1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
use crate::block::BlockContents;
use crate::filter::BoxedFilterPolicy;

use std::sync::Arc;

use integer_encoding::FixedInt;

const FILTER_BASE_LOG2: u32 = 11;

/// For a given byte offset, returns the index of the filter that includes the key at that offset.
#[inline]
fn get_filter_index(offset: usize, base_lg2: u32) -> u32 {
    // divide by 2048
    (offset >> base_lg2 as usize) as u32
}

/// A Filter Block is built like this:
///
/// [filter0, filter1, filter2, ..., offset of filter0, offset of filter1, ..., offset of offsets
/// array, log2 of FILTER_BASE]
///
/// where offsets are 4 bytes, offset of offsets is 4 bytes, and log2 of FILTER_BASE is 1 byte.
/// Two consecutive filter offsets may be the same.
///
/// TODO: See if we can remove the lifetime parameter.
pub struct FilterBlockBuilder {
    policy: BoxedFilterPolicy,
    // filters, concatenated
    filters: Vec<u8>,
    filter_offsets: Vec<usize>,

    // Reset on every start_block()
    key_offsets: Vec<usize>,
    keys: Vec<u8>,
}

impl FilterBlockBuilder {
    pub fn new(fp: BoxedFilterPolicy) -> FilterBlockBuilder {
        FilterBlockBuilder {
            policy: fp,
            // some pre-allocation
            filters: Vec::with_capacity(1024),
            filter_offsets: Vec::with_capacity(1024),
            key_offsets: Vec::with_capacity(1024),
            keys: Vec::with_capacity(1024),
        }
    }

    pub fn size_estimate(&self) -> usize {
        self.filters.len() + 4 * self.filter_offsets.len() + 4 + 1
    }

    pub fn filter_name(&self) -> &'static str {
        self.policy.name()
    }

    pub fn add_key(&mut self, key: &[u8]) {
        self.key_offsets.push(self.keys.len());
        self.keys.extend_from_slice(key);
    }

    pub fn start_block(&mut self, offset: usize) {
        let filter_ix = get_filter_index(offset, FILTER_BASE_LOG2);
        assert!(filter_ix >= self.filter_offsets.len() as u32);

        while filter_ix > self.filter_offsets.len() as u32 {
            self.generate_filter();
        }
    }

    fn generate_filter(&mut self) {
        self.filter_offsets.push(self.filters.len());
        if self.keys.is_empty() {
            return;
        }

        let filter = self.policy.create_filter(&self.keys, &self.key_offsets);
        self.filters.extend_from_slice(&filter);

        self.keys.clear();
        self.key_offsets.clear();
    }

    pub fn finish(mut self) -> Vec<u8> {
        if !self.keys.is_empty() {
            self.generate_filter();
        }

        let mut result = self.filters;
        let offsets_offset = result.len();
        let mut ix = result.len();
        result.resize(ix + 4 * self.filter_offsets.len() + 5, 0);

        // Put filter offsets at the end
        for offset in self.filter_offsets.into_iter() {
            (offset as u32).encode_fixed(&mut result[ix..ix + 4]);
            ix += 4;
        }

        (offsets_offset as u32).encode_fixed(&mut result[ix..ix + 4]);
        ix += 4;
        result[ix] = FILTER_BASE_LOG2 as u8;

        result
    }
}

#[derive(Clone)]
pub struct FilterBlockReader {
    policy: BoxedFilterPolicy,
    block: Arc<BlockContents>,

    offsets_offset: usize,
    filter_base_lg2: u32,
}

impl FilterBlockReader {
    pub fn new_owned(pol: BoxedFilterPolicy, data: Vec<u8>) -> FilterBlockReader {
        FilterBlockReader::new(pol, Arc::new(data))
    }

    pub fn new(pol: BoxedFilterPolicy, data: Arc<Vec<u8>>) -> FilterBlockReader {
        assert!(data.len() >= 5);

        let fbase = data[data.len() - 1] as u32;
        let offset = u32::decode_fixed(&data[data.len() - 5..data.len() - 1]) as usize;

        FilterBlockReader {
            policy: pol,
            block: data,
            filter_base_lg2: fbase,
            offsets_offset: offset,
        }
    }

