use std::collections::{BTreeSet, HashMap};
use std::sync::{Mutex, MutexGuard};
use crate::{EngineError, Pid};
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct EngineSupervisorId;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TypeSupervisorId {
workflow_type: String,
}
impl TypeSupervisorId {
#[must_use]
pub fn workflow_type(&self) -> &str {
&self.workflow_type
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TypeSupervisorNode {
id: TypeSupervisorId,
workflow_processes: BTreeSet<Pid>,
}
impl TypeSupervisorNode {
#[must_use]
pub fn id(&self) -> &TypeSupervisorId {
&self.id
}
#[must_use]
pub fn workflow_processes(&self) -> &BTreeSet<Pid> {
&self.workflow_processes
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct WorkflowNode {
workflow_type: String,
workflow_pid: Pid,
activity_children: BTreeSet<Pid>,
}
impl WorkflowNode {
#[must_use]
pub fn workflow_type(&self) -> &str {
&self.workflow_type
}
#[must_use]
pub fn workflow_pid(&self) -> Pid {
self.workflow_pid
}
#[must_use]
pub fn activity_children(&self) -> &BTreeSet<Pid> {
&self.activity_children
}
#[must_use]
pub fn has_activity_child(&self, activity_pid: Pid) -> bool {
self.activity_children.contains(&activity_pid)
}
}
#[derive(Debug, Default)]
struct TreeState {
type_supervisors: HashMap<String, TypeSupervisorNode>,
workflows: HashMap<Pid, WorkflowNode>,
}
#[derive(Debug, Default)]
pub struct SupervisionTree {
state: Mutex<TreeState>,
}
impl SupervisionTree {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub const fn engine_supervisor(&self) -> EngineSupervisorId {
EngineSupervisorId
}
pub fn ensure_type_supervisor(
&self,
workflow_type: impl Into<String>,
) -> Result<TypeSupervisorId, EngineError> {
let workflow_type = workflow_type.into();
let mut state = self.state()?;
let supervisor = state
.type_supervisors
.entry(workflow_type.clone())
.or_insert_with(|| TypeSupervisorNode {
id: TypeSupervisorId { workflow_type },
workflow_processes: BTreeSet::new(),
});
Ok(supervisor.id.clone())
}
pub fn place_workflow(
&self,
workflow_type: impl Into<String>,
workflow_pid: Pid,
) -> Result<TypeSupervisorId, EngineError> {
let workflow_type = workflow_type.into();
let mut state = self.state()?;
if let Some(existing) = state.workflows.get(&workflow_pid) {
if existing.workflow_type != workflow_type {
return Err(tree_runtime_error(format!(
"workflow process {workflow_pid} is already placed under workflow type `{}`",
existing.workflow_type
)));
}
let supervisor = state
.type_supervisors
.entry(workflow_type.clone())
.or_insert_with(|| TypeSupervisorNode {
id: TypeSupervisorId { workflow_type },
workflow_processes: BTreeSet::new(),
});
supervisor.workflow_processes.insert(workflow_pid);
return Ok(supervisor.id.clone());
}
let supervisor = state
.type_supervisors
.entry(workflow_type.clone())
.or_insert_with(|| TypeSupervisorNode {
id: TypeSupervisorId {
workflow_type: workflow_type.clone(),
},
workflow_processes: BTreeSet::new(),
});
supervisor.workflow_processes.insert(workflow_pid);
let id = supervisor.id.clone();
state.workflows.insert(
workflow_pid,
WorkflowNode {
workflow_type,
workflow_pid,
activity_children: BTreeSet::new(),
},
);
Ok(id)
}
pub fn record_activity_child(
&self,
workflow_pid: Pid,
activity_pid: Pid,
) -> Result<(), EngineError> {
let mut state = self.state()?;
if workflow_pid == activity_pid {
return Err(tree_runtime_error(format!(
"workflow process {workflow_pid} cannot be its own activity child"
)));
}
if state.workflows.contains_key(&activity_pid) {
return Err(tree_runtime_error(format!(
"process {activity_pid} is already placed as a workflow process"
)));
}
let workflow = state.workflows.get_mut(&workflow_pid).ok_or_else(|| {
tree_runtime_error(format!(
"workflow process {workflow_pid} is not in the supervision tree"
))
})?;
workflow.activity_children.insert(activity_pid);
Ok(())
}
pub fn type_supervisor_count(&self) -> Result<usize, EngineError> {
Ok(self.state()?.type_supervisors.len())
}
pub fn type_supervisors(&self) -> Result<Vec<TypeSupervisorNode>, EngineError> {
Ok(self.state()?.type_supervisors.values().cloned().collect())
}
pub fn workflow(&self, workflow_pid: Pid) -> Result<Option<WorkflowNode>, EngineError> {
Ok(self.state()?.workflows.get(&workflow_pid).cloned())
}
fn state(&self) -> Result<MutexGuard<'_, TreeState>, EngineError> {
self.state.lock().map_err(|_| EngineError::RegistryPoisoned)
}
}
fn tree_runtime_error(reason: String) -> EngineError {
EngineError::Runtime { reason }
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::EngineError;
use super::SupervisionTree;
#[test]
fn one_engine_root_has_one_supervisor_per_workflow_type() -> Result<(), EngineError> {
let tree = SupervisionTree::new();
let root = tree.engine_supervisor();
let checkout = tree.ensure_type_supervisor("checkout")?;
let billing = tree.ensure_type_supervisor("billing")?;
let checkout_again = tree.ensure_type_supervisor("checkout")?;
assert_eq!(root, tree.engine_supervisor());
assert_eq!(checkout.workflow_type(), "checkout");
assert_eq!(billing.workflow_type(), "billing");
assert_eq!(checkout, checkout_again);
assert_eq!(tree.type_supervisor_count()?, 2);
Ok(())
}
#[test]
fn workflows_sit_under_type_supervisors_not_new_supervisors() -> Result<(), EngineError> {
let tree = SupervisionTree::new();
tree.place_workflow("checkout", 10)?;
tree.place_workflow("checkout", 11)?;
assert_eq!(tree.type_supervisor_count()?, 1);
let supervisors = tree.type_supervisors()?;
let checkout = supervisors
.iter()
.find(|node| node.id().workflow_type() == "checkout");
assert!(checkout.is_some());
if let Some(checkout) = checkout {
assert_eq!(checkout.workflow_processes().len(), 2);
assert!(checkout.workflow_processes().contains(&10));
assert!(checkout.workflow_processes().contains(&11));
}
Ok(())
}
#[test]
fn activity_children_are_one_level_below_workflow_process() -> Result<(), EngineError> {
let tree = SupervisionTree::new();
tree.place_workflow("checkout", 10)?;
tree.record_activity_child(10, 20)?;
tree.record_activity_child(10, 21)?;
let workflow = tree.workflow(10)?;
assert!(workflow.is_some());
if let Some(workflow) = workflow {
assert_eq!(workflow.workflow_type(), "checkout");
assert_eq!(workflow.workflow_pid(), 10);
assert!(workflow.has_activity_child(20));
assert!(workflow.has_activity_child(21));
assert_eq!(workflow.activity_children().len(), 2);
}
Ok(())
}
#[test]
fn missing_workflow_activity_parent_is_typed_error() {
let tree = SupervisionTree::new();
let error = tree.record_activity_child(99, 20);
assert!(matches!(error, Err(EngineError::Runtime { .. })));
}
#[test]
fn placing_existing_workflow_is_idempotent_and_preserves_children() -> Result<(), EngineError> {
let tree = SupervisionTree::new();
let first = tree.place_workflow("checkout", 10)?;
tree.record_activity_child(10, 20)?;
let second = tree.place_workflow("checkout", 10)?;
assert_eq!(first, second);
assert_eq!(tree.type_supervisor_count()?, 1);
let workflow = tree.workflow(10)?;
assert!(workflow.is_some());
if let Some(workflow) = workflow {
assert!(workflow.has_activity_child(20));
}
Ok(())
}
#[test]
fn workflow_pid_cannot_move_between_type_supervisors() -> Result<(), EngineError> {
let tree = SupervisionTree::new();
tree.place_workflow("checkout", 10)?;
let error = tree.place_workflow("billing", 10);
assert!(matches!(error, Err(EngineError::Runtime { .. })));
assert_eq!(tree.type_supervisor_count()?, 1);
Ok(())
}
#[test]
fn activity_child_cannot_alias_workflow_process() -> Result<(), EngineError> {
let tree = SupervisionTree::new();
tree.place_workflow("checkout", 10)?;
assert!(matches!(
tree.record_activity_child(10, 10),
Err(EngineError::Runtime { .. })
));
Ok(())
}
#[test]
fn poisoned_tree_lock_returns_typed_registry_error() {
let tree = Arc::new(SupervisionTree::new());
let poisoner_tree = Arc::clone(&tree);
let poisoner = std::thread::spawn(move || {
let guard = poisoner_tree.state.lock();
assert!(guard.is_ok());
std::panic::resume_unwind(Box::new("poison supervision tree lock"));
});
assert!(poisoner.join().is_err());
assert!(matches!(
tree.type_supervisor_count(),
Err(EngineError::RegistryPoisoned)
));
}
}