spawned_concurrency/tasks/
stream.rs

1use crate::tasks::{GenServer, GenServerHandle};
2use futures::{future::select, Stream, StreamExt};
3use spawned_rt::tasks::JoinHandle;
4
5/// Spawns a listener that listens to a stream and sends messages to a GenServer.
6///
7/// Items sent through the stream are required to be wrapped in a Result type.
8///
9/// This function returns a handle to the spawned task and a cancellation token
10/// to stop it.
11pub fn spawn_listener<T, F, S, I, E>(
12    mut handle: GenServerHandle<T>,
13    message_builder: F,
14    mut stream: S,
15) -> JoinHandle<()>
16where
17    T: GenServer + 'static,
18    F: Fn(I) -> T::CastMsg + Send + 'static + std::marker::Sync,
19    I: Send,
20    E: std::fmt::Debug + Send,
21    S: Unpin + Send + Stream<Item = Result<I, E>> + 'static,
22{
23    let cancelation_token = handle.cancellation_token();
24    let join_handle = spawned_rt::tasks::spawn(async move {
25        let result = select(
26            Box::pin(cancelation_token.cancelled()),
27            Box::pin(async {
28                loop {
29                    match stream.next().await {
30                        Some(Ok(i)) => match handle.cast(message_builder(i)).await {
31                            Ok(_) => tracing::trace!("Message sent successfully"),
32                            Err(e) => {
33                                tracing::error!("Failed to send message: {e:?}");
34                                break;
35                            }
36                        },
37                        Some(Err(e)) => {
38                            tracing::trace!("Received Error in msg {e:?}");
39                            break;
40                        }
41                        None => {
42                            tracing::trace!("Stream finished");
43                            break;
44                        }
45                    }
46                }
47            }),
48        )
49        .await;
50        match result {
51            futures::future::Either::Left(_) => tracing::trace!("GenServer stopped"),
52            futures::future::Either::Right(_) => (), // Stream finished or errored out
53        }
54    });
55    join_handle
56}