1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
//! SBF data structure module

use std::{io::Cursor, ops};
use std::sync::Mutex;

use byteorder::ReadBytesExt;
#[cfg(feature = "md4_hash")]
use md4::Digest;
#[cfg(feature = "md5_hash")]
use md5::compute as md5_compute;
use num::{
    Bounded, cast::AsPrimitive,
    FromPrimitive, ToPrimitive,
    Unsigned, Zero};
use rand::{Rng, rngs::OsRng};
use rayon::{iter::repeatn, prelude::*};
#[cfg(feature = "serde_support")]
use serde::{Deserialize, Serialize};

use crate::{
    error::Error,
    types::{HashFunction, Salt}};
#[cfg(feature = "metrics")]
use crate::metrics::Metrics;

/// Spatial Bloom Filter data structure
///
/// This data structure uses a multi level bloom filter to identify if a content has already been
/// inserted in the filter and of which of a finite number of disjoint subsets of the origin space
/// it belongs to.
/// This is a probabilistic data structure
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde_support", derive(Serialize, Deserialize))]
pub struct SBF<U> where U: Unsigned + Bounded + Clone + Copy + PartialOrd + Eq {
    /// Hash salt container
    salts: Vec<Salt>,
    /// Filter
    pub(crate) filter: Vec<U>,
    /// Hash function to use during the calculation of the insertion and query indexes
    hash_function: HashFunction,
    #[cfg(feature = "metrics")]
    /// SBF metrics structure
    ///
    /// Can be activated enabling the `metrics` feature.
    /// Can be queried to retrieve information about the state of the filter.
    pub metrics: Metrics,
}

