Skip to main content

courier/sources/
kafka.rs

1use anyhow::{Context, Result, bail};
2use async_trait::async_trait;
3use rdkafka::Message;
4use rdkafka::config::ClientConfig;
5use rdkafka::consumer::{Consumer, StreamConsumer};
6use rdkafka::message::Headers;
7use serde::Deserialize;
8use serde_json::Value;
9use tokio::sync::mpsc::Sender;
10use tokio_util::sync::CancellationToken;
11
12use crate::config::{parse_config, redact_secret};
13use crate::envelope::Envelope;
14use crate::observability::trace_context::{TRACEPARENT, TRACESTATE};
15use crate::observability::{NodeCtx, SendStopped, SourceCtx};
16use crate::retry::RetryPolicy;
17use crate::sources::Source;
18
19/// Kafka consumer source. Deserializes each record's payload as JSON and
20/// emits it as the envelope payload. Populates `meta.key` from the record
21/// key and records `kafka.topic`, `kafka.partition`, `kafka.offset` in
22/// `meta.headers` for downstream debugging.
23///
24/// Auto-commit is disabled; offsets are not committed by this source
25/// (at-least-once semantics rely on downstream acking — TODO).
26pub struct KafkaSource {
27    id: String,
28    consumer: StreamConsumer,
29    source_ctx: SourceCtx,
30}
31
32impl KafkaSource {
33    pub fn new(
34        id: impl Into<String>,
35        brokers: &str,
36        group_id: &str,
37        topics: Vec<&str>,
38    ) -> Result<Self> {
39        let id = id.into();
40        let consumer: StreamConsumer = ClientConfig::new()
41            .set("group.id", group_id)
42            .set("bootstrap.servers", brokers)
43            .set("enable.partition.eof", "false")
44            .set("session.timeout.ms", "6000")
45            .set("enable.auto.commit", "false")
46            .create()
47            .context("failed to create kafka consumer")?;
48
49        consumer
50            .subscribe(&topics)
51            .context("failed to subscribe kafka consumer to specified topics")?;
52
53        Ok(Self {
54            source_ctx: SourceCtx::new(&id),
55            id,
56            consumer,
57        })
58    }
59}
60
61#[async_trait]
62impl Source for KafkaSource {
63    fn id(&self) -> &str {
64        &self.id
65    }
66
67    fn set_node_ctx(&mut self, ctx: NodeCtx) {
68        self.source_ctx = SourceCtx::from_node_ctx(ctx);
69    }
70
71    async fn run(self: Box<Self>, tx: Sender<Envelope>, cancel: CancellationToken) {
72        log::info!("[{}] starting kafka consumer", redact_secret(&self.id));
73        let source_ctx = self.source_ctx.clone();
74
75        loop {
76            let msg = tokio::select! {
77                _ = cancel.cancelled() => {
78                    log::info!("[{}] cancelled", redact_secret(&self.id));
79                    return;
80                }
81                result = self.consumer.recv() => match result {
82                    Ok(m) => m,
83                    Err(e) => {
84                        log::error!("[{}] kafka recv error: {e}", redact_secret(&self.id));
85                        continue;
86                    }
87                },
88            };
89
90            let offset = msg.offset();
91            let partition = msg.partition();
92            let topic = msg.topic().to_string();
93
94            let key = msg
95                .key()
96                .and_then(|k| std::str::from_utf8(k).ok())
97                .map(str::to_owned);
98
99            let payload_bytes = match msg.payload() {
100                Some(p) => p,
101                None => {
102                    log::error!(
103                        "[{}] message at offset {offset} has no payload",
104                        redact_secret(&self.id)
105                    );
106                    continue;
107                }
108            };
109
110            let payload: Value = match serde_json::from_slice(payload_bytes) {
111                Ok(v) => v,
112                Err(e) => {
113                    log::error!(
114                        "[{}] failed to deserialize at offset {offset}: {e}",
115                        redact_secret(&self.id),
116                    );
117                    continue;
118                }
119            };
120
121            let mut env = Envelope::new(&self.id, payload);
122            env.meta.key = key;
123            env.meta.headers.insert("kafka.topic".into(), topic.clone());
124            env.meta
125                .headers
126                .insert("kafka.partition".into(), partition.to_string());
127            env.meta
128                .headers
129                .insert("kafka.offset".into(), offset.to_string());
130            if let Some(headers) = msg.headers() {
131                for header in headers.iter() {
132                    if matches!(header.key, TRACEPARENT | TRACESTATE)
133                        && let Some(value) = header.value.and_then(|v| std::str::from_utf8(v).ok())
134                    {
135                        env.meta
136                            .headers
137                            .insert(header.key.to_string(), value.to_string());
138                    }
139                }
140            }
141
142            log::debug!(
143                "[{}] received topic={topic} partition={partition} offset={offset}",
144                redact_secret(&self.id),
145                topic = redact_secret(&topic),
146            );
147
148            match source_ctx.send(&tx, env, &cancel).await {
149                Ok(()) => {}
150                Err(SendStopped::Cancelled) => return,
151                Err(SendStopped::DownstreamClosed) => {
152                    log::info!("[{}] downstream closed, stopping", redact_secret(&self.id));
153                    return;
154                }
155            }
156        }
157    }
158}
159
160#[derive(Debug, Deserialize)]
161struct KafkaSourceConfig {
162    brokers: String,
163    group_id: String,
164    topics: Vec<String>,
165}
166
167/// Registry factory for [`KafkaSource`]. Registered by
168/// `courier::registry::register_builtin` under kind `"kafka"`.
169///
170/// `retry` is rejected at config time: kafka consumers are push-based
171/// (the broker drives delivery), so a retry/backoff knob has no role here.
172pub fn kafka_source_factory(
173    id: &str,
174    config: Value,
175    retry: Option<RetryPolicy>,
176) -> Result<Box<dyn Source>> {
177    if retry.is_some() {
178        bail!(
179            "invalid config for component type 'kafka': retry has no effect on push-based sources"
180        );
181    }
182    let config: KafkaSourceConfig = parse_config("kafka", config)?;
183    if config.brokers.trim().is_empty() {
184        bail!("invalid config for component type 'kafka': brokers must not be empty");
185    }
186    if config.group_id.trim().is_empty() {
187        bail!("invalid config for component type 'kafka': group_id must not be empty");
188    }
189    if config.topics.is_empty() {
190        bail!("invalid config for component type 'kafka': topics must not be empty");
191    }
192    if let Some(index) = config
193        .topics
194        .iter()
195        .position(|topic| topic.trim().is_empty())
196    {
197        bail!("invalid config for component type 'kafka': topics[{index}] must not be empty");
198    }
199    let topics: Vec<_> = config.topics.iter().map(String::as_str).collect();
200    Ok(Box::new(KafkaSource::new(
201        id,
202        &config.brokers,
203        &config.group_id,
204        topics,
205    )?))
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use std::time::Duration;
212
213    use rdkafka::message::{Header, OwnedHeaders};
214    use rdkafka::producer::{FutureProducer, FutureRecord};
215    use serde_json::json;
216    use testcontainers_modules::kafka::apache::{self, KAFKA_PORT};
217    use testcontainers_modules::testcontainers::runners::AsyncRunner;
218    use tokio::sync::mpsc;
219
220    #[test]
221    fn factory_rejects_empty_topics() {
222        let err = kafka_source_factory(
223            "kafka",
224            serde_json::json!({
225                "brokers": "localhost:9092",
226                "group_id": "courier",
227                "topics": []
228            }),
229            None,
230        )
231        .err()
232        .expect("expected empty topics to fail");
233        let msg = format!("{err:#}");
234        assert!(msg.contains("topics must not be empty"), "{msg}");
235    }
236
237    #[test]
238    fn factory_rejects_retry_policy() {
239        use crate::retry::{ExhaustedPolicy, RetryPolicy};
240
241        let err = kafka_source_factory(
242            "kafka",
243            serde_json::json!({
244                "brokers": "localhost:9092",
245                "group_id": "courier",
246                "topics": ["t"]
247            }),
248            Some(RetryPolicy {
249                max_attempts: 3,
250                initial_delay_ms: 100,
251                backoff_multiplier: 2.0,
252                max_delay_ms: 1000,
253                on_exhausted: ExhaustedPolicy::Propagate,
254            }),
255        )
256        .err()
257        .expect("expected retry rejection");
258        let msg = format!("{err:#}");
259        assert!(
260            msg.contains("retry has no effect on push-based sources"),
261            "{msg}"
262        );
263    }
264
265    #[tokio::test]
266    async fn emits_envelope_from_kafka_record() -> anyhow::Result<()> {
267        let node = apache::Kafka::default().start().await?;
268        let host_port = node.get_host_port_ipv4(KAFKA_PORT).await?;
269        let brokers = format!("127.0.0.1:{host_port}");
270
271        let topic = "courier-source-test";
272
273        // Pre-create the topic by producing (and consuming a throwaway) so
274        // that KafkaSource's subscribe succeeds immediately. We cannot
275        // produce the real message yet: KafkaSource uses the default
276        // `auto.offset.reset=latest`, so a fresh group ignores records
277        // written before it joined.
278        let producer: FutureProducer = ClientConfig::new()
279            .set("bootstrap.servers", &brokers)
280            .set("message.timeout.ms", "5000")
281            .create()?;
282
283        let source = KafkaSource::new("src", &brokers, "courier-source-group", vec![topic])?;
284        let (tx, mut rx) = mpsc::channel(8);
285        let cancel = CancellationToken::new();
286
287        let cancel_inner = cancel.clone();
288        let handle = tokio::spawn(async move {
289            Box::new(source).run(tx, cancel_inner).await;
290        });
291
292        // Produce in a loop until something lands; KafkaSource starts from
293        // `latest`, so early sends may arrive before the group is assigned.
294        let produce_cancel = cancel.clone();
295        let produce_handle = tokio::spawn(async move {
296            let payload = r#"{"event":"login","user":"u-1"}"#;
297            while !produce_cancel.is_cancelled() {
298                let _ = producer
299                    .send(
300                        FutureRecord::to(topic).key("k-1").payload(payload).headers(
301                            OwnedHeaders::new().insert(Header {
302                                key: TRACEPARENT,
303                                value: Some(
304                                    "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
305                                ),
306                            }),
307                        ),
308                        Duration::from_secs(5),
309                    )
310                    .await;
311                tokio::time::sleep(Duration::from_millis(500)).await;
312            }
313        });
314
315        let env = tokio::time::timeout(Duration::from_secs(30), rx.recv())
316            .await?
317            .expect("source closed before emitting");
318        cancel.cancel();
319        let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
320        let _ = tokio::time::timeout(Duration::from_secs(5), produce_handle).await;
321
322        assert_eq!(env.meta.key.as_deref(), Some("k-1"));
323        assert_eq!(env.payload, json!({ "event": "login", "user": "u-1" }));
324        assert_eq!(
325            env.meta.headers.get("kafka.topic").map(String::as_str),
326            Some(topic),
327        );
328        assert_eq!(
329            env.meta.headers.get("kafka.partition").map(String::as_str),
330            Some("0"),
331        );
332        assert!(
333            env.meta.headers.contains_key("kafka.offset"),
334            "missing kafka.offset header",
335        );
336        assert_eq!(
337            env.meta.headers.get(TRACEPARENT).map(String::as_str),
338            Some("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"),
339        );
340        Ok(())
341    }
342}