near_event_stream_processor/
lib.rs1use std::sync::Arc;
2
3use config::StreamerConfig;
4use futures::StreamExt;
5use message::StreamerMessage;
6use rdkafka::{
7 consumer::{Consumer, StreamConsumer},
8 ClientConfig,
9};
10use tokio::sync::mpsc;
11use tracing::warn;
12
13pub mod config;
14pub mod error;
15pub mod message;
16
17#[allow(clippy::type_complexity)]
18pub fn streamer(
19 config: &StreamerConfig,
20) -> anyhow::Result<(
21 tokio::task::JoinHandle<Result<(), anyhow::Error>>,
22 mpsc::Receiver<StreamerMessage>,
23)> {
24 let (sender, receiver) = mpsc::channel(config.preload_pool_size);
25
26 let sender = tokio::spawn(start(
27 sender,
28 config.build_kafka_config(),
29 config.topics.clone(),
30 ));
31
32 Ok((sender, receiver))
33}
34
35async fn start(
36 sender: mpsc::Sender<StreamerMessage>,
37 kafka_config: ClientConfig,
38 topics: Vec<String>,
39) -> anyhow::Result<()> {
40 let consumer: StreamConsumer = kafka_config.create()?;
41
42 let topics_ref: Vec<&str> = topics.iter().map(|topic| topic.as_str()).collect();
43 consumer.subscribe(&topics_ref.to_vec())?;
44
45 let consumer_arc = Arc::new(consumer);
46 let mut stream = consumer_arc.stream();
47
48 while let Some(message_result) = stream.next().await {
49 match message_result {
50 Err(e) => warn!("Kafka error: {}", e),
51 Ok(borrowed_message) => {
52 let message = borrowed_message.detach();
53 if let Err(err) = sender
54 .send(StreamerMessage::new(message, consumer_arc.clone()))
55 .await
56 {
57 tracing::error!("Channel closed, exiting with error {}", err);
58 return Ok(());
59 }
60 }
61 }
62 }
63
64 Ok(())
65}