rtactor 0.6.0

An Actor framework specially designed for Real-Time constrained use cases.
Documentation
use super::actor::{self, Message, RequestId};
use crate::Request;
use std::boxed::Box;
use std::collections::LinkedList;
use std::sync::mpsc;
use std::time::Duration;

////////////////////////////// public types /////////////////////////////////////

/// Actor that has it own message queue and manage actively how to wait on it.
pub struct ActiveMailbox {
    id: actor::ActorId,
    rx: mpsc::Receiver<Message>,
    tx: mpsc::SyncSender<Message>,
    last_request_id: RequestId,
    /// list to store messages popped from rx but not consumed because of filtered receive.
    message_list: LinkedList<Message>,
}

#[deprecated = "use ActiveMailbox instead"]
pub type ActiveActor = ActiveMailbox;

/// Super trait used by SyncRequester and SyncNotifier to send requests and notifications
///
/// These trait are generated by `#[derive(SyncRequester)]` and `#[derive(SyncNotifier)]`
/// and provide a synchronous (blocking) interface to an actor.
pub trait SyncAccessor {
    fn send_notification<T>(&mut self, data: T) -> Result<(), crate::Error>
    where
        T: 'static + Send;
    fn request_for<TRequest, TResponse>(
        &mut self,
        request_data: TRequest,
        timeout: Duration,
    ) -> Result<TResponse, crate::Error>
    where
        TRequest: 'static + Send + Sized,
        TResponse: 'static + Send + Sized;
}

////////////////////////////// internal types /////////////////////////////////////

/// Address implementation to an `ActiveMailbox`
#[derive(Debug, Clone)]
pub(crate) struct ActiveAddr {
    id: actor::ActorId,
    tx: mpsc::SyncSender<Message>,
}

////////////////////////////// public macros /////////////////////////////////////

/// Create a struct that allows a blocking access to a address and implements one or more SyncRequester or SyncNotifier
///
/// Example:
/// ```rs
/// define_syn_accessor(MySyncAccessorStructName, FirstSyncRequesterName, SecondSyncRequesterName, ...);
/// ```
/// The struct implements `SyncAccessData`. It will have a function new(&Addr) -> MySyncAccessorStructName
/// and implement all traits following the structName without changes of they default implementation.
#[macro_export]
macro_rules! define_sync_accessor{
    ($sync_accessor_name:ident, $($sync_trait:ty),+)
    =>
    {
        pub struct $sync_accessor_name {
            mailbox: ::rtactor::ActiveMailbox,
            target_addr: ::rtactor::Addr,
        }

        impl $sync_accessor_name {
            pub fn new(target_addr: &::rtactor::Addr) -> $sync_accessor_name {
                $sync_accessor_name {
                    mailbox: ::rtactor::ActiveMailbox::new(1),
                    target_addr: target_addr.clone(),
                }
            }

            pub fn target_addr(&self)-> &::rtactor::Addr {&self.target_addr}
        }

        impl ::rtactor::SyncAccessor for $sync_accessor_name {
            fn send_notification<T>(&mut self, data: T) -> Result<(), ::rtactor::Error>
            where
            T: 'static + Send,
            {
                let addr = self.target_addr.clone();
                self.mailbox.send_notification(&addr, data)
            }
            fn request_for<TRequest, TResponse>(
                &mut self,
                request_data: TRequest,
                timeout: std::time::Duration,
            ) -> Result<TResponse, ::rtactor::Error>
            where
                TRequest: 'static + Send + Sized,
                TResponse: 'static + Send + Sized
            {
                let addr = self.target_addr.clone();
                self.mailbox.request_for(&addr, request_data, timeout)
            }

        }

        $(
            impl $sync_trait for $sync_accessor_name {}
        )*
    }
}

