spawned_concurrency/tasks/
stream.rs1use crate::tasks::{GenServer, GenServerHandle};
2use futures::{future::select, Stream, StreamExt};
3use spawned_rt::tasks::JoinHandle;
4
5pub fn spawn_listener<T, F, S, I, E>(
12 mut handle: GenServerHandle<T>,
13 message_builder: F,
14 mut stream: S,
15) -> JoinHandle<()>
16where
17 T: GenServer + 'static,
18 F: Fn(I) -> T::CastMsg + Send + 'static + std::marker::Sync,
19 I: Send,
20 E: std::fmt::Debug + Send,
21 S: Unpin + Send + Stream<Item = Result<I, E>> + 'static,
22{
23 let cancelation_token = handle.cancellation_token();
24 let join_handle = spawned_rt::tasks::spawn(async move {
25 let result = select(
26 Box::pin(cancelation_token.cancelled()),
27 Box::pin(async {
28 loop {
29 match stream.next().await {
30 Some(Ok(i)) => match handle.cast(message_builder(i)).await {
31 Ok(_) => tracing::trace!("Message sent successfully"),
32 Err(e) => {
33 tracing::error!("Failed to send message: {e:?}");
34 break;
35 }
36 },
37 Some(Err(e)) => {
38 tracing::trace!("Received Error in msg {e:?}");
39 break;
40 }
41 None => {
42 tracing::trace!("Stream finished");
43 break;
44 }
45 }
46 }
47 }),
48 )
49 .await;
50 match result {
51 futures::future::Either::Left(_) => tracing::trace!("GenServer stopped"),
52 futures::future::Either::Right(_) => (), }
54 });
55 join_handle
56}