diff_priv/publishing/
kafka_publisher.rs1use crate::config::Config;
2use crate::data_manipulation::anonymizable::Anonymizable;
3use crate::data_manipulation::mueller::MuellerStream;
4use crate::publishing::publisher::Publisher;
5use avro_rs::{to_avro_datum, to_value};
6use kafka::producer::{Producer, Record, RequiredAcks};
7use std::time::Duration;
8use strm_privacy_driver::StrmPrivacyValue;
9use uuid::Uuid;
10
11pub struct KafkaPublisher {
12 producer: Producer,
13 pub confluent_bytes: Vec<u8>,
14 topic_out: String,
15 published: i32,
16}
17
18impl KafkaPublisher {
19 pub fn new(confluent_bytes: Vec<u8>) -> Self {
20 Self {
21 confluent_bytes,
22 ..Default::default()
23 }
24 }
25}
26
27impl Default for KafkaPublisher {
28 fn default() -> Self {
29 let config = Config::new(&"application.conf".to_string());
30 let producer = Producer::from_hosts(vec![config.kafka_bootstrap.to_owned()])
31 .with_ack_timeout(Duration::from_secs(1))
32 .with_required_acks(RequiredAcks::One)
33 .create()
34 .expect("Producer couldn't connect to kafka bootstrap");
35
36 let confluent_bytes: Vec<u8> = Vec::new();
37
38 Self {
39 confluent_bytes,
40 producer,
41 topic_out: config.topic_out,
42 published: 0,
43 }
44 }
45}
46
47impl Publisher for KafkaPublisher {
48 fn publish<M: Anonymizable>(&mut self, value: M, uuid: Uuid, dr: f64) {
49 let converted_value = to_value(value).unwrap();
50 let mut datum = to_avro_datum(
51 &MuellerStream::get_schema(MuellerStream::STRM_SCHEMA),
52 converted_value,
53 )
54 .unwrap();
55
56 let mut send = self.confluent_bytes.to_vec();
57 send.append(&mut datum);
58
59 self.published += 1;
60
61 debug!("{}", self.published);
62
63 match self
64 .producer
65 .send(&Record::from_value(self.topic_out.as_str(), send))
66 {
67 Ok(_) => {
68 println!("Produced!")
69 }
70 Err(e) => {
71 println!("{:?}", e)
72 }
73 }
74 }
75}