diff-priv 0.1.0

k-anonymity, (c,l)-diversity and ε-differential privacy framework
Documentation
use crate::anonymization::microagg_anonymizer::MicroaggAnonymizer;
use crate::config::Config;
use crate::data_manipulation::mueller::MuellerStream;
use crate::noise::laplace::laplace_noiser::LaplaceNoiser;
use crate::publishing::kafka_publisher::KafkaPublisher;
use avro_rs::from_value;
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use strm_privacy_driver::StrmPrivacyValue;

pub struct KafkaService {
    consumer: Consumer,
}

impl KafkaService {
    pub fn consume(&mut self) {
        let noiser = LaplaceNoiser::new(0.1, 3, 0.1);
        let publisher = KafkaPublisher::default();
        let mut microagg: MicroaggAnonymizer<LaplaceNoiser, MuellerStream, KafkaPublisher> =
            MicroaggAnonymizer::new(3, 20, 2, 7, 0.1, 300, 5, publisher, noiser);
        loop {
            for ms in self.consumer.poll().unwrap().iter() {
                for m in ms.messages() {
                    let mut b = &m.value[5..];
                    let confluent_bytes = &m.value[..5];
                    microagg.publisher.confluent_bytes = confluent_bytes.to_vec();
                    let mueller_value = avro_rs::from_avro_datum(
                        &MuellerStream::get_schema(MuellerStream::STRM_SCHEMA),
                        &mut b,
                        None,
                    )
                    .expect("couldn't convert message to mueller");
                    let mueller = from_value::<MuellerStream>(&mueller_value)
                        .expect("couldn't convert from value");
                    microagg.anonymize(mueller)
                }
                self.consumer
                    .consume_messageset(ms)
                    .expect("couldn't consume message");
            }
            self.consumer.commit_consumed().unwrap();
        }
    }
}

impl Default for KafkaService {
    fn default() -> Self {
        let config = Config::new(&"application.conf".to_string());

        let consumer = Consumer::from_hosts(vec![config.kafka_bootstrap.to_owned()])
            .with_topic_partitions(config.topic_in, &[0, 1])
            .with_fallback_offset(FetchOffset::Earliest)
            .with_group("my-group".to_owned())
            .with_offset_storage(GroupOffsetStorage::Kafka)
            .create()
            .expect("Consumer couldn't connect to bootstrap");

        Self { consumer }
    }
}