messages 0.3.1

Runtime-agnostic actor library.
Documentation
//! An address of an actor.
//!
//! See [`Actor`] documentation for details.

use std::sync::Arc;

use crate::{
    actor::Actor,
    cfg_runtime,
    context::{InputHandle, Signal},
    envelope::{EnvelopeProxy, MessageEnvelope, NotificationEnvelope},
    errors::SendError,
    handler::{Handler, Notifiable},
};
use futures::{lock::Mutex, Stream, StreamExt};

/// `Address` is an object used to communicate with [`Actor`]s.
///
/// Assuming that [`Actor`] is capable of processing messages of a certain
/// type, the [`Address`] can be used to interact with [`Actor`] by using
/// either [`Address::send`] (for messages) or [`Address::notify`] (for notifications).
pub struct Address<A> {
    sender: async_channel::Sender<Signal<InputHandle<A>>>,
    stop_handle: Arc<Mutex<()>>,
}

impl<A> std::fmt::Debug for Address<A> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Address").finish()
    }
}

impl<A> Clone for Address<A> {
    fn clone(&self) -> Self {
        Self {
            sender: self.sender.clone(),
            stop_handle: self.stop_handle.clone(),
        }
    }
}

impl<A> Address<A> {
    pub(crate) fn new(
        sender: async_channel::Sender<Signal<InputHandle<A>>>,
        stop_handle: Arc<Mutex<()>>,
    ) -> Self {
        Self {
            sender,
            stop_handle,
        }
    }

    /// Sends a message to the [`Actor`] and receives the response.
    ///
    /// ## Examples
    ///
    /// This example assumes that `messages` is used with `rt-tokio` feature enabled.
    ///
    /// ```rust
    /// # use messages::prelude::*;
    ///
    /// struct Sum;
    ///
    /// #[async_trait]
    /// impl Actor for Sum {}
    ///
    /// #[async_trait]
    /// impl Handler<(u8, u8)> for Sum {
    ///     type Result = u16;
    ///     // Implementation omitted.
    ///     # async fn handle(&mut self, (a, b): (u8, u8), context: &Context<Self>) -> u16 {
    ///     #    (a as u16) + (b as u16)
    ///     # }
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///    let mut addr = Sum.spawn();
    ///    let result = addr.send((22, 20)).await.unwrap();
    ///    assert_eq!(result, 42);
    ///    # addr.stop().await;
    ///    # addr.wait_for_stop().await;  
    /// }
    /// ```
    ///
    /// ## Errors
    ///
    /// Will return an error in case associated actor stopped working.
    pub async fn send<IN>(&mut self, message: IN) -> Result<A::Result, SendError>
    where
        A: Actor + Send + Handler<IN> + 'static,
        IN: Send + 'static,
        A::Result: Send + Sync + 'static,
    {
        let (sender, receiver) = async_oneshot::oneshot();
        let envelope: MessageEnvelope<A, IN> = MessageEnvelope::new(message, sender);

        let message = Box::new(envelope) as Box<dyn EnvelopeProxy<A> + Send + 'static>;

        self.sender
            .send(Signal::Message(message))
            .await
            .map_err(|_| SendError::ReceiverDisconnected)?;

        receiver.await.map_err(|_| SendError::ReceiverDisconnected)
    }

    /// Sends a notification to the [`Actor`] without receiving any kind of response.
    ///
    /// ## Examples
    ///
    /// This example assumes that `messages` is used with `rt-tokio` feature enabled.
    ///
    /// ```rust
    /// # use messages::prelude::*;
    ///
    /// struct Ping;
    ///
    /// #[async_trait]
    /// impl Actor for Ping {}
    ///
    /// #[async_trait]
    /// impl Notifiable<u8> for Ping {
    ///     async fn notify(&mut self, input: u8, context: &Context<Self>) {
    ///         println!("Received number {}", input);
    ///     }
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///    let mut addr = Ping.spawn();
    ///    addr.notify(42).await.unwrap();
    ///    # addr.stop().await;
    ///    # addr.wait_for_stop().await;  
    /// }
    /// ```
    ///
    /// ## Errors
    ///
    /// Will return an error in case associated actor stopped working.
    pub async fn notify<IN>(&mut self, message: IN) -> Result<(), SendError>
    where
        A: Actor + Send + Notifiable<IN> + 'static,
        IN: Send + 'static,
    {
        let envelope: NotificationEnvelope<A, IN> = NotificationEnvelope::new(message);

        let message = Box::new(envelope) as Box<dyn EnvelopeProxy<A> + Send + 'static>;

        self.sender
            .send(Signal::Message(message))
            .await
            .map_err(|_| SendError::ReceiverDisconnected)?;

        Ok(())
    }

