use crate::analysis::analyser::Analyser;
use crate::analysis::cluster_analyser::ClusterAnalyser;
use crate::analysis::disclosure_risk_analyser::DisclosureRiskAnalyser;
use crate::analysis::mse_analyser::MseAnalyser;
use crate::analysis::publishing_delay_analyser::PublishingDelayAnalyser;
use crate::analysis::sse_analyser::SseAnalyser;
use crate::anonymization::cluster::Cluster;
use crate::data_manipulation::anonymizable::{Anonymizable, QuasiIdentifierType};
use crate::noise::noiser::Noiser;
use crate::publishing::publisher::Publisher;
use rayon::prelude::*;
use std::collections::BTreeMap;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
pub struct MicroaggAnonymizer<N, A, P>
where
N: Noiser,
A: Anonymizable,
P: Publisher,
{
k: usize, k_max: usize, l: usize, c: i32, delta: u128, diff_thres: f64, buffer_size: usize, pub publisher: P,
pub cluster_set: BTreeMap<u128, Cluster<A, N>>,
pub noiser: N,
pub analysers: Vec<Analyser<A>>,
}
#[allow(clippy::too_many_arguments)]
impl<N, A, P> MicroaggAnonymizer<N, A, P>
where
N: Noiser,
A: Anonymizable,
P: Publisher,
{
pub fn new(
k: usize,
k_max: usize,
l: usize,
c: i32,
diff_thres: f64,
delta: u128,
buffer_size: usize,
publisher: P,
noiser: N,
) -> Self {
let analysers = vec![
Analyser::Mse(MseAnalyser::default()),
Analyser::Sse(SseAnalyser::default()),
Analyser::PublishingDelay(PublishingDelayAnalyser::default()),
Analyser::ClusterAnalyser(ClusterAnalyser::default()),
Analyser::DisclosureRiskAnalyser(DisclosureRiskAnalyser::initialize(100)),
];
Self {
k,
k_max,
l,
c,
diff_thres,
delta: delta * 1000000000,
buffer_size,
publisher,
cluster_set: Default::default(),
noiser,
analysers,
}
}
pub fn anonymize(&mut self, value: A) {
debug!("cluster count: {}", self.cluster_set.len());
match self.find_best_cluster(&value) {
None => {
info!("new cluster created");
let mut cluster = self.create_new_cluster();
cluster.add_tuple(value);
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
self.cluster_set.insert(now.as_nanos(), cluster);
self.analysers.iter_mut().for_each(|analyser| {
if let Analyser::ClusterAnalyser(cluster_analyser) = analyser {
cluster_analyser.add_count()
}
})
}
Some(mut cluster) => {
info!("cluster found");
if cluster.check_cluster_life_time() >= self.delta {
cluster.publish_all(&mut self.publisher, &mut self.analysers);
cluster = self.create_new_cluster();
info!("cluster life time delta exceeded")
}
cluster.add_tuple(value);
if cluster.w_current.buffer.len() == self.k {
cluster.publish_all(&mut self.publisher, &mut self.analysers);
info!("k-level is met published all")
} else if self.k < cluster.w_current.buffer.len()
&& cluster.w_current.buffer.len() <= self.k_max + 1
{
cluster.publish(&mut self.publisher, &mut self.analysers);
info!("publishing")
}
if cluster.is_full() {
info!("cluster is full, checking concept drift");
cluster.detect_concept_drift()
}
match cluster.complete_buffer_amount > self.k_max {
true => {
cluster.publish_all(&mut self.publisher, &mut self.analysers);
info!("cluster is full removing..");
cluster.print_domain_qis().into_iter().enumerate().for_each(
|(index, domain)| match domain {
(
QuasiIdentifierType::Integer(min),
QuasiIdentifierType::Integer(max),
) => {
debug!("QI {}| min: {:?}| max: {:?}", index + 1, min, max)
}
(
QuasiIdentifierType::Float(min),
QuasiIdentifierType::Float(max),
) => {
debug!("QI {}| min: {:?}| max: {:?}", index + 1, min, max)
}
_ => panic!("wrong QI"),
},
);
self.analysers.iter_mut().for_each(|analyser| {
if let Analyser::ClusterAnalyser(cluster_analyser) = analyser {
cluster_analyser.remove_count()
}
})
}
false => {
while self.cluster_set.contains_key(&cluster.last_arrival) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
cluster.last_arrival = now
}
self.cluster_set.insert(cluster.last_arrival, cluster);
}
};
}
}
}
fn find_best_cluster(&mut self, value: &A) -> Option<Cluster<A, N>> {
let mut best_cluster: Option<Cluster<A, N>> = None;
let best_cluster_key: Mutex<Option<u128>> = Mutex::new(None);
let least_info_loss: Mutex<Option<f64>> = Mutex::new(None);
self.cluster_set.par_iter().for_each(|(key, a)| {
if a.centroid.calculate_difference(value) <= self.diff_thres {
let info_loss = value.calculate_info_loss(&a.centroid);
let mut least_info = least_info_loss.lock().unwrap();
match *least_info {
None => {
*least_info = Some(info_loss);
*best_cluster_key.lock().unwrap() = Some(*key)
}
Some(current_info_loss) => {
if info_loss < current_info_loss {
*least_info = Some(info_loss);
*best_cluster_key.lock().unwrap() = Some(*key)
}
}
}
}
});
if let Some(key) = *best_cluster_key.lock().unwrap() {
best_cluster = self.cluster_set.remove(&key);
}
best_cluster
}
fn create_new_cluster(&self) -> Cluster<A, N> {
Cluster::new(
self.k,
self.l,
self.c,
self.buffer_size,
self.noiser.clone(),
)
}
}
#[cfg(test)]
mod tests {
use super::MicroaggAnonymizer;
use crate::data_manipulation::mueller::MuellerStream;
use crate::noise::laplace::laplace_noiser::LaplaceNoiser;
use crate::test::dummy_publisher::DummyPublisher;
#[test]
fn find_best_cluster() {
let noiser = LaplaceNoiser::new(0.1, 3, 0.1);
let publisher = DummyPublisher::default();
let mut anonymizer: MicroaggAnonymizer<LaplaceNoiser, MuellerStream, DummyPublisher> =
MicroaggAnonymizer::new(2, 10, 2, 2, 0.65, 10, 5, publisher, noiser);
let mueller1 = MuellerStream {
age: Some(30),
gender: Some("male".to_string()),
..MuellerStream::default()
};
assert!(anonymizer.find_best_cluster(&mueller1).is_none());
anonymizer.anonymize(mueller1);
assert_eq!(anonymizer.cluster_set.len(), 1);
let mueller2 = MuellerStream {
age: Some(30),
gender: Some("male".to_string()),
..MuellerStream::default()
};
assert!(anonymizer.find_best_cluster(&mueller2).is_some());
anonymizer.anonymize(mueller2);
let mueller3 = MuellerStream {
age: Some(50),
gender: Some("female".to_string()),
..MuellerStream::default()
};
assert_eq!(anonymizer.cluster_set.len(), 1);
anonymizer.anonymize(mueller3);
assert_eq!(anonymizer.cluster_set.len(), 2)
}
}