actm 0.1.0

Tiny async actors framework for rust
Documentation
//! Basic actor implementation on top of flume queues

use std::{
    future::Future,
    marker::PhantomData,
    sync::atomic::{AtomicU64, Ordering},
};

use async_trait::async_trait;
use flume::Sender;
use futures::{future::join_all, select, StreamExt};
use once_cell::sync::Lazy;
use snafu::Snafu;
use tracing::{error, info, info_span, instrument, warn, Instrument};

use super::TokenManager;
use crate::{
    executor::Executor,
    traits::{Actor, Event, EventConsumer, EventProducer},
    types::{CompletionToken, DynamicConsumer, DynamicError, Trigger, Waiter},
};

mod newtype_macro;

/// Error connecting to [`AsyncActor`]
#[derive(Debug, Snafu)]
#[non_exhaustive]
pub enum AsyncActorError {
    /// Background task shutdown
    Shutdown,
}

/// A basic actor that listens to its `Inbox` asynchronously on a background task
pub struct AsyncActor<I: Event, O: Event, X: Executor> {
    /// A channel into our task
    inbox: Sender<I>,
    /// A channel to register consumers over
    consumers: Sender<Box<dyn EventConsumer<O, Error = DynamicError>>>,
    /// A channel to register callbacks over
    #[allow(clippy::type_complexity)] // This type isn't actually that complex
    callbacks: Sender<(CompletionToken, Box<dyn FnOnce(O) + Send + Sync + 'static>)>,
    /// A channel for shutdown triggers
    shutdown: Sender<Trigger>,
    /// A channel for catch up triggers
    catchup: Sender<Trigger>,
    /// Phantom for the executor type
    _executor: PhantomData<X>,
}

// This clone needs to be explicit, as the derive macro for clone adds a requirement for the
// struct's generics to be clone, which we can't do here as not all events are clone
impl<X: Executor, I: Event, O: Event> Clone for AsyncActor<I, O, X> {
    fn clone(&self) -> Self {
        Self {
            inbox: self.inbox.clone(),
            consumers: self.consumers.clone(),
            callbacks: self.callbacks.clone(),
            shutdown: self.shutdown.clone(),
            catchup: self.catchup.clone(),
            _executor: PhantomData,
        }
    }
}

impl<X: Executor, I: Event, O: Event> AsyncActor<I, O, X> {
    /// Initializes this actor from a synchronous closure, spawning off the background task
    ///
    /// The provided closure implements the business logic of the [`Actor`], processing inbound
    /// [`Event`]s, and optionally returning responding outbound [`Event`]s. If the event is a
    /// response to an incoming request, the [`CompletionToken`] must be included in the outbound
    /// [`Event`].
    ///
    /// While the user provided closure is allowed to do actions that take long amounts of time to
    /// complete, you are encouraged to make use of the blocking thread pool.
    ///
    /// The provided `context` will be passed by mutable reference to the user provided closure
    /// every time it is invoked.
    ///
    /// If `limit` is `Some(_)`, then a bounded queue with the specified limit will be created,
    /// otherwise an unbounded queue will be used.
    #[instrument(skip(logic, context))]
    pub fn spawn<F, C>(logic: F, context: C, bound: Option<usize>) -> Self
    where
        F: Fn(C, I) -> (C, Option<O>) + Send + Sync + 'static,
        C: Send + Sync + 'static,
    {
        Self::spawn_async(
            move |a, b| {
                let result = logic(a, b);
                async move { result }
            },
            context,
            bound,
        )
    }

