spawned-concurrency 0.5.0

Spawned Concurrency
Documentation
use crate::message::Message;
use futures::{future::select, Stream, StreamExt};
use spawned_rt::tasks::JoinHandle;

use super::actor::{Actor, Context, Handler};

/// Forward items from an async stream to an actor as messages.
///
/// Stops when the stream is exhausted, the actor stops, or sending fails.
pub fn spawn_listener<A, M, S>(ctx: Context<A>, stream: S) -> JoinHandle<()>
where
    A: Actor + Handler<M>,
    M: Message,
    S: Send + Stream<Item = M> + 'static,
{
    let cancellation_token = ctx.cancellation_token();
    let join_handle = spawned_rt::tasks::spawn(async move {
        let mut pinned_stream = core::pin::pin!(stream);
        let is_cancelled = core::pin::pin!(cancellation_token.cancelled());
        let listener_loop = core::pin::pin!(async {
            loop {
                match pinned_stream.next().await {
                    Some(msg) => match ctx.send(msg) {
                        Ok(_) => {}
                        Err(e) => {
                            tracing::error!("Failed to send message: {e:?}");
                            break;
                        }
                    },
                    None => {
                        tracing::trace!("Stream finished");
                        break;
                    }
                }
            }
        });
        match select(is_cancelled, listener_loop).await {
            futures::future::Either::Left(_) => tracing::trace!("Actor stopped"),
            futures::future::Either::Right(_) => (),
        }
    });
    join_handle
}