Skip to main content

axon/flow_dispatcher/
mod.rs

1//! §Fase 33.y.b — Per-IRFlowNode async dispatcher skeleton.
2//!
3//! This module ships the **structural foundation** of the Fase 33.y
4//! universal-algebraic-streaming cycle: a closed-catalog, compiler-
5//! enforced exhaustive dispatch table over the 45-variant
6//! [`crate::ir_nodes::IRFlowNode`] enum. Each arm in
7//! [`dispatch_node`]'s match is named explicitly; adding a 46th
8//! IRFlowNode variant fails the Rust build at this match until the
9//! corresponding dispatcher arm is added (D1 totality invariant).
10//!
11//! # What 33.y.b ships
12//!
13//! - [`DispatchCtx`] — the shared per-flow context every per-variant
14//!   handler reads + writes. Carries the FlowExecutionEvent producer,
15//!   the cancellation flag, the side-channels for enforcement summary
16//!   / step audit / runtime warnings, the orchestration `branch_path`
17//!   (for D6 per-step replay binding under Par/ForIn/Conditional
18//!   nesting), and the per-step counter.
19//! - [`NodeOutcome`] — closed catalog of dispatcher outcomes. In
20//!   33.y.b only the transitional [`NodeOutcome::LegacyShimHandled`]
21//!   variant exists; subsequent sub-fases 33.y.c–j add real outcomes
22//!   (`Completed`, `Break`, `LoopContinue`, `Return`, etc.) and
23//!   33.y.l removes `LegacyShimHandled` once every variant has its
24//!   real handler.
25//! - [`DispatchError`] — closed catalog of dispatch errors. Five
26//!   variants today: BackendError / UpstreamCancelled /
27//!   LegacyShimFailed / MissingDependency / ChannelClosed.
28//! - [`ShimReason`] — per-IRFlowNode-variant tag for the 33.y.b
29//!   transitional shim. The drift gate
30//!   [`tests::shim_reason_covers_full_ir_flow_node_catalog`]
31//!   asserts the ShimReason set has 1-to-1 cardinality with the
32//!   IRFlowNode set.
33//! - [`dispatch_node`] — the dispatcher entry point. Exhaustive
34//!   match over 45 arms; each arm delegates to [`legacy_shim`] which
35//!   returns `Ok(NodeOutcome::LegacyShimHandled)`. **No node is
36//!   actually executed in 33.y.b.** The module is standalone — not
37//!   wired into `server_execute_streaming` — so production behavior
38//!   is byte-identical with v1.25.0 (D4).
39//!
40//! # What 33.y.b does NOT ship
41//!
42//! - Real per-variant async logic (lands per sub-fase 33.y.c–j).
43//! - Integration with `server_execute_streaming` (lands incrementally
44//!   per sub-fase as each variant comes online).
45//! - Wire-format extensions (per-step `wire_status`, `branch_path`
46//!   field on `StepAuditRecord`, `axon-W003 partial-streaming-
47//!   activation` warning — all land in 33.y.c–l).
48//!
49//! # D-letter anchors
50//!
51//! - **D1** — Per-IRFlowNode async dispatch is total. The exhaustive
52//!   match below is the compiler-enforced totality witness.
53//! - **D4** — Wire byte-compat preserved. No production code path
54//!   calls `dispatch_node` in 33.y.b; the module exists to lock the
55//!   shape that subsequent sub-fases extend.
56//! - **D7** — Production-grade per-variant handler discipline. The
57//!   shim is INTENTIONALLY a no-op transition — not an
58//!   `unimplemented!()` panic, not a `todo!()`, not a `_ =>` catch-all.
59//!   Each arm is named; each shim invocation tags its IR variant
60//!   precisely. The shim is structural plumbing, not a stub.
61
62use crate::cancel_token::CancellationFlag;
63use crate::flow_execution_event::FlowExecutionEvent;
64use crate::ir_nodes::IRFlowNode;
65use crate::stream_effect::BackpressurePolicy;
66use std::collections::HashMap;
67use std::sync::Arc;
68use tokio::sync::{mpsc, Mutex};
69
70/// §Fase 33.y.c — Pure-shape variant handlers (Step / Probe / Reason /
71/// Validate / Refine / Weave). All 6 variants reduce to "produce a
72/// single LLM response from a prompt + cognitive framing"; the module
73/// houses the shared async core (`run_pure_shape`) + 6 thin
74/// per-variant entry points that build the variant's framing.
75pub mod pure_shape;
76
77/// §Fase 33.y.d — Orchestration variant handlers (Let / Conditional /
78/// ForIn / Break / Continue / Return). 6 variants — control-flow
79/// constructs that compose child handlers via recursive
80/// `dispatch_node` calls + sentinel-driven loop semantics +
81/// `branch_path` segments threading the orchestration tree.
82pub mod orchestration;
83
84/// §Fase 33.y.e — Parallel variant handler (`Par`) + public helper
85/// [`parallel::run_branches_concurrently`] for concurrent dispatch
86/// via `futures::future::join_all` with per-branch DispatchCtx
87/// clones + post-join step_counter merge + Return-sentinel
88/// propagation. `IRParallelBlock` is payload-free in v1.25.0; the
89/// handler emits the `step_type: "par"` wire shape with zero
90/// token events. Future IR extensions wire branches into the
91/// public helper.
92pub mod parallel;
93
94/// §Fase 33.y.e — Stream variant handler (`Stream`) + public bridge
95/// [`effects_bridge::bridge_effect_stream_yield`] integrating the
96/// Fase 23 algebraic-effects runtime: scans the instruction block
97/// for `perform Stream.Yield x` (statically + via trace), runs the
98/// `EffectRuntime`, and emits one `axon.token` per Yield with the
99/// resolved value. `IRStreamBlock` is payload-free in v1.25.0; the
100/// handler emits the `step_type: "stream"` wire shape with zero
101/// token events. Future IR extensions wire instruction blocks into
102/// the public bridge.
103pub mod effects_bridge;
104
105/// §Fase 33.y.f — Cognitive primitives (Fase 11 neuro-symbolic).
106/// 10 variants: `Remember` / `Recall` are PEM-bound (write-through
107/// + read-back via the optional [`DispatchCtx::pem_backend`]);
108/// `Forge` is payload-free wire shape (canonical
109/// `step_type: "forge"`); `Focus` / `Associate` / `Aggregate` /
110/// `Explore` / `Ingest` / `Navigate` / `Corroborate` reuse the
111/// pure-shape async core ([`pure_shape::run_pure_shape`]) with
112/// each variant's cognitive framing addendum reflected in the
113/// system prompt.
114pub mod cognitive;
115
116/// §Fase 33.y.g — Algebraic-effect handler nodes.
117/// 6 variants: `ShieldApply` / `OtsApply` / `MandateApply` — apply
118/// a named capability to a target with structured output binding;
119/// `ComputeApply` — invoke a compute capability with positional
120/// arguments; `Listen` — wait on a Fase 13 typed channel for an
121/// event; `DaemonStep` — invoke a Fase 16 daemon supervisor by
122/// reference. Each handler emits wire shape with the canonical
123/// `step_type` slug + public `apply_*` helpers that enterprise
124/// integrations override (per the OSS/ENTERPRISE/SPLIT charter
125/// — the shield/OTS/mandate scanner registries live in
126/// `axon_enterprise.shield`).
127pub mod algebraic_handlers;
128
129/// §Fase 33.y.h — Wire-integration handler nodes (π-calc +
130/// persistence + multi-agent deliberation). 10 variants:
131/// **Emit / Publish / Discover** (Fase 13 typed channels — π-calc
132/// output prefix + capability extrusion + dual discovery);
133/// **Persist / Retrieve / Mutate / Purge / Transact** (persistence
134/// primitives — snapshot / load / update / delete / transactional
135/// block); **Deliberate / Consensus** (multi-agent payload-free
136/// blocks). Each ships wire shape + public helper that enterprise
137/// integrations (Postgres / Redis / MQ / typed-channel runtime)
138/// override.
139pub mod wire_integrations;
140
141/// §Fase 33.y.i — PIX variants (paper §6 hidden-state primitives).
142/// 3 variants: **Hibernate** (CPS-style event-await with timeout
143/// — Fase 11.e + Fase 16 supervisor); **Drill** (PIX subtree
144/// navigation); **Trail** (breadcrumb walk over a prior
145/// navigation). OSS reference impl uses `__pix_*` /
146/// `__hibernating_*` namespaced let_bindings keys; enterprise R&D
147/// (axon_enterprise.cognitive_states + .supervisor) wires real
148/// continuation-passing semantics + PIX state machines.
149pub mod pix;
150
151/// §Fase 33.y.j — Lambda + UseTool (the final 2 variants).
152/// **LambdaDataApply** — Fase 15 ΛD apply (the sync runner walks
153/// a CPS dispatcher mapping lambda data structures to expressions;
154/// 33.y.j ships the OSS wire shape + helper). **UseTool** —
155/// mid-step tool invocation (Fase 22 backend tools; the
156/// `ChatRequest.tools` cross-cutting plumb-through lands in
157/// 33.y.k D8). Completes the 45-variant total coverage.
158pub mod lambda_tools;
159
160/// §Fase 34.g — Unified stream handler (4-disjunction convergence).
161/// Pre-34.g the four streaming-effect disjunctions (LLM-side
162/// `output: Stream<T>`, `apply: <stream-tool>`, `use_tool` syntax,
163/// `perform Stream.Yield`) had divergent drain paths — disjunct (a)
164/// enforced `BackpressurePolicy` at chunk granularity while (b)/(d)
165/// only captured the policy slug in audit without enforcement. This
166/// module ships [`unified_stream::unified_stream_handler`] — the
167/// single drain loop that ALL `Stream<ToolChunk>`-producing
168/// disjunctions route through; the handler integrates a
169/// [`crate::stream_runtime::Stream<ToolChunk>`] policy primitive +
170/// returns a [`unified_stream::ToolStreamSummary`] with real
171/// `chunks_dropped`/`chunks_degraded` counters. Also ships the
172/// [`unified_stream::chat_chunk_to_tool_chunk`] type-bridge for
173/// disjunct (a) symmetry tests + [`unified_stream::unified_stream_from_chunks`]
174/// adapter for disjunct (d)'s static-scan output.
175pub mod unified_stream;
176
177// ────────────────────────────────────────────────────────────────────
178//  DispatchCtx — shared per-flow async surface
179// ────────────────────────────────────────────────────────────────────
180
181/// Per-flow dispatcher context. Carries the producer-side wire
182/// surface (`tx` for FlowExecutionEvent), cancel-in-body propagation
183/// (`cancel`), the audit/enforcement/warning side-channels (read by
184/// the SSE handler at `axon.complete` time), and the orchestration
185/// `branch_path` for D6 per-step replay binding.
186///
187/// # `branch_path` semantics
188///
189/// Empty at flow root. Parent handlers push a segment when descending
190/// into a child:
191/// - `par[0]`, `par[1]`, `par[2]` for the n-th branch of a Par block.
192/// - `for_in[0]`, `for_in[1]` for the n-th iteration of a ForIn loop.
193/// - `conditional.then`, `conditional.else` for the chosen branch
194///   of an `if`.
195/// - Children inside a branch concat: `par[0].step[0]` for the first
196///   child step of the first Par branch.
197///
198/// The path is observable in `StepAuditRecord.branch_path` (extended
199/// in 33.y.f when the audit row writer gains the field) so regulators
200/// replaying a flow on appeal see the full execution tree, not just a
201/// flattened step sequence.
202///
203/// # `step_counter`
204///
205/// Monotonic per-flow counter. Each Step (or pure-shape variant
206/// promoted to Step) increments. Surface fed into `step_audit` so
207/// the row index is correct under nested orchestration.
208#[derive(Clone)]
209pub struct DispatchCtx {
210    pub flow_name: String,
211    pub backend_name: String,
212    pub system_prompt: String,
213    pub cancel: CancellationFlag,
214    pub tx: mpsc::UnboundedSender<FlowExecutionEvent>,
215    pub enforcement_summaries: Arc<
216        Mutex<HashMap<String, crate::axon_server::EnforcementSummaryWire>>,
217    >,
218    pub step_audit_records: Arc<
219        Mutex<Vec<crate::axonendpoint_replay::StepAuditRecord>>,
220    >,
221    pub runtime_warnings: Arc<
222        Mutex<Vec<crate::runtime_warnings::RuntimeWarning>>,
223    >,
224    pub branch_path: Vec<String>,
225    pub step_counter: usize,
226    /// §Fase 33.y.f — Optional PEM async surface for cognitive
227    /// primitives (Remember / Recall etc.). When `Some(backend)`,
228    /// `run_remember` write-through persists to PEM and `run_recall`
229    /// restores from PEM as a write-back cache layered over
230    /// `let_bindings`. When `None`, both handlers degrade to
231    /// `let_bindings`-only (in-memory) — the canonical adopter
232    /// path for tests + adopters that don't opt into persistent
233    /// cognitive state. Arc-cloned per branch for concurrent
234    /// dispatch (Fase 33.y.e parity).
235    pub pem_backend: Option<std::sync::Arc<dyn crate::pem::PersistenceBackend>>,
236    /// §Fase 33.y.f — Session anchor for PEM persistence. Defaults
237    /// to `flow_name` in [`DispatchCtx::new`]; adopters override
238    /// for multi-session flows.
239    pub session_id: String,
240    /// §Fase 33.y.f — Tenant routing tag for PEM persistence.
241    /// Defaults to empty in [`DispatchCtx::new`]; multi-tenant
242    /// adopters set this before dispatch.
243    pub tenant_id: String,
244    /// §Fase 33.y.d — Let-binding scope. Map from binding name to its
245    /// resolved value. `run_let` inserts; `run_conditional` reads to
246    /// evaluate the condition; `run_for_in` inserts the iteration
247    /// variable per iter. Bindings persist through the flow's
248    /// lifetime — sub-scoping is NOT introduced in 33.y.d (the
249    /// sync runner's let semantics are flow-scoped + monotonic,
250    /// matching this discipline for D10 parity). The `HashMap` is
251    /// cheap to clone for branch isolation when sub-fases 33.y.e
252    /// introduce parallel branches with private scopes (Par block).
253    pub let_bindings: std::collections::HashMap<String, String>,
254    /// §Fase 33.y.c — Per-node declared `<stream:<policy>>` resolved
255    /// by the caller BEFORE invoking `dispatch_node`. The pure-shape
256    /// handlers read + consume this field (set back to `None` on
257    /// entry) so each handler observes the policy intended for ITS
258    /// node, never the previous node's residue. When `None`, the
259    /// handler skips `StreamPolicyEnforcer` wrapping + emits chunks
260    /// directly to the wire.
261    ///
262    /// Subsequent sub-fases 33.y.d-l adopt the same pattern for
263    /// orchestration handlers (`Par` / `ForIn`) when child nodes
264    /// declare effects.
265    pub pending_effect_policy: Option<BackpressurePolicy>,
266    /// §Fase 34.d (v1.29.0) — Tool registry surface for the
267    /// streaming-tool dispatcher branch. When `Some(registry)`,
268    /// `pure_shape::run_step` resolves `step.apply_ref` against
269    /// the registry; if the entry's `is_streaming` flag is true,
270    /// the step bypasses `Backend::stream()` entirely + invokes
271    /// `tool.stream(args, ctx)` via the
272    /// [`crate::tool_dispatch_bridge::resolve_streaming_tool`]
273    /// factory. When `None` (D9 backwards-compat), the legacy
274    /// LLM-side path is taken regardless of source-declared
275    /// `effects: <stream:<policy>>` — adopters who haven't wired
276    /// the registry yet see no behavior change. Arc-shared for
277    /// concurrent dispatch (Fase 33.y.e parity).
278    pub tool_registry: Option<std::sync::Arc<crate::tool_registry::ToolRegistry>>,
279    /// §Fase 35.f (v1.30.0) — axonstore registry for SQL-vs-KV
280    /// dispatch. When `Some(registry)`, `run_persist` / `run_retrieve`
281    /// / `run_mutate` / `run_purge` resolve `store_name` against it: a
282    /// `postgresql`-backed store routes through `PostgresStoreBackend`,
283    /// every other (and every undeclared) store takes the byte-
284    /// identical key-value path. When `None` (the `DispatchCtx::new`
285    /// default), every store op is key-value — the pre-35 behavior,
286    /// unchanged (D3). Arc-shared so concurrent branches share one
287    /// per-DSN pool cache.
288    pub store_registry: Option<std::sync::Arc<crate::store::registry::StoreRegistry>>,
289    /// §Fase 35.j (v1.30.0) — Pillar IV: the capability slugs the
290    /// current request carries (the JWT bearer's `capabilities`
291    /// claim). When `Some`, the store handlers re-check a
292    /// capability-gated store against this set before any access —
293    /// defense-in-depth behind the type-checker's compile-time
294    /// guarantee. When `None` (the `DispatchCtx::new` default), there
295    /// is no capability context at this layer and the runtime
296    /// re-check is a no-op: the compile-time check + the endpoint's
297    /// Fase 32.g `requires:` gate stand.
298    pub held_capabilities: Option<Vec<String>>,
299    /// §Fase 35.h (v1.30.0) — Pillar II: the flow's tamper-evident
300    /// HMAC-Merkle mutation chain. Every `persist`/`mutate`/`purge`
301    /// appends a delta. Shared (`Arc`) across concurrent branches so a
302    /// `Par` block's mutations land in one chain; the Merkle head is a
303    /// verifiable fingerprint of the flow's complete mutation history.
304    pub audit_chain:
305        std::sync::Arc<std::sync::Mutex<crate::store::audit_chain::StoreAuditChain>>,
306    /// §Fase 37.x.j (D2) — Per-flow pinned Postgres connections.
307    /// Populated at stream start by `run_streaming_via_dispatcher`:
308    /// the IR is walked, every postgresql-backed `axonstore` referenced
309    /// by the flow body has ONE `PoolConnection<Postgres>` acquired,
310    /// and the map holds them by axonstore name for the flow's
311    /// lifetime. The map drops at the end of the streaming task,
312    /// returning every conn to the pool via the `after_release
313    /// DEALLOCATE ALL` hook (Fase 38.x.a D2 composing with 37.x.j D1).
314    ///
315    /// Wire-integration store handlers consult this map per op:
316    /// `take` the pin out → run the SQL via `StoreConn::Pinned(&mut pin)`
317    /// → `insert` the pin back. The take/return discipline preserves
318    /// the Arc<Mutex<>> sharing pattern across cloned (par-branched)
319    /// contexts while keeping individual ops borrow-checker friendly.
320    ///
321    /// Empty map ≡ no pinning held (legacy path) → handlers fall back
322    /// to `StoreConn::Pool(backend.pool())`. This is the case for
323    /// callers that haven't eager-acquired (non-streaming RPC paths,
324    /// CLI tests, etc.) — D5 byte-identical backwards-compat.
325    ///
326    /// Per D6.b (sub-fase 37.x.j.6): `par {}` branches that share this
327    /// Arc serialize on its mutex. The D6.a default (per-branch
328    /// sub-pin) replaces this Arc with a fresh empty map at par-branch
329    /// clone time so branches do NOT serialize on the parent's pins.
330    pub pinned_conns: std::sync::Arc<
331        std::sync::Mutex<
332            std::collections::HashMap<
333                String,
334                sqlx::pool::PoolConnection<sqlx::Postgres>,
335            >,
336        >,
337    >,
338}
339
340impl DispatchCtx {
341    /// Construct a fresh context for a new flow. Subsequent sub-fases
342    /// extend this with builder methods as the surface grows (PEM /
343    /// ReplayToken / CognitiveState plumbing in 33.y.f, tool registry
344    /// in 33.y.k, etc.).
345    pub fn new(
346        flow_name: impl Into<String>,
347        backend_name: impl Into<String>,
348        system_prompt: impl Into<String>,
349        cancel: CancellationFlag,
350        tx: mpsc::UnboundedSender<FlowExecutionEvent>,
351    ) -> Self {
352        let flow_name = flow_name.into();
353        let session_id = flow_name.clone();
354        Self {
355            flow_name,
356            backend_name: backend_name.into(),
357            system_prompt: system_prompt.into(),
358            cancel,
359            tx,
360            enforcement_summaries: Arc::new(Mutex::new(HashMap::new())),
361            step_audit_records: Arc::new(Mutex::new(Vec::new())),
362            runtime_warnings: Arc::new(Mutex::new(Vec::new())),
363            branch_path: Vec::new(),
364            step_counter: 0,
365            pem_backend: None,
366            session_id,
367            tenant_id: String::new(),
368            let_bindings: std::collections::HashMap::new(),
369            pending_effect_policy: None,
370            tool_registry: None,
371            store_registry: None,
372            held_capabilities: None,
373            audit_chain: std::sync::Arc::new(std::sync::Mutex::new(
374                crate::store::audit_chain::StoreAuditChain::new(),
375            )),
376            // §Fase 37.x.j (D2) — empty pin map by default; populated
377            // by `run_streaming_via_dispatcher` via `with_pinned_conns`.
378            pinned_conns: std::sync::Arc::new(std::sync::Mutex::new(
379                std::collections::HashMap::new(),
380            )),
381        }
382    }
383
384    /// §Fase 37.x.j (D2) — Builder: attach an Arc-shared pinned
385    /// connection map populated by the caller. `run_streaming_via_dispatcher`
386    /// uses this to install the eagerly-acquired flow-scoped pins
387    /// BEFORE the dispatcher walks any node. Returns `self` so the
388    /// builder pattern chains with `with_store_registry`, `with_pem`,
389    /// `with_tool_registry`, `with_held_capabilities`.
390    pub fn with_pinned_conns(
391        mut self,
392        conns: std::sync::Arc<
393            std::sync::Mutex<
394                std::collections::HashMap<
395                    String,
396                    sqlx::pool::PoolConnection<sqlx::Postgres>,
397                >,
398            >,
399        >,
400    ) -> Self {
401        self.pinned_conns = conns;
402        self
403    }
404
405    /// §Fase 35.f — Builder: attach the `axonstore` registry so the
406    /// wire-integration store handlers route postgresql-backed stores
407    /// to SQL. Without it, every store op stays key-value (D3).
408    /// Returns `self` so builders chain.
409    pub fn with_store_registry(
410        mut self,
411        registry: std::sync::Arc<crate::store::registry::StoreRegistry>,
412    ) -> Self {
413        self.store_registry = Some(registry);
414        self
415    }
416
417    /// §Fase 35.j — Builder: attach the request's held capability
418    /// slugs so the store handlers re-check capability-gated stores
419    /// (Pillar IV). Returns `self` so builders chain.
420    pub fn with_held_capabilities(mut self, capabilities: Vec<String>) -> Self {
421        self.held_capabilities = Some(capabilities);
422        self
423    }
424
425    /// §Fase 34.d — Builder: attach a tool registry so the
426    /// dispatcher's streaming-tool branch can resolve `apply_ref`
427    /// against it. Returns `self` so builders chain.
428    pub fn with_tool_registry(
429        mut self,
430        registry: std::sync::Arc<crate::tool_registry::ToolRegistry>,
431    ) -> Self {
432        self.tool_registry = Some(registry);
433        self
434    }
435
436    /// Builder: attach a PEM persistence backend. Returns `self` so
437    /// callers can chain `DispatchCtx::new(...).with_pem(backend)`.
438    pub fn with_pem(
439        mut self,
440        backend: std::sync::Arc<dyn crate::pem::PersistenceBackend>,
441    ) -> Self {
442        self.pem_backend = Some(backend);
443        self
444    }
445
446    /// Builder: set the session id (defaults to flow_name).
447    pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
448        self.session_id = session_id.into();
449        self
450    }
451
452    /// Builder: set the tenant id (defaults to empty).
453    pub fn with_tenant_id(mut self, tenant_id: impl Into<String>) -> Self {
454        self.tenant_id = tenant_id.into();
455        self
456    }
457
458    /// Builder-style setter for the pending effect policy. Returns
459    /// `self` so callers can chain `ctx.with_effect_policy(policy)`
460    /// before invoking `dispatch_node`. Handlers read + clear the
461    /// field via [`Self::take_pending_effect_policy`].
462    pub fn with_effect_policy(mut self, policy: BackpressurePolicy) -> Self {
463        self.pending_effect_policy = Some(policy);
464        self
465    }
466
467    /// §Fase 33.z.c — Builder: inject external Arc-backed side-channels
468    /// so the dispatcher's per-variant handlers populate the SAME
469    /// Mutexes that `server_execute_streaming` reads from for the SSE
470    /// wire's `enforcement_summary`, `step_audit`, and `runtime_warnings`
471    /// fields.
472    ///
473    /// Without this builder, `DispatchCtx::new` creates FRESH Arcs that
474    /// the dispatcher populates but the production hot path can't read.
475    /// That gap broke `axon.complete.enforcement_summary` wire emission
476    /// on the canonical Step shape when the dispatcher graft (33.z.b)
477    /// activated — the 33.x.d production-path tests detected the
478    /// regression at the `assert_eq!(generate_summary["chunks_pushed"], 1)`
479    /// line because the side-channel the wire reads from stayed empty
480    /// while the dispatcher's fresh Arc carried the counters.
481    ///
482    /// Used exclusively by `streaming_via_dispatcher::run_streaming_via_dispatcher`
483    /// to thread the side-channels the SSE handler constructs into the
484    /// dispatcher. Downstream-crate consumers driving `dispatch_node`
485    /// directly continue to use `DispatchCtx::new` + the fresh internal
486    /// Arcs.
487    pub fn with_external_side_channels(
488        mut self,
489        enforcement_summaries: std::sync::Arc<
490            tokio::sync::Mutex<
491                std::collections::HashMap<String, crate::axon_server::EnforcementSummaryWire>,
492            >,
493        >,
494        step_audit_records: std::sync::Arc<
495            tokio::sync::Mutex<Vec<crate::axonendpoint_replay::StepAuditRecord>>,
496        >,
497        runtime_warnings: std::sync::Arc<
498            tokio::sync::Mutex<Vec<crate::runtime_warnings::RuntimeWarning>>,
499        >,
500    ) -> Self {
501        self.enforcement_summaries = enforcement_summaries;
502        self.step_audit_records = step_audit_records;
503        self.runtime_warnings = runtime_warnings;
504        self
505    }
506
507    /// Read + clear the pending effect policy. Returns `None` when no
508    /// policy was set by the caller. The take-semantics (vs. peek)
509    /// prevents a stale policy from a previous node leaking into the
510    /// next handler's invocation if the caller forgets to clear.
511    pub fn take_pending_effect_policy(&mut self) -> Option<BackpressurePolicy> {
512        self.pending_effect_policy.take()
513    }
514
515    /// Render the current `branch_path` as a wire-stable string. Empty
516    /// path returns `""` (flow root); single segment `"par[0]"`; multi
517    /// `"par[0].step[1]"`. The format is byte-stable across calls.
518    pub fn branch_path_string(&self) -> String {
519        self.branch_path.join(".")
520    }
521}
522
523// ────────────────────────────────────────────────────────────────────
524//  NodeOutcome — closed catalog of dispatcher outcomes
525// ────────────────────────────────────────────────────────────────────
526
527/// Closed catalog of dispatcher outcomes. 33.y.b ships only the
528/// transitional [`LegacyShimHandled`] variant; subsequent sub-fases
529/// 33.y.c–j add real outcomes:
530///
531/// - `Completed { output, tokens_emitted }` — handler ran to
532///   completion; output captured + tokens forwarded on the wire.
533/// - `Break` — sentinel from an in-loop `break`. The For-In handler
534///   short-circuits remaining iterations + propagates up.
535/// - `LoopContinue` — sentinel from `continue`. Skips to next
536///   iteration.
537/// - `Return { value }` — sentinel from `return`. Flow loop
538///   terminates.
539///
540/// 33.y.l removes [`LegacyShimHandled`] once every variant has its
541/// real handler.
542///
543/// # Why a closed catalog vs `Result<String, _>`
544///
545/// Sentinel values (Break / LoopContinue / Return) need to flow up
546/// through nested handler stacks WITHOUT being mistaken for content.
547/// A `Result<String, _>` would force the caller to encode sentinels
548/// in-band, which is unsound under serde + adopter-observable output.
549/// The closed enum is the only sound algebraic representation.
550#[derive(Debug, Clone)]
551#[non_exhaustive]
552pub enum NodeOutcome {
553    /// §Fase 33.y.c+ — Handler ran to completion. `output` is the
554    /// concatenated chunk content captured during streaming;
555    /// `tokens_emitted` is the count of non-empty `StepToken` events
556    /// fanned to the wire (post-policy enforcement for steps with a
557    /// declared `<stream:<policy>>`). The `step_index` is the value
558    /// of `ctx.step_counter` at the moment the handler started (i.e.
559    /// the index the handler reserved for itself before
560    /// incrementing); orchestration handlers in 33.y.d–e use this
561    /// to surface per-branch index trails on `branch_path`.
562    Completed {
563        output: String,
564        tokens_emitted: u64,
565        step_index: usize,
566    },
567    /// §Fase 33.y.d sentinel — emitted by the `Break` handler. The
568    /// enclosing `ForIn` handler observes this outcome from its
569    /// child dispatch + terminates the loop (skips remaining
570    /// iterations). Parser scope check guarantees `Break` only
571    /// appears inside a `ForIn` body, so non-loop ancestors that
572    /// observe this outcome MUST propagate it upward unchanged.
573    Break,
574    /// §Fase 33.y.d sentinel — emitted by the `Continue` handler.
575    /// The enclosing `ForIn` handler observes this + skips to the
576    /// next iteration. Same propagation discipline as
577    /// [`NodeOutcome::Break`].
578    LoopContinue,
579    /// §Fase 33.y.d sentinel — emitted by the `Return` handler.
580    /// Terminates the flow loop with the carried `value` as the
581    /// final flow output. Parents propagate unchanged until the
582    /// flow-loop level observes it.
583    Return { value: String },
584}
585
586// ────────────────────────────────────────────────────────────────────
587//  DispatchError — closed catalog of dispatcher errors
588// ────────────────────────────────────────────────────────────────────
589
590/// Closed catalog of dispatcher errors. Adopter-reachable error
591/// surfaces are NAMED (D7 mandate: zero `unwrap()` / zero
592/// `unimplemented!()` / zero `_ =>` catch-all). Each variant carries
593/// adopter-actionable structured data.
594#[derive(Debug, Clone)]
595#[non_exhaustive]
596pub enum DispatchError {
597    /// `Backend::stream()` failed on a per-variant handler that
598    /// invoked it. Carries the backend name + the upstream error
599    /// message so the SSE handler can surface a structured
600    /// `axon.error` event.
601    BackendError { name: String, message: String },
602
603    /// Cancellation flag fired mid-dispatch (client disconnected or
604    /// upstream `tokio::select!` raced the cancel branch). Caller
605    /// MUST treat this as a clean exit (no `axon.error` event
606    /// surfaced — the consumer is already gone).
607    UpstreamCancelled,
608
609    /// A per-variant handler needed a dependency that wasn't
610    /// available on the DispatchCtx (e.g., PEM async surface for a
611    /// `Remember`/`Recall` handler before 33.y.f wires it in). The
612    /// `name` field tags which dependency.
613    MissingDependency { name: &'static str },
614
615    /// The mpsc sender returned `Err(_)` — consumer dropped. Caller
616    /// MUST treat this as a clean exit (same posture as
617    /// `UpstreamCancelled`).
618    ChannelClosed,
619}
620
621impl std::fmt::Display for DispatchError {
622    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
623        match self {
624            Self::BackendError { name, message } => {
625                write!(f, "backend '{name}' stream() failed: {message}")
626            }
627            Self::UpstreamCancelled => write!(f, "upstream cancelled mid-dispatch"),
628            Self::MissingDependency { name } => {
629                write!(f, "dispatcher missing dependency: {name}")
630            }
631            Self::ChannelClosed => write!(f, "channel closed (consumer dropped)"),
632        }
633    }
634}
635
636impl std::error::Error for DispatchError {}
637
638// ────────────────────────────────────────────────────────────────────
639//  §Fase 33.y.l — ShimReason enum + legacy_shim function retired
640// ────────────────────────────────────────────────────────────────────
641//
642// After 33.y.j reached 45/45 IRFlowNode graduation, the
643// transitional `ShimReason` enum + `legacy_shim` function + the
644// `NodeOutcome::LegacyShimHandled` variant became structurally
645// unreachable from `dispatch_node`'s exhaustive match. 33.y.l
646// retires the entire shim infrastructure in this lockstep cleanup:
647//
648//   - ShimReason enum                   — DELETED
649//   - ShimReason::ALL constant          — DELETED
650//   - ShimReason::slug() method         — DELETED
651//   - legacy_shim() async helper        — DELETED
652//   - NodeOutcome::LegacyShimHandled    — DELETED variant
653//
654// Drift-gate slug catalog now uses `flow_plan::ir_flow_node_kind`
655// directly (the same byte-stable surface that was duplicated in
656// ShimReason::slug — single source of truth).
657//
658// The dispatcher's 45-arm exhaustive match is unchanged: every IR
659// variant routes to its real async handler module (pure_shape /
660// orchestration / parallel / effects_bridge / cognitive /
661// algebraic_handlers / wire_integrations / pix / lambda_tools).
662//
663// Search the codebase: `grep -E "unimplemented|todo!|legacy_shim"
664// axon-rs/src/flow_dispatcher/*.rs` returns ZERO matches post-33.y.l
665// (verified by the `fase33y_l_parity_gate.rs::d7_no_legacy_markers`
666// drift-gate test).
667//
668// Build-time guarantee: `legacy_shim` is gone → compiler enforces
669// that NO future arm in dispatch_node can fall back to a stub. The
670// catalog totality contract D1 is sealed.
671// ────────────────────────────────────────────────────────────────────
672//  dispatch_node — the exhaustive entry point
673// ────────────────────────────────────────────────────────────────────
674
675/// Dispatch a single IRFlowNode through the per-variant async
676/// handler stack. Total over the 45-variant closed catalog
677/// (compiler-enforced exhaustive match).
678///
679/// # 45/45 graduation FINAL (33.y.j)
680///
681/// As of Fase 33.y.j, every IRFlowNode variant has a NAMED async
682/// handler. There are NO `_ =>` catch-all arms, NO `legacy_shim`
683/// calls, NO `unimplemented!()` markers. Adding a 46th IRFlowNode
684/// variant fails the Rust build here until a real per-variant
685/// async handler is wired in.
686///
687/// Each arm's handler module:
688///
689/// - **pure_shape** (33.y.c) — Step / Probe / Reason / Validate /
690///   Refine / Weave
691/// - **orchestration** (33.y.d) — Let / Conditional / ForIn /
692///   Break / Continue / Return
693/// - **parallel** (33.y.e) — Par
694/// - **effects_bridge** (33.y.e + D9) — Stream
695/// - **cognitive** (33.y.f) — Remember / Recall / Forge / Focus /
696///   Associate / Aggregate / Explore / Ingest / Navigate /
697///   Corroborate
698/// - **algebraic_handlers** (33.y.g) — ShieldApply / OtsApply /
699///   MandateApply / ComputeApply / Listen / DaemonStep
700/// - **wire_integrations** (33.y.h) — Emit / Publish / Discover /
701///   Persist / Retrieve / Mutate / Purge / Transact / Deliberate /
702///   Consensus
703/// - **pix** (33.y.i) — Hibernate / Drill / Trail
704/// - **lambda_tools** (33.y.j) — LambdaDataApply / UseTool
705///
706/// # Cancellation
707///
708/// Every per-variant handler checks `ctx.cancel.is_cancelled()`
709/// at entry and at every `.await` boundary per the Fase 33.x.e
710/// `cancel_aware` discipline. Cancel propagation is uniform
711/// across the entire 45-variant catalog.
712pub async fn dispatch_node(
713    node: &IRFlowNode,
714    ctx: &mut DispatchCtx,
715) -> Result<NodeOutcome, DispatchError> {
716    // Exhaustive match — compiler enforces every variant has a
717    // named arm. Adding a 46th IRFlowNode variant fails the build
718    // here until the new arm is added. ZERO `_ =>` catch-all.
719    match node {
720        // §Fase 33.y.c — pure-shape variants graduated to real
721        // async handlers. Each delegates to its labeled
722        // `pure_shape::run_*` entry which wraps the shared
723        // `pure_shape::run_pure_shape` async core. The shim is
724        // retired for these 6 variants; subsequent sub-fases retire
725        // it for the remaining 39 variants per the topological
726        // schedule in `docs/fase/fase_33y_algebraic_streaming_dispatcher.md`.
727        IRFlowNode::Step(step) => pure_shape::run_step(step, ctx).await,
728        IRFlowNode::Probe(probe) => pure_shape::run_probe(probe, ctx).await,
729        IRFlowNode::Reason(reason) => pure_shape::run_reason(reason, ctx).await,
730        IRFlowNode::Validate(validate) => pure_shape::run_validate(validate, ctx).await,
731        IRFlowNode::Refine(refine) => pure_shape::run_refine(refine, ctx).await,
732        IRFlowNode::Weave(weave) => pure_shape::run_weave(weave, ctx).await,
733        // §Fase 33.y.j — UseTool graduated.
734        IRFlowNode::UseTool(node) => lambda_tools::run_use_tool(node, ctx).await,
735        // §Fase 33.y.f — cognitive primitives PEM-bound.
736        IRFlowNode::Remember(node) => cognitive::run_remember(node, ctx).await,
737        IRFlowNode::Recall(node) => cognitive::run_recall(node, ctx).await,
738        // §Fase 33.y.d — orchestration variants graduated to real
739        // async handlers. Each composes child handlers via recursive
740        // `dispatch_node` calls + threads sentinel outcomes (Break /
741        // LoopContinue / Return) up through orchestration parents.
742        IRFlowNode::Conditional(cond) => orchestration::run_conditional(cond, ctx).await,
743        IRFlowNode::ForIn(for_in) => orchestration::run_for_in(for_in, ctx).await,
744        IRFlowNode::Let(let_bind) => orchestration::run_let(let_bind, ctx).await,
745        IRFlowNode::Return(ret) => orchestration::run_return(ret, ctx).await,
746        IRFlowNode::Break(brk) => orchestration::run_break(brk, ctx).await,
747        IRFlowNode::Continue(cont) => orchestration::run_continue(cont, ctx).await,
748        // §Fase 33.y.j — LambdaDataApply graduated.
749        IRFlowNode::LambdaDataApply(node) => lambda_tools::run_lambda_data_apply(node, ctx).await,
750        // §Fase 33.y.e — Par graduated to real async handler. The
751        // payload-free `IRParallelBlock` emits the canonical
752        // `step_type: "par"` wire shape; future IR extensions
753        // delegate to `parallel::run_branches_concurrently`.
754        IRFlowNode::Par(par) => parallel::run_par(par, ctx).await,
755        // §Fase 33.y.i — PIX variants graduated.
756        IRFlowNode::Hibernate(node) => pix::run_hibernate(node, ctx).await,
757        // §Fase 33.y.h — multi-agent deliberation blocks.
758        IRFlowNode::Deliberate(node) => wire_integrations::run_deliberate(node, ctx).await,
759        IRFlowNode::Consensus(node) => wire_integrations::run_consensus(node, ctx).await,
760        // §Fase 33.y.f — Forge payload-free wire shape.
761        IRFlowNode::Forge(node) => cognitive::run_forge(node, ctx).await,
762        // §Fase 33.y.f — cognitive framing handlers reuse pure_shape.
763        IRFlowNode::Focus(node) => cognitive::run_focus(node, ctx).await,
764        IRFlowNode::Associate(node) => cognitive::run_associate(node, ctx).await,
765        IRFlowNode::Aggregate(node) => cognitive::run_aggregate(node, ctx).await,
766        IRFlowNode::Explore(node) => cognitive::run_explore(node, ctx).await,
767        IRFlowNode::Ingest(node) => cognitive::run_ingest(node, ctx).await,
768        // §Fase 33.y.g — algebraic-effect handler nodes graduated.
769        IRFlowNode::ShieldApply(node) => algebraic_handlers::run_shield_apply(node, ctx).await,
770        // §Fase 33.y.e — Stream graduated to real async handler.
771        // The payload-free `IRStreamBlock` emits the canonical
772        // `step_type: "stream"` wire shape; future IR extensions
773        // delegate to `effects_bridge::bridge_effect_stream_yield`.
774        IRFlowNode::Stream(stream) => effects_bridge::run_stream(stream, ctx).await,
775        IRFlowNode::Navigate(node) => cognitive::run_navigate(node, ctx).await,
776        IRFlowNode::Drill(node) => pix::run_drill(node, ctx).await,
777        IRFlowNode::Trail(node) => pix::run_trail(node, ctx).await,
778        IRFlowNode::Corroborate(node) => cognitive::run_corroborate(node, ctx).await,
779        IRFlowNode::OtsApply(node) => algebraic_handlers::run_ots_apply(node, ctx).await,
780        IRFlowNode::MandateApply(node) => algebraic_handlers::run_mandate_apply(node, ctx).await,
781        IRFlowNode::ComputeApply(node) => algebraic_handlers::run_compute_apply(node, ctx).await,
782        IRFlowNode::Listen(node) => algebraic_handlers::run_listen(node, ctx).await,
783        IRFlowNode::DaemonStep(node) => algebraic_handlers::run_daemon_step(node, ctx).await,
784        // §Fase 33.y.h — π-calc typed channels (Fase 13).
785        IRFlowNode::Emit(node) => wire_integrations::run_emit(node, ctx).await,
786        IRFlowNode::Publish(node) => wire_integrations::run_publish(node, ctx).await,
787        IRFlowNode::Discover(node) => wire_integrations::run_discover(node, ctx).await,
788        // §Fase 33.y.h — persistence primitives.
789        IRFlowNode::Persist(node) => wire_integrations::run_persist(node, ctx).await,
790        IRFlowNode::Retrieve(node) => wire_integrations::run_retrieve(node, ctx).await,
791        IRFlowNode::Mutate(node) => wire_integrations::run_mutate(node, ctx).await,
792        IRFlowNode::Purge(node) => wire_integrations::run_purge(node, ctx).await,
793        IRFlowNode::Transact(node) => wire_integrations::run_transact(node, ctx).await,
794    }
795}
796
797// ────────────────────────────────────────────────────────────────────
798//  Unit tests — drift gate + smoke
799// ────────────────────────────────────────────────────────────────────
800
801#[cfg(test)]
802mod tests {
803    use super::*;
804    use crate::cancel_token::CancellationFlag;
805
806    /// §Fase 33.y.l drift-gate update — the historical
807    /// `shim_reason_cardinality_45_variants` /
808    /// `shim_reason_slugs_are_unique` /
809    /// `shim_reason_slugs_are_well_formed` /
810    /// `legacy_shim_returns_handled_on_happy_path` /
811    /// `legacy_shim_returns_cancel_when_flag_set` /
812    /// `shim_reason_slug_matches_ir_flow_node_kind` tests are
813    /// RETIRED here. The replacement coverage lives in:
814    ///
815    ///   - `tests/fase33y_b_dispatcher_skeleton.rs` — IR-variant
816    ///     catalog cardinality + slug uniqueness via
817    ///     `flow_plan::ir_flow_node_kind` directly (single source
818    ///     of truth, no more `ShimReason::slug` duplication).
819    ///   - `tests/fase33y_l_parity_gate.rs` — D7 build-time grep
820    ///     invariant: zero `unimplemented!` / `todo!` / `legacy_shim`
821    ///     symbols in `flow_dispatcher/*.rs`.
822
823    /// 33.y.b branch_path: empty at flow root.
824    #[test]
825    fn dispatch_ctx_branch_path_empty_at_root() {
826        let (tx, _rx) = mpsc::unbounded_channel();
827        let ctx = DispatchCtx::new(
828            "F",
829            "stub",
830            "",
831            CancellationFlag::new(),
832            tx,
833        );
834        assert!(ctx.branch_path.is_empty());
835        assert_eq!(ctx.branch_path_string(), "");
836        assert_eq!(ctx.step_counter, 0);
837    }
838
839    /// 33.y.b branch_path: multi-segment join is wire-stable.
840    #[test]
841    fn dispatch_ctx_branch_path_joins_segments() {
842        let (tx, _rx) = mpsc::unbounded_channel();
843        let mut ctx = DispatchCtx::new(
844            "F",
845            "stub",
846            "",
847            CancellationFlag::new(),
848            tx,
849        );
850        ctx.branch_path.push("par[0]".to_string());
851        ctx.branch_path.push("step[1]".to_string());
852        assert_eq!(ctx.branch_path_string(), "par[0].step[1]");
853    }
854
855    /// 33.y.b DispatchError Display surface produces actionable
856    /// messages for every variant.
857    #[test]
858    fn dispatch_error_display_surface() {
859        let cases: Vec<(DispatchError, &str)> = vec![
860            (
861                DispatchError::BackendError {
862                    name: "anthropic".to_string(),
863                    message: "rate limited".to_string(),
864                },
865                "backend 'anthropic' stream() failed: rate limited",
866            ),
867            (DispatchError::UpstreamCancelled, "upstream cancelled mid-dispatch"),
868            (
869                DispatchError::MissingDependency { name: "pem_async" },
870                "dispatcher missing dependency: pem_async",
871            ),
872            (DispatchError::ChannelClosed, "channel closed (consumer dropped)"),
873        ];
874        for (err, expected) in cases {
875            assert_eq!(format!("{err}"), expected);
876        }
877    }
878
879    /// 33.y.c smoke: dispatch_node dispatches Step through the
880    /// graduated pure-shape handler (not the shim) and returns
881    /// `NodeOutcome::Completed` with the stub backend's canonical
882    /// "(stub)" 1-token output.
883    #[tokio::test]
884    async fn dispatch_node_step_routes_to_pure_shape_handler() {
885        use crate::ir_nodes::*;
886
887        let step = IRStep {
888            node_type: "step",
889            source_line: 0,
890            source_column: 0,
891            name: "Generate".to_string(),
892            persona_ref: String::new(),
893            given: String::new(),
894            ask: "hi".to_string(),
895            use_tool: None,
896            probe: None,
897            reason: None,
898            weave: None,
899            output_type: String::new(),
900            confidence_floor: None,
901            navigate_ref: String::new(),
902            apply_ref: String::new(),
903            body: Vec::new(),
904        };
905        let node = IRFlowNode::Step(step);
906
907        let (tx, _rx) = mpsc::unbounded_channel();
908        let mut ctx = DispatchCtx::new(
909            "F",
910            "stub",
911            "",
912            CancellationFlag::new(),
913            tx,
914        );
915
916        let outcome = dispatch_node(&node, &mut ctx).await.unwrap();
917        match outcome {
918            NodeOutcome::Completed {
919                output,
920                tokens_emitted,
921                step_index,
922            } => {
923                assert_eq!(output, "(stub)");
924                assert_eq!(tokens_emitted, 1);
925                assert_eq!(step_index, 0);
926            }
927            other => panic!("post-33.y.c: Step routes to pure_shape handler returning Completed; got {other:?}"),
928        }
929    }
930}