Skip to main content

courier/sinks/
kafka.rs

1use std::time::Duration;
2
3use anyhow::{Context, Result, anyhow, bail};
4use async_trait::async_trait;
5use rdkafka::config::ClientConfig;
6use rdkafka::message::{Header, OwnedHeaders};
7use rdkafka::producer::{FutureProducer, FutureRecord};
8use serde::Deserialize;
9use serde_json::Value;
10
11use crate::config::{parse_config, redact_secret};
12use crate::envelope::Envelope;
13use crate::observability::trace_context::{TRACEPARENT, TRACESTATE};
14use crate::pipeline::ErrorPolicy;
15use crate::retry::RetryPolicy;
16use crate::sinks::{ManagedSink, Sink, WriteOne};
17
18/// Kafka producer sink. Serializes the envelope payload as JSON and sends
19/// it to the configured topic. Uses `meta.key` as the record key when set,
20/// falling back to `meta.source_id` otherwise.
21pub struct KafkaSink {
22    id: String,
23    topic: String,
24    producer: FutureProducer,
25}
26
27impl KafkaSink {
28    pub fn new(id: impl Into<String>, brokers: &str, topic: impl Into<String>) -> Result<Self> {
29        let producer: FutureProducer = ClientConfig::new()
30            .set("bootstrap.servers", brokers)
31            .set("message.timeout.ms", "5000")
32            .create()
33            .context("failed to create kafka producer")?;
34
35        Ok(Self {
36            id: id.into(),
37            topic: topic.into(),
38            producer,
39        })
40    }
41}
42
43#[async_trait]
44impl WriteOne for KafkaSink {
45    fn id(&self) -> &str {
46        &self.id
47    }
48
49    async fn write(&self, env: &Envelope) -> Result<()> {
50        let key = env
51            .meta
52            .key
53            .clone()
54            .unwrap_or_else(|| env.meta.source_id.clone());
55        let payload = serde_json::to_string(&env.payload)?;
56
57        let mut record = FutureRecord::to(&self.topic).key(&key).payload(&payload);
58        let mut headers = OwnedHeaders::new();
59        let mut has_trace_headers = false;
60        for header_key in [TRACEPARENT, TRACESTATE] {
61            if let Some(value) = env.meta.headers.get(header_key) {
62                headers = headers.insert(Header {
63                    key: header_key,
64                    value: Some(value.as_str()),
65                });
66                has_trace_headers = true;
67            }
68        }
69        if has_trace_headers {
70            record = record.headers(headers);
71        }
72        match self.producer.send(record, Duration::from_secs(0)).await {
73            Ok(status) => {
74                log::debug!(
75                    "[{}] delivered to topic={} partition={} offset={}",
76                    redact_secret(&self.id),
77                    redact_secret(&self.topic),
78                    status.partition,
79                    status.offset,
80                );
81                Ok(())
82            }
83            Err((e, _)) => Err(anyhow!("kafka delivery failed: {e:?}")),
84        }
85    }
86}
87
88#[derive(Debug, Deserialize)]
89struct KafkaSinkConfig {
90    brokers: String,
91    topic: String,
92}
93
94/// Registry factory for [`KafkaSink`]. Registered by
95/// `courier::registry::register_builtin` under kind `"kafka"`.
96///
97/// Retry and error policy are managed centrally by the registry and applied
98/// to every sink uniformly — no per-sink config needed.
99pub fn kafka_sink_factory(
100    id: &str,
101    config: Value,
102    on_error: ErrorPolicy,
103    retry: Option<RetryPolicy>,
104) -> Result<Box<dyn Sink>> {
105    let config: KafkaSinkConfig = parse_config("kafka", config)?;
106    if config.brokers.trim().is_empty() {
107        bail!("invalid config for component type 'kafka': brokers must not be empty");
108    }
109    if config.topic.trim().is_empty() {
110        bail!("invalid config for component type 'kafka': topic must not be empty");
111    }
112    let kafka = KafkaSink::new(id, &config.brokers, config.topic)?;
113    let mut sink = ManagedSink::new(kafka).with_error_policy(on_error);
114    if let Some(policy) = retry {
115        sink = sink.with_retry(policy);
116    }
117    Ok(Box::new(sink))
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use rdkafka::Message;
124    use rdkafka::consumer::{Consumer, StreamConsumer};
125    use rdkafka::message::Headers;
126    use serde_json::json;
127    use testcontainers_modules::kafka::apache::{self, KAFKA_PORT};
128    use testcontainers_modules::testcontainers::runners::AsyncRunner;
129
130    use crate::sinks::WriteOne;
131
132    #[test]
133    fn factory_rejects_empty_topic() {
134        let err = kafka_sink_factory(
135            "kafka",
136            serde_json::json!({
137                "brokers": "localhost:9092",
138                "topic": ""
139            }),
140            ErrorPolicy::Drop,
141            None,
142        )
143        .err()
144        .expect("expected empty topic to fail");
145        let msg = format!("{err:#}");
146        assert!(msg.contains("topic must not be empty"), "{msg}");
147    }
148
149    #[tokio::test]
150    async fn delivers_payload_and_key_with_source_id_fallback() -> anyhow::Result<()> {
151        let node = apache::Kafka::default().start().await?;
152        let host_port = node.get_host_port_ipv4(KAFKA_PORT).await?;
153        let brokers = format!("127.0.0.1:{host_port}");
154
155        let topic = "courier-sink-test";
156        let sink = KafkaSink::new("kafka-sink", &brokers, topic)?;
157
158        // Explicit meta.key -> used as record key.
159        let mut e1 = Envelope::new("src-1", json!({ "hello": "world" }));
160        e1.meta.key = Some("k-1".into());
161        e1.meta.headers.insert(
162            TRACEPARENT.to_string(),
163            "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
164        );
165        sink.write(&e1).await?;
166
167        // No meta.key -> falls back to meta.source_id.
168        let e2 = Envelope::new("src-2", json!({ "n": 42 }));
169        assert!(e2.meta.key.is_none());
170        sink.write(&e2).await?;
171
172        let consumer: StreamConsumer = ClientConfig::new()
173            .set("bootstrap.servers", &brokers)
174            .set("group.id", "courier-sink-test-consumer")
175            .set("auto.offset.reset", "earliest")
176            .set("enable.auto.commit", "false")
177            .create()?;
178        consumer.subscribe(&[topic])?;
179
180        let mut received = Vec::new();
181        let mut traceparent = None;
182        for _ in 0..2 {
183            let msg = tokio::time::timeout(Duration::from_secs(30), consumer.recv()).await??;
184            let key = msg.key().map(|k| String::from_utf8_lossy(k).into_owned());
185            let payload: serde_json::Value = serde_json::from_slice(msg.payload().unwrap())?;
186            if key.as_deref() == Some("k-1") {
187                traceparent = msg.headers().and_then(|headers| {
188                    headers.iter().find_map(|header| {
189                        if header.key == TRACEPARENT {
190                            header
191                                .value
192                                .and_then(|value| std::str::from_utf8(value).ok())
193                                .map(str::to_string)
194                        } else {
195                            None
196                        }
197                    })
198                });
199            }
200            received.push((key, payload));
201        }
202
203        assert!(
204            received
205                .iter()
206                .any(|(k, p)| k.as_deref() == Some("k-1") && p == &json!({ "hello": "world" })),
207            "missing explicit-key message in {received:?}",
208        );
209        assert_eq!(
210            traceparent.as_deref(),
211            Some("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"),
212        );
213        assert!(
214            received
215                .iter()
216                .any(|(k, p)| k.as_deref() == Some("src-2") && p == &json!({ "n": 42 })),
217            "missing source-id-fallback message in {received:?}",
218        );
219        Ok(())
220    }
221}