spawned_concurrency/tasks/
stream.rs

1use crate::tasks::{GenServer, GenServerHandle};
2use futures::{future::select, Stream, StreamExt};
3use spawned_rt::tasks::{CancellationToken, JoinHandle};
4
5/// Spawns a listener that listens to a stream and sends messages to a GenServer.
6///
7/// Items sent through the stream are required to be wrapped in a Result type.
8///
9/// This function returns a handle to the spawned task and a cancellation token
10/// to stop it.
11pub fn spawn_listener<T, F, S, I, E>(
12    mut handle: GenServerHandle<T>,
13    message_builder: F,
14    mut stream: S,
15) -> (JoinHandle<()>, CancellationToken)
16where
17    T: GenServer + 'static,
18    F: Fn(I) -> T::CastMsg + Send + 'static + std::marker::Sync,
19    I: Send,
20    E: std::fmt::Debug + Send,
21    S: Unpin + Send + Stream<Item = Result<I, E>> + 'static,
22{
23    let cancelation_token = CancellationToken::new();
24    let cloned_token = cancelation_token.clone();
25    let join_handle = spawned_rt::tasks::spawn(async move {
26        let result = select(
27            Box::pin(cloned_token.cancelled()),
28            Box::pin(async {
29                loop {
30                    match stream.next().await {
31                        Some(Ok(i)) => match handle.cast(message_builder(i)).await {
32                            Ok(_) => tracing::trace!("Message sent successfully"),
33                            Err(e) => {
34                                tracing::error!("Failed to send message: {e:?}");
35                                break;
36                            }
37                        },
38                        Some(Err(e)) => {
39                            tracing::trace!("Received Error in msg {e:?}");
40                            break;
41                        }
42                        None => {
43                            tracing::trace!("Stream finished");
44                            break;
45                        }
46                    }
47                }
48            }),
49        )
50        .await;
51        match result {
52            futures::future::Either::Left(_) => tracing::trace!("Listener cancelled"),
53            futures::future::Either::Right(_) => (), // Stream finished or errored out
54        }
55    });
56    (join_handle, cancelation_token)
57}