use std::any::Any;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::Sender;
use actors::{Actor, ActorCell, ActorContext, ActorRef, Message};
pub struct Complete {
complete: Box<Any + Send>,
}
impl Complete {
pub fn new(complete: Box<Any + Send>) -> Complete {
Complete {
complete: complete,
}
}
}
#[derive(Clone)]
pub enum Computation {
Forward(ActorRef, Arc<Fn(Box<Any + Send>, ActorCell, ActorRef) -> FutureState + Send + Sync>),
Computation(Arc<Fn(Box<Any + Send>, ActorCell) -> FutureState + Send + Sync>),
}
pub enum FutureState {
Uncompleted,
Computing(Box<Any + Send>),
Extracted,
}
pub struct Future {
state: Mutex<Option<FutureState>>,
scheduled_calculations: Mutex<VecDeque<Computation>>,
}
impl Future {
pub fn new(_dummy: ()) -> Future {
Future {
state: Mutex::new(Some(FutureState::Uncompleted)),
scheduled_calculations: Mutex::new(VecDeque::new()),
}
}
fn handle_computation(&self, computation: Computation, context: ActorCell) {
info!("{} handling a computation", context.actor_ref().path().logical_path());
let mut state = self.state.lock().unwrap();
let s = state.take().unwrap();
match s {
FutureState::Computing(value) => {
match computation {
Computation::Forward(to, func) => {
info!("{} forwarding to {}", context.actor_ref().path().logical_path(), to.path().logical_path());
*state = Some((*func)(value, context.clone(), to));
},
Computation::Computation(func) => {
*state = Some((*func)(value, context.clone()));
}
}
match *state {
Some(FutureState::Computing(_)) => {},
Some(FutureState::Extracted) => context.kill_me(),
Some(FutureState::Uncompleted) => panic!("A future closure returned Uncompleted, this should not happen"),
None => unreachable!(),
}
}
FutureState::Uncompleted => {
*state = Some(s);
info!("{} is keeping the computation for later", context.actor_ref().path().logical_path());
self.scheduled_calculations.lock().unwrap().push_back(computation);
},
FutureState::Extracted => {
*state = Some(s);
panic!("A closure was called on an Extracted Future.");
},
}
}
}
impl Actor for Future {
fn receive(&self, message: Box<Any>, context: ActorCell) {
match Box::<Any>::downcast::<Computation>(message) {
Ok(computation) => {
self.handle_computation(*computation, context);
},
Err(message) => {
if let Ok(msg) = Box::<Any>::downcast::<Complete>(message) {
{
let mut state = self.state.lock().unwrap();
let s = state.take().unwrap();
match s {
FutureState::Uncompleted => {
*state = Some(FutureState::Computing((*msg).complete));
info!("{} has been completed", context.actor_ref().path().logical_path());
},
_ => {
panic!("Tried to complete a {} twice", context.actor_ref().path().logical_path());
},
}
}
let mut scheduled_calculations = self.scheduled_calculations.lock().unwrap();
while let Some(func) = scheduled_calculations.pop_front() {
self.handle_computation(func, context.clone());
}
} else {
info!("{} received a message of the wrong format from {}", context.actor_ref().path().logical_path(),
context.sender().path().logical_path());
}
}
}
}
}
pub struct FutureExtractor<T: Message> {
future: ActorRef,
channel: Arc<Mutex<Sender<T>>>,
}
impl<T: Message> FutureExtractor<T> {
pub fn new(args: (ActorRef, Arc<Mutex<Sender<T>>>)) -> FutureExtractor<T> {
FutureExtractor {
future: args.0,
channel: args.1,
}
}
}
impl<T: Message> Actor for FutureExtractor<T> {
fn pre_start(&self, context: ActorCell) {
context.forward_result::<T>(self.future.clone(), context.actor_ref());
}
fn receive(&self, message: Box<Any>, context: ActorCell) {
if let Ok(message) = Box::<Any>::downcast::<T>(message) {
info!("The extractor {} received the type it wants to extract", context.actor_ref().path().logical_path());
let _res = self.channel.lock().unwrap().send(*message);
context.kill_me();
}
}
}