use crate::cancel_token::CancellationFlag;
use crate::flow_execution_event::FlowExecutionEvent;
use crate::ir_nodes::IRFlowNode;
use crate::stream_effect::BackpressurePolicy;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
pub mod pure_shape;
pub mod orchestration;
pub mod parallel;
pub mod effects_bridge;
pub mod cognitive;
pub mod algebraic_handlers;
pub mod wire_integrations;
pub mod pix;
pub mod lambda_tools;
pub mod unified_stream;
#[derive(Clone)]
pub struct DispatchCtx {
pub flow_name: String,
pub backend_name: String,
pub system_prompt: String,
pub cancel: CancellationFlag,
pub tx: mpsc::UnboundedSender<FlowExecutionEvent>,
pub enforcement_summaries: Arc<
Mutex<HashMap<String, crate::axon_server::EnforcementSummaryWire>>,
>,
pub step_audit_records: Arc<
Mutex<Vec<crate::axonendpoint_replay::StepAuditRecord>>,
>,
pub runtime_warnings: Arc<
Mutex<Vec<crate::runtime_warnings::RuntimeWarning>>,
>,
pub branch_path: Vec<String>,
pub step_counter: usize,
pub pem_backend: Option<std::sync::Arc<dyn crate::pem::PersistenceBackend>>,
pub session_id: String,
pub tenant_id: String,
pub let_bindings: std::collections::HashMap<String, String>,
pub pending_effect_policy: Option<BackpressurePolicy>,
pub tool_registry: Option<std::sync::Arc<crate::tool_registry::ToolRegistry>>,
pub store_registry: Option<std::sync::Arc<crate::store::registry::StoreRegistry>>,
pub held_capabilities: Option<Vec<String>>,
pub audit_chain:
std::sync::Arc<std::sync::Mutex<crate::store::audit_chain::StoreAuditChain>>,
}
impl DispatchCtx {
pub fn new(
flow_name: impl Into<String>,
backend_name: impl Into<String>,
system_prompt: impl Into<String>,
cancel: CancellationFlag,
tx: mpsc::UnboundedSender<FlowExecutionEvent>,
) -> Self {
let flow_name = flow_name.into();
let session_id = flow_name.clone();
Self {
flow_name,
backend_name: backend_name.into(),
system_prompt: system_prompt.into(),
cancel,
tx,
enforcement_summaries: Arc::new(Mutex::new(HashMap::new())),
step_audit_records: Arc::new(Mutex::new(Vec::new())),
runtime_warnings: Arc::new(Mutex::new(Vec::new())),
branch_path: Vec::new(),
step_counter: 0,
pem_backend: None,
session_id,
tenant_id: String::new(),
let_bindings: std::collections::HashMap::new(),
pending_effect_policy: None,
tool_registry: None,
store_registry: None,
held_capabilities: None,
audit_chain: std::sync::Arc::new(std::sync::Mutex::new(
crate::store::audit_chain::StoreAuditChain::new(),
)),
}
}
pub fn with_store_registry(
mut self,
registry: std::sync::Arc<crate::store::registry::StoreRegistry>,
) -> Self {
self.store_registry = Some(registry);
self
}
pub fn with_held_capabilities(mut self, capabilities: Vec<String>) -> Self {
self.held_capabilities = Some(capabilities);
self
}
pub fn with_tool_registry(
mut self,
registry: std::sync::Arc<crate::tool_registry::ToolRegistry>,
) -> Self {
self.tool_registry = Some(registry);
self
}
pub fn with_pem(
mut self,
backend: std::sync::Arc<dyn crate::pem::PersistenceBackend>,
) -> Self {
self.pem_backend = Some(backend);
self
}
pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
self.session_id = session_id.into();
self
}
pub fn with_tenant_id(mut self, tenant_id: impl Into<String>) -> Self {
self.tenant_id = tenant_id.into();
self
}
pub fn with_effect_policy(mut self, policy: BackpressurePolicy) -> Self {
self.pending_effect_policy = Some(policy);
self
}
pub fn with_external_side_channels(
mut self,
enforcement_summaries: std::sync::Arc<
tokio::sync::Mutex<
std::collections::HashMap<String, crate::axon_server::EnforcementSummaryWire>,
>,
>,
step_audit_records: std::sync::Arc<
tokio::sync::Mutex<Vec<crate::axonendpoint_replay::StepAuditRecord>>,
>,
runtime_warnings: std::sync::Arc<
tokio::sync::Mutex<Vec<crate::runtime_warnings::RuntimeWarning>>,
>,
) -> Self {
self.enforcement_summaries = enforcement_summaries;
self.step_audit_records = step_audit_records;
self.runtime_warnings = runtime_warnings;
self
}
pub fn take_pending_effect_policy(&mut self) -> Option<BackpressurePolicy> {
self.pending_effect_policy.take()
}
pub fn branch_path_string(&self) -> String {
self.branch_path.join(".")
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum NodeOutcome {
Completed {
output: String,
tokens_emitted: u64,
step_index: usize,
},
Break,
LoopContinue,
Return { value: String },
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum DispatchError {
BackendError { name: String, message: String },
UpstreamCancelled,
MissingDependency { name: &'static str },
ChannelClosed,
}
impl std::fmt::Display for DispatchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BackendError { name, message } => {
write!(f, "backend '{name}' stream() failed: {message}")
}
Self::UpstreamCancelled => write!(f, "upstream cancelled mid-dispatch"),
Self::MissingDependency { name } => {
write!(f, "dispatcher missing dependency: {name}")
}
Self::ChannelClosed => write!(f, "channel closed (consumer dropped)"),
}
}
}
impl std::error::Error for DispatchError {}
pub async fn dispatch_node(
node: &IRFlowNode,
ctx: &mut DispatchCtx,
) -> Result<NodeOutcome, DispatchError> {
match node {
IRFlowNode::Step(step) => pure_shape::run_step(step, ctx).await,
IRFlowNode::Probe(probe) => pure_shape::run_probe(probe, ctx).await,
IRFlowNode::Reason(reason) => pure_shape::run_reason(reason, ctx).await,
IRFlowNode::Validate(validate) => pure_shape::run_validate(validate, ctx).await,
IRFlowNode::Refine(refine) => pure_shape::run_refine(refine, ctx).await,
IRFlowNode::Weave(weave) => pure_shape::run_weave(weave, ctx).await,
IRFlowNode::UseTool(node) => lambda_tools::run_use_tool(node, ctx).await,
IRFlowNode::Remember(node) => cognitive::run_remember(node, ctx).await,
IRFlowNode::Recall(node) => cognitive::run_recall(node, ctx).await,
IRFlowNode::Conditional(cond) => orchestration::run_conditional(cond, ctx).await,
IRFlowNode::ForIn(for_in) => orchestration::run_for_in(for_in, ctx).await,
IRFlowNode::Let(let_bind) => orchestration::run_let(let_bind, ctx).await,
IRFlowNode::Return(ret) => orchestration::run_return(ret, ctx).await,
IRFlowNode::Break(brk) => orchestration::run_break(brk, ctx).await,
IRFlowNode::Continue(cont) => orchestration::run_continue(cont, ctx).await,
IRFlowNode::LambdaDataApply(node) => lambda_tools::run_lambda_data_apply(node, ctx).await,
IRFlowNode::Par(par) => parallel::run_par(par, ctx).await,
IRFlowNode::Hibernate(node) => pix::run_hibernate(node, ctx).await,
IRFlowNode::Deliberate(node) => wire_integrations::run_deliberate(node, ctx).await,
IRFlowNode::Consensus(node) => wire_integrations::run_consensus(node, ctx).await,
IRFlowNode::Forge(node) => cognitive::run_forge(node, ctx).await,
IRFlowNode::Focus(node) => cognitive::run_focus(node, ctx).await,
IRFlowNode::Associate(node) => cognitive::run_associate(node, ctx).await,
IRFlowNode::Aggregate(node) => cognitive::run_aggregate(node, ctx).await,
IRFlowNode::Explore(node) => cognitive::run_explore(node, ctx).await,
IRFlowNode::Ingest(node) => cognitive::run_ingest(node, ctx).await,
IRFlowNode::ShieldApply(node) => algebraic_handlers::run_shield_apply(node, ctx).await,
IRFlowNode::Stream(stream) => effects_bridge::run_stream(stream, ctx).await,
IRFlowNode::Navigate(node) => cognitive::run_navigate(node, ctx).await,
IRFlowNode::Drill(node) => pix::run_drill(node, ctx).await,
IRFlowNode::Trail(node) => pix::run_trail(node, ctx).await,
IRFlowNode::Corroborate(node) => cognitive::run_corroborate(node, ctx).await,
IRFlowNode::OtsApply(node) => algebraic_handlers::run_ots_apply(node, ctx).await,
IRFlowNode::MandateApply(node) => algebraic_handlers::run_mandate_apply(node, ctx).await,
IRFlowNode::ComputeApply(node) => algebraic_handlers::run_compute_apply(node, ctx).await,
IRFlowNode::Listen(node) => algebraic_handlers::run_listen(node, ctx).await,
IRFlowNode::DaemonStep(node) => algebraic_handlers::run_daemon_step(node, ctx).await,
IRFlowNode::Emit(node) => wire_integrations::run_emit(node, ctx).await,
IRFlowNode::Publish(node) => wire_integrations::run_publish(node, ctx).await,
IRFlowNode::Discover(node) => wire_integrations::run_discover(node, ctx).await,
IRFlowNode::Persist(node) => wire_integrations::run_persist(node, ctx).await,
IRFlowNode::Retrieve(node) => wire_integrations::run_retrieve(node, ctx).await,
IRFlowNode::Mutate(node) => wire_integrations::run_mutate(node, ctx).await,
IRFlowNode::Purge(node) => wire_integrations::run_purge(node, ctx).await,
IRFlowNode::Transact(node) => wire_integrations::run_transact(node, ctx).await,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cancel_token::CancellationFlag;
#[test]
fn dispatch_ctx_branch_path_empty_at_root() {
let (tx, _rx) = mpsc::unbounded_channel();
let ctx = DispatchCtx::new(
"F",
"stub",
"",
CancellationFlag::new(),
tx,
);
assert!(ctx.branch_path.is_empty());
assert_eq!(ctx.branch_path_string(), "");
assert_eq!(ctx.step_counter, 0);
}
#[test]
fn dispatch_ctx_branch_path_joins_segments() {
let (tx, _rx) = mpsc::unbounded_channel();
let mut ctx = DispatchCtx::new(
"F",
"stub",
"",
CancellationFlag::new(),
tx,
);
ctx.branch_path.push("par[0]".to_string());
ctx.branch_path.push("step[1]".to_string());
assert_eq!(ctx.branch_path_string(), "par[0].step[1]");
}
#[test]
fn dispatch_error_display_surface() {
let cases: Vec<(DispatchError, &str)> = vec![
(
DispatchError::BackendError {
name: "anthropic".to_string(),
message: "rate limited".to_string(),
},
"backend 'anthropic' stream() failed: rate limited",
),
(DispatchError::UpstreamCancelled, "upstream cancelled mid-dispatch"),
(
DispatchError::MissingDependency { name: "pem_async" },
"dispatcher missing dependency: pem_async",
),
(DispatchError::ChannelClosed, "channel closed (consumer dropped)"),
];
for (err, expected) in cases {
assert_eq!(format!("{err}"), expected);
}
}
#[tokio::test]
async fn dispatch_node_step_routes_to_pure_shape_handler() {
use crate::ir_nodes::*;
let step = IRStep {
node_type: "step",
source_line: 0,
source_column: 0,
name: "Generate".to_string(),
persona_ref: String::new(),
given: String::new(),
ask: "hi".to_string(),
use_tool: None,
probe: None,
reason: None,
weave: None,
output_type: String::new(),
confidence_floor: None,
navigate_ref: String::new(),
apply_ref: String::new(),
body: Vec::new(),
};
let node = IRFlowNode::Step(step);
let (tx, _rx) = mpsc::unbounded_channel();
let mut ctx = DispatchCtx::new(
"F",
"stub",
"",
CancellationFlag::new(),
tx,
);
let outcome = dispatch_node(&node, &mut ctx).await.unwrap();
match outcome {
NodeOutcome::Completed {
output,
tokens_emitted,
step_index,
} => {
assert_eq!(output, "(stub)");
assert_eq!(tokens_emitted, 1);
assert_eq!(step_index, 0);
}
other => panic!("post-33.y.c: Step routes to pure_shape handler returning Completed; got {other:?}"),
}
}
}