spawned_concurrency/tasks/
stream.rs1use crate::tasks::{GenServer, GenServerHandle};
2use futures::{future::select, Stream, StreamExt};
3use spawned_rt::tasks::{CancellationToken, JoinHandle};
4
5pub fn spawn_listener<T, F, S, I, E>(
12 mut handle: GenServerHandle<T>,
13 message_builder: F,
14 mut stream: S,
15) -> (JoinHandle<()>, CancellationToken)
16where
17 T: GenServer + 'static,
18 F: Fn(I) -> T::CastMsg + Send + 'static + std::marker::Sync,
19 I: Send,
20 E: std::fmt::Debug + Send,
21 S: Unpin + Send + Stream<Item = Result<I, E>> + 'static,
22{
23 let cancelation_token = CancellationToken::new();
24 let cloned_token = cancelation_token.clone();
25 let join_handle = spawned_rt::tasks::spawn(async move {
26 let result = select(
27 Box::pin(cloned_token.cancelled()),
28 Box::pin(async {
29 loop {
30 match stream.next().await {
31 Some(Ok(i)) => match handle.cast(message_builder(i)).await {
32 Ok(_) => tracing::trace!("Message sent successfully"),
33 Err(e) => {
34 tracing::error!("Failed to send message: {e:?}");
35 break;
36 }
37 },
38 Some(Err(e)) => {
39 tracing::trace!("Received Error in msg {e:?}");
40 break;
41 }
42 None => {
43 tracing::trace!("Stream finished");
44 break;
45 }
46 }
47 }
48 }),
49 )
50 .await;
51 match result {
52 futures::future::Either::Left(_) => tracing::trace!("Listener cancelled"),
53 futures::future::Either::Right(_) => (), }
55 });
56 (join_handle, cancelation_token)
57}