spawned_concurrency/tasks/
stream.rs1use crate::tasks::{GenServer, GenServerHandle};
2use futures::{future::select, Stream, StreamExt};
3use spawned_rt::tasks::JoinHandle;
4
5pub fn spawn_listener<T, S>(mut handle: GenServerHandle<T>, stream: S) -> JoinHandle<()>
12where
13 T: GenServer,
14 S: Send + Stream<Item = T::CastMsg> + 'static,
15{
16 let cancelation_token = handle.cancellation_token();
17 let join_handle = spawned_rt::tasks::spawn(async move {
18 let mut pinned_stream = core::pin::pin!(stream);
19 let is_cancelled = core::pin::pin!(cancelation_token.cancelled());
20 let listener_loop = core::pin::pin!(async {
21 loop {
22 match pinned_stream.next().await {
23 Some(msg) => match handle.cast(msg).await {
24 Ok(_) => tracing::trace!("Message sent successfully"),
25 Err(e) => {
26 tracing::error!("Failed to send message: {e:?}");
27 break;
28 }
29 },
30 None => {
31 tracing::trace!("Stream finished");
32 break;
33 }
34 }
35 }
36 });
37 match select(is_cancelled, listener_loop).await {
38 futures::future::Either::Left(_) => tracing::trace!("GenServer stopped"),
39 futures::future::Either::Right(_) => (), }
41 });
42 join_handle
43}