1use std::time::Duration;
2
3use anyhow::{Context, Result, anyhow, bail};
4use async_trait::async_trait;
5use rdkafka::config::ClientConfig;
6use rdkafka::message::{Header, OwnedHeaders};
7use rdkafka::producer::{FutureProducer, FutureRecord};
8use serde::Deserialize;
9use serde_json::Value;
10
11use crate::config::{parse_config, redact_secret};
12use crate::envelope::Envelope;
13use crate::observability::trace_context::{TRACEPARENT, TRACESTATE};
14use crate::pipeline::ErrorPolicy;
15use crate::retry::RetryPolicy;
16use crate::sinks::{ManagedSink, Sink, WriteOne};
17
18pub struct KafkaSink {
22 id: String,
23 topic: String,
24 producer: FutureProducer,
25}
26
27impl KafkaSink {
28 pub fn new(id: impl Into<String>, brokers: &str, topic: impl Into<String>) -> Result<Self> {
29 let producer: FutureProducer = ClientConfig::new()
30 .set("bootstrap.servers", brokers)
31 .set("message.timeout.ms", "5000")
32 .create()
33 .context("failed to create kafka producer")?;
34
35 Ok(Self {
36 id: id.into(),
37 topic: topic.into(),
38 producer,
39 })
40 }
41}
42
43#[async_trait]
44impl WriteOne for KafkaSink {
45 fn id(&self) -> &str {
46 &self.id
47 }
48
49 async fn write(&self, env: &Envelope) -> Result<()> {
50 let key = env
51 .meta
52 .key
53 .clone()
54 .unwrap_or_else(|| env.meta.source_id.clone());
55 let payload = serde_json::to_string(&env.payload)?;
56
57 let mut record = FutureRecord::to(&self.topic).key(&key).payload(&payload);
58 let mut headers = OwnedHeaders::new();
59 let mut has_trace_headers = false;
60 for header_key in [TRACEPARENT, TRACESTATE] {
61 if let Some(value) = env.meta.headers.get(header_key) {
62 headers = headers.insert(Header {
63 key: header_key,
64 value: Some(value.as_str()),
65 });
66 has_trace_headers = true;
67 }
68 }
69 if has_trace_headers {
70 record = record.headers(headers);
71 }
72 match self.producer.send(record, Duration::from_secs(0)).await {
73 Ok(status) => {
74 log::debug!(
75 "[{}] delivered to topic={} partition={} offset={}",
76 redact_secret(&self.id),
77 redact_secret(&self.topic),
78 status.partition,
79 status.offset,
80 );
81 Ok(())
82 }
83 Err((e, _)) => Err(anyhow!("kafka delivery failed: {e:?}")),
84 }
85 }
86}
87
88#[derive(Debug, Deserialize)]
89struct KafkaSinkConfig {
90 brokers: String,
91 topic: String,
92}
93
94pub fn kafka_sink_factory(
100 id: &str,
101 config: Value,
102 on_error: ErrorPolicy,
103 retry: Option<RetryPolicy>,
104) -> Result<Box<dyn Sink>> {
105 let config: KafkaSinkConfig = parse_config("kafka", config)?;
106 if config.brokers.trim().is_empty() {
107 bail!("invalid config for component type 'kafka': brokers must not be empty");
108 }
109 if config.topic.trim().is_empty() {
110 bail!("invalid config for component type 'kafka': topic must not be empty");
111 }
112 let kafka = KafkaSink::new(id, &config.brokers, config.topic)?;
113 let mut sink = ManagedSink::new(kafka).with_error_policy(on_error);
114 if let Some(policy) = retry {
115 sink = sink.with_retry(policy);
116 }
117 Ok(Box::new(sink))
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use rdkafka::Message;
124 use rdkafka::consumer::{Consumer, StreamConsumer};
125 use rdkafka::message::Headers;
126 use serde_json::json;
127 use testcontainers_modules::kafka::apache::{self, KAFKA_PORT};
128 use testcontainers_modules::testcontainers::runners::AsyncRunner;
129
130 use crate::sinks::WriteOne;
131
132 #[test]
133 fn factory_rejects_empty_topic() {
134 let err = kafka_sink_factory(
135 "kafka",
136 serde_json::json!({
137 "brokers": "localhost:9092",
138 "topic": ""
139 }),
140 ErrorPolicy::Drop,
141 None,
142 )
143 .err()
144 .expect("expected empty topic to fail");
145 let msg = format!("{err:#}");
146 assert!(msg.contains("topic must not be empty"), "{msg}");
147 }
148
149 #[tokio::test]
150 async fn delivers_payload_and_key_with_source_id_fallback() -> anyhow::Result<()> {
151 let node = apache::Kafka::default().start().await?;
152 let host_port = node.get_host_port_ipv4(KAFKA_PORT).await?;
153 let brokers = format!("127.0.0.1:{host_port}");
154
155 let topic = "courier-sink-test";
156 let sink = KafkaSink::new("kafka-sink", &brokers, topic)?;
157
158 let mut e1 = Envelope::new("src-1", json!({ "hello": "world" }));
160 e1.meta.key = Some("k-1".into());
161 e1.meta.headers.insert(
162 TRACEPARENT.to_string(),
163 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
164 );
165 sink.write(&e1).await?;
166
167 let e2 = Envelope::new("src-2", json!({ "n": 42 }));
169 assert!(e2.meta.key.is_none());
170 sink.write(&e2).await?;
171
172 let consumer: StreamConsumer = ClientConfig::new()
173 .set("bootstrap.servers", &brokers)
174 .set("group.id", "courier-sink-test-consumer")
175 .set("auto.offset.reset", "earliest")
176 .set("enable.auto.commit", "false")
177 .create()?;
178 consumer.subscribe(&[topic])?;
179
180 let mut received = Vec::new();
181 let mut traceparent = None;
182 for _ in 0..2 {
183 let msg = tokio::time::timeout(Duration::from_secs(30), consumer.recv()).await??;
184 let key = msg.key().map(|k| String::from_utf8_lossy(k).into_owned());
185 let payload: serde_json::Value = serde_json::from_slice(msg.payload().unwrap())?;
186 if key.as_deref() == Some("k-1") {
187 traceparent = msg.headers().and_then(|headers| {
188 headers.iter().find_map(|header| {
189 if header.key == TRACEPARENT {
190 header
191 .value
192 .and_then(|value| std::str::from_utf8(value).ok())
193 .map(str::to_string)
194 } else {
195 None
196 }
197 })
198 });
199 }
200 received.push((key, payload));
201 }
202
203 assert!(
204 received
205 .iter()
206 .any(|(k, p)| k.as_deref() == Some("k-1") && p == &json!({ "hello": "world" })),
207 "missing explicit-key message in {received:?}",
208 );
209 assert_eq!(
210 traceparent.as_deref(),
211 Some("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"),
212 );
213 assert!(
214 received
215 .iter()
216 .any(|(k, p)| k.as_deref() == Some("src-2") && p == &json!({ "n": 42 })),
217 "missing source-id-fallback message in {received:?}",
218 );
219 Ok(())
220 }
221}