use crate::actor::{self, ActorId, NonBoxedErrorStatus};
use crate::dispatcher::Dispatcher;
use crate::reactive::{
InstantSource, InternalInstant, MessageAndDstId, ReactiveAddr, TimeoutScheduler,
};
use crate::{dispatcher, Addr, Behavior, Instant, Message, ProcessContext};
use std::ops::ControlFlow;
use std::sync::mpsc;
use std::time::Duration;
use std::vec::Vec;
use std::{thread, time};
pub struct MpscDispatcher {
disp_actor_id: ActorId,
rx: mpsc::Receiver<MessageAndDstId>,
pub(crate) tx: mpsc::SyncSender<MessageAndDstId>,
reactive_list: Vec<(ActorId, Box<dyn Behavior>)>,
}
pub struct Builder {
disp_actor_id: ActorId,
rx: mpsc::Receiver<MessageAndDstId>,
tx: mpsc::SyncSender<MessageAndDstId>,
}
impl Builder {
pub fn new(queue_size: usize) -> Builder {
let (tx, rx) = std::sync::mpsc::sync_channel::<MessageAndDstId>(queue_size);
Builder {
disp_actor_id: actor::generate_actor_id(),
rx,
tx,
}
}
pub fn dispatcher_addr(&self) -> Addr {
ReactiveAddr::new(self.tx.clone(), self.disp_actor_id).into_addr()
}
fn into_parts(
self,
) -> (
ActorId,
mpsc::Receiver<MessageAndDstId>,
mpsc::SyncSender<MessageAndDstId>,
) {
(self.disp_actor_id, self.rx, self.tx)
}
pub fn to_accessor(&self) -> dispatcher::SyncAccessor {
dispatcher::SyncAccessor::new(&self.dispatcher_addr())
}
pub fn build(self) -> MpscDispatcher {
let (disp_actor_id, rx, tx) = self.into_parts();
MpscDispatcher {
disp_actor_id,
rx,
tx,
reactive_list: Vec::new(),
}
}
}
impl MpscDispatcher {
pub fn process(&mut self) {
let instant_source = StdTimeInstantSource();
let mut timeout_scheduler = TimeoutScheduler::new();
let mut context = ProcessContext::new(self, 0, &instant_source, &mut timeout_scheduler);
loop {
let mut message_processed: bool;
let mut stop: bool;
let mut duration_to_next_timeout = Duration::MAX;
loop {
loop {
(message_processed, stop) = self.try_process_message(&mut context);
if stop || !message_processed {
break;
}
}
if stop {
break;
} else {
match context.try_send_next_pending_timeout() {
ControlFlow::Continue(()) => (),
ControlFlow::Break(duration) => {
duration_to_next_timeout = duration;
break;
}
}
}
}
if stop {
break;
}
let (_message_processed, stop) =
self.block_process_message(&mut context, duration_to_next_timeout);
if stop {
break;
}
}
}
fn build_owned_reactive_addr(&self, id: ActorId) -> Addr {
ReactiveAddr::new(self.tx.clone(), id).into_addr()
}
fn unregister_reactive_by_id(&mut self, id: ActorId) -> Option<Box<dyn Behavior>> {
match self.get_behavior_index(id) {
Some(index) => Some(self.reactive_list.remove(index).1),
None => None,
}
}
fn replace_reactive_by_id(
&mut self,
id: ActorId,
mut behavior: Box<dyn Behavior>,
) -> Result<Box<dyn Behavior>, Box<dyn Behavior>> {
match self.get_behavior_index(id) {
Some(index) => {
std::mem::swap(&mut self.reactive_list[index].1, &mut behavior);
Ok(behavior)
}
None => Err(behavior),
}
}
fn get_behavior_index(&mut self, id: ActorId) -> Option<usize> {
let result = self
.reactive_list
.binary_search_by_key(&id, |element| element.0);
result.ok()
}
fn drop_queued_messages(&mut self) {
while let Ok(msg_and_id) = self.rx.try_recv() {
if let Message::Request(request) = msg_and_id.message {
let _ = request.src.receive_err_response(
request.id,
NonBoxedErrorStatus {
error: crate::Error::ActorDisappeared,
request_data: request.data,
},
);
}
}
}
fn process_dispatcher_message(
&mut self,
context: &mut ProcessContext,
message: &Message,
) -> bool {
match message {
Message::Request(request) => {
if let Some(disp_request) = request.data.downcast_ref::<dispatcher::Request>() {
match disp_request {
dispatcher::Request::RegisterReactive { behavior } => {
context.send_response(
request,
dispatcher::Response::RegisterReactive(
if let Some(behavior) = behavior.replace(None) {
self.register_reactive(behavior)
} else {
Addr::INVALID
},
),
);
false
}
dispatcher::Request::ExecuteFn {
executable_fn: boxed_fn,
} => {
let response_data =
(boxed_fn.replace(Box::new(|_| Box::new(()))))(self);
context.send_response(request, response_data);
false
}
#[allow(deprecated)]
dispatcher::Request::StopReactive { addr: _ } => false,
dispatcher::Request::StopDispatcher {} => {
if true {
self.drop_queued_messages();
self.reactive_list.clear();
}
context.send_response(request, dispatcher::Response::StopDispatcher());
true
}
}
} else {
panic!("dispatcher take only dispatcher::Request");
}
}
Message::Response(_) => panic!(),
Message::Notification(_) => panic!(),
}
}
fn process_current_message(
&mut self,
context: &mut ProcessContext,
message_and_id: MessageAndDstId,
) -> bool {
if message_and_id.dst_id == self.disp_actor_id {
self.process_dispatcher_message(context, &message_and_id.message)
} else {
context.own_actor_id = message_and_id.dst_id;
match self.get_behavior_index(context.own_actor_id) {
Some(index) => self.reactive_list[index]
.1
.process_message(context, &message_and_id.message),
None => {
if let Message::Request(request) = message_and_id.message {
let _ = request.src.receive_err_response(
request.id,
NonBoxedErrorStatus {
error: crate::Error::ActorDisappeared,
request_data: request.data,
},
);
}
}
}
false
}
}
pub(crate) fn try_process_message(&mut self, context: &mut ProcessContext) -> (bool, bool) {
match self.rx.try_recv() {
Ok(message_and_id) => (true, self.process_current_message(context, message_and_id)),
Err(mpsc::TryRecvError::Empty) => (false, false),
Err(mpsc::TryRecvError::Disconnected) => (false, true), }
}
pub(crate) fn block_process_message(
&mut self,
context: &mut ProcessContext,
timeout: Duration,
) -> (bool, bool) {
match self.rx.recv_timeout(timeout) {
Ok(message_and_id) => {
let stop = self.process_current_message(context, message_and_id);
(true, stop)
}
Err(mpsc::RecvTimeoutError::Disconnected) => (false, true),
Err(mpsc::RecvTimeoutError::Timeout) => (false, false),
}
}
}
impl dispatcher::Dispatcher for MpscDispatcher {
fn addr(&self) -> actor::Addr {
self.build_owned_reactive_addr(self.disp_actor_id)
}
fn register_reactive(&mut self, behavior: Box<dyn Behavior>) -> actor::Addr {
let id = actor::generate_actor_id();
self.reactive_list.push((id, behavior));
self.reactive_list.sort_unstable_by_key(|element| element.0);
self.build_owned_reactive_addr(id)
}
fn replace_reactive(
&mut self,
addr: &actor::Addr,
behavior: Box<dyn Behavior>,
) -> Result<Box<dyn Behavior>, Box<dyn Behavior>> {
if let actor::AddrKind::Reactive(reactive_addr) = &addr.kind {
self.replace_reactive_by_id(reactive_addr.dst_id, behavior)
} else {
Err(behavior)
}
}
fn unregister_reactive(&mut self, addr: &actor::Addr) -> Option<Box<dyn Behavior>> {
if let actor::AddrKind::Reactive(reactive_addr) = &addr.kind {
self.unregister_reactive_by_id(reactive_addr.dst_id)
} else {
None
}
}
}
impl Drop for MpscDispatcher {
fn drop(&mut self) {
self.drop_queued_messages();
}
}
pub fn spawn_dispatcher<F, T>(
queue_size: usize,
setup_func: F,
) -> (actor::Addr, thread::JoinHandle<()>, T)
where
F: FnOnce(&mut dyn Dispatcher) -> T,
F: Send + 'static,
T: Send + 'static + Sized,
{
let builder = Builder::new(queue_size);
let mut accessor = builder.to_accessor();
let handle = thread::spawn(move || builder.build().process());
let out = accessor.execute_fn(setup_func, Duration::MAX).unwrap();
(accessor.dispatcher_addr().clone(), handle, out)
}
struct StdTimeInstantSource();
impl InstantSource for StdTimeInstantSource {
fn now(&self) -> Instant {
InternalInstant::Finite(time::Instant::now()).into_instant()
}
}
#[cfg(test)]
mod tests {
use crate::{actor::AddrKind, dispatcher::Dispatcher};
use super::*;
struct TestBehavior();
impl Behavior for TestBehavior {
fn process_message(&mut self, _context: &mut ProcessContext, msg: &Message) {
if let Message::Notification(notif) = msg {
if let Some(&float) = notif.data.downcast_ref::<f32>() {
assert!(float == 3.4);
} else if let Some(&int) = notif.data.downcast_ref::<i32>() {
assert!(int == -567);
}
}
}
}
#[test]
fn simple_reactive_register_unregister() {
let mut disp = crate::mpsc_dispatcher::Builder::new(10).build();
let behavior = Box::new(TestBehavior());
let addr = disp.register_reactive(behavior);
match addr.kind {
AddrKind::Reactive(reactive_addr) => {
assert!(disp
.unregister_reactive_by_id(reactive_addr.dst_id)
.is_some())
}
_ => panic!(),
}
}
#[test]
fn simple_send_message() {
let mut disp = crate::mpsc_dispatcher::Builder::new(10).build();
let instant_source = StdTimeInstantSource();
let mut timeout_scheduler = TimeoutScheduler::new();
let mut context = ProcessContext::new(&disp, 0, &instant_source, &mut timeout_scheduler);
let behavior = Box::new(TestBehavior());
let addr = disp.register_reactive(behavior);
let result = addr.receive_notification(3.4f32);
assert!(result.is_ok());
let result = addr.receive_notification(-567i32);
assert!(result.is_ok());
let (message_processed, stop) = disp.try_process_message(&mut context);
assert!(!stop);
assert!(message_processed);
let (message_processed, stop) = disp.try_process_message(&mut context);
assert!(!stop);
assert!(message_processed);
let (message_processed, stop) = disp.try_process_message(&mut context);
assert!(!stop);
assert!(!message_processed);
}
}