use super::actor::{self, Message, RequestId};
use crate::Request;
use std::boxed::Box;
use std::collections::LinkedList;
use std::sync::mpsc;
use std::time::Duration;
pub struct ActiveMailbox {
id: actor::ActorId,
rx: mpsc::Receiver<Message>,
tx: mpsc::SyncSender<Message>,
last_request_id: RequestId,
message_list: LinkedList<Message>,
}
#[deprecated = "use ActiveMailbox instead"]
pub type ActiveActor = ActiveMailbox;
pub trait SyncAccessor {
fn send_notification<T>(&mut self, data: T) -> Result<(), crate::Error>
where
T: 'static + Send;
fn request_for<TRequest, TResponse>(
&mut self,
request_data: TRequest,
timeout: Duration,
) -> Result<TResponse, crate::Error>
where
TRequest: 'static + Send + Sized,
TResponse: 'static + Send + Sized;
}
#[derive(Debug, Clone)]
pub(crate) struct ActiveAddr {
id: actor::ActorId,
tx: mpsc::SyncSender<Message>,
}
#[macro_export]
macro_rules! define_sync_accessor{
($sync_accessor_name:ident, $($sync_trait:ty),+)
=>
{
pub struct $sync_accessor_name {
mailbox: ::rtactor::ActiveMailbox,
target_addr: ::rtactor::Addr,
}
impl $sync_accessor_name {
pub fn new(target_addr: &::rtactor::Addr) -> $sync_accessor_name {
$sync_accessor_name {
mailbox: ::rtactor::ActiveMailbox::new(1),
target_addr: target_addr.clone(),
}
}
pub fn target_addr(&self)-> &::rtactor::Addr {&self.target_addr}
}
impl ::rtactor::SyncAccessor for $sync_accessor_name {
fn send_notification<T>(&mut self, data: T) -> Result<(), ::rtactor::Error>
where
T: 'static + Send,
{
let addr = self.target_addr.clone();
self.mailbox.send_notification(&addr, data)
}
fn request_for<TRequest, TResponse>(
&mut self,
request_data: TRequest,
timeout: std::time::Duration,
) -> Result<TResponse, ::rtactor::Error>
where
TRequest: 'static + Send + Sized,
TResponse: 'static + Send + Sized
{
let addr = self.target_addr.clone();
self.mailbox.request_for(&addr, request_data, timeout)
}
}
$(
impl $sync_trait for $sync_accessor_name {}
)*
}
}
impl ActiveMailbox {
pub fn new(queue_size: usize) -> ActiveMailbox {
let (tx, rx) = std::sync::mpsc::sync_channel::<Message>(queue_size);
ActiveMailbox {
id: actor::generate_actor_id(),
rx,
tx,
last_request_id: 0,
message_list: LinkedList::new(),
}
}
#[deprecated = "use better named addr()"]
pub fn get_addr(&self) -> actor::Addr {
self.addr()
}
pub fn addr(&self) -> actor::Addr {
actor::Addr {
kind: actor::AddrKind::Active(ActiveAddr {
id: self.id,
tx: self.tx.clone(),
}),
}
}
pub fn try_get_message(&mut self) -> Result<Message, actor::Error> {
if let Some(message) = self.message_list.pop_back() {
return Result::Ok(message);
}
match self.rx.try_recv() {
Ok(message) => Ok(message),
Err(mpsc::TryRecvError::Empty) => Err(actor::Error::NoMessage),
Err(mpsc::TryRecvError::Disconnected) => Err(actor::Error::BrokenReceive),
}
}
pub fn wait_message(&mut self) -> Result<Message, actor::Error> {
self.wait_message_for(Duration::MAX)
}
pub fn wait_message_for(&mut self, timeout: Duration) -> Result<Message, actor::Error> {
if let Some(message) = self.message_list.pop_back() {
return Result::Ok(message);
}
match self.rx.recv_timeout(timeout) {
Ok(message) => Result::Ok(message),
Err(err) => Result::Err(match err {
mpsc::RecvTimeoutError::Disconnected => actor::Error::BrokenReceive,
mpsc::RecvTimeoutError::Timeout => actor::Error::Timeout,
}),
}
}
pub fn send_notification<T>(&mut self, dst: &actor::Addr, data: T) -> Result<(), crate::Error>
where
T: 'static + Send,
{
dst.receive_notification(data)
}
pub fn responds<T>(&mut self, request: Request, data: T) -> Result<(), crate::Error>
where
T: 'static + Send,
{
request.src.receive_ok_response(request.id, data)
}
pub fn request_for<TRequest, TResponse>(
&mut self,
dst: &actor::Addr,
request_data: TRequest,
timeout: Duration,
) -> Result<TResponse, actor::Error>
where
TRequest: 'static + Send + Sized,
TResponse: 'static + Send + Sized,
{
let request_id = self.generate_request_id();
dst.receive_request(&self.addr(), request_id, request_data);
loop {
let result = self.rx.recv_timeout(timeout);
match result {
Ok(actor::Message::Response(response)) => match response.result {
Ok(data) => match data.downcast::<TResponse>() {
Ok(out) => {
if response.request_id == request_id {
return Ok(*out);
} else {
continue;
}
}
Err(result) => {
self.message_list.push_back(actor::Message::Response(
actor::Response {
request_id: response.request_id,
result: Ok(result),
},
));
continue;
}
},
Err(err) => {
return Err(err.error);
}
},
Ok(msg) => {
self.message_list.push_back(msg);
continue;
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
return Err(actor::Error::BrokenReceive);
}
Err(mpsc::RecvTimeoutError::Timeout) => {
return Err(actor::Error::Timeout);
}
}
}
}
pub(crate) fn generate_request_id(&mut self) -> RequestId {
self.last_request_id = self.last_request_id.wrapping_add(1);
self.last_request_id
}
}
impl ActiveAddr {
pub(crate) fn actor_id(&self) -> actor::ActorId {
self.id
}
pub fn receive_notification<T>(&self, data: T) -> Result<(), actor::Error>
where
T: 'static + Send,
{
let datagram = Message::Notification(actor::Notification {
data: Box::new(data),
});
match self.tx.try_send(datagram) {
Ok(_) => Result::Ok(()),
Err(err) => Result::Err(match err {
mpsc::TrySendError::Full(_) => actor::Error::QueueFull,
mpsc::TrySendError::Disconnected(..) => actor::Error::AddrUnreachable,
}),
}
}
pub fn receive_request<T>(&self, src: &actor::Addr, id: RequestId, data: T)
where
T: 'static + Send + Sized,
{
let message = Message::Request(actor::Request {
src: src.clone(),
id,
data: Box::new(data),
});
if let Err(err) = self.tx.try_send(message) {
let (actor_err, returned_message) = match err {
mpsc::TrySendError::Full(a_message) => (actor::Error::QueueFull, a_message),
mpsc::TrySendError::Disconnected(a_message) => {
(actor::Error::AddrUnreachable, a_message)
}
};
if let Message::Request(request) = returned_message {
let _ = src.receive_err_response(
id,
actor::NonBoxedErrorStatus {
error: actor_err,
request_data: request.data,
},
);
}
}
}
pub(super) fn receive_ok_response<T>(
&self,
request_id: RequestId,
result: T,
) -> Result<(), actor::Error>
where
T: 'static + Send + Sized,
{
let response = actor::Response {
request_id,
result: Result::Ok(Box::new(result)),
};
match self.tx.try_send(actor::Message::Response(response)) {
Ok(_) => Result::Ok(()),
Err(err) => Result::Err(match err {
mpsc::TrySendError::Full(_) => actor::Error::QueueFull,
mpsc::TrySendError::Disconnected(..) => actor::Error::AddrUnreachable,
}),
}
}
pub(super) fn receive_err_response<T>(
&self,
request_id: RequestId,
result: actor::NonBoxedErrorStatus<T>,
) -> Result<(), actor::Error>
where
T: 'static + Send + Sized,
{
let boxed_result = actor::ErrorStatus {
error: result.error,
request_data: Box::new(result.request_data),
};
let response = actor::Response {
request_id,
result: Result::Err(boxed_result),
};
match self.tx.try_send(actor::Message::Response(response)) {
Ok(_) => Result::Ok(()),
Err(err) => Result::Err(match err {
mpsc::TrySendError::Full(_) => actor::Error::QueueFull,
mpsc::TrySendError::Disconnected(..) => actor::Error::AddrUnreachable,
}),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn generate_request_id() {
let mut actor = ActiveMailbox::new(1);
assert_eq!(actor.generate_request_id(), 1);
assert_eq!(actor.generate_request_id(), 2);
actor.last_request_id = RequestId::MAX - 1;
assert_eq!(actor.generate_request_id(), RequestId::MAX);
assert_eq!(actor.generate_request_id(), 0);
assert_eq!(actor.generate_request_id(), 1);
}
}