    /// Initializes this actor from a closure returning a future (an "async closure"), spawning
    /// off the background task.
    ///
    /// Otherwise behaves identically to [`AsyncActor::spawn`].
    #[instrument(skip(logic, context))]
    pub fn spawn_async<R, F, C>(logic: F, mut context: C, bound: Option<usize>) -> Self
    where
        R: Future<Output = (C, Option<O>)> + Send,
        F: Fn(C, I) -> R + Send + Sync + 'static,
        C: Send + Sync + 'static,
    {
        /// Counter for generating logging ids
        static COUNTER: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(0));
        // Create our streams
        let (inbox_tx, inbox_rx): (Sender<I>, _) = match bound {
            Some(x) => flume::bounded(x),
            None => flume::unbounded(),
        };
        let (consumers_tx, consumers_rx): (
            Sender<Box<dyn EventConsumer<O, Error = DynamicError>>>,
            _,
        ) = match bound {
            Some(x) => flume::bounded(x),
            None => flume::unbounded(),
        };
        #[allow(clippy::type_complexity)]
        let (callbacks_tx, callbacks_rx): (
            Sender<(CompletionToken, Box<dyn FnOnce(O) + Send + Sync + 'static>)>,
            _,
        ) = match bound {
            Some(x) => flume::bounded(x),
            None => flume::unbounded(),
        };
        let (shutdown_tx, shutdown_rx) = flume::bounded::<Trigger>(1);
        let (catchup_tx, catchup_rx) = flume::bounded::<Trigger>(1);
        // Get a task id
        let id = COUNTER.fetch_add(1, Ordering::SeqCst);
        info!(?id, "Spawning BasicActor task");
        // Spawn the background processing task that implements the actor
        X::spawn_async(
            async move {
                // Create the token manager
                // We will use this intercept all outbound
                let mut token_manager: TokenManager<O> = TokenManager::new();
                // Convert channels into streams
                let mut inbox_stream = inbox_rx.clone().into_stream();
                let mut consumers_inbox_stream = consumers_rx.clone().into_stream();
                let mut callbacks_stream = callbacks_rx.clone().into_stream();
                let mut shutdown_stream = shutdown_rx.into_stream();
                let mut catchup_stream = catchup_rx.into_stream();
                // Store our consumers
                let mut consumers = Vec::<Box<dyn EventConsumer<O, Error = DynamicError>>>::new();
                // Enter our event handling loop
                loop {
                    // Check for a shutdown signal
                    // Select over our three inboxes. In each branch, we will log an error and break
                    // out of the loop, implicitly closing down the task, if the other side of the
                    // stream has been closed
                    //
                    // Because `select!` chooses between futures ready at the same time in a
                    // ""semi-random"" way, this should give roughly equal attention to all the
                    // inputs under load
                    select! {
                        x = inbox_stream.next() => {
                            if let Some(x) = x {
                                // Process the event with the caller provided closure and context,
                                // and then perform bookkeeping if the closure returns an outbound
                                // event
                                let (new_context, output) = logic(context, x).await;
                                context = new_context;
                                if let Some(output) = output {
                                    // First process the event through the token manager, so any
                                    // callbacks can be called back
                                    let output = token_manager.process(output);
                                    // Then distribute it to all of our consumers
                                    let results = join_all(
                                        consumers
                                            .iter_mut()
                                            .map(|x| async { x.accept(output.stateless_clone()).await})
                                    ).await;
                                    // Log all of our errors
                                    for result in results {
                                        if let Err(e) = result {
                                            warn!(?e, "Error occurred feeding event into consumer");
                                        }
                                    }
                                }
                            } else {
                                error!("Inbox channel unexpectedly shutdown");
                                break;
                            }
                        }
                        x = consumers_inbox_stream.next() => {
                            if let Some(x) = x {
                                // Add the new consumer to the list
                                consumers.push(x);
                            } else {
                                error!("Consumers channel unexpectedly shutdown");
                                break;
                            }
                        }
                        x = callbacks_stream.next() => {
                            if let Some(x) = x {
                                // Register the callback with the token manager
                                token_manager.register_callback(x.1, x.0);
                            } else {
                                error!("Callbacks channel unexpectedly shutdown");
                                break;
                            }
                        }
                        shutdown = shutdown_stream.next() => {
                            if let Some(shutdown) = shutdown {
                                // Drain the queues
                                let inbox = inbox_rx.drain().collect::<Vec<_>>();
                                let consumers_inbox = consumers_rx.drain().collect::<Vec<_>>();
                                let callbacks = callbacks_rx.drain().collect::<Vec<_>>();
                                // Handle connecting the remaining callbacks and consumers first
                                for callback in callbacks {
                                    token_manager.register_callback(callback.1, callback.0);
                                }
                                for consumer in consumers_inbox {
                                    consumers.push(consumer);
                                }
                                // Then handle any remaining events
                                for event in inbox {
                                    let (new_context, output) = logic(context, event).await;
                                    context = new_context;
                                    if let Some(output) = output {
                                        // First process the event through the token manager, so any
                                        // callbacks can be called back
                                        let output = token_manager.process(output);
                                        // Then distribute it to all of our consumers
                                        let results = join_all(
                                            consumers
                                                .iter_mut()
                                                .map(|x| async { x.accept(output.stateless_clone()).await})
                                        ).await;
                                        // Log all of our errors
                                        for result in results {
                                            if let Err(e) = result {
                                                warn!(?e, "Error occurred feeding event into consumer");
                                            }
                                        }
                                    }

                                }
                                // Tell the caller we are done
                                shutdown.trigger();
                                break;
                            }
                        }
                        catchup = catchup_stream.next() => {
                            if let Some(catchup) = catchup {
                                // Drain the queues
                                let inbox = inbox_rx.drain().collect::<Vec<_>>();
                                let consumers_inbox = consumers_rx.drain().collect::<Vec<_>>();
                                let callbacks = callbacks_rx.drain().collect::<Vec<_>>();
                                // Handle connecting the remaining callbacks and consumers first
                                for callback in callbacks {
                                    token_manager.register_callback(callback.1, callback.0);
                                }
                                for consumer in consumers_inbox {
                                    consumers.push(consumer);
                                }
                                // Then handle any remaining events
                                for event in inbox {
                                    let (new_context, output) = logic(context, event).await;
                                    context = new_context;
                                    if let Some(output) = output {
                                        // First process the event through the token manager, so any
                                        // callbacks can be called back
                                        let output = token_manager.process(output);
                                        // Then distribute it to all of our consumers
                                        let results = join_all(
                                            consumers
                                                .iter_mut()
                                                .map(|x| async { x.accept(output.stateless_clone()).await})
                                        ).await;
                                        // Log all of our errors
                                        for result in results {
                                            if let Err(e) = result {
                                                warn!(?e, "Error occurred feeding event into consumer");
                                            }
                                        }
                                    }

                                }
                                // Tell the caller we are done
                                catchup.trigger();
                            }
                        }
                        complete => {
                            break;
                        }
                    }
                }
            }
            .instrument(info_span!(target: "AsyncActor","AsyncActor Inner Task", id = id)),
        );

        Self {
            inbox: inbox_tx,
            consumers: consumers_tx,
            callbacks: callbacks_tx,
            shutdown: shutdown_tx,
            catchup: catchup_tx,
            _executor: PhantomData,
        }
    }
}

#[async_trait]
impl<X: Executor, I: Event, O: Event> EventConsumer<I> for AsyncActor<I, O, X> {
    type Error = AsyncActorError;

