manta_shared/common/
kafka.rs1use std::{fmt, sync::OnceLock, time::Duration};
10
11use rdkafka::{
12 ClientConfig,
13 producer::{FutureProducer, FutureRecord},
14};
15use serde::{Deserialize, Serialize};
16
17use super::{audit::Audit, error::MantaError};
18
19const KAFKA_MESSAGE_TIMEOUT_MS: &str = "5000";
21
22const KAFKA_DELIVERY_WAIT: Duration = Duration::from_secs(0);
25
26#[derive(Serialize, Deserialize)]
32pub struct Kafka {
33 pub brokers: Vec<String>,
35 pub topic: String,
37 #[serde(skip)]
38 producer: OnceLock<FutureProducer>,
39}
40
41impl fmt::Debug for Kafka {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 f.debug_struct("Kafka")
44 .field("brokers", &self.brokers)
45 .field("topic", &self.topic)
46 .field(
47 "producer",
48 &if self.producer.get().is_some() {
49 "Some(<FutureProducer>)"
50 } else {
51 "None"
52 },
53 )
54 .finish()
55 }
56}
57
58impl Clone for Kafka {
59 fn clone(&self) -> Self {
62 Self {
63 brokers: self.brokers.clone(),
64 topic: self.topic.clone(),
65 producer: OnceLock::new(),
66 }
67 }
68}
69
70impl Kafka {
71 pub fn new(brokers: Vec<String>, topic: String) -> Self {
91 Self {
92 brokers,
93 topic,
94 producer: OnceLock::new(),
95 }
96 }
97
98 fn get_or_init_producer(&self) -> Result<&FutureProducer, MantaError> {
101 if let Some(p) = self.producer.get() {
102 return Ok(p);
103 }
104 let brokers = self.brokers.join(",");
105 let p: FutureProducer = ClientConfig::new()
106 .set("bootstrap.servers", &brokers)
107 .set("message.timeout.ms", KAFKA_MESSAGE_TIMEOUT_MS)
108 .create()
109 .map_err(|e| {
110 MantaError::KafkaError(format!("Failed to create Kafka producer: {e}"))
111 })?;
112 Ok(self.producer.get_or_init(|| p))
115 }
116}
117
118impl Audit for Kafka {
119 async fn produce_message(&self, data: &[u8]) -> Result<(), MantaError> {
120 let producer = self.get_or_init_producer()?;
121
122 let delivery_status = producer
123 .send::<Vec<u8>, _, _>(
124 FutureRecord::to(&self.topic).payload(data),
125 KAFKA_DELIVERY_WAIT,
126 )
127 .await;
128
129 match delivery_status {
130 Ok(_) => {
131 tracing::info!("Delivery status for message received");
132 }
133 Err(e) => {
134 return Err(MantaError::KafkaError(format!(
135 "Delivery status for message failed: {:?}",
136 e.0
137 )));
138 }
139 }
140
141 Ok(())
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
153
154 #[test]
155 fn new_round_trips_brokers_and_topic() {
156 let k = Kafka::new(
157 vec!["broker1:9092".into(), "broker2:9092".into()],
158 "audit-events".into(),
159 );
160 assert_eq!(k.brokers, vec!["broker1:9092", "broker2:9092"]);
161 assert_eq!(k.topic, "audit-events");
162 assert!(
163 k.producer.get().is_none(),
164 "producer must be uninitialised on construction (lazy init)"
165 );
166 }
167
168 #[test]
169 fn clone_resets_the_producer_cache() {
170 let original = Kafka::new(vec!["b:9092".into()], "t".into());
176 let cloned = original.clone();
177 assert_eq!(cloned.brokers, original.brokers);
178 assert_eq!(cloned.topic, original.topic);
179 assert!(
180 cloned.producer.get().is_none(),
181 "cloned producer cache must be empty regardless of source state"
182 );
183 }
184
185 #[test]
186 fn debug_masks_the_producer_and_shows_init_state() {
187 let uninit = Kafka::new(vec!["b:9092".into()], "audit".into());
193 let s = format!("{uninit:?}");
194 assert!(s.contains("brokers"), "brokers field must be visible");
195 assert!(s.contains("\"b:9092\""), "broker value must be visible");
196 assert!(s.contains("audit"), "topic must be visible");
197 assert!(
198 s.contains("None"),
199 "uninitialised producer must show as `None`, got: {s}"
200 );
201 assert!(
206 !s.contains("FutureProducer"),
207 "uninitialised Kafka must not mention FutureProducer in Debug output"
208 );
209 }
210}