Skip to main content

diff_priv/publishing/
kafka_publisher.rs

1use crate::config::Config;
2use crate::data_manipulation::anonymizable::Anonymizable;
3use crate::data_manipulation::mueller::MuellerStream;
4use crate::publishing::publisher::Publisher;
5use avro_rs::{to_avro_datum, to_value};
6use kafka::producer::{Producer, Record, RequiredAcks};
7use std::time::Duration;
8use strm_privacy_driver::StrmPrivacyValue;
9use uuid::Uuid;
10
11pub struct KafkaPublisher {
12    producer: Producer,
13    pub confluent_bytes: Vec<u8>,
14    topic_out: String,
15    published: i32,
16}
17
18impl KafkaPublisher {
19    pub fn new(confluent_bytes: Vec<u8>) -> Self {
20        Self {
21            confluent_bytes,
22            ..Default::default()
23        }
24    }
25}
26
27impl Default for KafkaPublisher {
28    fn default() -> Self {
29        let config = Config::new(&"application.conf".to_string());
30        let producer = Producer::from_hosts(vec![config.kafka_bootstrap.to_owned()])
31            .with_ack_timeout(Duration::from_secs(1))
32            .with_required_acks(RequiredAcks::One)
33            .create()
34            .expect("Producer couldn't connect to kafka bootstrap");
35
36        let confluent_bytes: Vec<u8> = Vec::new();
37
38        Self {
39            confluent_bytes,
40            producer,
41            topic_out: config.topic_out,
42            published: 0,
43        }
44    }
45}
46
47impl Publisher for KafkaPublisher {
48    fn publish<M: Anonymizable>(&mut self, value: M, uuid: Uuid, dr: f64) {
49        let converted_value = to_value(value).unwrap();
50        let mut datum = to_avro_datum(
51            &MuellerStream::get_schema(MuellerStream::STRM_SCHEMA),
52            converted_value,
53        )
54        .unwrap();
55
56        let mut send = self.confluent_bytes.to_vec();
57        send.append(&mut datum);
58
59        self.published += 1;
60
61        debug!("{}", self.published);
62
63        match self
64            .producer
65            .send(&Record::from_value(self.topic_out.as_str(), send))
66        {
67            Ok(_) => {
68                println!("Produced!")
69            }
70            Err(e) => {
71                println!("{:?}", e)
72            }
73        }
74    }
75}