use std::pin::Pin;
use anyhow::Result;
use arbiter_core::middleware::ArbiterMiddleware;
use futures_util::{Stream, StreamExt};
use tokio::task::JoinHandle;
use tracing::error;
use super::*;
pub type EventStream<E> = Pin<Box<dyn Stream<Item = E> + Send + Sync>>;
#[derive(Clone, Debug)]
pub enum MachineInstruction {
Start(Arc<ArbiterMiddleware>, Messager),
Process,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ControlFlow {
Halt,
Continue,
}
#[derive(Clone, Copy, Debug)]
pub enum State {
Uninitialized,
Starting,
Processing,
}
#[async_trait::async_trait]
pub trait Behavior<E: Send + 'static>:
Serialize + DeserializeOwned + Send + Sync + Debug + 'static
{
async fn startup(
&mut self,
client: Arc<ArbiterMiddleware>,
messager: Messager,
) -> Result<Option<EventStream<E>>>;
async fn process(&mut self, _event: E) -> Result<ControlFlow> {
Ok(ControlFlow::Halt)
}
}
pub trait CreateStateMachine {
fn create_state_machine(self) -> Box<dyn StateMachine>;
}
#[async_trait::async_trait]
pub trait StateMachine: Send + Sync + Debug + 'static {
async fn execute(&mut self, _instruction: MachineInstruction) -> Result<()>;
}
pub struct Engine<B, E>
where
B: Behavior<E>,
E: Send + 'static,
{
behavior: Option<B>,
state: State,
event_stream: Option<EventStream<E>>,
}
impl<B, E> Debug for Engine<B, E>
where
B: Behavior<E>,
E: DeserializeOwned + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Engine")
.field("behavior", &self.behavior)
.field("state", &self.state)
.finish()
}
}
impl<B, E> Engine<B, E>
where
B: Behavior<E> + Debug,
E: DeserializeOwned + Send + Sync + 'static,
{
pub fn new(behavior: B) -> Self {
Self {
behavior: Some(behavior),
state: State::Uninitialized,
event_stream: None,
}
}
}
#[async_trait::async_trait]
impl<B, E> StateMachine for Engine<B, E>
where
B: Behavior<E> + Debug + Serialize + DeserializeOwned,
E: DeserializeOwned + Serialize + Send + Sync + Debug + 'static,
{
async fn execute(&mut self, instruction: MachineInstruction) -> Result<()> {
let id: Option<String>;
match instruction {
MachineInstruction::Start(client, messager) => {
id = messager.id.clone();
let id_clone = id.clone();
self.state = State::Starting;
let mut behavior = self.behavior.take().unwrap();
let behavior_task: JoinHandle<Result<(Option<EventStream<E>>, B)>> =
tokio::spawn(async move {
let stream = match behavior.startup(client, messager).await {
Ok(stream) => stream,
Err(e) => {
error!(
"startup failed for behavior {:?}: \n reason: {:?}",
id_clone, e
);
panic!();
}
};
debug!("startup complete for behavior {:?}", id_clone);
Ok((stream, behavior))
});
let (stream, behavior) = behavior_task.await??;
match stream {
Some(stream) => {
self.event_stream = Some(stream);
self.behavior = Some(behavior);
match self.execute(MachineInstruction::Process).await {
Ok(_) => {}
Err(e) => {
error!("process failed for behavior {:?}: \n reason: {:?}", id, e);
}
}
Ok(())
}
None => {
self.behavior = Some(behavior);
Ok(())
}
}
}
MachineInstruction::Process => {
trace!("Behavior is starting up.");
let mut behavior = self.behavior.take().unwrap();
let mut stream = self.event_stream.take().unwrap();
let behavior_task: JoinHandle<Result<B>> = tokio::spawn(async move {
while let Some(event) = stream.next().await {
match behavior.process(event).await? {
ControlFlow::Halt => {
break;
}
ControlFlow::Continue => {}
}
}
Ok(behavior)
});
self.behavior = Some(behavior_task.await??);
Ok(())
}
}
}
}