impl<U> SBF<U> where
    U: 'static + Send + Sync + Clone + Copy + Ord + PartialOrd + Eq +
    Unsigned + Bounded + Zero + FromPrimitive + ToPrimitive + ops::AddAssign + ops::SubAssign,
    usize: num::cast::AsPrimitive<U> {
    /// Adapter for the hash function used by the filter
    fn hash(&self, buff: &[u8]) -> Vec<u8> {
        match &self.hash_function {
            #[cfg(feature = "md5_hash")]
            HashFunction::MD5 => md5_compute(buff).to_vec(),
            #[cfg(feature = "md4_hash")]
            HashFunction::MD4 => md4::Md4::digest(buff).to_vec(),
        }
    }

    /// Calculates the indexed of the cells pointed by each of the hashes generated from the input
    fn calc_indexes(&self, content: Vec<u8>) -> Vec<U> {
        self
            .salts
            .par_iter()
            .map(|salt: &Salt| {
                // Iter over salt u8 values
                let salt_iterator = salt.par_iter();

                // Repeat 0, the length of the salt is the upper bound
                let zeros = repeatn(&(0 as u8), salt.len());

                // Content input with padding
                let content = content.par_iter().chain(zeros);

                // XORed content
                let xor_content: Vec<u8> = content.zip(salt_iterator)
                    .map(|(h, v)| h ^ v).collect();

                // First 8 u8 of the hash
                let digest = self
                    .hash(&xor_content)
                    .drain(0..8)
                    .collect::<Vec<u8>>();

                // Read digest as a u64
                let digest_value = Cursor::new(digest)
                    .read_u64::<byteorder::NativeEndian>().unwrap();

                // Return cell index
                    #[allow(clippy::integer_arithmetic)]
                (digest_value as usize % self.filter.len()).as_()
            })
            .collect::<Vec<U>>()
    }

    /// Returns the content of a cell
    fn get_cell(&self, index: U) -> Result<&U, Error> {
        self.filter.get(index.to_usize().unwrap()).ok_or(Error::IndexOutOfBounds)
    }

    /// Sets the content of the cell if the input area is higher than the one in the filter
    fn set_cell(&mut self, index: U, area: U) -> Result<&U, Error> {
        if let Some(v) = self.filter.get_mut(index.to_usize().unwrap()) {
            if *v == U::zero() || *v < area {
                // Cell is not marked or cell has lower value than the input area
                *v = area;
            } else if *v >= area {
                // Cell hash same or higher value than input area
            }

            #[cfg(feature = "metrics")]
                #[allow(clippy::integer_arithmetic)]
                {
                    if *v == U::zero() {
                        // Cell is not marked
                        self.metrics.area_cells[area.to_usize().unwrap()] += 1;
                    } else if *v < area {
                        // Cell hash lower value than the input area
                        // v is not zero ()
                        if area > U::zero() {
                            self.metrics.area_cells[v.to_usize().unwrap()] -= 1;
                        }
                        self.metrics.area_cells[area.to_usize().unwrap()] += 1;
                        self.metrics.collisions += 1;
                    } else if *v == area {
                        // Cell hash same value than input area
                        self.metrics.collisions += 1;
                        self.metrics.area_self_collisions[v.to_usize().unwrap()] += 1;
                    } else if *v > area {
                        // Cell hash higher value than input area
                        self.metrics.collisions += 1;
                    }
                }

            Ok(v)
        } else {
            Err(Error::IndexOutOfBounds)
        }
    }

    #[allow(unused_variables)]
    /// Constructor of the SBF data structure
    ///
    /// - `cells`: Number of cells in the filter,
    /// - `hash_number`: Number of hash functions used,
    /// - `max_input_size`: Maximum input dimension, if a larger one is used it will be truncated,
    /// - `hash_function`: Kind of hash function to use,
    /// - `area_number`: Number of different areas (only used in metrics).
    pub fn new(
        cells: U,
        hash_number: usize,
        max_input_size: usize,
        hash_function: HashFunction,
        area_number: U,
    ) -> Result<Self, Error> {
        assert!(cells > U::zero());

        // Cryptography safe RNG
        let rng = Mutex::new(OsRng);

        // Generate hash salts
        let salts = (0..hash_number)
            .into_par_iter()
            .map(|_| (0..max_input_size)
                .into_par_iter()
                .map(|_| rng.lock().unwrap().gen())
                .collect::<Salt>())
            .collect::<Vec<Salt>>();

        Ok(SBF {
            filter: vec![U::zero(); cells.to_usize().ok_or(Error::IndexOutOfBounds)?],
            hash_function,
            salts,

            #[cfg(feature = "metrics")]
            metrics: Metrics {
                cells: cells.to_usize().unwrap(),
                hash_number,
                members: 0,
                collisions: 0,
                safeness: 0.0,
                area_number: area_number.to_usize().unwrap(),
                area_members: vec![0; area_number.to_usize().ok_or(Error::IndexOutOfBounds)?],
                area_cells: vec![0; area_number.to_usize().ok_or(Error::IndexOutOfBounds)?],
                area_expected_cells: vec![-1; area_number.to_usize().ok_or(Error::IndexOutOfBounds)?],
                area_self_collisions: vec![0; area_number.to_usize().ok_or(Error::IndexOutOfBounds)?],
                area_fpp: vec![-1.0; area_number.to_usize().ok_or(Error::IndexOutOfBounds)?],
                area_isep: vec![-1.0; area_number.to_usize().ok_or(Error::IndexOutOfBounds)?],
                area_prior_fpp: vec![-1.0; area_number.to_usize().ok_or(Error::IndexOutOfBounds)?],
                area_prior_isep: vec![-1.0; area_number.to_usize().ok_or(Error::IndexOutOfBounds)?],
                area_prior_safep: vec![-1.0; area_number.to_usize().ok_or(Error::IndexOutOfBounds)?],
            },
        })
    }

    /// Constructor of the SBF data structure using optimal parameters
    pub fn new_optimal(expected_inserts: usize, area_number: U, max_fpp: f64, max_input_size: usize, hash_function: HashFunction) -> Result<Self, Error> {
        let optimal_cells = (-(expected_inserts as f64) * max_fpp.ln() / (2.0f64.ln().powi(2))) as u64;
        let hash_number = (optimal_cells as f64 / (expected_inserts as f64) * 2.0f64.ln()).ceil() as usize;
        Self::new(
            U::from_u64(optimal_cells).ok_or(Error::IndexOutOfBounds)?,
            hash_number,
            max_input_size,
            hash_function,
            area_number,
        )
    }
    /// Check an input for presence in the filter.
    /// It will return `0` if the input is not been inserted or the index of the area it belongs to
    /// if it has been inserted previously.
    ///
    /// Because of the probabilistic nature of this data structure, it is possible for it to return
    /// a false positive.
    pub fn check(&self, content: Vec<u8>) -> Result<&U, Error> {
        self.calc_indexes(content)
            .par_iter()
            .map(|i| self.get_cell(*i))
            .try_reduce_with(|a, b| Ok(a.min(b)))
            .expect("Some value, since the iterator is not empty")
    }

    /// Insert the content in the filter associated to the given area.
    pub fn insert(&mut self, content: Vec<u8>, area: U) -> Result<(), Error> {
        self.calc_indexes(content)
            .iter()
            .map(|i| self.set_cell(*i, area).map(|_| ()))
            .collect::<Result<(), Error>>()
            .map(|_| {
                #[cfg(feature = "metrics")]
                #[allow(clippy::integer_arithmetic)] {
                    self.metrics.members += 1;
                    self.metrics.area_members[area.to_usize().unwrap()] += 1;
                };
            })
    }
}