use crate::error::AgentError;
use crate::integration::error::Result as IntegrationResult;
use crate::{Context, Error as StateError, Event, EventTrait, Machine};
use async_trait::async_trait;
use std::fmt::Debug;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::{debug, error};
#[async_trait]
pub trait ChildMachine: Send + Sync {
async fn send_event(&mut self, event: Event) -> Result<bool, StateError>;
async fn get_status(&self) -> IntegrationResult<Option<String>>;
fn is_in_final_state(&self) -> bool;
async fn is_in_state(&self, state_id: &str) -> Result<bool, StateError>;
fn current_states(&self) -> Vec<String>;
fn to_json(&self) -> IntegrationResult<String>;
}
pub struct DefaultChildMachine {
machine: Arc<Mutex<Machine<Context, Event, String>>>,
}
impl DefaultChildMachine {
pub fn new(machine: Machine<Context, Event, String>) -> Self {
Self {
machine: Arc::new(Mutex::new(machine)),
}
}
pub async fn machine_locked(
&self,
) -> tokio::sync::MutexGuard<'_, Machine<Context, Event, String>> {
self.machine.lock().await
}
}
#[async_trait]
impl ChildMachine for DefaultChildMachine {
async fn send_event(&mut self, event: Event) -> Result<bool, StateError> {
let machine_arc = Arc::clone(&self.machine);
let event_owned = event;
let mut guard = machine_arc.lock().await;
guard.send(event_owned).await
}
async fn get_status(&self) -> IntegrationResult<Option<String>> {
let machine_arc: Arc<Mutex<Machine<Context, Event, String>>> = Arc::clone(&self.machine);
let guard = machine_arc.lock().await;
Ok(Some(guard.name.clone()))
}
fn is_in_final_state(&self) -> bool {
futures::executor::block_on(async {
let guard = self.machine.lock().await;
guard.is_in(&"final".to_string())
})
}
async fn is_in_state(&self, state_id: &str) -> Result<bool, StateError> {
let state_id_owned = state_id.to_string();
let machine_arc: Arc<Mutex<Machine<Context, Event, String>>> = Arc::clone(&self.machine);
let guard = machine_arc.lock().await;
Ok(guard.is_in(&state_id_owned))
}
fn current_states(&self) -> Vec<String> {
futures::executor::block_on(async {
let guard = self.machine.lock().await;
guard.current_states.iter().cloned().collect()
})
}
fn to_json(&self) -> IntegrationResult<String> {
futures::executor::block_on(async {
let guard = self.machine.lock().await;
Ok(guard.to_json()?)
})
}
}
pub mod coordination {
use super::*;
use crate::{Action, Context, Event, Guard, MachineBuilder, State, Transition, TransitionType};
use futures::FutureExt;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
#[allow(dead_code)] fn create_child_machine() -> Machine<Context, Event, String> {
let initial_state: State<String, Context, Event> = State::new("initial".to_string());
let final_state: State<String, Context, Event> = State::new_final("final".to_string());
MachineBuilder::<Context, Event, String, ()>::new(
"childMachine".to_string(),
"initial".to_string(),
)
.state(initial_state)
.state(final_state)
.transition(Transition::new(
"initial".to_string(),
Some("final".to_string()),
Some(Event::from("COMPLETE")),
None,
vec![],
TransitionType::External,
))
.build()
.now_or_never()
.expect("Child machine build failed")
.unwrap()
}
#[allow(dead_code)] fn create_parent_machine(
child: Arc<Mutex<Box<dyn ChildMachine + Send + Sync + 'static>>>,
) -> Machine<Context, Event, String> {
let monitoring: State<String, Context, Event> = State::new("monitoring".to_string());
let child_complete: State<String, Context, Event> = State::new("childComplete".to_string());
let done: State<String, Context, Event> = State::new_final("done".to_string());
let child_arc_for_monitor: Arc<Mutex<Box<dyn ChildMachine + Send + Sync + 'static>>> =
Arc::clone(&child);
let child_arc_for_forward: Arc<Mutex<Box<dyn ChildMachine + Send + Sync + 'static>>> =
Arc::clone(&child);
let monitor_closure = move |ctx: Arc<RwLock<Context>>, _evt: &Event| {
let child_lock: Arc<Mutex<Box<dyn ChildMachine + Send + Sync + 'static>>> =
Arc::clone(&child_arc_for_monitor);
Box::pin(async move {
let child = child_lock.lock().await;
match child.get_status().await {
Ok(Some(status)) => {
debug!("Child status observed by parent: {}", status);
let mut ctx_guard = ctx.write().await;
ctx_guard.set("child_status", status)?;
Ok(())
}
Ok(None) => {
debug!("Child status is None.");
Ok(())
}
Err(e) => {
error!("Error getting child status: {:?}", e);
Err(StateError::ActionFailed(format!(
"Failed to get child status: {:?}",
e
)))
}
}
})
};
let forward_closure = move |_ctx: Arc<RwLock<Context>>, evt: &Event| {
let child_lock: Arc<Mutex<Box<dyn ChildMachine + Send + Sync + 'static>>> =
Arc::clone(&child_arc_for_forward);
let event_to_forward = evt.clone(); Box::pin(async move {
debug!(
"Parent forwarding event '{}' to child",
event_to_forward.event_type()
);
let mut child = child_lock.lock().await;
match child.send_event(event_to_forward).await {
Ok(_) => Ok(()),
Err(e) => {
error!("Error forwarding event to child: {:?}", e);
Err(StateError::ActionFailed(format!(
"Failed to forward event to child: {:?}",
e
)))
}
}
})
};
let check_completion_guard =
Guard::new("checkChildComplete", |ctx: &Context, _: &Event| {
ctx.get::<bool>("childComplete")
.map(|res| res.unwrap_or(false))
.unwrap_or(false)
});
let start_action = Action::from_fn(forward_closure);
let start_transition = Transition::new(
"monitoring".to_string(),
Some("monitoring".to_string()), Some(Event::from("START")), None, vec![start_action], TransitionType::Internal, );
let check_transition = Transition::new(
"monitoring".to_string(), Some("childComplete".to_string()), Some(Event::from("CHECK")), Some(check_completion_guard), vec![], TransitionType::External, );
let finish_transition = Transition::new(
"childComplete".to_string(), Some("done".to_string()), Some(Event::from("FINISH")), None, vec![], TransitionType::External, );
MachineBuilder::<Context, Event, String, ()>::new(
"parentMachine".to_string(),
"monitoring".to_string(),
)
.state(monitoring)
.state(child_complete)
.state(done)
.transition(start_transition)
.transition(check_transition)
.transition(finish_transition)
.on_entry(&"monitoring".to_string(), monitor_closure)
.build()
.now_or_never()
.expect("Parent machine build failed")
.unwrap()
}
#[tokio::test]
async fn test_hierarchical_integration() -> crate::Result<()> {
let child_machine = create_child_machine();
let child_wrapper = DefaultChildMachine::new(child_machine);
let child_ref: Arc<Mutex<Box<dyn ChildMachine + Send + Sync + 'static>>> =
Arc::new(Mutex::new(Box::new(child_wrapper)));
let mut parent_machine = create_parent_machine(child_ref.clone());
println!("Debug: Sending START event to parent...");
let result = parent_machine.send(Event::from("START")).await?;
println!("Debug: Parent machine START event result: {:?}", result);
assert!(result, "Parent should handle START event");
println!("Debug: Sending COMPLETE event to child...");
let child_result = {
let mut child_guard = child_ref.lock().await;
child_guard.send_event(Event::from("COMPLETE")).await?
};
println!(
"Debug: Child machine COMPLETE event result: {:?}",
child_result
);
assert!(child_result, "Child should handle COMPLETE event");
{
let child_guard = child_ref.lock().await;
assert!(
child_guard.is_in_final_state(),
"Child should be in final state after COMPLETE"
);
}
parent_machine
.context
.write()
.await
.set("childComplete", true)?;
println!("Debug: Sending CHECK event to parent...");
let result_check = parent_machine.send(Event::from("CHECK")).await?;
println!(
"Debug: Parent machine CHECK event result: {:?}",
result_check
);
assert!(result_check); assert!(
parent_machine.is_in(&"childComplete".to_string()),
"Parent should be in childComplete"
);
println!("Debug: Sending FINISH event to parent...");
let result_finish = parent_machine.send(Event::from("FINISH")).await?;
println!(
"Debug: Parent machine FINISH event result: {:?}",
result_finish
);
assert!(result_finish);
assert!(
parent_machine.is_in(&"done".to_string()),
"Parent should be in done"
);
Ok(())
}
}
#[derive(Error, Debug)]
pub enum HierarchicalError {
#[error("Child machine lock poisoned")]
LockPoisoned,
#[error("State machine error: {0}")]
StateMachine(#[from] StateError),
#[error("Integration error: {0}")]
Integration(#[from] AgentError),
#[error("Send error: {0}")]
SendError(String),
#[error("Child machine error: {0}")]
ChildError(String),
}
#[cfg(feature = "integration")]
fn create_child_check_guard(shared_context: crate::SharedContext) -> crate::Guard<Context, Event> {
crate::Guard::new("checkChildStateInContext", move |ctx, _event| {
let context_clone = shared_context.clone();
futures::executor::block_on(async move {
context_clone
.get::<bool>("childComplete")
.ok()
.flatten()
.unwrap_or(false)
})
})
}