use std::sync::mpsc;
use std::thread;
use super::controller::Controller;
use super::message::{Message, Response};
use crate::error::RuntimeError;
pub trait Actor<M>: Send + Sync
where
M: PartialEq + Eq + Send + Sync,
{
fn step(&mut self) -> Result<(), RuntimeError>;
fn stop(self) -> Result<(), RuntimeError>;
fn pause(&mut self) -> Result<(), RuntimeError>;
fn resume(&mut self) -> Result<(), RuntimeError>;
fn hibernate(&mut self) -> Result<(), RuntimeError>;
fn wake_up(&mut self) -> Result<(), RuntimeError>;
fn request(&mut self, message: M) -> Result<Response, RuntimeError>;
}
pub struct ActorLoop<M, T>
where
M: PartialEq + Eq + Send + Sync,
T: Actor<M> + Send + Sync,
{
tx: mpsc::Sender<Response>,
rx: mpsc::Receiver<Message<M>>,
inner: T,
}
impl<M, T> ActorLoop<M, T>
where
M: 'static + PartialEq + Eq + Send + Sync,
T: 'static + Actor<M> + Send + Sync,
{
pub fn spawn(inner: T) -> Controller<M> {
let (tx_c, rx) = mpsc::channel();
let (tx, rx_c) = mpsc::channel();
let actor = Self { tx, rx, inner };
let t = thread::spawn(move || actor.loop_actor());
Controller::new(tx_c, rx_c, t)
}
#[inline]
fn loop_actor(mut self) -> Result<(), RuntimeError> {
'main: loop {
for m in self.rx.try_iter() {
match m {
Message::Stop => {
self.send_ok()?;
self.inner.stop()?;
break 'main Ok(());
}
Message::Pause => {
self.send_ok()?;
self.inner.pause()?;
self.assert_recv_then_send_ok(Message::Resume)?;
self.inner.resume()?;
}
Message::Hibernate => {
self.send_ok()?;
self.inner.hibernate()?;
self.assert_recv_then_send_ok(Message::WakeUp)?;
self.inner.wake_up()?;
}
Message::Resume | Message::WakeUp => break 'main RuntimeError::unexpected(),
Message::Custom(m) => {
let response = self.inner.request(m)?;
self.send(response)?;
}
}
}
match self.inner.step() {
Ok(()) => continue 'main,
Err(e) => {
dbg!(&e);
break 'main Err(e);
}
}
}
}
#[inline]
fn assert_recv_then_send_ok(&self, message: Message<M>) -> Result<(), RuntimeError> {
match self.rx.recv()? == message {
true => self.send_ok(),
false => RuntimeError::unexpected(),
}
}
#[inline]
fn send(&self, message: Response) -> Result<(), RuntimeError> {
Ok(self.tx.send(message)?)
}
#[inline]
fn send_ok(&self) -> Result<(), RuntimeError> {
self.send(Box::new(()))
}
}