near_event_stream_processor/
lib.rs

1use std::sync::Arc;
2
3use config::StreamerConfig;
4use futures::StreamExt;
5use message::StreamerMessage;
6use rdkafka::{
7    consumer::{Consumer, StreamConsumer},
8    ClientConfig,
9};
10use tokio::sync::mpsc;
11use tracing::warn;
12
13pub mod config;
14pub mod error;
15pub mod message;
16
17#[allow(clippy::type_complexity)]
18pub fn streamer(
19    config: &StreamerConfig,
20) -> anyhow::Result<(
21    tokio::task::JoinHandle<Result<(), anyhow::Error>>,
22    mpsc::Receiver<StreamerMessage>,
23)> {
24    let (sender, receiver) = mpsc::channel(config.preload_pool_size);
25
26    let sender = tokio::spawn(start(
27        sender,
28        config.build_kafka_config(),
29        config.topics.clone(),
30    ));
31
32    Ok((sender, receiver))
33}
34
35async fn start(
36    sender: mpsc::Sender<StreamerMessage>,
37    kafka_config: ClientConfig,
38    topics: Vec<String>,
39) -> anyhow::Result<()> {
40    let consumer: StreamConsumer = kafka_config.create()?;
41
42    let topics_ref: Vec<&str> = topics.iter().map(|topic| topic.as_str()).collect();
43    consumer.subscribe(&topics_ref.to_vec())?;
44
45    let consumer_arc = Arc::new(consumer);
46    let mut stream = consumer_arc.stream();
47
48    while let Some(message_result) = stream.next().await {
49        match message_result {
50            Err(e) => warn!("Kafka error: {}", e),
51            Ok(borrowed_message) => {
52                let message = borrowed_message.detach();
53                if let Err(err) = sender
54                    .send(StreamerMessage::new(message, consumer_arc.clone()))
55                    .await
56                {
57                    tracing::error!("Channel closed, exiting with error {}", err);
58                    return Ok(());
59                }
60            }
61        }
62    }
63
64    Ok(())
65}