1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::fmt::Display;
4use std::future::Future;
5use std::str::Utf8Error;
6
7use anyhow::Result;
8use async_trait::async_trait;
9use opentelemetry::global;
10use prost::DecodeError;
11use rdkafka::config::FromClientConfig;
12use rdkafka::consumer::{ConsumerContext, Rebalance};
13use rdkafka::error::{KafkaError, KafkaResult};
14use rdkafka::message::BorrowedMessage;
15use rdkafka::message::Headers;
16use rdkafka::{ClientConfig, ClientContext, Message, TopicPartitionList};
17use thiserror::Error;
18use tracing::instrument;
19use tracing::{debug, error, info, warn};
20use tracing_opentelemetry::OpenTelemetrySpanExt;
21
22use super::KafkaConfig;
23
24pub use rdkafka::consumer::{
25 CommitMode, Consumer, DefaultConsumerContext, MessageStream, StreamConsumer,
26};
27
28impl KafkaConfig {
29 #[instrument(skip_all, name = "kafka::init_consumer", fields(brokers = %self.brokers_csv, group = group_id))]
30 pub fn consumer_config<T>(&self, group_id: &str) -> KafkaResult<T>
31 where
32 T: FromClientConfig,
33 {
34 ClientConfig::new()
35 .set("group.id", group_id)
36 .set("bootstrap.servers", &self.brokers_csv)
37 .set("enable.partition.eof", "false")
38 .set(
39 "security.protocol",
40 self.security_protocol
41 .clone()
42 .unwrap_or_else(|| "ssl".to_string()),
43 )
44 .set("session.timeout.ms", "6000")
45 .set("enable.auto.commit", "false")
46 .set("auto.offset.reset", "earliest")
47 .create()
48 }
49}
50
51pub fn set_trace(message: &BorrowedMessage) -> Result<(), KakfaProcessError> {
52 if let Some(header) = message.headers() {
53 let trace_parent = std::str::from_utf8(
54 header
55 .get(0)
56 .ok_or_else(|| {
57 KakfaProcessError::ParseHeaderError("header 0 not found".to_string())
58 })?
59 .1,
60 )?;
61 let trace_state = std::str::from_utf8(
62 header
63 .get(1)
64 .ok_or_else(|| {
65 KakfaProcessError::ParseHeaderError("header 1 not found".to_string())
66 })?
67 .1,
68 )?;
69
70 let mut trace_metadata = HashMap::<String, String>::new();
71 trace_metadata.insert("traceparent".to_string(), trace_parent.to_owned());
72 trace_metadata.insert("tracestate".to_string(), trace_state.to_owned());
73
74 let parent_cx = global::get_text_map_propagator(|prop| prop.extract(&trace_metadata));
75 tracing::Span::current().set_parent(parent_cx);
76 }
77 Ok(())
78}
79
80#[async_trait]
81pub trait ConsumerExt<C = DefaultConsumerContext>: Consumer<C>
82where
83 C: ConsumerContext,
84{
85 async fn process_protobuf_and_commit<F, T, Fut, E>(
86 &self,
87 message: Result<BorrowedMessage<'_>, KafkaError>,
88 process_fn: F,
89 mode: CommitMode,
90 ) -> Result<(), KakfaProcessError>
91 where
92 T: prost::Message + Default,
93 F: Fn(T) -> Fut + Send + Sync,
94 Fut: Future<Output = Result<(), E>> + Send,
95 E: Display,
96 {
97 let message = message?;
98
99 set_trace(&message).unwrap_or_else(|err| warn!("set trace fail with error `{}`", err));
100
101 let decoded_message = decode_protobuf::<T>(&message)?;
102
103 process_fn(decoded_message)
104 .await
105 .map_err(|err| KakfaProcessError::ProcessError(err.to_string()))?;
106
107 self.commit_message(&message, mode)?;
108
109 Ok(())
110 }
111}
112
113impl<C: ConsumerContext, R> ConsumerExt<C> for StreamConsumer<C, R> {}
114
115pub async fn process_protobuf<F, T, Fut, E>(
116 message: Result<BorrowedMessage<'_>, KafkaError>,
117 process_fn: F,
118) -> Result<(), KakfaProcessError>
119where
120 T: prost::Message + Default,
121 F: Fn(T) -> Fut + Send + Sync,
122 Fut: Future<Output = Result<(), E>> + Send,
123 E: Display,
124{
125 let message = message?;
126
127 let decoded_message = decode_protobuf::<T>(&message)?;
128
129 process_fn(decoded_message)
130 .await
131 .map_err(|err| KakfaProcessError::ProcessError(err.to_string()))?;
132
133 Ok(())
134}
135
136pub fn process_error(error: KakfaProcessError) {
137 warn!(
138 "consume and process kafka message fail with error `{}`",
139 error
140 );
141}
142
143#[allow(clippy::unnecessary_lazy_evaluations)]
144fn decode_protobuf<T>(message: &BorrowedMessage<'_>) -> Result<T, KakfaProcessError>
145where
146 T: prost::Message + Default,
147{
148 let payload = message
149 .payload()
150 .ok_or_else(|| KakfaProcessError::EmptyPayload)?;
151
152 Ok(T::decode(payload)?)
153}
154
155#[derive(Error, Debug)]
156pub enum KakfaProcessError {
157 #[error("kafka error: {0}")]
158 KafkaError(#[from] KafkaError),
159 #[error("decode error: {0}")]
160 DecodeError(#[from] DecodeError),
161 #[error("utf 8 error: {0}")]
162 Utf8Error(#[from] Utf8Error),
163 #[error("No messages available right now")]
164 EmptyPayload,
165 #[error("parse header error: {0}")]
166 ParseHeaderError(String),
167 #[error("any error: {0}")]
168 ProcessError(String),
169}
170
171pub struct LoggingConsumerContext;
172
173impl ClientContext for LoggingConsumerContext {}
174
175impl ConsumerContext for LoggingConsumerContext {
176 fn pre_rebalance(&self, rebalance: &Rebalance) {
177 match rebalance {
178 Rebalance::Assign(tpl) => {
179 info!("pre rebalance: {:?}", tpl)
180 }
181 Rebalance::Revoke(tpl) => {
182 info!("pre rebalance all partitions are revoke: {:?}", tpl)
183 }
184 Rebalance::Error(e) => {
185 info!("pre rebalance error: {:?}", e)
186 }
187 }
188 }
189
190 fn post_rebalance(&self, rebalance: &Rebalance) {
191 match rebalance {
192 Rebalance::Assign(tpl) => {
193 info!("post rebalance: {:?}", tpl)
194 }
195 Rebalance::Revoke(tpl) => {
196 info!("post rebalance all partitions are revoke: {:?}", tpl)
197 }
198 Rebalance::Error(e) => {
199 info!("post rebalance error: {:?}", e)
200 }
201 }
202 }
203
204 fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {
205 match result {
206 Ok(_) => debug!("committed: {:?}", offsets),
207 Err(e) => info!("committed error: {:?}", e),
208 }
209 }
210}