    /// Combines provided stream and this `Address` object, returning a future
    /// that will run while stream yields messages and send them to the server.
    ///
    /// [`Actor`] associated with this `Address` must implmenet [`Notifiable`] trait
    /// to process messages from the stream.
    ///
    /// Future returned by this method should not normally be directly `await`ed,
    /// but rather is expected to be used in some kind of `spawn` function of
    /// the used runtime (e.g. `tokio::spawn` or `async_std::task::spawn`).
    ///
    /// ## Errors
    ///
    /// Will return an error in case associated actor stopped working.
    pub async fn into_stream_forwarder<IN, S>(mut self, mut stream: S) -> Result<(), SendError>
    where
        A: Actor + Send + Notifiable<IN> + 'static,
        S: Send + Stream<Item = IN> + Unpin,
        IN: Send + 'static,
    {
        while let Some(message) = stream.next().await {
            self.notify(message).await?;
        }
        Ok(())
    }

    /// Returns `true` if `Address` is still connected to the [`Actor`].
    #[must_use]
    pub fn connected(&self) -> bool {
        !self.sender.is_closed()
    }

    /// Sends a stop request to the corresponding [`Actor`].
    ///
    /// Sending this message does not mean that actor will be stopped immediately.
    /// In order to make sure that the actor is stopped, [`Address::wait_for_stop`]
    /// should be used.
    ///
    /// Does nothing if address is disconnected from the actor or actor already has
    /// been stopped.
    pub async fn stop(&mut self) {
        // If actor is already stopped, we're fine with it.
        drop(self.sender.send(Signal::Stop).await);
    }

    /// Creates a future that waits for actor to be fully stopped.
    ///
    /// Note that this method does not request an actor to stop, it only waits for it
    /// in order to stop actor, [`Address::stop`] should be used.
    pub async fn wait_for_stop(&self) {
        // We will only able to obtain the lock when context will release it.
        // However, we don't want to exit early in case this method is called
        // before actor is actually started, so we do it in the loop until
        // the channel is disconnected.
        while self.connected() {
            self.stop_handle.lock().await;
        }
    }
}

cfg_runtime! {

use crate::{
    handler::Coroutine,
    envelope::CoroutineEnvelope
};

impl<A> Address<A> {
    /// Version of [`Address::into_stream_forwarder`] that automatically spawns the future.
    ///
    /// Returned future is the join handle of the spawned task, e.g. it can be awaited
    /// if the user is interested in the moment when the stream stopped sending messages.
    pub fn spawn_stream_forwarder<IN, S>(self, stream: S) -> crate::runtime::JoinHandle<Result<(), SendError>>
    where
        A: Actor + Send + Notifiable<IN> + 'static,
        S: Send + Stream<Item = IN> + Unpin + 'static,
        IN: Send + 'static,
    {
        crate::runtime::spawn(self.into_stream_forwarder(stream))
    }


    /// Sends a message to the [`Actor`] and receives the response.
    /// Unlike in [`Address::send`], `calculate` supports parallel execution.
    ///
    /// ## Examples
    ///
    /// This example assumes that `messages` is used with `rt-tokio` feature enabled.
    ///
    /// ```rust
    /// # use messages::prelude::*;
    /// #[derive(Clone)]
    /// struct Sum;
    ///
    /// #[async_trait]
    /// impl Actor for Sum {}
    ///
    /// #[async_trait]
    /// impl Coroutine<(u8, u8)> for Sum {
    ///     type Result = u16;
    ///     async fn calculate(self, (a, b): (u8, u8)) -> u16 {
    ///         (a as u16) + (b as u16)
    ///     }
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///    let mut addr = Sum.spawn();
    ///    let result = addr.calculate((22, 20)).await.unwrap();
    ///    assert_eq!(result, 42);
    ///    # addr.stop().await;
    ///    # addr.wait_for_stop().await;
    /// }
    /// ```
    pub async fn calculate<IN>(&self, message: IN) -> Result<A::Result, SendError>
    where
        A: Actor + Send + Coroutine<IN> + 'static,
        IN: Send + 'static,
        A::Result: Send + Sync + 'static,
    {
        let addr = self.sender.clone();
        let (sender, receiver) = async_oneshot::oneshot();
        let envelope: CoroutineEnvelope<A, IN> = CoroutineEnvelope::new(message, sender);

        let message = Box::new(envelope) as Box<dyn EnvelopeProxy<A> + Send + 'static>;

        addr
            .send(Signal::Message(message))
            .await
            .map_err(|_| SendError::ReceiverDisconnected)?;

        receiver.await.map_err(|_| SendError::ReceiverDisconnected)
    }
}
}