////////////////////////////// public impl's /////////////////////////////////////
impl ActiveMailbox {
    /// Create an active actor with a given maximum queue.
    pub fn new(queue_size: usize) -> ActiveMailbox {
        let (tx, rx) = std::sync::mpsc::sync_channel::<Message>(queue_size);
        ActiveMailbox {
            id: actor::generate_actor_id(),
            rx,
            tx,
            last_request_id: 0,
            message_list: LinkedList::new(),
        }
    }

    /// Get the address of the actor.
    #[deprecated = "use better named addr()"]
    pub fn get_addr(&self) -> actor::Addr {
        self.addr()
    }

    /// Get the address of the actor.
    pub fn addr(&self) -> actor::Addr {
        actor::Addr {
            kind: actor::AddrKind::Active(ActiveAddr {
                id: self.id,
                tx: self.tx.clone(),
            }),
        }
    }

    /// Try to get a single message.
    ///
    /// In case of error, can be `Error::NoMessage` or an error during the
    /// pop of the queue.
    pub fn try_get_message(&mut self) -> Result<Message, actor::Error> {
        if let Some(message) = self.message_list.pop_back() {
            return Result::Ok(message);
        }

        match self.rx.try_recv() {
            Ok(message) => Ok(message),
            Err(mpsc::TryRecvError::Empty) => Err(actor::Error::NoMessage),
            Err(mpsc::TryRecvError::Disconnected) => Err(actor::Error::BrokenReceive),
        }
    }

    /// Wait indefinitely for a message.
    pub fn wait_message(&mut self) -> Result<Message, actor::Error> {
        self.wait_message_for(Duration::MAX)
    }

    /// Wait for a message for a given amount of duration.
    pub fn wait_message_for(&mut self, timeout: Duration) -> Result<Message, actor::Error> {
        // look in linked list
        if let Some(message) = self.message_list.pop_back() {
            return Result::Ok(message);
        }

        // wait in mpsc queue
        match self.rx.recv_timeout(timeout) {
            Ok(message) => Result::Ok(message),
            Err(err) => Result::Err(match err {
                mpsc::RecvTimeoutError::Disconnected => actor::Error::BrokenReceive,
                mpsc::RecvTimeoutError::Timeout => actor::Error::Timeout,
            }),
        }
    }

    /// Send notification.
    ///
    /// This method is equivalent to the function rtactor::send_notification()
    /// but is preferred because it allows possible future thread local memory allocation.
    pub fn send_notification<T>(&mut self, dst: &actor::Addr, data: T) -> Result<(), crate::Error>
    where
        T: 'static + Send,
    {
        dst.receive_notification(data)
    }

    /// Send back an ok response to this request.
    pub fn responds<T>(&mut self, request: Request, data: T) -> Result<(), crate::Error>
    where
        T: 'static + Send,
    {
        request.src.receive_ok_response(request.id, data)
    }

    /// Send a request and wait for the response for a given duration.
    pub fn request_for<TRequest, TResponse>(
        &mut self,
        dst: &actor::Addr,
        request_data: TRequest,
        timeout: Duration,
    ) -> Result<TResponse, actor::Error>
    where
        TRequest: 'static + Send + Sized,
        TResponse: 'static + Send + Sized,
    {
        let request_id = self.generate_request_id();
        dst.receive_request(&self.addr(), request_id, request_data);

        loop {
            // wait in mpsc queue
            // TODO use until
            let result = self.rx.recv_timeout(timeout);

            match result {
                Ok(actor::Message::Response(response)) => match response.result {
                    Ok(data) => match data.downcast::<TResponse>() {
                        Ok(out) => {
                            if response.request_id == request_id {
                                return Ok(*out);
                            } else {
                                continue;
                            }
                        }
                        Err(result) => {
                            self.message_list.push_back(actor::Message::Response(
                                actor::Response {
                                    request_id: response.request_id,
                                    result: Ok(result),
                                },
                            ));
                            continue;
                        }
                    },
                    Err(err) => {
                        return Err(err.error);
                    }
                },
                Ok(msg) => {
                    self.message_list.push_back(msg);
                    continue;
                }
                Err(mpsc::RecvTimeoutError::Disconnected) => {
                    return Err(actor::Error::BrokenReceive);
                }
                Err(mpsc::RecvTimeoutError::Timeout) => {
                    return Err(actor::Error::Timeout);
                }
            }
        }
    }

