avantis_utils/kafka/
producer.rs1use std::collections::HashMap;
2
3use anyhow::Error;
4use opentelemetry::global;
5use rdkafka::config::FromClientConfig;
6use rdkafka::error::{KafkaError, KafkaResult};
7use rdkafka::message::{OwnedHeaders, OwnedMessage};
8use rdkafka::ClientConfig;
9use tracing::instrument;
10use tracing::warn;
11use tracing_opentelemetry::OpenTelemetrySpanExt;
12
13use super::KafkaConfig;
14
15pub use rdkafka::producer::{FutureProducer, FutureRecord};
16pub use rdkafka::util::Timeout;
17
18pub fn with_trace_header(
19 record: FutureRecord<'_, String, [u8]>,
20) -> Result<FutureRecord<'_, String, [u8]>, Error> {
21 Ok(record.headers(create_tracing_header()))
22}
23
24fn create_tracing_header() -> OwnedHeaders {
25 let cx = tracing::Span::current().context();
26 let mut trace_metadata = HashMap::new();
27 global::get_text_map_propagator(|propagator| {
28 propagator.inject_context(&cx, &mut trace_metadata)
29 });
30
31 let mut headers = OwnedHeaders::new();
32
33 if let Some(traceparent) = trace_metadata.get("traceparent") {
34 headers = headers.add("traceparent", traceparent);
35 } else {
36 warn!("trace metadata don't have traceparent");
37 }
38
39 if let Some(tracestate) = trace_metadata.get("tracestate") {
40 headers = headers.add("tracestate", tracestate);
41 } else {
42 warn!("trace metadata don't have tracestate");
43 }
44
45 headers
46}
47
48impl KafkaConfig {
49 #[instrument(skip_all, name = "kafka::init_producer", fields(brokers = %self.brokers_csv))]
50 pub fn producer_config<T>(&self) -> KafkaResult<T>
51 where
52 T: FromClientConfig,
53 {
54 ClientConfig::new()
55 .set("bootstrap.servers", &self.brokers_csv)
56 .set("message.timeout.ms", "30000")
57 .set(
58 "security.protocol",
59 self.security_protocol
60 .clone()
61 .unwrap_or_else(|| "ssl".to_string()),
62 )
63 .set_log_level(rdkafka::config::RDKafkaLogLevel::Debug)
64 .create()
66 }
67}
68
69pub fn process_error((error, message): (KafkaError, OwnedMessage)) -> (i32, i64) {
70 warn!(
71 "send kafka fail for message: `{:?}` with error `{}`",
72 message, error
73 );
74 (-1, -1)
75}