near_event_stream_processor/
message.rs

1use std::{sync::Arc, time::Duration};
2
3use rdkafka::{
4    consumer::{CommitMode, Consumer, StreamConsumer},
5    error::KafkaResult,
6    message::OwnedMessage,
7    types::RDKafkaErrorCode,
8    Message, Offset, TopicPartitionList,
9};
10use serde::de::DeserializeOwned;
11use tracing::warn;
12
13use crate::error::Error;
14
15pub struct StreamerMessage {
16    pub message: OwnedMessage,
17    consumer: Arc<StreamConsumer>,
18}
19
20impl StreamerMessage {
21    pub fn new(message: OwnedMessage, consumer: Arc<StreamConsumer>) -> Self {
22        Self { message, consumer }
23    }
24
25    pub async fn commit(&self) -> KafkaResult<()> {
26        let mut tpl = TopicPartitionList::new();
27        tpl.add_partition_offset(
28            self.message.topic(),
29            self.message.partition(),
30            Offset::Offset(self.message.offset() + 1),
31        )?;
32
33        loop {
34            let result = self.consumer.commit(&tpl, CommitMode::Sync);
35            match result {
36                Ok(()) => {
37                    return Ok(());
38                }
39                Err(err) => {
40                    if err.rdkafka_error_code() == Some(RDKafkaErrorCode::RebalanceInProgress) {
41                        warn!("RebalanceInProgress, retry after 5s");
42                        tokio::time::sleep(Duration::from_secs(5)).await;
43                        continue;
44                    }
45                    return Err(err);
46                }
47            }
48        }
49    }
50
51    pub fn fetch_all_topics(&self) -> anyhow::Result<Vec<String>> {
52        let metadata = self.consumer.fetch_metadata(None, Duration::from_secs(1))?;
53
54        let topics: Vec<String> = metadata
55            .topics()
56            .iter()
57            .map(|t| t.name().to_string())
58            .collect();
59
60        Ok(topics)
61    }
62
63    pub fn event<T: DeserializeOwned>(&self) -> Result<T, Error> {
64        match self.message.payload_view::<str>() {
65            Some(Ok(payload)) => {
66                let event = serde_json::from_str::<T>(payload);
67                match event {
68                    Ok(event) => Ok(event),
69                    Err(err) => Err(Error::PayloadDeserializedError(err)),
70                }
71            }
72            Some(Err(_)) => Err(Error::PayloadIsNotString),
73            None => Err(Error::NoPayload),
74        }
75    }
76}