use std::any::Any;
use std::collections::{HashMap, VecDeque};
use std::mem;
use std::sync::{Arc, Mutex, RwLock, Weak};
use actors::{Actor, ActorPath, ActorRef, ActorSystem, Message, Props};
use actors::future::{Computation, Complete, Future, FutureState};
use actors::name_resolver::ResolveRequest;
use actors::props::ActorFactory;
pub type FailureHandler = Arc<Fn(Failure, ActorCell) + Send + Sync>;
enum Ref<T: ?Sized> {
StrongRef(Arc<T>),
WeakRef(Weak<T>),
}
macro_rules! unwrap_inner {
($r:expr, $b:block) => {
match $r {
Ref::StrongRef(ref inner) => inner.clone(),
Ref::WeakRef(ref inner) => match inner.upgrade() {
Some(inner) => inner.clone(),
None => {
$b
},
}
}
}
}
pub struct ActorCell {
inner_cell: Ref<InnerActorCell>,
}
impl Clone for ActorCell {
fn clone(&self) -> ActorCell {
ActorCell {
inner_cell: Ref::WeakRef(match self.inner_cell {
Ref::StrongRef(ref inner) => Arc::downgrade(&inner),
Ref::WeakRef(ref inner) => inner.clone(),
}),
}
}
}
impl ActorCell {
pub fn new( props: Arc<ActorFactory>,
system: ActorSystem,
father: ActorRef,
path: Arc<ActorPath>)
-> ActorCell {
ActorCell {
inner_cell: Ref::StrongRef(Arc::new(InnerActorCell::new(props,
system,
father,
path))),
}
}
pub fn receive_message(&self, message: InnerMessage, sender: ActorRef) {
let inner = unwrap_inner!(self.inner_cell, {
warn!("A message was send to a ref to a stopped actor");
return;
});
inner.receive_message(message, sender);
inner.system.enqueue_actor(self.actor_ref());
}
pub fn receive_system_message(&self, system_message: SystemMessage) {
let inner = unwrap_inner!(self.inner_cell, {
warn!("A message was send to a ref to a stopped actor");
return;
});
inner.receive_system_message(system_message);
inner.system.enqueue_actor(self.actor_ref());
}
pub fn handle_envelope(&self) {
let inner = unwrap_inner!(self.inner_cell, {
warn!("A message was send to a ref to a stopped actor");
return;
});
inner.handle_envelope(self.clone());
}
}
pub trait ActorContext {
fn actor_ref(&self) -> ActorRef;
fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> Result<ActorRef, &'static str>;
fn tell<MessageTo: Message>(&self, to: ActorRef, message: MessageTo);
fn ask<MessageTo: Message>(&self, to: ActorRef, message: MessageTo, future_name: String) -> ActorRef;
fn complete<MessageTo: Message>(&self, to: ActorRef, complete: MessageTo);
fn forward_result<T: Message>(&self, future: ActorRef, to: ActorRef);
fn forward_result_to_future<T: Message>(&self, future: ActorRef, to: ActorRef);
fn do_computation<T: Message, F: Fn(Box<Any + Send>, ActorCell) -> T + Send + Sync + 'static>
(&self, future: ActorRef, closure: F);
fn stop(&self, actor_ref: ActorRef);
fn kill_me(&self);
fn sender(&self) -> ActorRef;
fn father(&self) -> ActorRef;
fn children(&self) -> HashMap<Arc<ActorPath>, ActorRef>;
fn monitoring(&self) -> HashMap<Arc<ActorPath>, (ActorRef, FailureHandler)>;
fn monitored_by(&self) -> Vec<ActorRef>;
fn monitor(&self, actor: ActorRef, handler: FailureHandler);
fn path(&self) -> Arc<ActorPath>;
fn identify_actor(&self, logical_path: String, request_name: String) -> ActorRef;
fn tell_control(&self, actor: ActorRef, message: ControlMessage);
fn fail(&self, reason: &'static str);
}
impl ActorContext for ActorCell {
fn actor_ref(&self) -> ActorRef {
ActorRef::with_cell(self.clone(), self.path())
}
fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> Result<ActorRef, &'static str> {
let inner = unwrap_inner!(self.inner_cell, {
panic!("Tried to create an actor from the context of a no longer existing actor");
});
if name.find("/") != None {
return Err("Used a '/' in the name of an actor, this is not allowed");
}
let path = self.path().child(name);
info!("creating actor {}", path.logical_path());
let inner_cell = InnerActorCell::new(props,
inner.system.clone(),
self.actor_ref(),
path.clone());
let actor_cell = ActorCell { inner_cell: Ref::StrongRef(Arc::new(inner_cell)) };
let internal_ref = ActorRef::with_cell(actor_cell, path.clone());
let external_ref = internal_ref.clone();
inner.children.lock().unwrap().insert(path.clone(), internal_ref);
inner.monitoring.lock().unwrap().insert(path.clone(), (external_ref.clone(), Arc::new(InnerActorCell::restart_child)));
external_ref.receive_system_message(SystemMessage::Start);
if *(path.logical_path()) != "/system/name_resolver" {
self.tell(inner.system.name_resolver(), ResolveRequest::Add(external_ref.clone()));
}
Ok(external_ref)
}
fn tell<MessageTo: Message>(&self, to: ActorRef, message: MessageTo) {
let path = to.path();
match *path {
ActorPath::Local(_) => to.receive(InnerMessage::Message(Box::new(message)), self.actor_ref()),
ActorPath::Distant(ref path) => {
info!("Sent a message of size {} to distant actor {}:{}", mem::size_of::<MessageTo>(),
path.distant_logical_path(), path.addr_port());
},
}
}
fn ask<MessageTo: Message>(&self, to: ActorRef, message: MessageTo, name: String) -> ActorRef {
let future = self.actor_of(Props::new(Arc::new(Future::new), ()), name).unwrap();
future.tell_to(to, message);
future
}
fn complete<MessageTo: Message>(&self, future: ActorRef, complete: MessageTo) {
let path = future.path();
match *path {
ActorPath::Local(_) => future.receive(InnerMessage::Message(Box::new(Complete::new(Box::new(complete)))), self.actor_ref()),
ActorPath::Distant(ref path) => {
info!("Sent a message of size {} to distant future {}:{}", mem::size_of::<MessageTo>(),
path.distant_logical_path(), path.addr_port());
},
}
}
fn forward_result<T: Message>(&self, future: ActorRef, actor: ActorRef) {
self.tell(future, Computation::Forward(actor, Arc::new(move |value, context, to| {
let value = Box::<Any + Send>::downcast::<T>(value).expect("Message of the wrong type");
context.tell(to, *value);
FutureState::Extracted
})));
}
fn forward_result_to_future<T: Message>(&self, future: ActorRef, actor: ActorRef) {
self.tell(future, Computation::Forward(actor, Arc::new(move |value, context, to| {
let value = Box::<Any + Send>::downcast::<T>(value).expect("Message of the wrong type");
context.complete(to, *value);
FutureState::Extracted
})));
}
fn do_computation<T: Message, F: Fn(Box<Any + Send>, ActorCell) -> T + Send + Sync + 'static>
(&self, future: ActorRef, closure: F) {
self.tell(future, Computation::Computation(Arc::new(move |value, context| {
let v = closure(value, context);
FutureState::Computing(Box::new(v))
})));
}
fn sender(&self) -> ActorRef {
let inner = unwrap_inner!(self.inner_cell, {
panic!("Tried to get a sender from the context of a no longer existing actor");
});
let current_sender = inner.current_sender.lock().unwrap();
current_sender.as_ref().unwrap().clone()
}
fn tell_control(&self, actor: ActorRef, message: ControlMessage) {
let path = actor.path();
match *path {
ActorPath::Local(_) => actor.receive(InnerMessage::Control(message), self.actor_ref()),
ActorPath::Distant(_) => {},
}
}
fn stop(&self, actor_ref: ActorRef) {
self.tell_control(actor_ref, ControlMessage::PoisonPill);
}
fn kill_me(&self) {
self.tell_control(self.father(), ControlMessage::KillMe(self.actor_ref()));
}
fn father(&self) -> ActorRef {
let inner = unwrap_inner!(self.inner_cell, {
panic!("Tried to get the father from the context of a no longer existing actor");
});
inner.father.clone()
}
fn children(&self) -> HashMap<Arc<ActorPath>, ActorRef> {
let inner = unwrap_inner!(self.inner_cell, {
panic!("Tried to get the children from the context of a no longer existing actor");
});
let children = inner.children.lock().unwrap();
children.clone()
}
fn monitoring(&self) -> HashMap<Arc<ActorPath>, (ActorRef, FailureHandler)> {
let inner = unwrap_inner!(self.inner_cell, {
panic!("Tried to get the monitored actors from the context of a no longer existing \
actor");
});
let monitoring = inner.monitoring.lock().unwrap();
monitoring.clone()
}
fn monitored_by(&self) -> Vec<ActorRef> {
let inner = unwrap_inner!(self.inner_cell, {
panic!("Tried to get the monitoring actors from the context of a no longer existing actor");
});
let monitored_by = inner.monitored_by.lock().unwrap();
monitored_by.clone()
}
fn monitor(&self, actor: ActorRef, handler: FailureHandler) {
let inner = unwrap_inner!(self.inner_cell, {
panic!("tried to have a no longer existing actor monitor an other actor?")
});
self.tell_control(actor.clone(), ControlMessage::RegisterMonitoring);
let mut monitoring = inner.monitoring.lock().unwrap();
monitoring.insert(actor.path(), (actor, handler));
}
fn path(&self) -> Arc<ActorPath> {
let inner = unwrap_inner!(self.inner_cell, {
panic!("Tried to get the path from the context of a no longer existing actor");
});
inner.path.clone()
}
fn identify_actor(&self, name: String, request_name: String) -> ActorRef {
let inner = unwrap_inner!(self.inner_cell, {
panic!("Tried to get the actor system of a no longer existing actor while resolving \
a path. This should *never* happen");
});
self.ask(inner.system.name_resolver(), ResolveRequest::Get(name), request_name)
}
fn fail(&self, reason: &'static str) {
let inner = unwrap_inner!(self.inner_cell, {
panic!("Tried to get the state of a no longer existing actor while resolving \
a path. This should *never* happen");
});
{*inner.actor_state.write().unwrap() = ActorState::Failed;}
for actor in self.monitored_by().iter() {
self.tell_control(actor.clone(), ControlMessage::Failure(Failure::new(self.actor_ref(), reason)));
}
}
}
#[derive(PartialEq, Copy, Clone)]
enum ActorState {
Failed,
Running,
Unstarted,
}
struct Failsafe {
context: ActorCell,
active: bool,
}
impl Failsafe {
fn new(context: ActorCell) -> Failsafe {
Failsafe {
context: context,
active: true,
}
}
fn cancel(mut self) {
self.active = false;
}
}
impl Drop for Failsafe {
fn drop(&mut self) {
if self.active {
self.context.fail("panic");
}
}
}
#[derive(Clone, Copy)]
pub enum SystemMessage {
Restart,
Start,
}
struct Envelope {
message: InnerMessage,
sender: ActorRef,
}
pub enum InnerMessage {
Message(Box<Any + Send>),
Control(ControlMessage),
}
#[derive(Clone)]
pub enum ControlMessage {
PoisonPill,
Failure(Failure),
KillMe(ActorRef),
RegisterMonitoring,
}
#[derive(Clone)]
pub struct Failure {
source: ActorRef,
reason: &'static str,
}
impl Failure {
fn new(source: ActorRef, reason: &'static str) -> Failure {
Failure {
source: source,
reason: reason,
}
}
pub fn actor(&self) -> ActorRef {self.source.clone()}
pub fn reason(&self) -> &'static str {self.reason}
}
struct InnerActorCell {
mailbox: Mutex<VecDeque<Envelope>>,
system_mailbox: Mutex<VecDeque<SystemMessage>>,
props: Arc<ActorFactory>,
system: ActorSystem,
path: Arc<ActorPath>,
current_sender: Mutex<Option<ActorRef>>,
busy: Mutex<()>,
father: ActorRef,
children: Mutex<HashMap<Arc<ActorPath>, ActorRef>>,
monitoring: Mutex<HashMap<Arc<ActorPath>, (ActorRef, FailureHandler)>>,
actor_state: Arc<RwLock<ActorState>>,
monitored_by: Mutex<Vec<ActorRef>>,
actor: RwLock<Arc<Actor>>,
}
impl InnerActorCell {
fn new(props: Arc<ActorFactory>,
system: ActorSystem,
father: ActorRef,
path: Arc<ActorPath>)
-> InnerActorCell {
InnerActorCell {
actor: RwLock::new(props.create()),
mailbox: Mutex::new(VecDeque::new()),
system_mailbox: Mutex::new(VecDeque::new()),
props: props,
system: system,
path: path,
current_sender: Mutex::new(None),
busy: Mutex::new(()),
father: father.clone(),
children: Mutex::new(HashMap::new()),
monitoring: Mutex::new(HashMap::new()),
actor_state: Arc::new(RwLock::new(ActorState::Unstarted)),
monitored_by: Mutex::new(vec![father.clone()]),
}
}
fn receive_envelope(&self, envelope: Envelope) {
self.mailbox.lock().unwrap().push_back(envelope);
}
fn receive_message(&self, message: InnerMessage, sender: ActorRef) {
self.receive_envelope(Envelope {
message: message,
sender: sender,
});
}
fn receive_system_message(&self, system_message: SystemMessage) {
self.system_mailbox.lock().unwrap().push_back(system_message);
}
fn handle_envelope(&self, context: ActorCell) {
let _lock = self.busy.lock();
let failsafe = Failsafe::new(context.clone());
if let Some(message) = self.system_mailbox.lock().unwrap().pop_front() {
match message {
SystemMessage::Restart => self.restart(context),
SystemMessage::Start => self.start(context),
}
failsafe.cancel();
return;
}
let state = {self.actor_state.read().unwrap().clone()};
if state == ActorState::Running {
let envelope = match self.mailbox.lock().unwrap().pop_front() {
Some(envelope) => envelope,
None => {
failsafe.cancel();
return;
}
};
{
let mut current_sender = self.current_sender.lock().unwrap();
*current_sender = Some(envelope.sender.clone());
};
{
let actor = self.actor.read().unwrap();
match envelope.message {
InnerMessage::Message(message) => {
actor.receive(message, context);
},
InnerMessage::Control(message) => {
match message {
ControlMessage::PoisonPill => context.kill_me(),
ControlMessage::Failure(failure) => {
let monitoring = self.monitoring.lock().unwrap();
let handler = monitoring.get(&failure.actor().path())
.expect("Received a failure notification from an unknown actor");
(*handler.1)(failure, context);
},
ControlMessage::KillMe(actor_ref) => self.kill(actor_ref, context),
ControlMessage::RegisterMonitoring => {
let mut mon = self.monitored_by.lock().unwrap();
mon.push(context.sender());
},
}
}
}
}
} else {
self.system.enqueue_actor(context.actor_ref());
}
failsafe.cancel();
}
fn kill(&self, actor: ActorRef, context: ActorCell) {
self.children.lock().unwrap().remove(&actor.path()).expect(&format!("actor {} was asked to kill {} and cannot do that",
self.path.logical_path(),
actor.path().logical_path()));
context.tell(self.system.name_resolver(), ResolveRequest::Remove(actor.path()));
}
fn start(&self, context: ActorCell) {
self.actor.write().unwrap().pre_start(context);
*self.actor_state.write().unwrap() = ActorState::Running;
}
fn restart(&self, context: ActorCell) {
let mut actor = self.actor.write().unwrap();
actor.pre_restart(context.clone());
*actor = self.props.create();
actor.post_restart(context);
*self.actor_state.write().unwrap() = ActorState::Running;
}
fn restart_child(failure: Failure, _context: ActorCell) {
failure.actor().receive_system_message(SystemMessage::Restart);
}
}
impl Drop for InnerActorCell {
fn drop(&mut self) {
let actor = self.actor.write().unwrap();
info!("Actor {} is dropped", *self.path.logical_path());
actor.post_stop();
}
}