avantis_utils/kafka/
consumer.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::fmt::Display;
4use std::future::Future;
5use std::str::Utf8Error;
6
7use anyhow::Result;
8use async_trait::async_trait;
9use opentelemetry::global;
10use prost::DecodeError;
11use rdkafka::config::FromClientConfig;
12use rdkafka::consumer::{ConsumerContext, Rebalance};
13use rdkafka::error::{KafkaError, KafkaResult};
14use rdkafka::message::BorrowedMessage;
15use rdkafka::message::Headers;
16use rdkafka::{ClientConfig, ClientContext, Message, TopicPartitionList};
17use thiserror::Error;
18use tracing::instrument;
19use tracing::{debug, error, info, warn};
20use tracing_opentelemetry::OpenTelemetrySpanExt;
21
22use super::KafkaConfig;
23
24pub use rdkafka::consumer::{
25    CommitMode, Consumer, DefaultConsumerContext, MessageStream, StreamConsumer,
26};
27
28impl KafkaConfig {
29    #[instrument(skip_all, name = "kafka::init_consumer", fields(brokers = %self.brokers_csv, group = group_id))]
30    pub fn consumer_config<T>(&self, group_id: &str) -> KafkaResult<T>
31    where
32        T: FromClientConfig,
33    {
34        ClientConfig::new()
35            .set("group.id", group_id)
36            .set("bootstrap.servers", &self.brokers_csv)
37            .set("enable.partition.eof", "false")
38            .set(
39                "security.protocol",
40                self.security_protocol
41                    .clone()
42                    .unwrap_or_else(|| "ssl".to_string()),
43            )
44            .set("session.timeout.ms", "6000")
45            .set("enable.auto.commit", "false")
46            .set("auto.offset.reset", "earliest")
47            .create()
48    }
49}
50
51pub fn set_trace(message: &BorrowedMessage) -> Result<(), KakfaProcessError> {
52    if let Some(header) = message.headers() {
53        let trace_parent = std::str::from_utf8(
54            header
55                .get(0)
56                .ok_or_else(|| {
57                    KakfaProcessError::ParseHeaderError("header 0 not found".to_string())
58                })?
59                .1,
60        )?;
61        let trace_state = std::str::from_utf8(
62            header
63                .get(1)
64                .ok_or_else(|| {
65                    KakfaProcessError::ParseHeaderError("header 1 not found".to_string())
66                })?
67                .1,
68        )?;
69
70        let mut trace_metadata = HashMap::<String, String>::new();
71        trace_metadata.insert("traceparent".to_string(), trace_parent.to_owned());
72        trace_metadata.insert("tracestate".to_string(), trace_state.to_owned());
73
74        let parent_cx = global::get_text_map_propagator(|prop| prop.extract(&trace_metadata));
75        tracing::Span::current().set_parent(parent_cx);
76    }
77    Ok(())
78}
79
80#[async_trait]
81pub trait ConsumerExt<C = DefaultConsumerContext>: Consumer<C>
82where
83    C: ConsumerContext,
84{
85    async fn process_protobuf_and_commit<F, T, Fut, E>(
86        &self,
87        message: Result<BorrowedMessage<'_>, KafkaError>,
88        process_fn: F,
89        mode: CommitMode,
90    ) -> Result<(), KakfaProcessError>
91    where
92        T: prost::Message + Default,
93        F: Fn(T) -> Fut + Send + Sync,
94        Fut: Future<Output = Result<(), E>> + Send,
95        E: Display,
96    {
97        let message = message?;
98
99        set_trace(&message).unwrap_or_else(|err| warn!("set trace fail with error `{}`", err));
100
101        let decoded_message = decode_protobuf::<T>(&message)?;
102
103        process_fn(decoded_message)
104            .await
105            .map_err(|err| KakfaProcessError::ProcessError(err.to_string()))?;
106
107        self.commit_message(&message, mode)?;
108
109        Ok(())
110    }
111}
112
113impl<C: ConsumerContext, R> ConsumerExt<C> for StreamConsumer<C, R> {}
114
115pub async fn process_protobuf<F, T, Fut, E>(
116    message: Result<BorrowedMessage<'_>, KafkaError>,
117    process_fn: F,
118) -> Result<(), KakfaProcessError>
119where
120    T: prost::Message + Default,
121    F: Fn(T) -> Fut + Send + Sync,
122    Fut: Future<Output = Result<(), E>> + Send,
123    E: Display,
124{
125    let message = message?;
126
127    let decoded_message = decode_protobuf::<T>(&message)?;
128
129    process_fn(decoded_message)
130        .await
131        .map_err(|err| KakfaProcessError::ProcessError(err.to_string()))?;
132
133    Ok(())
134}
135
136pub fn process_error(error: KakfaProcessError) {
137    warn!(
138        "consume and process kafka message fail with error `{}`",
139        error
140    );
141}
142
143#[allow(clippy::unnecessary_lazy_evaluations)]
144fn decode_protobuf<T>(message: &BorrowedMessage<'_>) -> Result<T, KakfaProcessError>
145where
146    T: prost::Message + Default,
147{
148    let payload = message
149        .payload()
150        .ok_or_else(|| KakfaProcessError::EmptyPayload)?;
151
152    Ok(T::decode(payload)?)
153}
154
155#[derive(Error, Debug)]
156pub enum KakfaProcessError {
157    #[error("kafka error: {0}")]
158    KafkaError(#[from] KafkaError),
159    #[error("decode error: {0}")]
160    DecodeError(#[from] DecodeError),
161    #[error("utf 8 error: {0}")]
162    Utf8Error(#[from] Utf8Error),
163    #[error("No messages available right now")]
164    EmptyPayload,
165    #[error("parse header error: {0}")]
166    ParseHeaderError(String),
167    #[error("any error: {0}")]
168    ProcessError(String),
169}
170
171pub struct LoggingConsumerContext;
172
173impl ClientContext for LoggingConsumerContext {}
174
175impl ConsumerContext for LoggingConsumerContext {
176    fn pre_rebalance(&self, rebalance: &Rebalance) {
177        match rebalance {
178            Rebalance::Assign(tpl) => {
179                info!("pre rebalance: {:?}", tpl)
180            }
181            Rebalance::Revoke(tpl) => {
182                info!("pre rebalance all partitions are revoke: {:?}", tpl)
183            }
184            Rebalance::Error(e) => {
185                info!("pre rebalance error: {:?}", e)
186            }
187        }
188    }
189
190    fn post_rebalance(&self, rebalance: &Rebalance) {
191        match rebalance {
192            Rebalance::Assign(tpl) => {
193                info!("post rebalance: {:?}", tpl)
194            }
195            Rebalance::Revoke(tpl) => {
196                info!("post rebalance all partitions are revoke: {:?}", tpl)
197            }
198            Rebalance::Error(e) => {
199                info!("post rebalance error: {:?}", e)
200            }
201        }
202    }
203
204    fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {
205        match result {
206            Ok(_) => debug!("committed: {:?}", offsets),
207            Err(e) => info!("committed error: {:?}", e),
208        }
209    }
210}