use std::future::Future;
use std::pin::Pin;
use log::warn;
use tokio::select;
use tokio::sync::mpsc::Receiver;
use tokio_util::task::TaskTracker;
use crate::{Actor, ActorRef};
use crate::control::Control;
pub(crate) struct ActorExecutor<T>
where T: Actor + Send {
instance: T,
inbox: Receiver<ActorSysMsg<T::SendMessage, T::CallMessage, T::ErrorType>>,
actor_ref: ActorRef<T>,
tasks: TaskTracker,
}
impl<T> ActorExecutor<T>
where
T: Actor + Send + Sync
{
pub(crate) fn new(instance: T, inbox: Receiver<ActorSysMsg<T::SendMessage, T::CallMessage, T::ErrorType>>, actor_ref: ActorRef<T>) -> Self {
ActorExecutor {
instance, inbox, actor_ref, tasks: TaskTracker::new()
}
}
pub(crate) async fn run(&mut self) {
use ActorSysMsg::*;
let r = self.instance.on_initialization(self.actor_ref.clone()).await;
match self.handle_control(r).await {
Ok(()) => {},
_err => {
return;
},
}
loop {
select! {
_ = self.actor_ref.terminate_token.cancelled() => { break; }
r = self.inbox.recv() => {
match r {
None => { break; }
Some(sys_msg) => {
match sys_msg {
Shutdown => {
let r = self.instance.on_shutdown().await;
match r {
Control::Ok | Control::Shutdown | Control::Terminate => {},
Control::SpawnFuture(f) => {
self.spawn_future(f);
}
}
break;
},
Send(msg) => {
let r = self.instance.handle_sends(msg).await;
self.handle_control(r).await; },
Call(msg, dest) => {
let (control, result) = self.instance.handle_calls(msg).await;
match dest.send(result) {
Ok(()) => {},
Err(_) => {
warn!("unable to send reply of call message to caller.");
}
}
self.handle_control(control).await; },
}
}
}
}
}
}
if ! self.actor_ref.terminate_token.is_cancelled() {
self.tasks.close();
if ! self.tasks.is_empty() {
self.tasks.wait().await;
}
}
}
async fn handle_control(&mut self, control: Control) -> crate::result::Result<()> {
match control {
Control::Ok => Ok(()),
Control::Terminate => Err(crate::result::Error::Terminated),
Control::Shutdown => {
self.actor_ref.shutdown().await
},
Control::SpawnFuture(f) => {
self.spawn_future(f);
Ok(())
}
}
}
fn spawn_future(&mut self, f: Pin<Box<dyn Future<Output=()> + Send>>) {
self.tasks.spawn(f);
}
}
pub(crate) enum ActorSysMsg<S, C, E>
where S: Send, C: Send, E: Send {
Shutdown,
Send(S),
Call(C, tokio::sync::oneshot::Sender<Result<C, E>>),
}
#[cfg(test)]
mod tests {
use crate::create_actor;
use crate::test_code::tests::SimpleCounter;
#[tokio::test]
async fn test_init_quit() {
let instance = SimpleCounter::new(true);
let (_actor, handle) = create_actor(instance).await.unwrap();
let r = handle.await;
assert!(r.is_ok());
}
}