spawned_concurrency/tasks/
stream.rs

1use crate::tasks::{GenServer, GenServerHandle};
2use futures::{future::select, Stream, StreamExt};
3use spawned_rt::tasks::JoinHandle;
4
5/// Spawns a listener that listens to a stream and sends messages to a GenServer.
6///
7/// Items sent through the stream are required to be wrapped in a Result type.
8///
9/// This function returns a handle to the spawned task and a cancellation token
10/// to stop it.
11pub fn spawn_listener<T, S>(mut handle: GenServerHandle<T>, stream: S) -> JoinHandle<()>
12where
13    T: GenServer,
14    S: Send + Stream<Item = T::CastMsg> + 'static,
15{
16    let cancelation_token = handle.cancellation_token();
17    let join_handle = spawned_rt::tasks::spawn(async move {
18        let mut pinned_stream = core::pin::pin!(stream);
19        let is_cancelled = core::pin::pin!(cancelation_token.cancelled());
20        let listener_loop = core::pin::pin!(async {
21            loop {
22                match pinned_stream.next().await {
23                    Some(msg) => match handle.cast(msg).await {
24                        Ok(_) => tracing::trace!("Message sent successfully"),
25                        Err(e) => {
26                            tracing::error!("Failed to send message: {e:?}");
27                            break;
28                        }
29                    },
30                    None => {
31                        tracing::trace!("Stream finished");
32                        break;
33                    }
34                }
35            }
36        });
37        match select(is_cancelled, listener_loop).await {
38            futures::future::Either::Left(_) => tracing::trace!("GenServer stopped"),
39            futures::future::Either::Right(_) => (), // Stream finished or errored out
40        }
41    });
42    join_handle
43}