use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::thread;
use actors::{ActorPath, ActorRef, Message, Props};
use actors::actor_cell::{ActorCell, SystemMessage};
use actors::cthulhu::Cthulhu;
use actors::future::{Future, FutureExtractor};
use actors::name_resolver::NameResolver;
use actors::props::ActorFactory;
use actors::root_actor::RootActor;
struct Relauncher {
actor_system: ActorSystem,
active: bool,
}
impl Relauncher {
fn new(actor_system: ActorSystem) -> Relauncher {
Relauncher {
actor_system: actor_system,
active: true,
}
}
fn cancel(mut self) {
self.active = false;
}
}
impl Drop for Relauncher {
fn drop(&mut self) {
if self.active {
self.actor_system.spawn_thread();
}
}
}
pub struct ActorSystem {
inner: Arc<InnerActorSystem>,
}
impl ActorSystem {
pub fn new(name: String) -> ActorSystem {
let actor_system = ActorSystem { inner: Arc::new(InnerActorSystem::new(name)) };
let cthulhu = Cthulhu::new(actor_system.clone());
let cthulhu = ActorRef::with_cthulhu(cthulhu);
info!("Created cthulhu");
*actor_system.inner.cthulhu.write().unwrap() = Some(cthulhu.clone());
let user_actor_path = ActorPath::new_local("/user".to_owned());
let user_actor_cell = ActorCell::new(Props::new(Arc::new(RootActor::new), ()),
actor_system.clone(),
cthulhu.clone(),
user_actor_path.clone());
let user_actor = ActorRef::with_cell(user_actor_cell, user_actor_path);
user_actor.receive_system_message(SystemMessage::Start);
info!("Created /user actor");
*actor_system.inner.user_actor.write().unwrap() = Some(user_actor);
let system_actor_path = ActorPath::new_local("/system".to_owned());
let system_actor_cell = ActorCell::new(Props::new(Arc::new(RootActor::new), ()),
actor_system.clone(),
cthulhu.clone(),
system_actor_path.clone());
let system_actor = ActorRef::with_cell(system_actor_cell, system_actor_path);
system_actor.receive_system_message(SystemMessage::Start);
info!("Created /system actor");
*actor_system.inner.system_actor.write().unwrap() = Some(system_actor);
actor_system.spawn_threads(1);
info!("Launched the first thread");
let name_resolver = actor_system.system_actor_of(Props::new(Arc::new(NameResolver::new), ()), "name_resolver".to_owned());
info!("Created the /system/name_resolver actor");
*actor_system.inner.name_resolver.write().unwrap() = Some(name_resolver);
actor_system
}
pub fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
self.inner.actor_of(props, name)
}
pub fn system_actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
self.inner.system_actor_of(props, name)
}
pub fn shutdown(&self) {
self.inner.shutdown();
}
pub fn enqueue_actor(&self, actor_ref: ActorRef) {
self.inner.enqueue_actor(actor_ref);
}
pub fn spawn_thread(&self) {
let actors_queue = self.inner.actors_queue_receiver.clone();
let rx = self.inner.consumer_threads_receiver.clone();
let actor_system = self.clone();
let _ = thread::spawn(move || {
let relauncher = Relauncher::new(actor_system.clone());
loop {
match rx.lock().unwrap().try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => {
relauncher.cancel();
break;
}
Err(TryRecvError::Empty) => {}
};
let actor_ref = {
let lock = actors_queue.lock().unwrap();
lock.try_recv()
};
match actor_ref {
Ok(actor_ref) => actor_ref.handle(),
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => {
relauncher.cancel();
actor_system.shutdown();
panic!("The actors queue failed, something is very wrong");
}
}
}
});
*self.inner.n_threads.lock().unwrap() += 1;
}
pub fn terminate_thread(&self) {
self.inner.terminate_thread();
}
pub fn spawn_threads(&self, n: u32) {
for _ in 0..n {
self.spawn_thread();
}
}
pub fn terminate_threads(&self, n: u32) {
self.inner.terminate_threads(n);
}
pub fn name_resolver(&self) -> ActorRef {
match self.inner.name_resolver.read().unwrap().as_ref() {
None => panic!("The name resolver is not initialized."),
Some(resolver) => resolver.clone(),
}
}
pub fn tell<M: Message>(&self, to: ActorRef, message: M) {
match self.inner.user_actor.read().unwrap().as_ref() {
Some(user_actor) => user_actor.tell_to(to, message),
None => unreachable!(),
}
}
pub fn ask<M: Message>(&self, to: ActorRef, message: M, name: String) -> ActorRef {
let future = self.actor_of(Props::new(Arc::new(Future::new), ()), name);
future.tell_to(to, message);
future
}
pub fn extract_result<M: Message>(&self, future: ActorRef) -> M {
let (tx, rx) = channel();
let _extractor = self.actor_of(Props::new(Arc::new(FutureExtractor::new), (future, Arc::new(Mutex::new(tx)))), "extractor".to_owned());
rx.recv().unwrap()
}
}
impl Clone for ActorSystem {
fn clone(&self) -> ActorSystem {
ActorSystem { inner: self.inner.clone() }
}
}
struct InnerActorSystem {
_name: String,
consumer_threads_sender: Mutex<Sender<()>>,
consumer_threads_receiver: Arc<Mutex<Receiver<()>>>,
n_threads: Mutex<u32>,
actors_queue_sender: Mutex<Sender<ActorRef>>,
actors_queue_receiver: Arc<Mutex<Receiver<ActorRef>>>,
cthulhu: RwLock<Option<ActorRef >>,
user_actor: RwLock<Option<ActorRef>>,
system_actor: RwLock<Option<ActorRef>>,
name_resolver: RwLock<Option<ActorRef>>,
}
impl InnerActorSystem {
fn new(name: String) -> InnerActorSystem {
let (tx_queue, rx_queue) = channel();
let (tx_thread, rx_thread) = channel();
InnerActorSystem {
_name: name,
consumer_threads_sender: Mutex::new(tx_thread),
consumer_threads_receiver: Arc::new(Mutex::new(rx_thread)),
n_threads: Mutex::new(0u32),
actors_queue_sender: Mutex::new(tx_queue),
actors_queue_receiver: Arc::new(Mutex::new(rx_queue)),
cthulhu: RwLock::new(None),
user_actor: RwLock::new(None),
system_actor: RwLock::new(None),
name_resolver: RwLock::new(None),
}
}
fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
match self.user_actor.read().unwrap().clone() {
Some(user_actor) => {
let (tx, rx) = channel::<Result<ActorRef, &'static str>>();
info!("Created the channel to get an ActorRef from a root actor");
self.cthulhu.read().unwrap().as_ref().unwrap().tell_to(user_actor, (props, name, Arc::new(Mutex::new(tx))));
rx.recv().unwrap().unwrap()
},
None => panic!("The user actor is not initialised"),
}
}
fn system_actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
match self.system_actor.read().unwrap().clone() {
Some(system_actor) => {
let (tx, rx) = channel::<Result<ActorRef, &'static str>>();
info!("Created the channel to get an ActorRef from a root actor");
self.cthulhu.read().unwrap().as_ref().unwrap().tell_to(system_actor, (props, name, Arc::new(Mutex::new(tx))));
rx.recv().unwrap().unwrap()
},
None => panic!("The system actor is not initialised"),
}
}
fn shutdown(&self) {
let n = {*self.n_threads.lock().unwrap()};
self.terminate_threads(n);
*self.user_actor.write().unwrap() = None;
*self.system_actor.write().unwrap() = None;
*self.cthulhu.write().unwrap() = None;
}
fn enqueue_actor(&self, actor_ref: ActorRef) {
match self.actors_queue_sender.lock().unwrap().send(actor_ref) {
Ok(_) => return,
Err(_) => {
self.shutdown();
panic!("The communication channel for messages is disconnected, this is bad!");
}
}
}
fn terminate_thread(&self) {
let _ = self.consumer_threads_sender.lock().unwrap().send(());
*self.n_threads.lock().unwrap() -= 1;
}
fn terminate_threads(&self, n: u32) {
for _ in 0..n {
self.terminate_thread();
}
}
}
impl Drop for InnerActorSystem {
fn drop(&mut self) { }
}