arbiter_engine/
machine.rs

1//! The [`StateMachine`] trait, [`Behavior`] trait, and the [`Engine`] that runs
2//! [`Behavior`]s.
3
4use std::pin::Pin;
5
6use anyhow::Result;
7use arbiter_core::middleware::ArbiterMiddleware;
8use futures_util::{Stream, StreamExt};
9use tokio::task::JoinHandle;
10use tracing::error;
11
12use super::*;
13
14/// A type alias for a pinned, boxed stream of events.
15///
16/// This stream is capable of handling items of any type that implements the
17/// `Stream` trait, and it is both sendable across threads and synchronizable
18/// between threads.
19///
20/// # Type Parameters
21///
22/// * `E`: The type of the items in the stream.
23pub type EventStream<E> = Pin<Box<dyn Stream<Item = E> + Send + Sync>>;
24
25/// The instructions that can be sent to a [`StateMachine`].
26#[derive(Clone, Debug)]
27pub enum MachineInstruction {
28    /// Used to make a [`StateMachine`] start up.
29    Start(Arc<ArbiterMiddleware>, Messager),
30
31    /// Used to make a [`StateMachine`] process events.
32    /// This will offload the process into a task that can be halted by sending
33    /// a [`ControlFlow::Halt`] message from the [`Messager`]. For our purposes,
34    /// the [`crate::world::World`] will handle this.
35    Process,
36}
37
38/// The message that is used in a [`StateMachine`] to continue or halt its
39/// processing.
40#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
41pub enum ControlFlow {
42    /// Used to halt the processing of a [`StateMachine`].
43    Halt,
44
45    /// Used to continue on the processing of a [`StateMachine`].
46    Continue,
47}
48
49/// The state used by any entity implementing [`StateMachine`].
50#[derive(Clone, Copy, Debug)]
51pub enum State {
52    /// The entity is not yet running any process.
53    /// This is the state adopted by the entity when it is first created.
54    Uninitialized,
55
56    /// The entity is starting up.
57    /// This is where the entity can engage in its specific start up activities
58    /// that it can do given the current state of the world.
59    /// These are usually quick one-shot activities that are not repeated.
60    Starting,
61
62    /// The entity is processing.
63    /// This is where the entity can engage in its specific processing
64    /// of events that can lead to actions being taken.
65    Processing,
66}
67
68// NOTE: `async_trait::async_trait` is used throughout to make the trait object
69// safe even though rust >=1.75 has async trait stabilized
70
71/// The [`Behavior`] trait is the lowest level functionality that will be used
72/// by a [`StateMachine`]. This constitutes what each state transition will do.
73#[async_trait::async_trait]
74pub trait Behavior<E: Send + 'static>:
75    Serialize + DeserializeOwned + Send + Sync + Debug + 'static
76{
77    /// Used to start the agent.
78    /// This is where the agent can engage in its specific start up activities
79    /// that it can do given the current state of the world.
80    async fn startup(
81        &mut self,
82        client: Arc<ArbiterMiddleware>,
83        messager: Messager,
84    ) -> Result<Option<EventStream<E>>>;
85
86    /// Used to process events.
87    /// This is where the agent can engage in its specific processing
88    /// of events that can lead to actions being taken.
89    async fn process(&mut self, _event: E) -> Result<ControlFlow> {
90        Ok(ControlFlow::Halt)
91    }
92}
93/// A trait for creating a state machine.
94///
95/// This trait is intended to be implemented by types that can be converted into
96/// a state machine. A state machine, in this context, is an entity capable of
97/// executing a set of instructions or operations based on its current state and
98/// inputs it receives.
99///
100/// Implementers of this trait should provide the logic to initialize and return
101/// a new instance of a state machine, encapsulated within a `Box<dyn
102/// StateMachine>`. This allows for dynamic dispatch to the state machine's
103/// methods, enabling polymorphism where different types of state machines can
104/// be used interchangeably at runtime.
105///
106/// # Returns
107///
108/// - `Box<dyn StateMachine>`: A boxed state machine object that can be
109///   dynamically dispatched.
110pub trait CreateStateMachine {
111    /// Creates and returns a new state machine instance.
112    ///
113    /// This method consumes the implementer and returns a new instance of a
114    /// state machine encapsulated within a `Box<dyn StateMachine>`. The
115    /// specific type of the state machine returned can vary, allowing for
116    /// flexibility and reuse of the state machine logic across
117    /// different contexts.
118    fn create_state_machine(self) -> Box<dyn StateMachine>;
119}
120#[async_trait::async_trait]
121/// A trait defining the capabilities of a state machine within the system.
122///
123/// This trait is designed to be implemented by entities that can execute
124/// instructions based on their current state and inputs they receive. The
125/// execution of these instructions is asynchronous, allowing for non-blocking
126/// operations within the state machine's logic.
127///
128/// Implementers of this trait must be able to be sent across threads and shared
129/// among threads safely, hence the `Send`, `Sync`, and `'static` bounds. They
130/// should also support debugging through the `Debug` trait.
131pub trait StateMachine: Send + Sync + Debug + 'static {
132    /// Executes a given instruction asynchronously.
133    ///
134    /// This method takes a mutable reference to self, allowing the state
135    /// machine to modify its state in response to the instruction. The
136    /// instruction to be executed is passed as an argument, encapsulating the
137    /// action to be performed by the state machine.
138    ///
139    /// # Parameters
140    ///
141    /// - `instruction`: The instruction that the state machine is to execute.
142    ///
143    /// # Returns
144    ///
145    /// This method does not return a value, but it may result in state changes
146    /// within the implementing type or the generation of further instructions
147    /// or events.
148    async fn execute(&mut self, _instruction: MachineInstruction) -> Result<()>;
149}
150
151/// The `Engine` struct represents the core logic unit of a state machine-based
152/// entity, such as an agent. It encapsulates a behavior and manages the flow
153/// of events to and from this behavior, effectively driving the entity's
154/// response to external stimuli.
155///
156/// The `Engine` is generic over a behavior type `B` and an event type `E`,
157/// allowing it to be used with a wide variety of behaviors and event sources.
158/// It is itself a state machine, capable of executing instructions that
159/// manipulate its behavior or react to events.
160///
161/// # Fields
162///
163/// - `behavior`: An optional behavior that the engine is currently managing.
164///   This is where the engine's logic is primarily executed in response to
165///   events.
166pub struct Engine<B, E>
167where
168    B: Behavior<E>,
169    E: Send + 'static,
170{
171    /// The behavior the `Engine` runs.
172    behavior: Option<B>,
173
174    /// The current state of the [`Engine`].
175    state: State,
176
177    /// The receiver of events that the [`Engine`] will process.
178    /// The [`State::Processing`] stage will attempt a decode of the [`String`]s
179    /// into the event type `<E>`.
180    event_stream: Option<EventStream<E>>,
181}
182
183impl<B, E> Debug for Engine<B, E>
184where
185    B: Behavior<E>,
186    E: DeserializeOwned + Send + Sync + 'static,
187{
188    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189        f.debug_struct("Engine")
190            .field("behavior", &self.behavior)
191            .field("state", &self.state)
192            .finish()
193    }
194}
195
196impl<B, E> Engine<B, E>
197where
198    B: Behavior<E> + Debug,
199    E: DeserializeOwned + Send + Sync + 'static,
200{
201    /// Creates a new [`Engine`] with the given [`Behavior`] and [`Receiver`].
202    pub fn new(behavior: B) -> Self {
203        Self {
204            behavior: Some(behavior),
205            state: State::Uninitialized,
206            event_stream: None,
207        }
208    }
209}
210
211#[async_trait::async_trait]
212impl<B, E> StateMachine for Engine<B, E>
213where
214    B: Behavior<E> + Debug + Serialize + DeserializeOwned,
215    E: DeserializeOwned + Serialize + Send + Sync + Debug + 'static,
216{
217    async fn execute(&mut self, instruction: MachineInstruction) -> Result<()> {
218        // NOTE: The unwraps here are safe because the `Behavior` in an engine is only
219        // accessed here and it is private.
220        let id: Option<String>;
221        match instruction {
222            MachineInstruction::Start(client, messager) => {
223                id = messager.id.clone();
224                let id_clone = id.clone();
225                self.state = State::Starting;
226                let mut behavior = self.behavior.take().unwrap();
227                let behavior_task: JoinHandle<Result<(Option<EventStream<E>>, B)>> =
228                    tokio::spawn(async move {
229                        let stream = match behavior.startup(client, messager).await {
230                            Ok(stream) => stream,
231                            Err(e) => {
232                                error!(
233                                    "startup failed for behavior {:?}: \n reason: {:?}",
234                                    id_clone, e
235                                );
236                                // Throw a panic as we cannot recover from this for now.
237                                panic!();
238                            }
239                        };
240                        debug!("startup complete for behavior {:?}", id_clone);
241                        Ok((stream, behavior))
242                    });
243                let (stream, behavior) = behavior_task.await??;
244                match stream {
245                    Some(stream) => {
246                        self.event_stream = Some(stream);
247                        self.behavior = Some(behavior);
248                        match self.execute(MachineInstruction::Process).await {
249                            Ok(_) => {}
250                            Err(e) => {
251                                error!("process failed for behavior {:?}: \n reason: {:?}", id, e);
252                            }
253                        }
254                        Ok(())
255                    }
256                    None => {
257                        self.behavior = Some(behavior);
258                        Ok(())
259                    }
260                }
261            }
262            MachineInstruction::Process => {
263                trace!("Behavior is starting up.");
264                let mut behavior = self.behavior.take().unwrap();
265                let mut stream = self.event_stream.take().unwrap();
266                let behavior_task: JoinHandle<Result<B>> = tokio::spawn(async move {
267                    while let Some(event) = stream.next().await {
268                        match behavior.process(event).await? {
269                            ControlFlow::Halt => {
270                                break;
271                            }
272                            ControlFlow::Continue => {}
273                        }
274                    }
275                    Ok(behavior)
276                });
277                // TODO: We don't have to store the behavior again here, we could just discard
278                // it.
279                self.behavior = Some(behavior_task.await??);
280                Ok(())
281            }
282        }
283    }
284}