diff_priv/kafka/
kafka_service.rs

1use crate::anonymization::microagg_anonymizer::MicroaggAnonymizer;
2use crate::config::Config;
3use crate::data_manipulation::mueller::MuellerStream;
4use crate::noise::laplace::laplace_noiser::LaplaceNoiser;
5use crate::publishing::kafka_publisher::KafkaPublisher;
6use avro_rs::from_value;
7use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
8use strm_privacy_driver::StrmPrivacyValue;
9
10pub struct KafkaService {
11    consumer: Consumer,
12}
13
14impl KafkaService {
15    pub fn consume(&mut self) {
16        let noiser = LaplaceNoiser::new(0.1, 3, 0.1);
17        let publisher = KafkaPublisher::default();
18        let mut microagg: MicroaggAnonymizer<LaplaceNoiser, MuellerStream, KafkaPublisher> =
19            MicroaggAnonymizer::new(3, 20, 2, 7, 0.1, 300, 5, publisher, noiser);
20        loop {
21            for ms in self.consumer.poll().unwrap().iter() {
22                for m in ms.messages() {
23                    let mut b = &m.value[5..];
24                    let confluent_bytes = &m.value[..5];
25                    microagg.publisher.confluent_bytes = confluent_bytes.to_vec();
26                    let mueller_value = avro_rs::from_avro_datum(
27                        &MuellerStream::get_schema(MuellerStream::STRM_SCHEMA),
28                        &mut b,
29                        None,
30                    )
31                    .expect("couldn't convert message to mueller");
32                    let mueller = from_value::<MuellerStream>(&mueller_value)
33                        .expect("couldn't convert from value");
34                    microagg.anonymize(mueller)
35                }
36                self.consumer
37                    .consume_messageset(ms)
38                    .expect("couldn't consume message");
39            }
40            self.consumer.commit_consumed().unwrap();
41        }
42    }
43}
44
45impl Default for KafkaService {
46    fn default() -> Self {
47        let config = Config::new(&"application.conf".to_string());
48
49        let consumer = Consumer::from_hosts(vec![config.kafka_bootstrap.to_owned()])
50            .with_topic_partitions(config.topic_in, &[0, 1])
51            .with_fallback_offset(FetchOffset::Earliest)
52            .with_group("my-group".to_owned())
53            .with_offset_storage(GroupOffsetStorage::Kafka)
54            .create()
55            .expect("Consumer couldn't connect to bootstrap");
56
57        Self { consumer }
58    }
59}