1use anyhow::{Context, Result, bail};
2use async_trait::async_trait;
3use rdkafka::Message;
4use rdkafka::config::ClientConfig;
5use rdkafka::consumer::{Consumer, StreamConsumer};
6use rdkafka::message::Headers;
7use serde::Deserialize;
8use serde_json::Value;
9use tokio::sync::mpsc::Sender;
10use tokio_util::sync::CancellationToken;
11
12use crate::config::{parse_config, redact_secret};
13use crate::envelope::Envelope;
14use crate::observability::trace_context::{TRACEPARENT, TRACESTATE};
15use crate::observability::{NodeCtx, SendStopped, SourceCtx};
16use crate::retry::RetryPolicy;
17use crate::sources::Source;
18
19pub struct KafkaSource {
27 id: String,
28 consumer: StreamConsumer,
29 source_ctx: SourceCtx,
30}
31
32impl KafkaSource {
33 pub fn new(
34 id: impl Into<String>,
35 brokers: &str,
36 group_id: &str,
37 topics: Vec<&str>,
38 ) -> Result<Self> {
39 let id = id.into();
40 let consumer: StreamConsumer = ClientConfig::new()
41 .set("group.id", group_id)
42 .set("bootstrap.servers", brokers)
43 .set("enable.partition.eof", "false")
44 .set("session.timeout.ms", "6000")
45 .set("enable.auto.commit", "false")
46 .create()
47 .context("failed to create kafka consumer")?;
48
49 consumer
50 .subscribe(&topics)
51 .context("failed to subscribe kafka consumer to specified topics")?;
52
53 Ok(Self {
54 source_ctx: SourceCtx::new(&id),
55 id,
56 consumer,
57 })
58 }
59}
60
61#[async_trait]
62impl Source for KafkaSource {
63 fn id(&self) -> &str {
64 &self.id
65 }
66
67 fn set_node_ctx(&mut self, ctx: NodeCtx) {
68 self.source_ctx = SourceCtx::from_node_ctx(ctx);
69 }
70
71 async fn run(self: Box<Self>, tx: Sender<Envelope>, cancel: CancellationToken) {
72 log::info!("[{}] starting kafka consumer", redact_secret(&self.id));
73 let source_ctx = self.source_ctx.clone();
74
75 loop {
76 let msg = tokio::select! {
77 _ = cancel.cancelled() => {
78 log::info!("[{}] cancelled", redact_secret(&self.id));
79 return;
80 }
81 result = self.consumer.recv() => match result {
82 Ok(m) => m,
83 Err(e) => {
84 log::error!("[{}] kafka recv error: {e}", redact_secret(&self.id));
85 continue;
86 }
87 },
88 };
89
90 let offset = msg.offset();
91 let partition = msg.partition();
92 let topic = msg.topic().to_string();
93
94 let key = msg
95 .key()
96 .and_then(|k| std::str::from_utf8(k).ok())
97 .map(str::to_owned);
98
99 let payload_bytes = match msg.payload() {
100 Some(p) => p,
101 None => {
102 log::error!(
103 "[{}] message at offset {offset} has no payload",
104 redact_secret(&self.id)
105 );
106 continue;
107 }
108 };
109
110 let payload: Value = match serde_json::from_slice(payload_bytes) {
111 Ok(v) => v,
112 Err(e) => {
113 log::error!(
114 "[{}] failed to deserialize at offset {offset}: {e}",
115 redact_secret(&self.id),
116 );
117 continue;
118 }
119 };
120
121 let mut env = Envelope::new(&self.id, payload);
122 env.meta.key = key;
123 env.meta.headers.insert("kafka.topic".into(), topic.clone());
124 env.meta
125 .headers
126 .insert("kafka.partition".into(), partition.to_string());
127 env.meta
128 .headers
129 .insert("kafka.offset".into(), offset.to_string());
130 if let Some(headers) = msg.headers() {
131 for header in headers.iter() {
132 if matches!(header.key, TRACEPARENT | TRACESTATE)
133 && let Some(value) = header.value.and_then(|v| std::str::from_utf8(v).ok())
134 {
135 env.meta
136 .headers
137 .insert(header.key.to_string(), value.to_string());
138 }
139 }
140 }
141
142 log::debug!(
143 "[{}] received topic={topic} partition={partition} offset={offset}",
144 redact_secret(&self.id),
145 topic = redact_secret(&topic),
146 );
147
148 match source_ctx.send(&tx, env, &cancel).await {
149 Ok(()) => {}
150 Err(SendStopped::Cancelled) => return,
151 Err(SendStopped::DownstreamClosed) => {
152 log::info!("[{}] downstream closed, stopping", redact_secret(&self.id));
153 return;
154 }
155 }
156 }
157 }
158}
159
160#[derive(Debug, Deserialize)]
161struct KafkaSourceConfig {
162 brokers: String,
163 group_id: String,
164 topics: Vec<String>,
165}
166
167pub fn kafka_source_factory(
173 id: &str,
174 config: Value,
175 retry: Option<RetryPolicy>,
176) -> Result<Box<dyn Source>> {
177 if retry.is_some() {
178 bail!(
179 "invalid config for component type 'kafka': retry has no effect on push-based sources"
180 );
181 }
182 let config: KafkaSourceConfig = parse_config("kafka", config)?;
183 if config.brokers.trim().is_empty() {
184 bail!("invalid config for component type 'kafka': brokers must not be empty");
185 }
186 if config.group_id.trim().is_empty() {
187 bail!("invalid config for component type 'kafka': group_id must not be empty");
188 }
189 if config.topics.is_empty() {
190 bail!("invalid config for component type 'kafka': topics must not be empty");
191 }
192 if let Some(index) = config
193 .topics
194 .iter()
195 .position(|topic| topic.trim().is_empty())
196 {
197 bail!("invalid config for component type 'kafka': topics[{index}] must not be empty");
198 }
199 let topics: Vec<_> = config.topics.iter().map(String::as_str).collect();
200 Ok(Box::new(KafkaSource::new(
201 id,
202 &config.brokers,
203 &config.group_id,
204 topics,
205 )?))
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use std::time::Duration;
212
213 use rdkafka::message::{Header, OwnedHeaders};
214 use rdkafka::producer::{FutureProducer, FutureRecord};
215 use serde_json::json;
216 use testcontainers_modules::kafka::apache::{self, KAFKA_PORT};
217 use testcontainers_modules::testcontainers::runners::AsyncRunner;
218 use tokio::sync::mpsc;
219
220 #[test]
221 fn factory_rejects_empty_topics() {
222 let err = kafka_source_factory(
223 "kafka",
224 serde_json::json!({
225 "brokers": "localhost:9092",
226 "group_id": "courier",
227 "topics": []
228 }),
229 None,
230 )
231 .err()
232 .expect("expected empty topics to fail");
233 let msg = format!("{err:#}");
234 assert!(msg.contains("topics must not be empty"), "{msg}");
235 }
236
237 #[test]
238 fn factory_rejects_retry_policy() {
239 use crate::retry::{ExhaustedPolicy, RetryPolicy};
240
241 let err = kafka_source_factory(
242 "kafka",
243 serde_json::json!({
244 "brokers": "localhost:9092",
245 "group_id": "courier",
246 "topics": ["t"]
247 }),
248 Some(RetryPolicy {
249 max_attempts: 3,
250 initial_delay_ms: 100,
251 backoff_multiplier: 2.0,
252 max_delay_ms: 1000,
253 on_exhausted: ExhaustedPolicy::Propagate,
254 }),
255 )
256 .err()
257 .expect("expected retry rejection");
258 let msg = format!("{err:#}");
259 assert!(
260 msg.contains("retry has no effect on push-based sources"),
261 "{msg}"
262 );
263 }
264
265 #[tokio::test]
266 async fn emits_envelope_from_kafka_record() -> anyhow::Result<()> {
267 let node = apache::Kafka::default().start().await?;
268 let host_port = node.get_host_port_ipv4(KAFKA_PORT).await?;
269 let brokers = format!("127.0.0.1:{host_port}");
270
271 let topic = "courier-source-test";
272
273 let producer: FutureProducer = ClientConfig::new()
279 .set("bootstrap.servers", &brokers)
280 .set("message.timeout.ms", "5000")
281 .create()?;
282
283 let source = KafkaSource::new("src", &brokers, "courier-source-group", vec![topic])?;
284 let (tx, mut rx) = mpsc::channel(8);
285 let cancel = CancellationToken::new();
286
287 let cancel_inner = cancel.clone();
288 let handle = tokio::spawn(async move {
289 Box::new(source).run(tx, cancel_inner).await;
290 });
291
292 let produce_cancel = cancel.clone();
295 let produce_handle = tokio::spawn(async move {
296 let payload = r#"{"event":"login","user":"u-1"}"#;
297 while !produce_cancel.is_cancelled() {
298 let _ = producer
299 .send(
300 FutureRecord::to(topic).key("k-1").payload(payload).headers(
301 OwnedHeaders::new().insert(Header {
302 key: TRACEPARENT,
303 value: Some(
304 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
305 ),
306 }),
307 ),
308 Duration::from_secs(5),
309 )
310 .await;
311 tokio::time::sleep(Duration::from_millis(500)).await;
312 }
313 });
314
315 let env = tokio::time::timeout(Duration::from_secs(30), rx.recv())
316 .await?
317 .expect("source closed before emitting");
318 cancel.cancel();
319 let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
320 let _ = tokio::time::timeout(Duration::from_secs(5), produce_handle).await;
321
322 assert_eq!(env.meta.key.as_deref(), Some("k-1"));
323 assert_eq!(env.payload, json!({ "event": "login", "user": "u-1" }));
324 assert_eq!(
325 env.meta.headers.get("kafka.topic").map(String::as_str),
326 Some(topic),
327 );
328 assert_eq!(
329 env.meta.headers.get("kafka.partition").map(String::as_str),
330 Some("0"),
331 );
332 assert!(
333 env.meta.headers.contains_key("kafka.offset"),
334 "missing kafka.offset header",
335 );
336 assert_eq!(
337 env.meta.headers.get(TRACEPARENT).map(String::as_str),
338 Some("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"),
339 );
340 Ok(())
341 }
342}