Skip to main content

diff_priv/anonymization/
microagg_anonymizer.rs

1use crate::analysis::analyser::Analyser;
2use crate::analysis::cluster_analyser::ClusterAnalyser;
3use crate::analysis::disclosure_risk_analyser::DisclosureRiskAnalyser;
4use crate::analysis::mse_analyser::MseAnalyser;
5use crate::analysis::publishing_delay_analyser::PublishingDelayAnalyser;
6use crate::analysis::sse_analyser::SseAnalyser;
7use crate::anonymization::cluster::Cluster;
8use crate::data_manipulation::anonymizable::{Anonymizable, QuasiIdentifierType};
9use crate::noise::noiser::Noiser;
10use crate::publishing::publisher::Publisher;
11use rayon::prelude::*;
12use std::collections::BTreeMap;
13use std::sync::Mutex;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16// the micro aggregation differential privacy anonymization
17pub struct MicroaggAnonymizer<N, A, P>
18where
19    N: Noiser,
20    A: Anonymizable,
21    P: Publisher,
22{
23    k: usize,           // k-anonymity level
24    k_max: usize,       // maximum k-anonymity level before cluster is removed
25    l: usize,           // l-diversity level
26    c: i32,             // recursive (l,c)-diversity
27    delta: u128,        // life time delta in seconds
28    diff_thres: f64,    // difference threshold between data points
29    buffer_size: usize, // batch of data used to detect concept drift
30    pub publisher: P,
31    pub cluster_set: BTreeMap<u128, Cluster<A, N>>,
32    pub noiser: N,
33    pub analysers: Vec<Analyser<A>>,
34}
35
36#[allow(clippy::too_many_arguments)]
37impl<N, A, P> MicroaggAnonymizer<N, A, P>
38where
39    N: Noiser,
40    A: Anonymizable,
41    P: Publisher,
42{
43    pub fn new(
44        k: usize,
45        k_max: usize,
46        l: usize,
47        c: i32,
48        diff_thres: f64,
49        delta: u128,
50        buffer_size: usize,
51        publisher: P,
52        noiser: N,
53    ) -> Self {
54        let analysers = vec![
55            Analyser::Mse(MseAnalyser::default()),
56            Analyser::Sse(SseAnalyser::default()),
57            Analyser::PublishingDelay(PublishingDelayAnalyser::default()),
58            Analyser::ClusterAnalyser(ClusterAnalyser::default()),
59            Analyser::DisclosureRiskAnalyser(DisclosureRiskAnalyser::initialize(100)),
60        ];
61        Self {
62            k,
63            k_max,
64            l,
65            c,
66            diff_thres,
67            delta: delta * 1000000000,
68            buffer_size,
69            publisher,
70            cluster_set: Default::default(),
71            noiser,
72            analysers,
73        }
74    }
75
76    /// feed the data tuple through the differential privacy algorithm
77    pub fn anonymize(&mut self, value: A) {
78        // Borrowing the right cluster caused multiple ownership problems as we borrow
79        // self mutable and immutable.
80        debug!("cluster count: {}", self.cluster_set.len());
81        match self.find_best_cluster(&value) {
82            // create new cluster
83            None => {
84                info!("new cluster created");
85                let mut cluster = self.create_new_cluster();
86
87                cluster.add_tuple(value);
88                let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
89                self.cluster_set.insert(now.as_nanos(), cluster);
90                self.analysers.iter_mut().for_each(|analyser| {
91                    if let Analyser::ClusterAnalyser(cluster_analyser) = analyser {
92                        cluster_analyser.add_count()
93                    }
94                })
95            }
96            Some(mut cluster) => {
97                info!("cluster found");
98                // check life time and change cluster
99                if cluster.check_cluster_life_time() >= self.delta {
100                    cluster.publish_all(&mut self.publisher, &mut self.analysers);
101                    cluster = self.create_new_cluster();
102                    info!("cluster life time delta exceeded")
103                }
104
105                cluster.add_tuple(value);
106
107                // publishing value when k-anon level is met
108                if cluster.w_current.buffer.len() == self.k {
109                    cluster.publish_all(&mut self.publisher, &mut self.analysers);
110                    info!("k-level is met published all")
111                } else if self.k < cluster.w_current.buffer.len()
112                    && cluster.w_current.buffer.len() <= self.k_max + 1
113                {
114                    // if the cluster contains at least k records
115                    cluster.publish(&mut self.publisher, &mut self.analysers);
116                    info!("publishing")
117                }
118
119                // check if the w_current is full (max size of buffer)
120                // and reuse the buffer if concept drift is not detected
121                if cluster.is_full() {
122                    info!("cluster is full, checking concept drift");
123                    cluster.detect_concept_drift()
124                }
125
126                // we removed the cluster in the find best cluster method
127                // we need to insert it again
128                match cluster.complete_buffer_amount > self.k_max {
129                    true => {
130                        cluster.publish_all(&mut self.publisher, &mut self.analysers);
131                        info!("cluster is full removing..");
132                        cluster.print_domain_qis().into_iter().enumerate().for_each(
133                            |(index, domain)| match domain {
134                                (
135                                    QuasiIdentifierType::Integer(min),
136                                    QuasiIdentifierType::Integer(max),
137                                ) => {
138                                    debug!("QI {}| min: {:?}| max: {:?}", index + 1, min, max)
139                                }
140                                (
141                                    QuasiIdentifierType::Float(min),
142                                    QuasiIdentifierType::Float(max),
143                                ) => {
144                                    debug!("QI {}| min: {:?}| max: {:?}", index + 1, min, max)
145                                }
146                                _ => panic!("wrong QI"),
147                            },
148                        );
149                        // cluster has already been removed from cluster_set
150                        // and it does not need to be added again
151                        self.analysers.iter_mut().for_each(|analyser| {
152                            if let Analyser::ClusterAnalyser(cluster_analyser) = analyser {
153                                cluster_analyser.remove_count()
154                            }
155                        })
156                    }
157                    false => {
158                        // add the cluster again at its arrival time into the cluster,
159                        // if there already is another cluster there change the arrival time
160                        // and try again
161                        while self.cluster_set.contains_key(&cluster.last_arrival) {
162                            let now = SystemTime::now()
163                                .duration_since(UNIX_EPOCH)
164                                .unwrap()
165                                .as_nanos();
166                            cluster.last_arrival = now
167                        }
168                        self.cluster_set.insert(cluster.last_arrival, cluster);
169                    }
170                };
171            }
172        }
173    }
174
175    /// finding best cluster looking at the threshold
176    /// Some -> use cluster for further building
177    /// None -> create new cluster
178    /// TODO: create a check here for old cluster so that the looping through the cluster_set
179    /// is only done once, here we can improve massively on speed to use async to publishing the cluster set concurrently while
180    /// looping further maybe?
181    fn find_best_cluster(&mut self, value: &A) -> Option<Cluster<A, N>> {
182        // remove the cluster from self and return it
183        let mut best_cluster: Option<Cluster<A, N>> = None;
184        let best_cluster_key: Mutex<Option<u128>> = Mutex::new(None);
185        let least_info_loss: Mutex<Option<f64>> = Mutex::new(None);
186
187        self.cluster_set.par_iter().for_each(|(key, a)| {
188            if a.centroid.calculate_difference(value) <= self.diff_thres {
189                let info_loss = value.calculate_info_loss(&a.centroid);
190                let mut least_info = least_info_loss.lock().unwrap();
191                match *least_info {
192                    None => {
193                        *least_info = Some(info_loss);
194                        *best_cluster_key.lock().unwrap() = Some(*key)
195                    }
196                    Some(current_info_loss) => {
197                        if info_loss < current_info_loss {
198                            *least_info = Some(info_loss);
199                            *best_cluster_key.lock().unwrap() = Some(*key)
200                        }
201                    }
202                }
203            }
204        });
205
206        // remove the key from the hashmap and set the best_cluster
207        // variable
208        if let Some(key) = *best_cluster_key.lock().unwrap() {
209            best_cluster = self.cluster_set.remove(&key);
210        }
211
212        best_cluster
213    }
214
215    /// create new cluster
216    fn create_new_cluster(&self) -> Cluster<A, N> {
217        Cluster::new(
218            self.k,
219            self.l,
220            self.c,
221            self.buffer_size,
222            self.noiser.clone(),
223        )
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::MicroaggAnonymizer;
230    use crate::data_manipulation::mueller::MuellerStream;
231    use crate::noise::laplace::laplace_noiser::LaplaceNoiser;
232    use crate::test::dummy_publisher::DummyPublisher;
233
234    #[test]
235    fn find_best_cluster() {
236        let noiser = LaplaceNoiser::new(0.1, 3, 0.1);
237        let publisher = DummyPublisher::default();
238        let mut anonymizer: MicroaggAnonymizer<LaplaceNoiser, MuellerStream, DummyPublisher> =
239            MicroaggAnonymizer::new(2, 10, 2, 2, 0.65, 10, 5, publisher, noiser);
240
241        let mueller1 = MuellerStream {
242            age: Some(30),
243            gender: Some("male".to_string()),
244            ..MuellerStream::default()
245        };
246
247        assert!(anonymizer.find_best_cluster(&mueller1).is_none());
248
249        anonymizer.anonymize(mueller1);
250
251        assert_eq!(anonymizer.cluster_set.len(), 1);
252
253        let mueller2 = MuellerStream {
254            age: Some(30),
255            gender: Some("male".to_string()),
256            ..MuellerStream::default()
257        };
258
259        assert!(anonymizer.find_best_cluster(&mueller2).is_some());
260
261        anonymizer.anonymize(mueller2);
262
263        let mueller3 = MuellerStream {
264            age: Some(50),
265            gender: Some("female".to_string()),
266            ..MuellerStream::default()
267        };
268
269        assert_eq!(anonymizer.cluster_set.len(), 1);
270
271        anonymizer.anonymize(mueller3);
272
273        assert_eq!(anonymizer.cluster_set.len(), 2)
274    }
275}