diff_priv/kafka/
kafka_service.rs1use crate::anonymization::microagg_anonymizer::MicroaggAnonymizer;
2use crate::config::Config;
3use crate::data_manipulation::mueller::MuellerStream;
4use crate::noise::laplace::laplace_noiser::LaplaceNoiser;
5use crate::publishing::kafka_publisher::KafkaPublisher;
6use avro_rs::from_value;
7use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
8use strm_privacy_driver::StrmPrivacyValue;
9
10pub struct KafkaService {
11 consumer: Consumer,
12}
13
14impl KafkaService {
15 pub fn consume(&mut self) {
16 let noiser = LaplaceNoiser::new(0.1, 3, 0.1);
17 let publisher = KafkaPublisher::default();
18 let mut microagg: MicroaggAnonymizer<LaplaceNoiser, MuellerStream, KafkaPublisher> =
19 MicroaggAnonymizer::new(3, 20, 2, 7, 0.1, 300, 5, publisher, noiser);
20 loop {
21 for ms in self.consumer.poll().unwrap().iter() {
22 for m in ms.messages() {
23 let mut b = &m.value[5..];
24 let confluent_bytes = &m.value[..5];
25 microagg.publisher.confluent_bytes = confluent_bytes.to_vec();
26 let mueller_value = avro_rs::from_avro_datum(
27 &MuellerStream::get_schema(MuellerStream::STRM_SCHEMA),
28 &mut b,
29 None,
30 )
31 .expect("couldn't convert message to mueller");
32 let mueller = from_value::<MuellerStream>(&mueller_value)
33 .expect("couldn't convert from value");
34 microagg.anonymize(mueller)
35 }
36 self.consumer
37 .consume_messageset(ms)
38 .expect("couldn't consume message");
39 }
40 self.consumer.commit_consumed().unwrap();
41 }
42 }
43}
44
45impl Default for KafkaService {
46 fn default() -> Self {
47 let config = Config::new(&"application.conf".to_string());
48
49 let consumer = Consumer::from_hosts(vec![config.kafka_bootstrap.to_owned()])
50 .with_topic_partitions(config.topic_in, &[0, 1])
51 .with_fallback_offset(FetchOffset::Earliest)
52 .with_group("my-group".to_owned())
53 .with_offset_storage(GroupOffsetStorage::Kafka)
54 .create()
55 .expect("Consumer couldn't connect to bootstrap");
56
57 Self { consumer }
58 }
59}