axon/flow_dispatcher/mod.rs
1//! §Fase 33.y.b — Per-IRFlowNode async dispatcher skeleton.
2//!
3//! This module ships the **structural foundation** of the Fase 33.y
4//! universal-algebraic-streaming cycle: a closed-catalog, compiler-
5//! enforced exhaustive dispatch table over the 45-variant
6//! [`crate::ir_nodes::IRFlowNode`] enum. Each arm in
7//! [`dispatch_node`]'s match is named explicitly; adding a 46th
8//! IRFlowNode variant fails the Rust build at this match until the
9//! corresponding dispatcher arm is added (D1 totality invariant).
10//!
11//! # What 33.y.b ships
12//!
13//! - [`DispatchCtx`] — the shared per-flow context every per-variant
14//! handler reads + writes. Carries the FlowExecutionEvent producer,
15//! the cancellation flag, the side-channels for enforcement summary
16//! / step audit / runtime warnings, the orchestration `branch_path`
17//! (for D6 per-step replay binding under Par/ForIn/Conditional
18//! nesting), and the per-step counter.
19//! - [`NodeOutcome`] — closed catalog of dispatcher outcomes. In
20//! 33.y.b only the transitional [`NodeOutcome::LegacyShimHandled`]
21//! variant exists; subsequent sub-fases 33.y.c–j add real outcomes
22//! (`Completed`, `Break`, `LoopContinue`, `Return`, etc.) and
23//! 33.y.l removes `LegacyShimHandled` once every variant has its
24//! real handler.
25//! - [`DispatchError`] — closed catalog of dispatch errors. Five
26//! variants today: BackendError / UpstreamCancelled /
27//! LegacyShimFailed / MissingDependency / ChannelClosed.
28//! - [`ShimReason`] — per-IRFlowNode-variant tag for the 33.y.b
29//! transitional shim. The drift gate
30//! [`tests::shim_reason_covers_full_ir_flow_node_catalog`]
31//! asserts the ShimReason set has 1-to-1 cardinality with the
32//! IRFlowNode set.
33//! - [`dispatch_node`] — the dispatcher entry point. Exhaustive
34//! match over 45 arms; each arm delegates to [`legacy_shim`] which
35//! returns `Ok(NodeOutcome::LegacyShimHandled)`. **No node is
36//! actually executed in 33.y.b.** The module is standalone — not
37//! wired into `server_execute_streaming` — so production behavior
38//! is byte-identical with v1.25.0 (D4).
39//!
40//! # What 33.y.b does NOT ship
41//!
42//! - Real per-variant async logic (lands per sub-fase 33.y.c–j).
43//! - Integration with `server_execute_streaming` (lands incrementally
44//! per sub-fase as each variant comes online).
45//! - Wire-format extensions (per-step `wire_status`, `branch_path`
46//! field on `StepAuditRecord`, `axon-W003 partial-streaming-
47//! activation` warning — all land in 33.y.c–l).
48//!
49//! # D-letter anchors
50//!
51//! - **D1** — Per-IRFlowNode async dispatch is total. The exhaustive
52//! match below is the compiler-enforced totality witness.
53//! - **D4** — Wire byte-compat preserved. No production code path
54//! calls `dispatch_node` in 33.y.b; the module exists to lock the
55//! shape that subsequent sub-fases extend.
56//! - **D7** — Production-grade per-variant handler discipline. The
57//! shim is INTENTIONALLY a no-op transition — not an
58//! `unimplemented!()` panic, not a `todo!()`, not a `_ =>` catch-all.
59//! Each arm is named; each shim invocation tags its IR variant
60//! precisely. The shim is structural plumbing, not a stub.
61
62use crate::cancel_token::CancellationFlag;
63use crate::flow_execution_event::FlowExecutionEvent;
64use crate::ir_nodes::IRFlowNode;
65use crate::stream_effect::BackpressurePolicy;
66use std::collections::HashMap;
67use std::sync::Arc;
68use tokio::sync::{mpsc, Mutex};
69
70/// §Fase 33.y.c — Pure-shape variant handlers (Step / Probe / Reason /
71/// Validate / Refine / Weave). All 6 variants reduce to "produce a
72/// single LLM response from a prompt + cognitive framing"; the module
73/// houses the shared async core (`run_pure_shape`) + 6 thin
74/// per-variant entry points that build the variant's framing.
75pub mod pure_shape;
76
77/// §Fase 33.y.d — Orchestration variant handlers (Let / Conditional /
78/// ForIn / Break / Continue / Return). 6 variants — control-flow
79/// constructs that compose child handlers via recursive
80/// `dispatch_node` calls + sentinel-driven loop semantics +
81/// `branch_path` segments threading the orchestration tree.
82pub mod orchestration;
83
84/// §Fase 33.y.e — Parallel variant handler (`Par`) + public helper
85/// [`parallel::run_branches_concurrently`] for concurrent dispatch
86/// via `futures::future::join_all` with per-branch DispatchCtx
87/// clones + post-join step_counter merge + Return-sentinel
88/// propagation. `IRParallelBlock` is payload-free in v1.25.0; the
89/// handler emits the `step_type: "par"` wire shape with zero
90/// token events. Future IR extensions wire branches into the
91/// public helper.
92pub mod parallel;
93
94/// §Fase 33.y.e — Stream variant handler (`Stream`) + public bridge
95/// [`effects_bridge::bridge_effect_stream_yield`] integrating the
96/// Fase 23 algebraic-effects runtime: scans the instruction block
97/// for `perform Stream.Yield x` (statically + via trace), runs the
98/// `EffectRuntime`, and emits one `axon.token` per Yield with the
99/// resolved value. `IRStreamBlock` is payload-free in v1.25.0; the
100/// handler emits the `step_type: "stream"` wire shape with zero
101/// token events. Future IR extensions wire instruction blocks into
102/// the public bridge.
103pub mod effects_bridge;
104
105/// §Fase 33.y.f — Cognitive primitives (Fase 11 neuro-symbolic).
106/// 10 variants: `Remember` / `Recall` are PEM-bound (write-through
107/// + read-back via the optional [`DispatchCtx::pem_backend`]);
108/// `Forge` is payload-free wire shape (canonical
109/// `step_type: "forge"`); `Focus` / `Associate` / `Aggregate` /
110/// `Explore` / `Ingest` / `Navigate` / `Corroborate` reuse the
111/// pure-shape async core ([`pure_shape::run_pure_shape`]) with
112/// each variant's cognitive framing addendum reflected in the
113/// system prompt.
114pub mod cognitive;
115
116/// §Fase 33.y.g — Algebraic-effect handler nodes.
117/// 6 variants: `ShieldApply` / `OtsApply` / `MandateApply` — apply
118/// a named capability to a target with structured output binding;
119/// `ComputeApply` — invoke a compute capability with positional
120/// arguments; `Listen` — wait on a Fase 13 typed channel for an
121/// event; `DaemonStep` — invoke a Fase 16 daemon supervisor by
122/// reference. Each handler emits wire shape with the canonical
123/// `step_type` slug + public `apply_*` helpers that enterprise
124/// integrations override (per the OSS/ENTERPRISE/SPLIT charter
125/// — the shield/OTS/mandate scanner registries live in
126/// `axon_enterprise.shield`).
127pub mod algebraic_handlers;
128
129/// §Fase 33.y.h — Wire-integration handler nodes (π-calc +
130/// persistence + multi-agent deliberation). 10 variants:
131/// **Emit / Publish / Discover** (Fase 13 typed channels — π-calc
132/// output prefix + capability extrusion + dual discovery);
133/// **Persist / Retrieve / Mutate / Purge / Transact** (persistence
134/// primitives — snapshot / load / update / delete / transactional
135/// block); **Deliberate / Consensus** (multi-agent payload-free
136/// blocks). Each ships wire shape + public helper that enterprise
137/// integrations (Postgres / Redis / MQ / typed-channel runtime)
138/// override.
139pub mod wire_integrations;
140
141/// §Fase 33.y.i — PIX variants (paper §6 hidden-state primitives).
142/// 3 variants: **Hibernate** (CPS-style event-await with timeout
143/// — Fase 11.e + Fase 16 supervisor); **Drill** (PIX subtree
144/// navigation); **Trail** (breadcrumb walk over a prior
145/// navigation). OSS reference impl uses `__pix_*` /
146/// `__hibernating_*` namespaced let_bindings keys; enterprise R&D
147/// (axon_enterprise.cognitive_states + .supervisor) wires real
148/// continuation-passing semantics + PIX state machines.
149pub mod pix;
150
151/// §Fase 33.y.j — Lambda + UseTool (the final 2 variants).
152/// **LambdaDataApply** — Fase 15 ΛD apply (the sync runner walks
153/// a CPS dispatcher mapping lambda data structures to expressions;
154/// 33.y.j ships the OSS wire shape + helper). **UseTool** —
155/// mid-step tool invocation (Fase 22 backend tools; the
156/// `ChatRequest.tools` cross-cutting plumb-through lands in
157/// 33.y.k D8). Completes the 45-variant total coverage.
158pub mod lambda_tools;
159
160/// §Fase 34.g — Unified stream handler (4-disjunction convergence).
161/// Pre-34.g the four streaming-effect disjunctions (LLM-side
162/// `output: Stream<T>`, `apply: <stream-tool>`, `use_tool` syntax,
163/// `perform Stream.Yield`) had divergent drain paths — disjunct (a)
164/// enforced `BackpressurePolicy` at chunk granularity while (b)/(d)
165/// only captured the policy slug in audit without enforcement. This
166/// module ships [`unified_stream::unified_stream_handler`] — the
167/// single drain loop that ALL `Stream<ToolChunk>`-producing
168/// disjunctions route through; the handler integrates a
169/// [`crate::stream_runtime::Stream<ToolChunk>`] policy primitive +
170/// returns a [`unified_stream::ToolStreamSummary`] with real
171/// `chunks_dropped`/`chunks_degraded` counters. Also ships the
172/// [`unified_stream::chat_chunk_to_tool_chunk`] type-bridge for
173/// disjunct (a) symmetry tests + [`unified_stream::unified_stream_from_chunks`]
174/// adapter for disjunct (d)'s static-scan output.
175pub mod unified_stream;
176
177// ────────────────────────────────────────────────────────────────────
178// DispatchCtx — shared per-flow async surface
179// ────────────────────────────────────────────────────────────────────
180
181/// Per-flow dispatcher context. Carries the producer-side wire
182/// surface (`tx` for FlowExecutionEvent), cancel-in-body propagation
183/// (`cancel`), the audit/enforcement/warning side-channels (read by
184/// the SSE handler at `axon.complete` time), and the orchestration
185/// `branch_path` for D6 per-step replay binding.
186///
187/// # `branch_path` semantics
188///
189/// Empty at flow root. Parent handlers push a segment when descending
190/// into a child:
191/// - `par[0]`, `par[1]`, `par[2]` for the n-th branch of a Par block.
192/// - `for_in[0]`, `for_in[1]` for the n-th iteration of a ForIn loop.
193/// - `conditional.then`, `conditional.else` for the chosen branch
194/// of an `if`.
195/// - Children inside a branch concat: `par[0].step[0]` for the first
196/// child step of the first Par branch.
197///
198/// The path is observable in `StepAuditRecord.branch_path` (extended
199/// in 33.y.f when the audit row writer gains the field) so regulators
200/// replaying a flow on appeal see the full execution tree, not just a
201/// flattened step sequence.
202///
203/// # `step_counter`
204///
205/// Monotonic per-flow counter. Each Step (or pure-shape variant
206/// promoted to Step) increments. Surface fed into `step_audit` so
207/// the row index is correct under nested orchestration.
208#[derive(Clone)]
209pub struct DispatchCtx {
210 pub flow_name: String,
211 pub backend_name: String,
212 pub system_prompt: String,
213 pub cancel: CancellationFlag,
214 pub tx: mpsc::UnboundedSender<FlowExecutionEvent>,
215 pub enforcement_summaries: Arc<
216 Mutex<HashMap<String, crate::axon_server::EnforcementSummaryWire>>,
217 >,
218 pub step_audit_records: Arc<
219 Mutex<Vec<crate::axonendpoint_replay::StepAuditRecord>>,
220 >,
221 pub runtime_warnings: Arc<
222 Mutex<Vec<crate::runtime_warnings::RuntimeWarning>>,
223 >,
224 pub branch_path: Vec<String>,
225 pub step_counter: usize,
226 /// §Fase 33.y.f — Optional PEM async surface for cognitive
227 /// primitives (Remember / Recall etc.). When `Some(backend)`,
228 /// `run_remember` write-through persists to PEM and `run_recall`
229 /// restores from PEM as a write-back cache layered over
230 /// `let_bindings`. When `None`, both handlers degrade to
231 /// `let_bindings`-only (in-memory) — the canonical adopter
232 /// path for tests + adopters that don't opt into persistent
233 /// cognitive state. Arc-cloned per branch for concurrent
234 /// dispatch (Fase 33.y.e parity).
235 pub pem_backend: Option<std::sync::Arc<dyn crate::pem::PersistenceBackend>>,
236 /// §Fase 33.y.f — Session anchor for PEM persistence. Defaults
237 /// to `flow_name` in [`DispatchCtx::new`]; adopters override
238 /// for multi-session flows.
239 pub session_id: String,
240 /// §Fase 33.y.f — Tenant routing tag for PEM persistence.
241 /// Defaults to empty in [`DispatchCtx::new`]; multi-tenant
242 /// adopters set this before dispatch.
243 pub tenant_id: String,
244 /// §Fase 33.y.d — Let-binding scope. Map from binding name to its
245 /// resolved value. `run_let` inserts; `run_conditional` reads to
246 /// evaluate the condition; `run_for_in` inserts the iteration
247 /// variable per iter. Bindings persist through the flow's
248 /// lifetime — sub-scoping is NOT introduced in 33.y.d (the
249 /// sync runner's let semantics are flow-scoped + monotonic,
250 /// matching this discipline for D10 parity). The `HashMap` is
251 /// cheap to clone for branch isolation when sub-fases 33.y.e
252 /// introduce parallel branches with private scopes (Par block).
253 pub let_bindings: std::collections::HashMap<String, String>,
254 /// §Fase 33.y.c — Per-node declared `<stream:<policy>>` resolved
255 /// by the caller BEFORE invoking `dispatch_node`. The pure-shape
256 /// handlers read + consume this field (set back to `None` on
257 /// entry) so each handler observes the policy intended for ITS
258 /// node, never the previous node's residue. When `None`, the
259 /// handler skips `StreamPolicyEnforcer` wrapping + emits chunks
260 /// directly to the wire.
261 ///
262 /// Subsequent sub-fases 33.y.d-l adopt the same pattern for
263 /// orchestration handlers (`Par` / `ForIn`) when child nodes
264 /// declare effects.
265 pub pending_effect_policy: Option<BackpressurePolicy>,
266 /// §Fase 34.d (v1.29.0) — Tool registry surface for the
267 /// streaming-tool dispatcher branch. When `Some(registry)`,
268 /// `pure_shape::run_step` resolves `step.apply_ref` against
269 /// the registry; if the entry's `is_streaming` flag is true,
270 /// the step bypasses `Backend::stream()` entirely + invokes
271 /// `tool.stream(args, ctx)` via the
272 /// [`crate::tool_dispatch_bridge::resolve_streaming_tool`]
273 /// factory. When `None` (D9 backwards-compat), the legacy
274 /// LLM-side path is taken regardless of source-declared
275 /// `effects: <stream:<policy>>` — adopters who haven't wired
276 /// the registry yet see no behavior change. Arc-shared for
277 /// concurrent dispatch (Fase 33.y.e parity).
278 pub tool_registry: Option<std::sync::Arc<crate::tool_registry::ToolRegistry>>,
279 /// §Fase 35.f (v1.30.0) — axonstore registry for SQL-vs-KV
280 /// dispatch. When `Some(registry)`, `run_persist` / `run_retrieve`
281 /// / `run_mutate` / `run_purge` resolve `store_name` against it: a
282 /// `postgresql`-backed store routes through `PostgresStoreBackend`,
283 /// every other (and every undeclared) store takes the byte-
284 /// identical key-value path. When `None` (the `DispatchCtx::new`
285 /// default), every store op is key-value — the pre-35 behavior,
286 /// unchanged (D3). Arc-shared so concurrent branches share one
287 /// per-DSN pool cache.
288 pub store_registry: Option<std::sync::Arc<crate::store::registry::StoreRegistry>>,
289 /// §Fase 35.j (v1.30.0) — Pillar IV: the capability slugs the
290 /// current request carries (the JWT bearer's `capabilities`
291 /// claim). When `Some`, the store handlers re-check a
292 /// capability-gated store against this set before any access —
293 /// defense-in-depth behind the type-checker's compile-time
294 /// guarantee. When `None` (the `DispatchCtx::new` default), there
295 /// is no capability context at this layer and the runtime
296 /// re-check is a no-op: the compile-time check + the endpoint's
297 /// Fase 32.g `requires:` gate stand.
298 pub held_capabilities: Option<Vec<String>>,
299 /// §Fase 35.h (v1.30.0) — Pillar II: the flow's tamper-evident
300 /// HMAC-Merkle mutation chain. Every `persist`/`mutate`/`purge`
301 /// appends a delta. Shared (`Arc`) across concurrent branches so a
302 /// `Par` block's mutations land in one chain; the Merkle head is a
303 /// verifiable fingerprint of the flow's complete mutation history.
304 pub audit_chain:
305 std::sync::Arc<std::sync::Mutex<crate::store::audit_chain::StoreAuditChain>>,
306 /// §Fase 37.x.j (D2) — Per-flow pinned Postgres connections.
307 /// Populated at stream start by `run_streaming_via_dispatcher`:
308 /// the IR is walked, every postgresql-backed `axonstore` referenced
309 /// by the flow body has ONE `PoolConnection<Postgres>` acquired,
310 /// and the map holds them by axonstore name for the flow's
311 /// lifetime. The map drops at the end of the streaming task,
312 /// returning every conn to the pool via the `after_release
313 /// DEALLOCATE ALL` hook (Fase 38.x.a D2 composing with 37.x.j D1).
314 ///
315 /// Wire-integration store handlers consult this map per op:
316 /// `take` the pin out → run the SQL via `StoreConn::Pinned(&mut pin)`
317 /// → `insert` the pin back. The take/return discipline preserves
318 /// the Arc<Mutex<>> sharing pattern across cloned (par-branched)
319 /// contexts while keeping individual ops borrow-checker friendly.
320 ///
321 /// Empty map ≡ no pinning held (legacy path) → handlers fall back
322 /// to `StoreConn::Pool(backend.pool())`. This is the case for
323 /// callers that haven't eager-acquired (non-streaming RPC paths,
324 /// CLI tests, etc.) — D5 byte-identical backwards-compat.
325 ///
326 /// Per D6.b (sub-fase 37.x.j.6): `par {}` branches that share this
327 /// Arc serialize on its mutex. The D6.a default (per-branch
328 /// sub-pin) replaces this Arc with a fresh empty map at par-branch
329 /// clone time so branches do NOT serialize on the parent's pins.
330 pub pinned_conns: std::sync::Arc<
331 std::sync::Mutex<
332 std::collections::HashMap<
333 String,
334 sqlx::pool::PoolConnection<sqlx::Postgres>,
335 >,
336 >,
337 >,
338}
339
340impl DispatchCtx {
341 /// Construct a fresh context for a new flow. Subsequent sub-fases
342 /// extend this with builder methods as the surface grows (PEM /
343 /// ReplayToken / CognitiveState plumbing in 33.y.f, tool registry
344 /// in 33.y.k, etc.).
345 pub fn new(
346 flow_name: impl Into<String>,
347 backend_name: impl Into<String>,
348 system_prompt: impl Into<String>,
349 cancel: CancellationFlag,
350 tx: mpsc::UnboundedSender<FlowExecutionEvent>,
351 ) -> Self {
352 let flow_name = flow_name.into();
353 let session_id = flow_name.clone();
354 Self {
355 flow_name,
356 backend_name: backend_name.into(),
357 system_prompt: system_prompt.into(),
358 cancel,
359 tx,
360 enforcement_summaries: Arc::new(Mutex::new(HashMap::new())),
361 step_audit_records: Arc::new(Mutex::new(Vec::new())),
362 runtime_warnings: Arc::new(Mutex::new(Vec::new())),
363 branch_path: Vec::new(),
364 step_counter: 0,
365 pem_backend: None,
366 session_id,
367 tenant_id: String::new(),
368 let_bindings: std::collections::HashMap::new(),
369 pending_effect_policy: None,
370 tool_registry: None,
371 store_registry: None,
372 held_capabilities: None,
373 audit_chain: std::sync::Arc::new(std::sync::Mutex::new(
374 crate::store::audit_chain::StoreAuditChain::new(),
375 )),
376 // §Fase 37.x.j (D2) — empty pin map by default; populated
377 // by `run_streaming_via_dispatcher` via `with_pinned_conns`.
378 pinned_conns: std::sync::Arc::new(std::sync::Mutex::new(
379 std::collections::HashMap::new(),
380 )),
381 }
382 }
383
384 /// §Fase 37.x.j (D2) — Builder: attach an Arc-shared pinned
385 /// connection map populated by the caller. `run_streaming_via_dispatcher`
386 /// uses this to install the eagerly-acquired flow-scoped pins
387 /// BEFORE the dispatcher walks any node. Returns `self` so the
388 /// builder pattern chains with `with_store_registry`, `with_pem`,
389 /// `with_tool_registry`, `with_held_capabilities`.
390 pub fn with_pinned_conns(
391 mut self,
392 conns: std::sync::Arc<
393 std::sync::Mutex<
394 std::collections::HashMap<
395 String,
396 sqlx::pool::PoolConnection<sqlx::Postgres>,
397 >,
398 >,
399 >,
400 ) -> Self {
401 self.pinned_conns = conns;
402 self
403 }
404
405 /// §Fase 35.f — Builder: attach the `axonstore` registry so the
406 /// wire-integration store handlers route postgresql-backed stores
407 /// to SQL. Without it, every store op stays key-value (D3).
408 /// Returns `self` so builders chain.
409 pub fn with_store_registry(
410 mut self,
411 registry: std::sync::Arc<crate::store::registry::StoreRegistry>,
412 ) -> Self {
413 self.store_registry = Some(registry);
414 self
415 }
416
417 /// §Fase 35.j — Builder: attach the request's held capability
418 /// slugs so the store handlers re-check capability-gated stores
419 /// (Pillar IV). Returns `self` so builders chain.
420 pub fn with_held_capabilities(mut self, capabilities: Vec<String>) -> Self {
421 self.held_capabilities = Some(capabilities);
422 self
423 }
424
425 /// §Fase 34.d — Builder: attach a tool registry so the
426 /// dispatcher's streaming-tool branch can resolve `apply_ref`
427 /// against it. Returns `self` so builders chain.
428 pub fn with_tool_registry(
429 mut self,
430 registry: std::sync::Arc<crate::tool_registry::ToolRegistry>,
431 ) -> Self {
432 self.tool_registry = Some(registry);
433 self
434 }
435
436 /// Builder: attach a PEM persistence backend. Returns `self` so
437 /// callers can chain `DispatchCtx::new(...).with_pem(backend)`.
438 pub fn with_pem(
439 mut self,
440 backend: std::sync::Arc<dyn crate::pem::PersistenceBackend>,
441 ) -> Self {
442 self.pem_backend = Some(backend);
443 self
444 }
445
446 /// Builder: set the session id (defaults to flow_name).
447 pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
448 self.session_id = session_id.into();
449 self
450 }
451
452 /// Builder: set the tenant id (defaults to empty).
453 pub fn with_tenant_id(mut self, tenant_id: impl Into<String>) -> Self {
454 self.tenant_id = tenant_id.into();
455 self
456 }
457
458 /// Builder-style setter for the pending effect policy. Returns
459 /// `self` so callers can chain `ctx.with_effect_policy(policy)`
460 /// before invoking `dispatch_node`. Handlers read + clear the
461 /// field via [`Self::take_pending_effect_policy`].
462 pub fn with_effect_policy(mut self, policy: BackpressurePolicy) -> Self {
463 self.pending_effect_policy = Some(policy);
464 self
465 }
466
467 /// §Fase 33.z.c — Builder: inject external Arc-backed side-channels
468 /// so the dispatcher's per-variant handlers populate the SAME
469 /// Mutexes that `server_execute_streaming` reads from for the SSE
470 /// wire's `enforcement_summary`, `step_audit`, and `runtime_warnings`
471 /// fields.
472 ///
473 /// Without this builder, `DispatchCtx::new` creates FRESH Arcs that
474 /// the dispatcher populates but the production hot path can't read.
475 /// That gap broke `axon.complete.enforcement_summary` wire emission
476 /// on the canonical Step shape when the dispatcher graft (33.z.b)
477 /// activated — the 33.x.d production-path tests detected the
478 /// regression at the `assert_eq!(generate_summary["chunks_pushed"], 1)`
479 /// line because the side-channel the wire reads from stayed empty
480 /// while the dispatcher's fresh Arc carried the counters.
481 ///
482 /// Used exclusively by `streaming_via_dispatcher::run_streaming_via_dispatcher`
483 /// to thread the side-channels the SSE handler constructs into the
484 /// dispatcher. Downstream-crate consumers driving `dispatch_node`
485 /// directly continue to use `DispatchCtx::new` + the fresh internal
486 /// Arcs.
487 pub fn with_external_side_channels(
488 mut self,
489 enforcement_summaries: std::sync::Arc<
490 tokio::sync::Mutex<
491 std::collections::HashMap<String, crate::axon_server::EnforcementSummaryWire>,
492 >,
493 >,
494 step_audit_records: std::sync::Arc<
495 tokio::sync::Mutex<Vec<crate::axonendpoint_replay::StepAuditRecord>>,
496 >,
497 runtime_warnings: std::sync::Arc<
498 tokio::sync::Mutex<Vec<crate::runtime_warnings::RuntimeWarning>>,
499 >,
500 ) -> Self {
501 self.enforcement_summaries = enforcement_summaries;
502 self.step_audit_records = step_audit_records;
503 self.runtime_warnings = runtime_warnings;
504 self
505 }
506
507 /// Read + clear the pending effect policy. Returns `None` when no
508 /// policy was set by the caller. The take-semantics (vs. peek)
509 /// prevents a stale policy from a previous node leaking into the
510 /// next handler's invocation if the caller forgets to clear.
511 pub fn take_pending_effect_policy(&mut self) -> Option<BackpressurePolicy> {
512 self.pending_effect_policy.take()
513 }
514
515 /// Render the current `branch_path` as a wire-stable string. Empty
516 /// path returns `""` (flow root); single segment `"par[0]"`; multi
517 /// `"par[0].step[1]"`. The format is byte-stable across calls.
518 pub fn branch_path_string(&self) -> String {
519 self.branch_path.join(".")
520 }
521}
522
523// ────────────────────────────────────────────────────────────────────
524// NodeOutcome — closed catalog of dispatcher outcomes
525// ────────────────────────────────────────────────────────────────────
526
527/// Closed catalog of dispatcher outcomes. 33.y.b ships only the
528/// transitional [`LegacyShimHandled`] variant; subsequent sub-fases
529/// 33.y.c–j add real outcomes:
530///
531/// - `Completed { output, tokens_emitted }` — handler ran to
532/// completion; output captured + tokens forwarded on the wire.
533/// - `Break` — sentinel from an in-loop `break`. The For-In handler
534/// short-circuits remaining iterations + propagates up.
535/// - `LoopContinue` — sentinel from `continue`. Skips to next
536/// iteration.
537/// - `Return { value }` — sentinel from `return`. Flow loop
538/// terminates.
539///
540/// 33.y.l removes [`LegacyShimHandled`] once every variant has its
541/// real handler.
542///
543/// # Why a closed catalog vs `Result<String, _>`
544///
545/// Sentinel values (Break / LoopContinue / Return) need to flow up
546/// through nested handler stacks WITHOUT being mistaken for content.
547/// A `Result<String, _>` would force the caller to encode sentinels
548/// in-band, which is unsound under serde + adopter-observable output.
549/// The closed enum is the only sound algebraic representation.
550#[derive(Debug, Clone)]
551#[non_exhaustive]
552pub enum NodeOutcome {
553 /// §Fase 33.y.c+ — Handler ran to completion. `output` is the
554 /// concatenated chunk content captured during streaming;
555 /// `tokens_emitted` is the count of non-empty `StepToken` events
556 /// fanned to the wire (post-policy enforcement for steps with a
557 /// declared `<stream:<policy>>`). The `step_index` is the value
558 /// of `ctx.step_counter` at the moment the handler started (i.e.
559 /// the index the handler reserved for itself before
560 /// incrementing); orchestration handlers in 33.y.d–e use this
561 /// to surface per-branch index trails on `branch_path`.
562 Completed {
563 output: String,
564 tokens_emitted: u64,
565 step_index: usize,
566 },
567 /// §Fase 33.y.d sentinel — emitted by the `Break` handler. The
568 /// enclosing `ForIn` handler observes this outcome from its
569 /// child dispatch + terminates the loop (skips remaining
570 /// iterations). Parser scope check guarantees `Break` only
571 /// appears inside a `ForIn` body, so non-loop ancestors that
572 /// observe this outcome MUST propagate it upward unchanged.
573 Break,
574 /// §Fase 33.y.d sentinel — emitted by the `Continue` handler.
575 /// The enclosing `ForIn` handler observes this + skips to the
576 /// next iteration. Same propagation discipline as
577 /// [`NodeOutcome::Break`].
578 LoopContinue,
579 /// §Fase 33.y.d sentinel — emitted by the `Return` handler.
580 /// Terminates the flow loop with the carried `value` as the
581 /// final flow output. Parents propagate unchanged until the
582 /// flow-loop level observes it.
583 Return { value: String },
584}
585
586// ────────────────────────────────────────────────────────────────────
587// DispatchError — closed catalog of dispatcher errors
588// ────────────────────────────────────────────────────────────────────
589
590/// Closed catalog of dispatcher errors. Adopter-reachable error
591/// surfaces are NAMED (D7 mandate: zero `unwrap()` / zero
592/// `unimplemented!()` / zero `_ =>` catch-all). Each variant carries
593/// adopter-actionable structured data.
594#[derive(Debug, Clone)]
595#[non_exhaustive]
596pub enum DispatchError {
597 /// `Backend::stream()` failed on a per-variant handler that
598 /// invoked it. Carries the backend name + the upstream error
599 /// message so the SSE handler can surface a structured
600 /// `axon.error` event.
601 BackendError { name: String, message: String },
602
603 /// Cancellation flag fired mid-dispatch (client disconnected or
604 /// upstream `tokio::select!` raced the cancel branch). Caller
605 /// MUST treat this as a clean exit (no `axon.error` event
606 /// surfaced — the consumer is already gone).
607 UpstreamCancelled,
608
609 /// A per-variant handler needed a dependency that wasn't
610 /// available on the DispatchCtx (e.g., PEM async surface for a
611 /// `Remember`/`Recall` handler before 33.y.f wires it in). The
612 /// `name` field tags which dependency.
613 MissingDependency { name: &'static str },
614
615 /// The mpsc sender returned `Err(_)` — consumer dropped. Caller
616 /// MUST treat this as a clean exit (same posture as
617 /// `UpstreamCancelled`).
618 ChannelClosed,
619}
620
621impl std::fmt::Display for DispatchError {
622 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
623 match self {
624 Self::BackendError { name, message } => {
625 write!(f, "backend '{name}' stream() failed: {message}")
626 }
627 Self::UpstreamCancelled => write!(f, "upstream cancelled mid-dispatch"),
628 Self::MissingDependency { name } => {
629 write!(f, "dispatcher missing dependency: {name}")
630 }
631 Self::ChannelClosed => write!(f, "channel closed (consumer dropped)"),
632 }
633 }
634}
635
636impl std::error::Error for DispatchError {}
637
638// ────────────────────────────────────────────────────────────────────
639// §Fase 33.y.l — ShimReason enum + legacy_shim function retired
640// ────────────────────────────────────────────────────────────────────
641//
642// After 33.y.j reached 45/45 IRFlowNode graduation, the
643// transitional `ShimReason` enum + `legacy_shim` function + the
644// `NodeOutcome::LegacyShimHandled` variant became structurally
645// unreachable from `dispatch_node`'s exhaustive match. 33.y.l
646// retires the entire shim infrastructure in this lockstep cleanup:
647//
648// - ShimReason enum — DELETED
649// - ShimReason::ALL constant — DELETED
650// - ShimReason::slug() method — DELETED
651// - legacy_shim() async helper — DELETED
652// - NodeOutcome::LegacyShimHandled — DELETED variant
653//
654// Drift-gate slug catalog now uses `flow_plan::ir_flow_node_kind`
655// directly (the same byte-stable surface that was duplicated in
656// ShimReason::slug — single source of truth).
657//
658// The dispatcher's 45-arm exhaustive match is unchanged: every IR
659// variant routes to its real async handler module (pure_shape /
660// orchestration / parallel / effects_bridge / cognitive /
661// algebraic_handlers / wire_integrations / pix / lambda_tools).
662//
663// Search the codebase: `grep -E "unimplemented|todo!|legacy_shim"
664// axon-rs/src/flow_dispatcher/*.rs` returns ZERO matches post-33.y.l
665// (verified by the `fase33y_l_parity_gate.rs::d7_no_legacy_markers`
666// drift-gate test).
667//
668// Build-time guarantee: `legacy_shim` is gone → compiler enforces
669// that NO future arm in dispatch_node can fall back to a stub. The
670// catalog totality contract D1 is sealed.
671// ────────────────────────────────────────────────────────────────────
672// dispatch_node — the exhaustive entry point
673// ────────────────────────────────────────────────────────────────────
674
675/// Dispatch a single IRFlowNode through the per-variant async
676/// handler stack. Total over the 45-variant closed catalog
677/// (compiler-enforced exhaustive match).
678///
679/// # 45/45 graduation FINAL (33.y.j)
680///
681/// As of Fase 33.y.j, every IRFlowNode variant has a NAMED async
682/// handler. There are NO `_ =>` catch-all arms, NO `legacy_shim`
683/// calls, NO `unimplemented!()` markers. Adding a 46th IRFlowNode
684/// variant fails the Rust build here until a real per-variant
685/// async handler is wired in.
686///
687/// Each arm's handler module:
688///
689/// - **pure_shape** (33.y.c) — Step / Probe / Reason / Validate /
690/// Refine / Weave
691/// - **orchestration** (33.y.d) — Let / Conditional / ForIn /
692/// Break / Continue / Return
693/// - **parallel** (33.y.e) — Par
694/// - **effects_bridge** (33.y.e + D9) — Stream
695/// - **cognitive** (33.y.f) — Remember / Recall / Forge / Focus /
696/// Associate / Aggregate / Explore / Ingest / Navigate /
697/// Corroborate
698/// - **algebraic_handlers** (33.y.g) — ShieldApply / OtsApply /
699/// MandateApply / ComputeApply / Listen / DaemonStep
700/// - **wire_integrations** (33.y.h) — Emit / Publish / Discover /
701/// Persist / Retrieve / Mutate / Purge / Transact / Deliberate /
702/// Consensus
703/// - **pix** (33.y.i) — Hibernate / Drill / Trail
704/// - **lambda_tools** (33.y.j) — LambdaDataApply / UseTool
705///
706/// # Cancellation
707///
708/// Every per-variant handler checks `ctx.cancel.is_cancelled()`
709/// at entry and at every `.await` boundary per the Fase 33.x.e
710/// `cancel_aware` discipline. Cancel propagation is uniform
711/// across the entire 45-variant catalog.
712pub async fn dispatch_node(
713 node: &IRFlowNode,
714 ctx: &mut DispatchCtx,
715) -> Result<NodeOutcome, DispatchError> {
716 // Exhaustive match — compiler enforces every variant has a
717 // named arm. Adding a 46th IRFlowNode variant fails the build
718 // here until the new arm is added. ZERO `_ =>` catch-all.
719 match node {
720 // §Fase 33.y.c — pure-shape variants graduated to real
721 // async handlers. Each delegates to its labeled
722 // `pure_shape::run_*` entry which wraps the shared
723 // `pure_shape::run_pure_shape` async core. The shim is
724 // retired for these 6 variants; subsequent sub-fases retire
725 // it for the remaining 39 variants per the topological
726 // schedule in `docs/fase/fase_33y_algebraic_streaming_dispatcher.md`.
727 IRFlowNode::Step(step) => pure_shape::run_step(step, ctx).await,
728 IRFlowNode::Probe(probe) => pure_shape::run_probe(probe, ctx).await,
729 IRFlowNode::Reason(reason) => pure_shape::run_reason(reason, ctx).await,
730 IRFlowNode::Validate(validate) => pure_shape::run_validate(validate, ctx).await,
731 IRFlowNode::Refine(refine) => pure_shape::run_refine(refine, ctx).await,
732 IRFlowNode::Weave(weave) => pure_shape::run_weave(weave, ctx).await,
733 // §Fase 33.y.j — UseTool graduated.
734 IRFlowNode::UseTool(node) => lambda_tools::run_use_tool(node, ctx).await,
735 // §Fase 33.y.f — cognitive primitives PEM-bound.
736 IRFlowNode::Remember(node) => cognitive::run_remember(node, ctx).await,
737 IRFlowNode::Recall(node) => cognitive::run_recall(node, ctx).await,
738 // §Fase 33.y.d — orchestration variants graduated to real
739 // async handlers. Each composes child handlers via recursive
740 // `dispatch_node` calls + threads sentinel outcomes (Break /
741 // LoopContinue / Return) up through orchestration parents.
742 IRFlowNode::Conditional(cond) => orchestration::run_conditional(cond, ctx).await,
743 IRFlowNode::ForIn(for_in) => orchestration::run_for_in(for_in, ctx).await,
744 IRFlowNode::Let(let_bind) => orchestration::run_let(let_bind, ctx).await,
745 IRFlowNode::Return(ret) => orchestration::run_return(ret, ctx).await,
746 IRFlowNode::Break(brk) => orchestration::run_break(brk, ctx).await,
747 IRFlowNode::Continue(cont) => orchestration::run_continue(cont, ctx).await,
748 // §Fase 33.y.j — LambdaDataApply graduated.
749 IRFlowNode::LambdaDataApply(node) => lambda_tools::run_lambda_data_apply(node, ctx).await,
750 // §Fase 33.y.e — Par graduated to real async handler. The
751 // payload-free `IRParallelBlock` emits the canonical
752 // `step_type: "par"` wire shape; future IR extensions
753 // delegate to `parallel::run_branches_concurrently`.
754 IRFlowNode::Par(par) => parallel::run_par(par, ctx).await,
755 // §Fase 33.y.i — PIX variants graduated.
756 IRFlowNode::Hibernate(node) => pix::run_hibernate(node, ctx).await,
757 // §Fase 33.y.h — multi-agent deliberation blocks.
758 IRFlowNode::Deliberate(node) => wire_integrations::run_deliberate(node, ctx).await,
759 IRFlowNode::Consensus(node) => wire_integrations::run_consensus(node, ctx).await,
760 // §Fase 33.y.f — Forge payload-free wire shape.
761 IRFlowNode::Forge(node) => cognitive::run_forge(node, ctx).await,
762 // §Fase 33.y.f — cognitive framing handlers reuse pure_shape.
763 IRFlowNode::Focus(node) => cognitive::run_focus(node, ctx).await,
764 IRFlowNode::Associate(node) => cognitive::run_associate(node, ctx).await,
765 IRFlowNode::Aggregate(node) => cognitive::run_aggregate(node, ctx).await,
766 IRFlowNode::Explore(node) => cognitive::run_explore(node, ctx).await,
767 IRFlowNode::Ingest(node) => cognitive::run_ingest(node, ctx).await,
768 // §Fase 33.y.g — algebraic-effect handler nodes graduated.
769 IRFlowNode::ShieldApply(node) => algebraic_handlers::run_shield_apply(node, ctx).await,
770 // §Fase 33.y.e — Stream graduated to real async handler.
771 // The payload-free `IRStreamBlock` emits the canonical
772 // `step_type: "stream"` wire shape; future IR extensions
773 // delegate to `effects_bridge::bridge_effect_stream_yield`.
774 IRFlowNode::Stream(stream) => effects_bridge::run_stream(stream, ctx).await,
775 IRFlowNode::Navigate(node) => cognitive::run_navigate(node, ctx).await,
776 IRFlowNode::Drill(node) => pix::run_drill(node, ctx).await,
777 IRFlowNode::Trail(node) => pix::run_trail(node, ctx).await,
778 IRFlowNode::Corroborate(node) => cognitive::run_corroborate(node, ctx).await,
779 IRFlowNode::OtsApply(node) => algebraic_handlers::run_ots_apply(node, ctx).await,
780 IRFlowNode::MandateApply(node) => algebraic_handlers::run_mandate_apply(node, ctx).await,
781 IRFlowNode::ComputeApply(node) => algebraic_handlers::run_compute_apply(node, ctx).await,
782 IRFlowNode::Listen(node) => algebraic_handlers::run_listen(node, ctx).await,
783 IRFlowNode::DaemonStep(node) => algebraic_handlers::run_daemon_step(node, ctx).await,
784 // §Fase 33.y.h — π-calc typed channels (Fase 13).
785 IRFlowNode::Emit(node) => wire_integrations::run_emit(node, ctx).await,
786 IRFlowNode::Publish(node) => wire_integrations::run_publish(node, ctx).await,
787 IRFlowNode::Discover(node) => wire_integrations::run_discover(node, ctx).await,
788 // §Fase 33.y.h — persistence primitives.
789 IRFlowNode::Persist(node) => wire_integrations::run_persist(node, ctx).await,
790 IRFlowNode::Retrieve(node) => wire_integrations::run_retrieve(node, ctx).await,
791 IRFlowNode::Mutate(node) => wire_integrations::run_mutate(node, ctx).await,
792 IRFlowNode::Purge(node) => wire_integrations::run_purge(node, ctx).await,
793 IRFlowNode::Transact(node) => wire_integrations::run_transact(node, ctx).await,
794 }
795}
796
797// ────────────────────────────────────────────────────────────────────
798// Unit tests — drift gate + smoke
799// ────────────────────────────────────────────────────────────────────
800
801#[cfg(test)]
802mod tests {
803 use super::*;
804 use crate::cancel_token::CancellationFlag;
805
806 /// §Fase 33.y.l drift-gate update — the historical
807 /// `shim_reason_cardinality_45_variants` /
808 /// `shim_reason_slugs_are_unique` /
809 /// `shim_reason_slugs_are_well_formed` /
810 /// `legacy_shim_returns_handled_on_happy_path` /
811 /// `legacy_shim_returns_cancel_when_flag_set` /
812 /// `shim_reason_slug_matches_ir_flow_node_kind` tests are
813 /// RETIRED here. The replacement coverage lives in:
814 ///
815 /// - `tests/fase33y_b_dispatcher_skeleton.rs` — IR-variant
816 /// catalog cardinality + slug uniqueness via
817 /// `flow_plan::ir_flow_node_kind` directly (single source
818 /// of truth, no more `ShimReason::slug` duplication).
819 /// - `tests/fase33y_l_parity_gate.rs` — D7 build-time grep
820 /// invariant: zero `unimplemented!` / `todo!` / `legacy_shim`
821 /// symbols in `flow_dispatcher/*.rs`.
822
823 /// 33.y.b branch_path: empty at flow root.
824 #[test]
825 fn dispatch_ctx_branch_path_empty_at_root() {
826 let (tx, _rx) = mpsc::unbounded_channel();
827 let ctx = DispatchCtx::new(
828 "F",
829 "stub",
830 "",
831 CancellationFlag::new(),
832 tx,
833 );
834 assert!(ctx.branch_path.is_empty());
835 assert_eq!(ctx.branch_path_string(), "");
836 assert_eq!(ctx.step_counter, 0);
837 }
838
839 /// 33.y.b branch_path: multi-segment join is wire-stable.
840 #[test]
841 fn dispatch_ctx_branch_path_joins_segments() {
842 let (tx, _rx) = mpsc::unbounded_channel();
843 let mut ctx = DispatchCtx::new(
844 "F",
845 "stub",
846 "",
847 CancellationFlag::new(),
848 tx,
849 );
850 ctx.branch_path.push("par[0]".to_string());
851 ctx.branch_path.push("step[1]".to_string());
852 assert_eq!(ctx.branch_path_string(), "par[0].step[1]");
853 }
854
855 /// 33.y.b DispatchError Display surface produces actionable
856 /// messages for every variant.
857 #[test]
858 fn dispatch_error_display_surface() {
859 let cases: Vec<(DispatchError, &str)> = vec![
860 (
861 DispatchError::BackendError {
862 name: "anthropic".to_string(),
863 message: "rate limited".to_string(),
864 },
865 "backend 'anthropic' stream() failed: rate limited",
866 ),
867 (DispatchError::UpstreamCancelled, "upstream cancelled mid-dispatch"),
868 (
869 DispatchError::MissingDependency { name: "pem_async" },
870 "dispatcher missing dependency: pem_async",
871 ),
872 (DispatchError::ChannelClosed, "channel closed (consumer dropped)"),
873 ];
874 for (err, expected) in cases {
875 assert_eq!(format!("{err}"), expected);
876 }
877 }
878
879 /// 33.y.c smoke: dispatch_node dispatches Step through the
880 /// graduated pure-shape handler (not the shim) and returns
881 /// `NodeOutcome::Completed` with the stub backend's canonical
882 /// "(stub)" 1-token output.
883 #[tokio::test]
884 async fn dispatch_node_step_routes_to_pure_shape_handler() {
885 use crate::ir_nodes::*;
886
887 let step = IRStep {
888 node_type: "step",
889 source_line: 0,
890 source_column: 0,
891 name: "Generate".to_string(),
892 persona_ref: String::new(),
893 given: String::new(),
894 ask: "hi".to_string(),
895 use_tool: None,
896 probe: None,
897 reason: None,
898 weave: None,
899 output_type: String::new(),
900 confidence_floor: None,
901 navigate_ref: String::new(),
902 apply_ref: String::new(),
903 body: Vec::new(),
904 };
905 let node = IRFlowNode::Step(step);
906
907 let (tx, _rx) = mpsc::unbounded_channel();
908 let mut ctx = DispatchCtx::new(
909 "F",
910 "stub",
911 "",
912 CancellationFlag::new(),
913 tx,
914 );
915
916 let outcome = dispatch_node(&node, &mut ctx).await.unwrap();
917 match outcome {
918 NodeOutcome::Completed {
919 output,
920 tokens_emitted,
921 step_index,
922 } => {
923 assert_eq!(output, "(stub)");
924 assert_eq!(tokens_emitted, 1);
925 assert_eq!(step_index, 0);
926 }
927 other => panic!("post-33.y.c: Step routes to pure_shape handler returning Completed; got {other:?}"),
928 }
929 }
930}