1use crate::analysis::analyser::Analyser;
2use crate::analysis::cluster_analyser::ClusterAnalyser;
3use crate::analysis::disclosure_risk_analyser::DisclosureRiskAnalyser;
4use crate::analysis::mse_analyser::MseAnalyser;
5use crate::analysis::publishing_delay_analyser::PublishingDelayAnalyser;
6use crate::analysis::sse_analyser::SseAnalyser;
7use crate::anonymization::cluster::Cluster;
8use crate::data_manipulation::anonymizable::{Anonymizable, QuasiIdentifierType};
9use crate::noise::noiser::Noiser;
10use crate::publishing::publisher::Publisher;
11use rayon::prelude::*;
12use std::collections::BTreeMap;
13use std::sync::Mutex;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16pub struct MicroaggAnonymizer<N, A, P>
18where
19 N: Noiser,
20 A: Anonymizable,
21 P: Publisher,
22{
23 k: usize, k_max: usize, l: usize, c: i32, delta: u128, diff_thres: f64, buffer_size: usize, pub publisher: P,
31 pub cluster_set: BTreeMap<u128, Cluster<A, N>>,
32 pub noiser: N,
33 pub analysers: Vec<Analyser<A>>,
34}
35
36#[allow(clippy::too_many_arguments)]
37impl<N, A, P> MicroaggAnonymizer<N, A, P>
38where
39 N: Noiser,
40 A: Anonymizable,
41 P: Publisher,
42{
43 pub fn new(
44 k: usize,
45 k_max: usize,
46 l: usize,
47 c: i32,
48 diff_thres: f64,
49 delta: u128,
50 buffer_size: usize,
51 publisher: P,
52 noiser: N,
53 ) -> Self {
54 let analysers = vec![
55 Analyser::Mse(MseAnalyser::default()),
56 Analyser::Sse(SseAnalyser::default()),
57 Analyser::PublishingDelay(PublishingDelayAnalyser::default()),
58 Analyser::ClusterAnalyser(ClusterAnalyser::default()),
59 Analyser::DisclosureRiskAnalyser(DisclosureRiskAnalyser::initialize(100)),
60 ];
61 Self {
62 k,
63 k_max,
64 l,
65 c,
66 diff_thres,
67 delta: delta * 1000000000,
68 buffer_size,
69 publisher,
70 cluster_set: Default::default(),
71 noiser,
72 analysers,
73 }
74 }
75
76 pub fn anonymize(&mut self, value: A) {
78 debug!("cluster count: {}", self.cluster_set.len());
81 match self.find_best_cluster(&value) {
82 None => {
84 info!("new cluster created");
85 let mut cluster = self.create_new_cluster();
86
87 cluster.add_tuple(value);
88 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
89 self.cluster_set.insert(now.as_nanos(), cluster);
90 self.analysers.iter_mut().for_each(|analyser| {
91 if let Analyser::ClusterAnalyser(cluster_analyser) = analyser {
92 cluster_analyser.add_count()
93 }
94 })
95 }
96 Some(mut cluster) => {
97 info!("cluster found");
98 if cluster.check_cluster_life_time() >= self.delta {
100 cluster.publish_all(&mut self.publisher, &mut self.analysers);
101 cluster = self.create_new_cluster();
102 info!("cluster life time delta exceeded")
103 }
104
105 cluster.add_tuple(value);
106
107 if cluster.w_current.buffer.len() == self.k {
109 cluster.publish_all(&mut self.publisher, &mut self.analysers);
110 info!("k-level is met published all")
111 } else if self.k < cluster.w_current.buffer.len()
112 && cluster.w_current.buffer.len() <= self.k_max + 1
113 {
114 cluster.publish(&mut self.publisher, &mut self.analysers);
116 info!("publishing")
117 }
118
119 if cluster.is_full() {
122 info!("cluster is full, checking concept drift");
123 cluster.detect_concept_drift()
124 }
125
126 match cluster.complete_buffer_amount > self.k_max {
129 true => {
130 cluster.publish_all(&mut self.publisher, &mut self.analysers);
131 info!("cluster is full removing..");
132 cluster.print_domain_qis().into_iter().enumerate().for_each(
133 |(index, domain)| match domain {
134 (
135 QuasiIdentifierType::Integer(min),
136 QuasiIdentifierType::Integer(max),
137 ) => {
138 debug!("QI {}| min: {:?}| max: {:?}", index + 1, min, max)
139 }
140 (
141 QuasiIdentifierType::Float(min),
142 QuasiIdentifierType::Float(max),
143 ) => {
144 debug!("QI {}| min: {:?}| max: {:?}", index + 1, min, max)
145 }
146 _ => panic!("wrong QI"),
147 },
148 );
149 self.analysers.iter_mut().for_each(|analyser| {
152 if let Analyser::ClusterAnalyser(cluster_analyser) = analyser {
153 cluster_analyser.remove_count()
154 }
155 })
156 }
157 false => {
158 while self.cluster_set.contains_key(&cluster.last_arrival) {
162 let now = SystemTime::now()
163 .duration_since(UNIX_EPOCH)
164 .unwrap()
165 .as_nanos();
166 cluster.last_arrival = now
167 }
168 self.cluster_set.insert(cluster.last_arrival, cluster);
169 }
170 };
171 }
172 }
173 }
174
175 fn find_best_cluster(&mut self, value: &A) -> Option<Cluster<A, N>> {
182 let mut best_cluster: Option<Cluster<A, N>> = None;
184 let best_cluster_key: Mutex<Option<u128>> = Mutex::new(None);
185 let least_info_loss: Mutex<Option<f64>> = Mutex::new(None);
186
187 self.cluster_set.par_iter().for_each(|(key, a)| {
188 if a.centroid.calculate_difference(value) <= self.diff_thres {
189 let info_loss = value.calculate_info_loss(&a.centroid);
190 let mut least_info = least_info_loss.lock().unwrap();
191 match *least_info {
192 None => {
193 *least_info = Some(info_loss);
194 *best_cluster_key.lock().unwrap() = Some(*key)
195 }
196 Some(current_info_loss) => {
197 if info_loss < current_info_loss {
198 *least_info = Some(info_loss);
199 *best_cluster_key.lock().unwrap() = Some(*key)
200 }
201 }
202 }
203 }
204 });
205
206 if let Some(key) = *best_cluster_key.lock().unwrap() {
209 best_cluster = self.cluster_set.remove(&key);
210 }
211
212 best_cluster
213 }
214
215 fn create_new_cluster(&self) -> Cluster<A, N> {
217 Cluster::new(
218 self.k,
219 self.l,
220 self.c,
221 self.buffer_size,
222 self.noiser.clone(),
223 )
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::MicroaggAnonymizer;
230 use crate::data_manipulation::mueller::MuellerStream;
231 use crate::noise::laplace::laplace_noiser::LaplaceNoiser;
232 use crate::test::dummy_publisher::DummyPublisher;
233
234 #[test]
235 fn find_best_cluster() {
236 let noiser = LaplaceNoiser::new(0.1, 3, 0.1);
237 let publisher = DummyPublisher::default();
238 let mut anonymizer: MicroaggAnonymizer<LaplaceNoiser, MuellerStream, DummyPublisher> =
239 MicroaggAnonymizer::new(2, 10, 2, 2, 0.65, 10, 5, publisher, noiser);
240
241 let mueller1 = MuellerStream {
242 age: Some(30),
243 gender: Some("male".to_string()),
244 ..MuellerStream::default()
245 };
246
247 assert!(anonymizer.find_best_cluster(&mueller1).is_none());
248
249 anonymizer.anonymize(mueller1);
250
251 assert_eq!(anonymizer.cluster_set.len(), 1);
252
253 let mueller2 = MuellerStream {
254 age: Some(30),
255 gender: Some("male".to_string()),
256 ..MuellerStream::default()
257 };
258
259 assert!(anonymizer.find_best_cluster(&mueller2).is_some());
260
261 anonymizer.anonymize(mueller2);
262
263 let mueller3 = MuellerStream {
264 age: Some(50),
265 gender: Some("female".to_string()),
266 ..MuellerStream::default()
267 };
268
269 assert_eq!(anonymizer.cluster_set.len(), 1);
270
271 anonymizer.anonymize(mueller3);
272
273 assert_eq!(anonymizer.cluster_set.len(), 2)
274 }
275}