near_event_stream_processor/
message.rs1use std::{sync::Arc, time::Duration};
2
3use rdkafka::{
4 consumer::{CommitMode, Consumer, StreamConsumer},
5 error::KafkaResult,
6 message::OwnedMessage,
7 types::RDKafkaErrorCode,
8 Message, Offset, TopicPartitionList,
9};
10use serde::de::DeserializeOwned;
11use tracing::warn;
12
13use crate::error::Error;
14
15pub struct StreamerMessage {
16 pub message: OwnedMessage,
17 consumer: Arc<StreamConsumer>,
18}
19
20impl StreamerMessage {
21 pub fn new(message: OwnedMessage, consumer: Arc<StreamConsumer>) -> Self {
22 Self { message, consumer }
23 }
24
25 pub async fn commit(&self) -> KafkaResult<()> {
26 let mut tpl = TopicPartitionList::new();
27 tpl.add_partition_offset(
28 self.message.topic(),
29 self.message.partition(),
30 Offset::Offset(self.message.offset() + 1),
31 )?;
32
33 loop {
34 let result = self.consumer.commit(&tpl, CommitMode::Sync);
35 match result {
36 Ok(()) => {
37 return Ok(());
38 }
39 Err(err) => {
40 if err.rdkafka_error_code() == Some(RDKafkaErrorCode::RebalanceInProgress) {
41 warn!("RebalanceInProgress, retry after 5s");
42 tokio::time::sleep(Duration::from_secs(5)).await;
43 continue;
44 }
45 return Err(err);
46 }
47 }
48 }
49 }
50
51 pub fn fetch_all_topics(&self) -> anyhow::Result<Vec<String>> {
52 let metadata = self.consumer.fetch_metadata(None, Duration::from_secs(1))?;
53
54 let topics: Vec<String> = metadata
55 .topics()
56 .iter()
57 .map(|t| t.name().to_string())
58 .collect();
59
60 Ok(topics)
61 }
62
63 pub fn event<T: DeserializeOwned>(&self) -> Result<T, Error> {
64 match self.message.payload_view::<str>() {
65 Some(Ok(payload)) => {
66 let event = serde_json::from_str::<T>(payload);
67 match event {
68 Ok(event) => Ok(event),
69 Err(err) => Err(Error::PayloadDeserializedError(err)),
70 }
71 }
72 Some(Err(_)) => Err(Error::PayloadIsNotString),
73 None => Err(Error::NoPayload),
74 }
75 }
76}