    async fn accept(&self, event: I) -> Result<(), Self::Error> {
        self.inbox
            .send_async(event)
            .await
            .map_err(|_| AsyncActorError::Shutdown)
    }

    fn accept_sync(&self, event: I) -> Result<(), Self::Error> {
        self.inbox
            .send(event)
            .map_err(|_| AsyncActorError::Shutdown)
    }
}

#[async_trait]
impl<X: Executor, I: Event, O: Event> EventProducer<O> for AsyncActor<I, O, X> {
    type Error = AsyncActorError;

    async fn register_consumer<C>(&self, consumer: C) -> Result<(), Self::Error>
    where
        C: EventConsumer<O> + Send + Sync + 'static,
    {
        self.consumers
            .send_async(Box::new(DynamicConsumer::from(consumer)))
            .await
            .map_err(|_| AsyncActorError::Shutdown)
    }

    async fn register_callback<F>(
        &self,
        callback: F,
        token: CompletionToken,
    ) -> Result<(), Self::Error>
    where
        F: FnOnce(O) + Send + Sync + 'static,
    {
        self.callbacks
            .send_async((token, Box::new(callback)))
            .await
            .map_err(|_| AsyncActorError::Shutdown)
    }

    fn register_consumer_sync<C>(&self, consumer: C) -> Result<(), Self::Error>
    where
        C: EventConsumer<O> + Send + Sync + 'static,
    {
        self.consumers
            .send(Box::new(DynamicConsumer::from(consumer)))
            .map_err(|_| AsyncActorError::Shutdown)
    }