    /// Returns number of filters
    pub fn num(&self) -> u32 {
        ((self.block.len() - self.offsets_offset - 5) / 4) as u32
    }

    /// Returns the offset of the offset with index i.
    fn offset_of(&self, i: u32) -> usize {
        let offset_offset = self.offsets_offset + 4 * i as usize;
        u32::decode_fixed(&self.block[offset_offset..offset_offset + 4]) as usize
    }

    /// blk_offset is the offset of the block containing key. Returns whether the key matches the
    /// filter for the block at blk_offset.
    pub fn key_may_match(&self, blk_offset: usize, key: &[u8]) -> bool {
        if get_filter_index(blk_offset, self.filter_base_lg2) > self.num() {
            return true;
        }

        let filter_begin = self.offset_of(get_filter_index(blk_offset, self.filter_base_lg2));
        let filter_end = self.offset_of(get_filter_index(blk_offset, self.filter_base_lg2) + 1);

        assert!(filter_begin < filter_end);
        assert!(filter_end <= self.offsets_offset);

        self.policy
            .key_may_match(key, &self.block[filter_begin..filter_end])
    }
}

#[cfg(test)]
mod tests {
    use super::get_filter_index;
    use super::FILTER_BASE_LOG2;
    use super::*;
    use crate::filter::BloomPolicy;

    #[test]
    fn test_filter_index() {
        assert_eq!(get_filter_index(3777, FILTER_BASE_LOG2), 1);
        assert_eq!(get_filter_index(10000, FILTER_BASE_LOG2), 4);
    }

    fn get_keys() -> Vec<&'static [u8]> {
        vec![
            "abcd".as_bytes(),
            "efgh".as_bytes(),
            "ijkl".as_bytes(),
            "mnopqrstuvwxyz".as_bytes(),
        ]
    }

    fn produce_filter_block() -> Vec<u8> {
        let keys = get_keys();
        let mut bld = FilterBlockBuilder::new(Arc::new(Box::new(BloomPolicy::new(32))));

        bld.start_block(0);

        for k in keys.iter() {
            bld.add_key(k);
        }

        // second block
        bld.start_block(5000);

        for k in keys.iter() {
            bld.add_key(k);
        }

        bld.finish()
    }

    #[test]
    fn test_filter_block_builder() {
        let result = produce_filter_block();
        // 2 blocks of 4 filters of 4 bytes plus 1B for `k`; plus three filter offsets (because of
        //   the block offsets of 0 and 5000); plus footer
        assert_eq!(result.len(), 2 * (get_keys().len() * 4 + 1) + (3 * 4) + 5);
        assert_eq!(
            result,
            vec![
                234, 195, 25, 155, 61, 141, 173, 140, 221, 28, 222, 92, 220, 112, 234, 227, 22,
                234, 195, 25, 155, 61, 141, 173, 140, 221, 28, 222, 92, 220, 112, 234, 227, 22, 0,
                0, 0, 0, 17, 0, 0, 0, 17, 0, 0, 0, 34, 0, 0, 0, 11,
            ]
        );
    }

    #[test]
    fn test_filter_block_build_read() {
        let result = produce_filter_block();
        let reader = FilterBlockReader::new_owned(Arc::new(Box::new(BloomPolicy::new(32))), result);

        assert_eq!(
            reader.offset_of(get_filter_index(5121, FILTER_BASE_LOG2)),
            17
        ); // third block in third filter

        let unknown_keys = vec![
            "xsb".as_bytes(),
            "9sad".as_bytes(),
            "assssaaaass".as_bytes(),
        ];

        for block_offset in vec![0, 1024, 5000, 6025].into_iter() {
            for key in get_keys().iter() {
                assert!(
                    reader.key_may_match(block_offset, key),
                    format!("{} {:?} ", block_offset, key)
                );
            }
            for key in unknown_keys.iter() {
                assert!(!reader.key_may_match(block_offset, key));
            }
        }
    }
}