use std::any::{Any, TypeId};
use std::collections::{HashMap};
use std::error::Error;
use std::panic;
use std::sync::Arc;
use crate::actor::ActorContext;
use crate::address::Addr;
use crate::message::Message;
pub enum ActorManageMessage {
Kill,
Restart
}
pub type BehaviorAction<S: Send + 'static> = Result<Option<Behavior<S>>, Box<dyn Error>>;
type HandlerFn<S: Send + 'static> = Arc<dyn Fn(Message, &mut S, &mut ActorContext) -> BehaviorAction<S> + Send + Sync>;
type UserDefinedAskHandlerFn<M: Any + Send, S: Send + 'static> = fn(M, &mut S, Addr, &mut ActorContext) -> BehaviorAction<S>;
type UserDefinedTellHandlerFn<M: Any + Send, S: Send + 'static> = fn(M, &mut S, &mut ActorContext) -> BehaviorAction<S>;
type PlainActorAction<S: Send + 'static> = fn(&mut S, &mut ActorContext) -> ();
type UserDefinedTellHandlerClosure<M: Any + Send, S: Send + 'static> = Box<dyn Fn(M, &mut S, &mut ActorContext) -> BehaviorAction<S> + Send + Sync>;
type UserDefinedAskHandlerClosure<M: Any + Send, S: Send + 'static> = Box<dyn Fn(M, &mut S, Addr, &mut ActorContext) -> BehaviorAction<S> + Send + Sync>;
pub struct BehaviorBuilder<S: Send + 'static> {
on_ask_handler: HashMap<TypeId, HandlerFn<S>>,
on_tell_handler: HashMap<TypeId, HandlerFn<S>>,
on_start: Option<PlainActorAction<S>>,
on_kill: Option<PlainActorAction<S>>,
on_error: Option<PlainActorAction<S>>,
on_restart: Option<PlainActorAction<S>>,
}
impl<S: Send + 'static> BehaviorBuilder<S> {
pub fn new() -> Self {
Self {
on_ask_handler: HashMap::new(),
on_tell_handler: HashMap::new(),
on_start: None,
on_kill: None,
on_error: None,
on_restart: None
}
}
pub fn on_ask<M: Any + Send>(mut self, h: UserDefinedAskHandlerFn<M, S>) -> Self {
let h_wrapper = move |msg: Message, state: &mut S, ctx: &mut ActorContext| -> BehaviorAction<S>{
if msg.instance_of::<M>() {
match &msg.sender {
Some(tx) => {
let sender = tx.clone();
let m = msg.downcast::<M>();
h(*m, state, sender, ctx)
},
None => {
println!("Sent message without a sender to on_ask, response not possible!");
Ok(None)
}
}
} else {
panic!("Invalid downcasting operation!")
}
};
if self.on_ask_handler.contains_key(&TypeId::of::<M>()) {
panic!("Ask handler for {} has already been defined on this behavior! Cannot define more than one ask handler per message type per actor!", std::any::type_name::<M>());
} else {
self.on_ask_handler.insert(TypeId::of::<M>(), Arc::new(h_wrapper));
self
}
}
pub fn on_tell<M: Any + Send>(mut self, h: UserDefinedTellHandlerFn<M, S>) -> Self {
let h_wrapper = move |msg: Message, state: &mut S, ctx: &mut ActorContext| -> BehaviorAction<S> {
if msg.instance_of::<M>() {
let m = msg.downcast::<M>();
h(*m, state, ctx)
} else {
panic!("Invalid downcasting operation!")
}
};
if self.on_tell_handler.contains_key(&TypeId::of::<M>()) {
panic!("Tell handler for {} has already been defined on this behavior! Cannot define more than one tell handler per message type per actor!", std::any::type_name::<M>());
} else {
self.on_tell_handler.insert(TypeId::of::<M>(), Arc::new(h_wrapper));
self
}
}
pub fn on_start(mut self, action: PlainActorAction<S>) -> Self {
if let Some(_) = self.on_start {
panic!("Cannot define more than one on_start methods for same actor!");
} else {
self.on_start = Some(action);
self
}
}
pub fn on_kill(mut self, action: PlainActorAction<S>) -> Self {
if let Some(_) = self.on_kill {
panic!("Cannot define more than one on_kill methods for same actor!");
} else {
self.on_kill = Some(action);
self
}
}
pub fn on_error(mut self, action: PlainActorAction<S>) -> Self {
if let Some(_) = self.on_error {
panic!("Cannot define more than one on_error methods for same actor!");
} else {
self.on_error = Some(action);
self
}
}
pub fn on_restart(mut self, action: PlainActorAction<S>) -> Self {
if let Some(_) = self.on_restart {
panic!("Cannot define more than one on_restart methods for same actor!");
} else {
self.on_restart = Some(action);
self
}
}
pub fn enable_state_checks(self) -> Self {
self.on_ask::<StateCheckMessage<S>>(|msg, state, reply_to, _ctx| -> BehaviorAction<S> {
match msg {
StateCheckMessage::Check(check_fn) => {
let res = check_fn(state);
reply_to.tell(StateCheckMessage::<S>::Result(res));
}
_ => {}
}
Behavior::keep()
})
}
pub fn on_tell_closure<M: Any + Send>(mut self, h: UserDefinedTellHandlerClosure<M, S>) -> Self {
let h_wrapper = move |msg: Message, state: &mut S, ctx: &mut ActorContext| -> BehaviorAction<S> {
if msg.instance_of::<M>() {
let m = msg.downcast::<M>();
h(*m, state, ctx)
} else {
panic!("Invalid downcasting operation!")
}
};
if self.on_tell_handler.contains_key(&TypeId::of::<M>()) {
panic!("Tell handler for {} has already been defined on this behavior! Cannot define more than one tell handler per message type per actor!", std::any::type_name::<M>());
} else {
self.on_tell_handler.insert(TypeId::of::<M>(), Arc::new(h_wrapper));
self
}
}
pub fn on_ask_closure<M: Any + Send>(mut self, h: UserDefinedAskHandlerClosure<M, S>) -> Self {
let h_wrapper = move |msg: Message, state: &mut S, ctx: &mut ActorContext| -> BehaviorAction<S>{
if msg.instance_of::<M>() {
match &msg.sender {
Some(tx) => {
let sender = tx.clone();
let m = msg.downcast::<M>();
h(*m, state, sender, ctx)
},
None => {
Ok(None)
}
}
} else {
panic!("Invalid downcasting operation!")
}
};
if self.on_ask_handler.contains_key(&TypeId::of::<M>()) {
panic!("Ask handler for {} has already been defined on this behavior! Cannot define more than one ask handler per message type per actor!", std::any::type_name::<M>());
} else {
self.on_ask_handler.insert(TypeId::of::<M>(), Arc::new(h_wrapper));
self
}
}
pub(crate) fn has_tell_handler(&self, type_id: TypeId) -> bool {
self.on_tell_handler.contains_key(&type_id)
}
pub(crate) fn has_ask_handler(&self, type_id: TypeId) -> bool {
self.on_ask_handler.contains_key(&type_id)
}
pub fn build(self) -> Behavior<S> {
let b = self.on_tell::<ActorManageMessage>(|msg, _state, ctx| -> BehaviorAction<S> {
match msg {
ActorManageMessage::Kill => {
ctx.kill()
},
ActorManageMessage::Restart => {
ctx.restart()
}
}
Behavior::keep()
});
Behavior {
on_ask_handler: b.on_ask_handler,
on_tell_handler: b.on_tell_handler,
on_start: b.on_start,
on_kill: b.on_kill,
on_error: b.on_error,
on_restart: b.on_restart
}
}
}
pub enum StateCheckMessage<S> {
Check(fn(&S) -> bool),
Result(bool)
}
#[derive(Clone)]
pub struct Behavior<S: Send + 'static> {
pub(crate) on_ask_handler: HashMap<TypeId, HandlerFn<S>>,
pub(crate) on_tell_handler: HashMap<TypeId, HandlerFn<S>>,
pub(crate) on_start: Option<PlainActorAction<S>>,
pub(crate) on_kill: Option<PlainActorAction<S>>,
pub(crate) on_error: Option<PlainActorAction<S>>,
pub(crate) on_restart: Option<PlainActorAction<S>>,
}
impl<S: Send> Behavior<S> {
pub fn keep() -> BehaviorAction<S> {
Ok(None)
}
pub fn change(new_behavior: Behavior<S>) -> BehaviorAction<S> {
Ok(Some(new_behavior))
}
}
impl<S: Send + 'static> Behavior<S> {
pub(crate) fn handle(&mut self, msg: Message, state: &mut S, ctx: &mut ActorContext) -> BehaviorAction<S> {
match &msg.sender {
Some(_) => {
match self.on_ask_handler.get(&msg.type_id()) {
Some(f) => {
f(msg, state, ctx)
},
None => {
Ok(None)
}
}
},
None => {
match self.on_tell_handler.get(&msg.type_id()) {
Some(f) => {
f(msg, state, ctx)
},
None => {
Ok(None)
}
}
}
}
}
}