    fn register_callback_sync<F>(
        &self,
        callback: F,
        token: CompletionToken,
    ) -> Result<(), Self::Error>
    where
        F: FnOnce(O) + Send + Sync + 'static,
    {
        self.callbacks
            .send((token, Box::new(callback)))
            .map_err(|_| AsyncActorError::Shutdown)
    }
}

impl<X: Executor, I: Event, O: Event> Actor<I, O, X> for AsyncActor<I, O, X> {
    type Inbox = Self;
    type Outbox = Self;

    fn inbox(&self) -> &Self::Inbox {
        self
    }

    fn outbox(&self) -> &Self::Outbox {
        self
    }

    fn shutdown(&self) -> Waiter {
        let (waiter, trigger) = Waiter::new();
        let _res = self.shutdown.send(trigger);
        waiter
    }
    fn catchup(&self) -> Waiter {
        let (waiter, trigger) = Waiter::new();
        let _res = self.catchup.send(trigger);
        waiter
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{
        executor::Threads,
        testing_util::{Add, Math, MathEvent, MathEventType, Output, OutputEvent},
        traits::ActorExt,
    };
    // Basic smoke test, increment the counter by one a few times
    async fn smoke<X: Executor>() {
        // Create our actor
        let actor: AsyncActor<MathEvent, OutputEvent, X> = AsyncActor::spawn(
            |mut value: i64, mut math: MathEvent| {
                // Pull out the completion token, if there is any
                let token = math.token();
                // Perform the operation
                let old_value = value;
                let math = math.into_inner();
                value = math.operate(value);
                // Check to see if there was a completion token, if so, send back an Output
                if let Some(token) = token {
                    // Make our output
                    let output = Output {
                        before: old_value,
                        after: value,
                        input: math,
                    };
                    // Wrap it up
                    let mut output = OutputEvent::from(output);
                    // Attach the token
                    output.set_completion_token(token);
                    // Send it up
                    (value, Some(output))
                } else {
                    (value, None)
                }
            },
            0,
            None,
        );
        // Create a channel to collect our outputs
        let (tx, rx) = flume::unbounded();
        // Create 100 individual tasks that each add 1 to the actor
        let events = join_all(
            (0..100)
                // Create 100 individual tasks
                .map(|_| {
                    let event_type = MathEventType::Add(Add(1));
                    let mut event = MathEvent::from(event_type);
                    let token = event.tokenize().unwrap();
                    (event, token)
                })
                // Register the callbacks
                .map(|(event, token)| {
                    // Make a copy of our stream input
                    let tx = tx.clone();
                    // Clone the actor, cheating a bit, a real application wouldn't be spawning tasks
                    // like this
                    let actor = actor.clone();
                    async move {
                        // Register the callback
                        actor
                            .outbox()
                            .register_callback(
                                move |event| {
                                    tx.send(event).unwrap();
                                },
                                token,
                            )
                            .await
                            .unwrap();
                        // Return the event
                        event
                    }
                }),
        )
        .await;
        // Hook up our collector
        let collector_out = actor.stream(None).await.unwrap();
        // Spawn up some tasks to fill our actor's inbox
        let _tasks = join_all(events.into_iter().map(|x| {
            // Same cheeky cloning of the actor
            let actor = actor.clone();
            async move {
                actor.inbox().accept(x).await.unwrap();
            }
        }))
        .await;
        // Pull out of our collector and sort the results
        let mut collector_out: Vec<_> = collector_out.into_stream().take(100).collect().await;
        collector_out.sort();
        // Pull out of our callback stream and sort the results
        let mut callbacks: Vec<_> = rx.into_stream().take(100).collect().await;
        callbacks.sort();
        // The should be equal
        assert_eq!(collector_out, callbacks);
        // Generate the expected list
        let expected: Vec<OutputEvent> = (0..100)
            .map(|x| {
                OutputEvent::from(Output {
                    before: x,
                    after: x + 1,
                    input: MathEventType::Add(Add(1)),
                })
            })
            .collect();
        // Double check equality
        assert_eq!(collector_out, expected);
    }

    #[cfg(feature = "async-std")]
    #[async_std::test]
    async fn smoke_async_std() {
        smoke::<crate::executor::AsyncStd>().await;
    }

    #[async_std::test]
    async fn smoke_threads_async() {
        smoke::<Threads>().await;
    }

    #[test]
    fn smoke_threads_sync() {
        // Create our actor
        let actor: AsyncActor<MathEvent, OutputEvent, Threads> = AsyncActor::spawn(
            |mut value: i64, mut math: MathEvent| {
                // Pull out the completion token, if there is any
                let token = math.token();
                // Perform the operation
                let old_value = value;
                let math = math.into_inner();
                value = math.operate(value);
                // Check to see if there was a completion token, if so, send back an Output
                if let Some(token) = token {
                    // Make our output
                    let output = Output {
                        before: old_value,
                        after: value,
                        input: math,
                    };
                    // Wrap it up
                    let mut output = OutputEvent::from(output);
                    // Attach the token
                    output.set_completion_token(token);
                    // Send it up
                    (value, Some(output))
                } else {
                    (value, None)
                }
            },
            0,
            None,
        );
        // Create a channel to collect our outputs
        let (tx, rx) = flume::unbounded();
        // Create 100 individual tasks that each add 1 to the actor
        let mut events = vec![];
        for _ in 0..100 {
            // Create 100 individual tasks
            let event_type = MathEventType::Add(Add(1));
            let mut event = MathEvent::from(event_type);
            let token = event.tokenize().unwrap();
            let tx = tx.clone();
            // Register the callback
            actor
                .outbox()
                .register_callback_sync(
                    move |event| {
                        tx.send(event).unwrap();
                    },
                    token,
                )
                .unwrap();
            // Return the event
            events.push(event);
        }
        // Hook up our collector
        let collector_out = actor.stream_sync(None).unwrap();
        // Fill our actor's inbox with some threads
        for x in events {
            // Some cheeky cloning of the actor
            let actor = actor.clone();
            std::thread::spawn(move || actor.inbox().accept_sync(x).unwrap());
        }
        // Pull out of our collector and sort the results
        let mut collector_out: Vec<_> = collector_out.into_iter().take(100).collect();
        collector_out.sort();
        // Pull out of our callback stream and sort the results
        let mut callbacks: Vec<_> = rx.into_iter().take(100).collect();
        callbacks.sort();
        // The should be equal
        assert_eq!(collector_out, callbacks);
        // Generate the expected list
        let expected: Vec<OutputEvent> = (0..100)
            .map(|x| {
                OutputEvent::from(Output {
                    before: x,
                    after: x + 1,
                    input: MathEventType::Add(Add(1)),
                })
            })
            .collect();
        // Double check equality
        assert_eq!(collector_out, expected);
    }
}