stream_consumer_task/
lib.rs1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3use futures::{Future, StreamExt};
4use tokio::{
5 select, spawn,
6 sync::watch::{channel, Receiver, Sender},
7 task::{JoinHandle, JoinSet},
8};
9use tracing::{trace, warn};
10
11pub trait Stream<ITEM> {
15 fn next(&mut self) -> impl Future<Output = Option<ITEM>> + Send;
17}
18
19impl<ITEM, STREAM: StreamExt<Item = ITEM> + Send + Unpin> Stream<ITEM> for STREAM {
22 async fn next(&mut self) -> Option<ITEM> {
23 self.next().await
24 }
25}
26
27#[cfg(feature = "kafka")]
31#[cfg_attr(docrs, doc(cfg(feature = "kafka")))]
32pub struct KafkaStreamConsumerAdapter(rdkafka::consumer::StreamConsumer);
33
34#[cfg(feature = "kafka")]
35impl KafkaStreamConsumerAdapter {
36 pub fn new(csm: rdkafka::consumer::StreamConsumer) -> Self {
38 Self(csm)
39 }
40}
41
42#[cfg(feature = "kafka")]
43impl AsMut<rdkafka::consumer::StreamConsumer> for KafkaStreamConsumerAdapter {
44 fn as_mut(&mut self) -> &mut rdkafka::consumer::StreamConsumer {
45 &mut self.0
46 }
47}
48
49#[cfg(feature = "kafka")]
50impl AsRef<rdkafka::consumer::StreamConsumer> for KafkaStreamConsumerAdapter {
51 fn as_ref(&self) -> &rdkafka::consumer::StreamConsumer {
52 &self.0
53 }
54}
55
56#[cfg(feature = "kafka")]
57impl std::ops::Deref for KafkaStreamConsumerAdapter {
58 type Target = rdkafka::consumer::StreamConsumer;
59
60 fn deref(&self) -> &Self::Target {
61 &self.0
62 }
63}
64
65#[cfg(feature = "kafka")]
66impl std::ops::DerefMut for KafkaStreamConsumerAdapter {
67 fn deref_mut(&mut self) -> &mut Self::Target {
68 &mut self.0
69 }
70}
71
72#[cfg(feature = "kafka")]
73impl std::borrow::Borrow<rdkafka::consumer::StreamConsumer> for KafkaStreamConsumerAdapter {
74 fn borrow(&self) -> &rdkafka::consumer::StreamConsumer {
75 &self.0
76 }
77}
78
79#[cfg(feature = "kafka")]
80impl std::borrow::BorrowMut<rdkafka::consumer::StreamConsumer> for KafkaStreamConsumerAdapter {
81 fn borrow_mut(&mut self) -> &mut rdkafka::consumer::StreamConsumer {
82 &mut self.0
83 }
84}
85
86#[cfg(feature = "kafka")]
87impl From<rdkafka::consumer::StreamConsumer> for KafkaStreamConsumerAdapter {
88 fn from(csm: rdkafka::consumer::StreamConsumer) -> Self {
89 Self(csm)
90 }
91}
92
93#[cfg(feature = "kafka")]
94impl Stream<rdkafka::error::KafkaResult<rdkafka::message::OwnedMessage>>
95 for KafkaStreamConsumerAdapter
96{
97 async fn next(
98 &mut self,
99 ) -> Option<rdkafka::error::KafkaResult<rdkafka::message::OwnedMessage>> {
100 Some(self.0.recv().await.map(|msg| msg.detach()))
101 }
102}
103
104pub struct StreamConsumerTask {
108 stop_tx: Sender<()>,
109 task: JoinHandle<()>,
110}
111
112impl<'a> StreamConsumerTask {
113 pub fn start<ITEM, FUT: Future<Output = ()> + Send + 'static>(
115 mut stream: impl Stream<ITEM> + Send + 'static,
116 handle: impl Fn(ITEM, Receiver<()>) -> FUT + Send + 'static,
117 ) -> Self {
118 let (stop_tx, mut stop_rx) = channel(());
119 let task = spawn(async move {
120 let mut tasks = JoinSet::new();
121 loop {
122 select! {
123 item = stream.next() => {
124 match item {
125 Some(item) => {
126 trace!("spawning task to handle stream item");
127 tasks.spawn(handle(item, stop_rx.clone()));
128 }
129 None => {
130 trace!("stream is closed");
131 break;
132 },
133 }
134 }
135 _ = stop_rx.changed() => {
136 trace!("stop signal received");
137 break;
138 }
139 }
140 }
141 trace!("waiting for running tasks");
142 while let Some(res) = tasks.join_next().await {
143 if let Err(err) = res {
144 warn!("failed to wait task: {err}");
145 }
146 }
147 trace!("consumer stopped");
148 });
149 Self { stop_tx, task }
150 }
151
152 pub async fn stop(self) {
154 trace!("sending stop signal");
155 if let Err(err) = self.stop_tx.send(()) {
156 warn!("failed to send stop signal: {err}");
157 }
158 trace!("waiting for running task");
159 if let Err(err) = self.task.await {
160 warn!("failed to wait task: {err}");
161 }
162 }
163}