1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
//! The [`StateMachine`] trait, [`Behavior`] trait, and the [`Engine`] that runs
//! [`Behavior`]s.
use std::pin::Pin;
use anyhow::Result;
use arbiter_core::middleware::ArbiterMiddleware;
use futures_util::{Stream, StreamExt};
use tokio::task::JoinHandle;
use super::*;
/// A type alias for a pinned, boxed stream of events.
///
/// This stream is capable of handling items of any type that implements the
/// `Stream` trait, and it is both sendable across threads and synchronizable
/// between threads.
///
/// # Type Parameters
///
/// * `E`: The type of the items in the stream.
pub type EventStream<E> = Pin<Box<dyn Stream<Item = E> + Send + Sync>>;
/// The instructions that can be sent to a [`StateMachine`].
#[derive(Clone, Debug)]
pub enum MachineInstruction {
/// Used to make a [`StateMachine`] start up.
Start(Arc<ArbiterMiddleware>, Messager),
/// Used to make a [`StateMachine`] process events.
/// This will offload the process into a task that can be halted by sending
/// a [`ControlFlow::Halt`] message from the [`Messager`]. For our purposes,
/// the [`crate::world::World`] will handle this.
Process,
}
/// The message that is used in a [`StateMachine`] to continue or halt its
/// processing.
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ControlFlow {
/// Used to halt the processing of a [`StateMachine`].
Halt,
/// Used to continue on the processing of a [`StateMachine`].
Continue,
}
/// The state used by any entity implementing [`StateMachine`].
#[derive(Clone, Copy, Debug)]
pub enum State {
/// The entity is not yet running any process.
/// This is the state adopted by the entity when it is first created.
Uninitialized,
/// The entity is starting up.
/// This is where the entity can engage in its specific start up activities
/// that it can do given the current state of the world.
/// These are usually quick one-shot activities that are not repeated.
Starting,
/// The entity is processing.
/// This is where the entity can engage in its specific processing
/// of events that can lead to actions being taken.
Processing,
}
// NOTE: `async_trait::async_trait` is used throughout to make the trait object
// safe even though rust >=1.75 has async trait stabilized
/// The [`Behavior`] trait is the lowest level functionality that will be used
/// by a [`StateMachine`]. This constitutes what each state transition will do.
#[async_trait::async_trait]
pub trait Behavior<E: Send + 'static>:
Serialize + DeserializeOwned + Send + Sync + Debug + 'static
{
/// Used to start the agent.
/// This is where the agent can engage in its specific start up activities
/// that it can do given the current state of the world.
async fn startup(
&mut self,
client: Arc<ArbiterMiddleware>,
messager: Messager,
) -> Result<Option<EventStream<E>>>;
/// Used to process events.
/// This is where the agent can engage in its specific processing
/// of events that can lead to actions being taken.
async fn process(&mut self, _event: E) -> Result<ControlFlow> {
Ok(ControlFlow::Halt)
}
}
/// A trait for creating a state machine.
///
/// This trait is intended to be implemented by types that can be converted into
/// a state machine. A state machine, in this context, is an entity capable of
/// executing a set of instructions or operations based on its current state and
/// inputs it receives.
///
/// Implementers of this trait should provide the logic to initialize and return
/// a new instance of a state machine, encapsulated within a `Box<dyn
/// StateMachine>`. This allows for dynamic dispatch to the state machine's
/// methods, enabling polymorphism where different types of state machines can
/// be used interchangeably at runtime.
///
/// # Returns
///
/// - `Box<dyn StateMachine>`: A boxed state machine object that can be
/// dynamically dispatched.
pub trait CreateStateMachine {
/// Creates and returns a new state machine instance.
///
/// This method consumes the implementer and returns a new instance of a
/// state machine encapsulated within a `Box<dyn StateMachine>`. The
/// specific type of the state machine returned can vary, allowing for
/// flexibility and reuse of the state machine logic across
/// different contexts.
fn create_state_machine(self) -> Box<dyn StateMachine>;
}
#[async_trait::async_trait]
/// A trait defining the capabilities of a state machine within the system.
///
/// This trait is designed to be implemented by entities that can execute
/// instructions based on their current state and inputs they receive. The
/// execution of these instructions is asynchronous, allowing for non-blocking
/// operations within the state machine's logic.
///
/// Implementers of this trait must be able to be sent across threads and shared
/// among threads safely, hence the `Send`, `Sync`, and `'static` bounds. They
/// should also support debugging through the `Debug` trait.
pub trait StateMachine: Send + Sync + Debug + 'static {
/// Executes a given instruction asynchronously.
///
/// This method takes a mutable reference to self, allowing the state
/// machine to modify its state in response to the instruction. The
/// instruction to be executed is passed as an argument, encapsulating the
/// action to be performed by the state machine.
///
/// # Parameters
///
/// - `instruction`: The instruction that the state machine is to execute.
///
/// # Returns
///
/// This method does not return a value, but it may result in state changes
/// within the implementing type or the generation of further instructions
/// or events.
async fn execute(&mut self, _instruction: MachineInstruction) -> Result<()>;
}
/// The `Engine` struct represents the core logic unit of a state machine-based
/// entity, such as an agent. It encapsulates a behavior and manages the flow
/// of events to and from this behavior, effectively driving the entity's
/// response to external stimuli.
///
/// The `Engine` is generic over a behavior type `B` and an event type `E`,
/// allowing it to be used with a wide variety of behaviors and event sources.
/// It is itself a state machine, capable of executing instructions that
/// manipulate its behavior or react to events.
///
/// # Fields
///
/// - `behavior`: An optional behavior that the engine is currently managing.
/// This is where the engine's logic is primarily executed in response to
/// events.
pub struct Engine<B, E>
where
B: Behavior<E>,
E: Send + 'static,
{
/// The behavior the `Engine` runs.
behavior: Option<B>,
/// The current state of the [`Engine`].
state: State,
/// The receiver of events that the [`Engine`] will process.
/// The [`State::Processing`] stage will attempt a decode of the [`String`]s
/// into the event type `<E>`.
event_stream: Option<EventStream<E>>,
}
impl<B, E> Debug for Engine<B, E>
where
B: Behavior<E>,
E: DeserializeOwned + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Engine")
.field("behavior", &self.behavior)
.field("state", &self.state)
.finish()
}
}
impl<B, E> Engine<B, E>
where
B: Behavior<E> + Debug,
E: DeserializeOwned + Send + Sync + 'static,
{
/// Creates a new [`Engine`] with the given [`Behavior`] and [`Receiver`].
pub fn new(behavior: B) -> Self {
Self {
behavior: Some(behavior),
state: State::Uninitialized,
event_stream: None,
}
}
}
#[async_trait::async_trait]
impl<B, E> StateMachine for Engine<B, E>
where
B: Behavior<E> + Debug + Serialize + DeserializeOwned,
E: DeserializeOwned + Serialize + Send + Sync + Debug + 'static,
{
async fn execute(&mut self, instruction: MachineInstruction) -> Result<()> {
// NOTE: The unwraps here are safe because the `Behavior` in an engine is only
// accessed here and it is private.
match instruction {
MachineInstruction::Start(client, messager) => {
self.state = State::Starting;
let mut behavior = self.behavior.take().unwrap();
let behavior_task: JoinHandle<Result<(Option<EventStream<E>>, B)>> =
tokio::spawn(async move {
let id = messager.id.clone();
let stream = behavior.startup(client, messager).await?;
debug!("startup complete for behavior {:?}", id);
Ok((stream, behavior))
});
let (stream, behavior) = behavior_task.await??;
match stream {
Some(stream) => {
self.event_stream = Some(stream);
self.behavior = Some(behavior);
self.execute(MachineInstruction::Process).await?;
Ok(())
}
None => {
self.behavior = Some(behavior);
Ok(())
}
}
}
MachineInstruction::Process => {
trace!("Behavior is starting up.");
let mut behavior = self.behavior.take().unwrap();
let mut stream = self.event_stream.take().unwrap();
let behavior_task: JoinHandle<Result<B>> = tokio::spawn(async move {
while let Some(event) = stream.next().await {
match behavior.process(event).await? {
ControlFlow::Halt => {
break;
}
ControlFlow::Continue => {}
}
}
Ok(behavior)
});
// TODO: We don't have to store the behavior again here, we could just discard
// it.
self.behavior = Some(behavior_task.await??);
Ok(())
}
}
}
}