    pub(crate) fn generate_request_id(&mut self) -> RequestId {
        self.last_request_id = self.last_request_id.wrapping_add(1);
        self.last_request_id
    }
}

impl ActiveAddr {
    pub(crate) fn actor_id(&self) -> actor::ActorId {
        self.id
    }

    pub fn receive_notification<T>(&self, data: T) -> Result<(), actor::Error>
    where
        T: 'static + Send,
    {
        let datagram = Message::Notification(actor::Notification {
            data: Box::new(data),
        });

        match self.tx.try_send(datagram) {
            Ok(_) => Result::Ok(()),
            Err(err) => Result::Err(match err {
                mpsc::TrySendError::Full(_) => actor::Error::QueueFull,
                mpsc::TrySendError::Disconnected(..) => actor::Error::AddrUnreachable,
            }),
        }
    }

    pub fn receive_request<T>(&self, src: &actor::Addr, id: RequestId, data: T)
    where
        T: 'static + Send + Sized,
    {
        let message = Message::Request(actor::Request {
            src: src.clone(),
            id,
            data: Box::new(data),
        });

        if let Err(err) = self.tx.try_send(message) {
            let (actor_err, returned_message) = match err {
                mpsc::TrySendError::Full(a_message) => (actor::Error::QueueFull, a_message),
                mpsc::TrySendError::Disconnected(a_message) => {
                    (actor::Error::AddrUnreachable, a_message)
                }
            };
            if let Message::Request(request) = returned_message {
                let _ = src.receive_err_response(
                    id,
                    actor::NonBoxedErrorStatus {
                        error: actor_err,
                        request_data: request.data,
                    },
                );
            }
        }
    }
    pub(super) fn receive_ok_response<T>(
        &self,
        request_id: RequestId,
        result: T,
    ) -> Result<(), actor::Error>
    where
        T: 'static + Send + Sized,
    {
        let response = actor::Response {
            request_id,
            result: Result::Ok(Box::new(result)),
        };

        match self.tx.try_send(actor::Message::Response(response)) {
            Ok(_) => Result::Ok(()),
            Err(err) => Result::Err(match err {
                mpsc::TrySendError::Full(_) => actor::Error::QueueFull,
                mpsc::TrySendError::Disconnected(..) => actor::Error::AddrUnreachable,
            }),
        }
    }

    pub(super) fn receive_err_response<T>(
        &self,
        request_id: RequestId,
        result: actor::NonBoxedErrorStatus<T>,
    ) -> Result<(), actor::Error>
    where
        T: 'static + Send + Sized,
    {
        let boxed_result = actor::ErrorStatus {
            error: result.error,
            request_data: Box::new(result.request_data),
        };

        let response = actor::Response {
            request_id,
            result: Result::Err(boxed_result),
        };

        match self.tx.try_send(actor::Message::Response(response)) {
            Ok(_) => Result::Ok(()),
            Err(err) => Result::Err(match err {
                mpsc::TrySendError::Full(_) => actor::Error::QueueFull,
                mpsc::TrySendError::Disconnected(..) => actor::Error::AddrUnreachable,
            }),
        }
    }
}

#[cfg(test)]
mod tests {

    use super::*;

    #[test]
    fn generate_request_id() {
        let mut actor = ActiveMailbox::new(1);

        assert_eq!(actor.generate_request_id(), 1);

        assert_eq!(actor.generate_request_id(), 2);

        actor.last_request_id = RequestId::MAX - 1;
        assert_eq!(actor.generate_request_id(), RequestId::MAX);
        assert_eq!(actor.generate_request_id(), 0);
        assert_eq!(actor.generate_request_id(), 1);
    }
}