use crate::cancel_token::CancellationFlag;
use crate::flow_dispatcher::{dispatch_node, DispatchCtx, NodeOutcome};
use crate::flow_execution_event::{now_ms, FlowExecutionEvent};
use tokio::sync::mpsc::UnboundedSender;
pub async fn run_streaming_via_dispatcher(
source: String,
source_file: String,
flow_name: String,
backend: String,
cancel: CancellationFlag,
tx: UnboundedSender<FlowExecutionEvent>,
enforcement_summaries: std::sync::Arc<
tokio::sync::Mutex<
std::collections::HashMap<
String,
crate::axon_server::EnforcementSummaryWire,
>,
>,
>,
step_audit_records: std::sync::Arc<
tokio::sync::Mutex<Vec<crate::axonendpoint_replay::StepAuditRecord>>,
>,
runtime_warnings: std::sync::Arc<
tokio::sync::Mutex<Vec<crate::runtime_warnings::RuntimeWarning>>,
>,
held_capabilities: Option<Vec<String>>,
request_body: Option<serde_json::Value>,
request_path: std::collections::HashMap<String, String>,
request_query: std::collections::HashMap<String, String>,
tool_base_url: Option<String>,
) {
let emit = |event: FlowExecutionEvent| -> Result<(), ()> {
if cancel.is_cancelled() {
return Err(());
}
tx.send(event).map_err(|_| ())
};
let exec_start = std::time::Instant::now();
if emit(FlowExecutionEvent::FlowStart {
flow_name: flow_name.clone(),
backend: backend.clone(),
timestamp_ms: now_ms(),
})
.is_err()
{
return;
}
let (ast_program, ir) = match crate::flow_plan::compile_source_to_ir(&source, &source_file) {
Ok(pair) => pair,
Err(plan_error) => {
let detail = format!("compilation failed: {plan_error:?}");
tracing::error!(
flow = %flow_name,
source_file = %source_file,
detail = %detail,
"axon streaming flow failed — source did not compile"
);
let _ = emit(FlowExecutionEvent::FlowError {
flow_name: flow_name.clone(),
error: detail,
timestamp_ms: now_ms(),
});
return;
}
};
let store_registry = match crate::store::registry::StoreRegistry::build(
&ir.axonstore_specs,
) {
Ok(r) => std::sync::Arc::new(r),
Err(reg_error) => {
let detail = format!("axonstore registry: {reg_error}");
tracing::error!(
flow = %flow_name,
detail = %detail,
"axon streaming flow failed — axonstore registry build"
);
let _ = emit(FlowExecutionEvent::FlowError {
flow_name: flow_name.clone(),
error: detail,
timestamp_ms: now_ms(),
});
return;
}
};
let pinned_conns: std::sync::Arc<
std::sync::Mutex<
std::collections::HashMap<
String,
sqlx::pool::PoolConnection<sqlx::Postgres>,
>,
>,
> = std::sync::Arc::new(std::sync::Mutex::new(
std::collections::HashMap::new(),
));
{
let mut needed: std::collections::HashSet<String> =
std::collections::HashSet::new();
for f in &ir.flows {
for node in &f.steps {
let store_ref: Option<&str> = match node {
crate::ir_nodes::IRFlowNode::Persist(p) => Some(p.store_name.as_str()),
crate::ir_nodes::IRFlowNode::Retrieve(r) => Some(r.store_name.as_str()),
crate::ir_nodes::IRFlowNode::Mutate(m) => Some(m.store_name.as_str()),
crate::ir_nodes::IRFlowNode::Purge(p) => Some(p.store_name.as_str()),
_ => None,
};
if let Some(store_name) = store_ref {
if store_registry.backend_kind(store_name)
== Some(crate::store::registry::StoreBackendKind::Postgresql)
{
needed.insert(store_name.to_string());
}
}
}
}
for store_name in &needed {
match store_registry.resolve(store_name) {
Ok(crate::store::registry::StoreHandle::Postgres(backend)) => {
match backend.acquire_pin().await {
Ok(conn) => {
crate::store::pin_observability::emit_pin_acquire(
store_name,
&flow_name,
"", "eager",
None,
);
pinned_conns
.lock()
.unwrap()
.insert(store_name.clone(), conn);
}
Err(e) => {
tracing::warn!(
target: "axon::store::pin",
store_name = %store_name,
error = %e,
d_letter = "37.x.j.D2",
"streaming flow failed to acquire pin; \
falling back to per-step pool acquisition \
(legacy path). Adopter under transaction-\
mode pooler may observe the unnamed-\
prepared-statement race for this store."
);
}
}
}
Ok(_) => {} Err(e) => {
tracing::warn!(
target: "axon::store::pin",
store_name = %store_name,
error = %e,
d_letter = "37.x.j.D2",
"streaming flow failed to resolve axonstore for \
pin acquisition; falling back to per-step pool \
acquisition (legacy path)."
);
}
}
}
}
let flow = match ir.flows.iter().find(|f| f.name == flow_name) {
Some(f) => f.clone(),
None => {
let available: Vec<String> = ir.flows.iter().map(|f| f.name.clone()).collect();
let detail = format!(
"flow '{}' not found in compiled IR; available: {:?}",
flow_name, available
);
tracing::error!(
flow = %flow_name,
detail = %detail,
"axon streaming flow failed — flow not found"
);
let _ = emit(FlowExecutionEvent::FlowError {
flow_name: flow_name.clone(),
error: detail,
timestamp_ms: now_ms(),
});
return;
}
};
let system_prompt = crate::flow_plan::compose_system_prompt_public(&flow, &ir, None);
let tool_registry = {
let mut reg = crate::tool_registry::ToolRegistry::new();
reg.register_from_ir(&ir.tools);
if let Some(base) = tool_base_url.as_deref() {
reg.resolve_relative_endpoints(base);
}
std::sync::Arc::new(reg)
};
let mut ctx = DispatchCtx::new(
flow_name.clone(),
backend.clone(),
system_prompt,
cancel.clone(),
tx.clone(),
)
.with_external_side_channels(
enforcement_summaries,
step_audit_records,
runtime_warnings,
)
.with_store_registry(store_registry)
.with_tool_registry(tool_registry)
.with_pinned_conns(pinned_conns);
ctx.held_capabilities = held_capabilities;
for (name, value) in crate::request_binding::bind_request(
&flow,
&request_path,
&request_query,
request_body.as_ref(),
) {
ctx.let_bindings.insert(name, value);
}
let mut total_tokens_output: u64 = 0;
let mut steps_executed: usize = 0;
let mut flow_success = true;
let mut flow_errored = false;
let ast_flow = ast_program
.declarations
.iter()
.find_map(|d| match d {
crate::ast::Declaration::Flow(f) if f.name == flow_name => Some(f),
_ => None,
});
for node in &flow.steps {
if cancel.is_cancelled() {
break;
}
if let (Some(ast_f), crate::ir_nodes::IRFlowNode::Step(ir_step)) = (ast_flow, node) {
if !ir_step.name.is_empty() {
ctx.pending_effect_policy =
crate::stream_effect_dispatcher::resolve_stream_effect_for_step(
&ir_step.name,
ast_f,
&ast_program,
);
}
}
let outcome = dispatch_node(node, &mut ctx).await;
match outcome {
Ok(NodeOutcome::Completed {
tokens_emitted, ..
}) => {
total_tokens_output += tokens_emitted;
steps_executed += 1;
}
Ok(NodeOutcome::Break) | Ok(NodeOutcome::LoopContinue) => {
}
Ok(NodeOutcome::Return { .. }) => {
steps_executed += 1;
break;
}
Err(crate::flow_dispatcher::DispatchError::UpstreamCancelled) => {
break;
}
Err(e) => {
flow_success = false;
flow_errored = true;
use crate::ir_nodes::IRFlowNode;
let node_label = match node {
IRFlowNode::Step(s) if !s.name.is_empty() => {
format!("step '{}'", s.name)
}
IRFlowNode::Retrieve(r) => {
format!("retrieve from '{}'", r.store_name)
}
IRFlowNode::Persist(p) => {
format!("persist into '{}'", p.store_name)
}
IRFlowNode::Mutate(m) => format!("mutate '{}'", m.store_name),
IRFlowNode::Purge(p) => format!("purge '{}'", p.store_name),
_ => format!("node #{}", steps_executed + 1),
};
let detail =
format!("flow '{flow_name}' failed at {node_label}: {e:?}");
tracing::error!(
flow = %flow_name,
node = %node_label,
detail = %detail,
"axon streaming flow failed — node dispatch error"
);
let _ = emit(FlowExecutionEvent::FlowError {
flow_name: flow_name.clone(),
error: detail,
timestamp_ms: now_ms(),
});
break;
}
}
}
if !flow_errored {
let _ = emit(FlowExecutionEvent::FlowComplete {
flow_name,
backend,
success: flow_success,
steps_executed,
tokens_input: 0,
tokens_output: total_tokens_output,
latency_ms: exec_start.elapsed().as_millis() as u64,
timestamp_ms: now_ms(),
});
}
drop(ctx);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cancel_token::CancellationFlag;
use tokio::sync::mpsc;
#[tokio::test]
async fn canonical_step_emits_single_stub_token() {
let src = "flow Chat() -> Unit {\n\
step Generate { ask: \"hi\" output: Stream<Token> }\n\
}\n\
axonendpoint E { method: POST path: \"/c\" execute: Chat transport: sse }";
let (tx, mut rx) = mpsc::unbounded_channel();
let cancel = CancellationFlag::new();
let enforcement = std::sync::Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::new(),
));
let audit = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new()));
let warnings = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new()));
run_streaming_via_dispatcher(
src.to_string(),
"test.axon".to_string(),
"Chat".to_string(),
"stub".to_string(),
cancel,
tx,
enforcement,
audit,
warnings,
None,
None,
std::collections::HashMap::new(),
std::collections::HashMap::new(),
None, )
.await;
let mut events = Vec::new();
while let Some(ev) = rx.recv().await {
events.push(ev);
}
let token_count = events
.iter()
.filter(|e| matches!(e, FlowExecutionEvent::StepToken { .. }))
.count();
let complete_count = events
.iter()
.filter(|e| matches!(e, FlowExecutionEvent::FlowComplete { .. }))
.count();
let flow_start_count = events
.iter()
.filter(|e| matches!(e, FlowExecutionEvent::FlowStart { .. }))
.count();
assert_eq!(flow_start_count, 1, "exactly 1 FlowStart");
assert_eq!(token_count, 1, "stub backend emits exactly 1 token");
assert_eq!(complete_count, 1, "exactly 1 FlowComplete");
}
#[tokio::test]
async fn compilation_error_emits_exactly_one_flow_error() {
let src = "not valid axon source ::: <<< broken";
let (tx, mut rx) = mpsc::unbounded_channel();
let cancel = CancellationFlag::new();
let enforcement = std::sync::Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::new(),
));
let audit = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new()));
let warnings = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new()));
run_streaming_via_dispatcher(
src.to_string(),
"broken.axon".to_string(),
"Whatever".to_string(),
"stub".to_string(),
cancel,
tx,
enforcement,
audit,
warnings,
None,
None,
std::collections::HashMap::new(),
std::collections::HashMap::new(),
None, )
.await;
let mut events = Vec::new();
while let Some(ev) = rx.recv().await {
events.push(ev);
}
let error_count = events
.iter()
.filter(|e| matches!(e, FlowExecutionEvent::FlowError { .. }))
.count();
let token_count = events
.iter()
.filter(|e| matches!(e, FlowExecutionEvent::StepToken { .. }))
.count();
let complete_count = events
.iter()
.filter(|e| matches!(e, FlowExecutionEvent::FlowComplete { .. }))
.count();
assert_eq!(error_count, 1, "compilation error emits exactly one FlowError");
assert_eq!(token_count, 0, "no tokens on compilation failure");
assert_eq!(
complete_count, 0,
"no FlowComplete after a FlowError — exactly one terminator (36.x.c/D1)"
);
assert!(
matches!(events.last(), Some(FlowExecutionEvent::FlowError { .. })),
"the FlowError is the final event — nothing follows the terminator"
);
}
#[tokio::test]
async fn missing_flow_name_emits_flow_error() {
let src = "flow Chat() -> Unit {\n\
step Generate { ask: \"hi\" output: Stream<Token> }\n\
}\n\
axonendpoint E { method: POST path: \"/c\" execute: Chat transport: sse }";
let (tx, mut rx) = mpsc::unbounded_channel();
let cancel = CancellationFlag::new();
let enforcement = std::sync::Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::new(),
));
let audit = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new()));
let warnings = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new()));
run_streaming_via_dispatcher(
src.to_string(),
"test.axon".to_string(),
"NonExistent".to_string(),
"stub".to_string(),
cancel,
tx,
enforcement,
audit,
warnings,
None,
None,
std::collections::HashMap::new(),
std::collections::HashMap::new(),
None, )
.await;
let mut events = Vec::new();
while let Some(ev) = rx.recv().await {
events.push(ev);
}
let error_found = events.iter().any(|e| {
matches!(e, FlowExecutionEvent::FlowError { error, .. } if error.contains("not found"))
});
assert!(error_found, "missing flow surfaces structured 'not found' error");
}
#[tokio::test]
async fn pre_cancel_short_circuits() {
let src = "flow Chat() -> Unit {\n\
step Generate { ask: \"hi\" output: Stream<Token> }\n\
}\n\
axonendpoint E { method: POST path: \"/c\" execute: Chat transport: sse }";
let (tx, mut rx) = mpsc::unbounded_channel();
let cancel = CancellationFlag::new();
cancel.cancel(); let enforcement = std::sync::Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::new(),
));
let audit = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new()));
let warnings = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new()));
run_streaming_via_dispatcher(
src.to_string(),
"test.axon".to_string(),
"Chat".to_string(),
"stub".to_string(),
cancel,
tx,
enforcement,
audit,
warnings,
None,
None,
std::collections::HashMap::new(),
std::collections::HashMap::new(),
None, )
.await;
let mut events = Vec::new();
while let Some(ev) = rx.recv().await {
events.push(ev);
}
assert_eq!(events.len(), 0, "pre-cancel → zero events emitted");
}
}