stream_consumer_task/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3use futures::{Future, StreamExt};
4use tokio::{
5    select, spawn,
6    sync::watch::{channel, Receiver, Sender},
7    task::{JoinHandle, JoinSet},
8};
9use tracing::{trace, warn};
10
11// Stream
12
13/// A stream.
14pub trait Stream<ITEM> {
15    /// Returns next item.
16    fn next(&mut self) -> impl Future<Output = Option<ITEM>> + Send;
17}
18
19// StreamExt
20
21impl<ITEM, STREAM: StreamExt<Item = ITEM> + Send + Unpin> Stream<ITEM> for STREAM {
22    async fn next(&mut self) -> Option<ITEM> {
23        self.next().await
24    }
25}
26
27// KafkaStreamConsumerAdapter
28
29/// An adapter used to make easier [`Stream`](Stream) implementation of [`StreamConsumer`](https://docs.rs/rdkafka/latest/rdkafka/consumer/struct.StreamConsumer.html).
30#[cfg(feature = "kafka")]
31#[cfg_attr(docrs, doc(cfg(feature = "kafka")))]
32pub struct KafkaStreamConsumerAdapter(rdkafka::consumer::StreamConsumer);
33
34#[cfg(feature = "kafka")]
35impl KafkaStreamConsumerAdapter {
36    /// Creates new adapter.
37    pub fn new(csm: rdkafka::consumer::StreamConsumer) -> Self {
38        Self(csm)
39    }
40}
41
42#[cfg(feature = "kafka")]
43impl AsMut<rdkafka::consumer::StreamConsumer> for KafkaStreamConsumerAdapter {
44    fn as_mut(&mut self) -> &mut rdkafka::consumer::StreamConsumer {
45        &mut self.0
46    }
47}
48
49#[cfg(feature = "kafka")]
50impl AsRef<rdkafka::consumer::StreamConsumer> for KafkaStreamConsumerAdapter {
51    fn as_ref(&self) -> &rdkafka::consumer::StreamConsumer {
52        &self.0
53    }
54}
55
56#[cfg(feature = "kafka")]
57impl std::ops::Deref for KafkaStreamConsumerAdapter {
58    type Target = rdkafka::consumer::StreamConsumer;
59
60    fn deref(&self) -> &Self::Target {
61        &self.0
62    }
63}
64
65#[cfg(feature = "kafka")]
66impl std::ops::DerefMut for KafkaStreamConsumerAdapter {
67    fn deref_mut(&mut self) -> &mut Self::Target {
68        &mut self.0
69    }
70}
71
72#[cfg(feature = "kafka")]
73impl std::borrow::Borrow<rdkafka::consumer::StreamConsumer> for KafkaStreamConsumerAdapter {
74    fn borrow(&self) -> &rdkafka::consumer::StreamConsumer {
75        &self.0
76    }
77}
78
79#[cfg(feature = "kafka")]
80impl std::borrow::BorrowMut<rdkafka::consumer::StreamConsumer> for KafkaStreamConsumerAdapter {
81    fn borrow_mut(&mut self) -> &mut rdkafka::consumer::StreamConsumer {
82        &mut self.0
83    }
84}
85
86#[cfg(feature = "kafka")]
87impl From<rdkafka::consumer::StreamConsumer> for KafkaStreamConsumerAdapter {
88    fn from(csm: rdkafka::consumer::StreamConsumer) -> Self {
89        Self(csm)
90    }
91}
92
93#[cfg(feature = "kafka")]
94impl Stream<rdkafka::error::KafkaResult<rdkafka::message::OwnedMessage>>
95    for KafkaStreamConsumerAdapter
96{
97    async fn next(
98        &mut self,
99    ) -> Option<rdkafka::error::KafkaResult<rdkafka::message::OwnedMessage>> {
100        Some(self.0.recv().await.map(|msg| msg.detach()))
101    }
102}
103
104// StreamConsumerTask
105
106/// A task that consumes stream in backgroup.
107pub struct StreamConsumerTask {
108    stop_tx: Sender<()>,
109    task: JoinHandle<()>,
110}
111
112impl<'a> StreamConsumerTask {
113    /// Starts a new task.
114    pub fn start<ITEM, FUT: Future<Output = ()> + Send + 'static>(
115        mut stream: impl Stream<ITEM> + Send + 'static,
116        handle: impl Fn(ITEM, Receiver<()>) -> FUT + Send + 'static,
117    ) -> Self {
118        let (stop_tx, mut stop_rx) = channel(());
119        let task = spawn(async move {
120            let mut tasks = JoinSet::new();
121            loop {
122                select! {
123                    item = stream.next() => {
124                        match item {
125                            Some(item) => {
126                                trace!("spawning task to handle stream item");
127                                tasks.spawn(handle(item, stop_rx.clone()));
128                            }
129                            None => {
130                                trace!("stream is closed");
131                                break;
132                            },
133                        }
134                    }
135                    _ = stop_rx.changed() => {
136                        trace!("stop signal received");
137                        break;
138                    }
139                }
140            }
141            trace!("waiting for running tasks");
142            while let Some(res) = tasks.join_next().await {
143                if let Err(err) = res {
144                    warn!("failed to wait task: {err}");
145                }
146            }
147            trace!("consumer stopped");
148        });
149        Self { stop_tx, task }
150    }
151
152    /// Stops a task and wait for its termination.
153    pub async fn stop(self) {
154        trace!("sending stop signal");
155        if let Err(err) = self.stop_tx.send(()) {
156            warn!("failed to send stop signal: {err}");
157        }
158        trace!("waiting for running task");
159        if let Err(err) = self.task.await {
160            warn!("failed to wait task: {err}");
161        }
162    }
163}