Skip to main content

manta_shared/common/
kafka.rs

1//! Lazily-initialised Kafka producer used by the audit subsystem.
2//!
3//! The producer is a fire-and-forget `FutureProducer` cached behind
4//! a `OnceLock`, so the first audit message pays the connection cost
5//! and subsequent messages reuse the same client. Delivery uses a
6//! zero-duration wait — the audit path never blocks the request
7//! that triggered it.
8
9use std::{fmt, sync::OnceLock, time::Duration};
10
11use rdkafka::{
12  ClientConfig,
13  producer::{FutureProducer, FutureRecord},
14};
15use serde::{Deserialize, Serialize};
16
17use super::{audit::Audit, error::MantaError};
18
19/// Kafka message delivery timeout in milliseconds.
20const KAFKA_MESSAGE_TIMEOUT_MS: &str = "5000";
21
22/// How long to wait for Kafka delivery confirmation.
23/// Zero means fire-and-forget.
24const KAFKA_DELIVERY_WAIT: Duration = Duration::from_secs(0);
25
26/// Kafka client configuration for audit message production.
27///
28/// The [`FutureProducer`] is lazily created on the first
29/// call to [`Audit::produce_message`] and reused for all
30/// subsequent calls via an internal [`OnceLock`].
31#[derive(Serialize, Deserialize)]
32pub struct Kafka {
33  /// Bootstrap broker list, e.g. `vec!["kafka.example.com:9092"]`.
34  pub brokers: Vec<String>,
35  /// Kafka topic that audit messages are published to.
36  pub topic: String,
37  #[serde(skip)]
38  producer: OnceLock<FutureProducer>,
39}
40
41impl fmt::Debug for Kafka {
42  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43    f.debug_struct("Kafka")
44      .field("brokers", &self.brokers)
45      .field("topic", &self.topic)
46      .field(
47        "producer",
48        &if self.producer.get().is_some() {
49          "Some(<FutureProducer>)"
50        } else {
51          "None"
52        },
53      )
54      .finish()
55  }
56}
57
58impl Clone for Kafka {
59  /// Clone the configuration only; the cached producer is
60  /// not cloned and will be lazily recreated.
61  fn clone(&self) -> Self {
62    Self {
63      brokers: self.brokers.clone(),
64      topic: self.topic.clone(),
65      producer: OnceLock::new(),
66    }
67  }
68}
69
70impl Kafka {
71  /// Create a new `Kafka` instance with the given broker
72  /// list and topic name.
73  ///
74  /// The actual `FutureProducer` is built lazily on the first
75  /// `produce_message` call, so this constructor is cheap and
76  /// infallible.
77  ///
78  /// # Examples
79  ///
80  /// ```
81  /// use manta_shared::common::kafka::Kafka;
82  ///
83  /// let k = Kafka::new(
84  ///   vec!["kafka1.example.com:9092".into(), "kafka2.example.com:9092".into()],
85  ///   "manta-audit".into(),
86  /// );
87  /// assert_eq!(k.topic, "manta-audit");
88  /// assert_eq!(k.brokers.len(), 2);
89  /// ```
90  pub fn new(brokers: Vec<String>, topic: String) -> Self {
91    Self {
92      brokers,
93      topic,
94      producer: OnceLock::new(),
95    }
96  }
97
98  /// Return the cached [`FutureProducer`], creating it on
99  /// first call.
100  fn get_or_init_producer(&self) -> Result<&FutureProducer, MantaError> {
101    if let Some(p) = self.producer.get() {
102      return Ok(p);
103    }
104    let brokers = self.brokers.join(",");
105    let p: FutureProducer = ClientConfig::new()
106      .set("bootstrap.servers", &brokers)
107      .set("message.timeout.ms", KAFKA_MESSAGE_TIMEOUT_MS)
108      .create()
109      .map_err(|e| {
110        MantaError::KafkaError(format!("Failed to create Kafka producer: {e}"))
111      })?;
112    // Another thread may have raced us; either value is
113    // fine since they are configured identically.
114    Ok(self.producer.get_or_init(|| p))
115  }
116}
117
118impl Audit for Kafka {
119  async fn produce_message(&self, data: &[u8]) -> Result<(), MantaError> {
120    let producer = self.get_or_init_producer()?;
121
122    let delivery_status = producer
123      .send::<Vec<u8>, _, _>(
124        FutureRecord::to(&self.topic).payload(data),
125        KAFKA_DELIVERY_WAIT,
126      )
127      .await;
128
129    match delivery_status {
130      Ok(_) => {
131        tracing::info!("Delivery status for message received");
132      }
133      Err(e) => {
134        return Err(MantaError::KafkaError(format!(
135          "Delivery status for message failed: {:?}",
136          e.0
137        )));
138      }
139    }
140
141    Ok(())
142  }
143}
144
145#[cfg(test)]
146mod tests {
147  //! Tests for the non-IO parts of [`Kafka`]: configuration plumbing,
148  //! clone semantics, and the redacted Debug representation.
149  //! `produce_message` and `get_or_init_producer` require a broker
150  //! (or a librdkafka mock) and are exercised via integration tests.
151
152  use super::*;
153
154  #[test]
155  fn new_round_trips_brokers_and_topic() {
156    let k = Kafka::new(
157      vec!["broker1:9092".into(), "broker2:9092".into()],
158      "audit-events".into(),
159    );
160    assert_eq!(k.brokers, vec!["broker1:9092", "broker2:9092"]);
161    assert_eq!(k.topic, "audit-events");
162    assert!(
163      k.producer.get().is_none(),
164      "producer must be uninitialised on construction (lazy init)"
165    );
166  }
167
168  #[test]
169  fn clone_resets_the_producer_cache() {
170    // The `Clone` impl deliberately drops the cached producer —
171    // otherwise two `Kafka` values would share rdkafka state in a
172    // way the rdkafka APIs don't sanction. A future "fix" that
173    // shares the OnceLock would break the lazy-init contract; this
174    // test makes that change deliberate.
175    let original = Kafka::new(vec!["b:9092".into()], "t".into());
176    let cloned = original.clone();
177    assert_eq!(cloned.brokers, original.brokers);
178    assert_eq!(cloned.topic, original.topic);
179    assert!(
180      cloned.producer.get().is_none(),
181      "cloned producer cache must be empty regardless of source state"
182    );
183  }
184
185  #[test]
186  fn debug_masks_the_producer_and_shows_init_state() {
187    // The Debug impl deliberately substitutes a placeholder string
188    // for the FutureProducer — librdkafka internals would otherwise
189    // appear in log lines if a Kafka value is debug-printed. Pin
190    // both the placeholder string and that brokers/topic remain
191    // visible (they're not secret).
192    let uninit = Kafka::new(vec!["b:9092".into()], "audit".into());
193    let s = format!("{uninit:?}");
194    assert!(s.contains("brokers"), "brokers field must be visible");
195    assert!(s.contains("\"b:9092\""), "broker value must be visible");
196    assert!(s.contains("audit"), "topic must be visible");
197    assert!(
198      s.contains("None"),
199      "uninitialised producer must show as `None`, got: {s}"
200    );
201    // The literal placeholder string used for an initialised producer
202    // is pinned indirectly: if `Some(<FutureProducer>)` ever leaks
203    // through to Debug output of an uninit Kafka, this assertion
204    // catches it.
205    assert!(
206      !s.contains("FutureProducer"),
207      "uninitialised Kafka must not mention FutureProducer in Debug output"
208    );
209  }
210}