avantis_utils/kafka/
producer.rs

1use std::collections::HashMap;
2
3use anyhow::Error;
4use opentelemetry::global;
5use rdkafka::config::FromClientConfig;
6use rdkafka::error::{KafkaError, KafkaResult};
7use rdkafka::message::{OwnedHeaders, OwnedMessage};
8use rdkafka::ClientConfig;
9use tracing::instrument;
10use tracing::warn;
11use tracing_opentelemetry::OpenTelemetrySpanExt;
12
13use super::KafkaConfig;
14
15pub use rdkafka::producer::{FutureProducer, FutureRecord};
16pub use rdkafka::util::Timeout;
17
18pub fn with_trace_header(
19    record: FutureRecord<'_, String, [u8]>,
20) -> Result<FutureRecord<'_, String, [u8]>, Error> {
21    Ok(record.headers(create_tracing_header()))
22}
23
24fn create_tracing_header() -> OwnedHeaders {
25    let cx = tracing::Span::current().context();
26    let mut trace_metadata = HashMap::new();
27    global::get_text_map_propagator(|propagator| {
28        propagator.inject_context(&cx, &mut trace_metadata)
29    });
30
31    let mut headers = OwnedHeaders::new();
32
33    if let Some(traceparent) = trace_metadata.get("traceparent") {
34        headers = headers.add("traceparent", traceparent);
35    } else {
36        warn!("trace metadata don't have traceparent");
37    }
38
39    if let Some(tracestate) = trace_metadata.get("tracestate") {
40        headers = headers.add("tracestate", tracestate);
41    } else {
42        warn!("trace metadata don't have tracestate");
43    }
44
45    headers
46}
47
48impl KafkaConfig {
49    #[instrument(skip_all, name = "kafka::init_producer", fields(brokers = %self.brokers_csv))]
50    pub fn producer_config<T>(&self) -> KafkaResult<T>
51    where
52        T: FromClientConfig,
53    {
54        ClientConfig::new()
55            .set("bootstrap.servers", &self.brokers_csv)
56            .set("message.timeout.ms", "30000")
57            .set(
58                "security.protocol",
59                self.security_protocol
60                    .clone()
61                    .unwrap_or_else(|| "ssl".to_string()),
62            )
63            .set_log_level(rdkafka::config::RDKafkaLogLevel::Debug)
64            // .set("log.connection.close", "false")
65            .create()
66    }
67}
68
69pub fn process_error((error, message): (KafkaError, OwnedMessage)) -> (i32, i64) {
70    warn!(
71        "send kafka fail for message: `{:?}` with error `{}`",
72        message, error
73    );
74    (-1, -1)
75}