pub struct DispatchCtx {Show 20 fields
pub flow_name: String,
pub backend_name: String,
pub system_prompt: String,
pub cancel: CancellationFlag,
pub tx: UnboundedSender<FlowExecutionEvent>,
pub enforcement_summaries: Arc<Mutex<HashMap<String, EnforcementSummaryWire>>>,
pub step_audit_records: Arc<Mutex<Vec<StepAuditRecord>>>,
pub runtime_warnings: Arc<Mutex<Vec<RuntimeWarning>>>,
pub branch_path: Vec<String>,
pub step_counter: usize,
pub pem_backend: Option<Arc<dyn PersistenceBackend>>,
pub session_id: String,
pub tenant_id: String,
pub let_bindings: HashMap<String, String>,
pub pending_effect_policy: Option<BackpressurePolicy>,
pub tool_registry: Option<Arc<ToolRegistry>>,
pub store_registry: Option<Arc<StoreRegistry>>,
pub held_capabilities: Option<Vec<String>>,
pub audit_chain: Arc<Mutex<StoreAuditChain>>,
pub pinned_conns: Arc<Mutex<HashMap<String, PoolConnection<Postgres>>>>,
}Expand description
Per-flow dispatcher context. Carries the producer-side wire
surface (tx for FlowExecutionEvent), cancel-in-body propagation
(cancel), the audit/enforcement/warning side-channels (read by
the SSE handler at axon.complete time), and the orchestration
branch_path for D6 per-step replay binding.
§branch_path semantics
Empty at flow root. Parent handlers push a segment when descending into a child:
par[0],par[1],par[2]for the n-th branch of a Par block.for_in[0],for_in[1]for the n-th iteration of a ForIn loop.conditional.then,conditional.elsefor the chosen branch of anif.- Children inside a branch concat:
par[0].step[0]for the first child step of the first Par branch.
The path is observable in StepAuditRecord.branch_path (extended
in 33.y.f when the audit row writer gains the field) so regulators
replaying a flow on appeal see the full execution tree, not just a
flattened step sequence.
§step_counter
Monotonic per-flow counter. Each Step (or pure-shape variant
promoted to Step) increments. Surface fed into step_audit so
the row index is correct under nested orchestration.
Fields§
§flow_name: String§backend_name: String§system_prompt: String§cancel: CancellationFlag§tx: UnboundedSender<FlowExecutionEvent>§enforcement_summaries: Arc<Mutex<HashMap<String, EnforcementSummaryWire>>>§step_audit_records: Arc<Mutex<Vec<StepAuditRecord>>>§runtime_warnings: Arc<Mutex<Vec<RuntimeWarning>>>§branch_path: Vec<String>§step_counter: usize§pem_backend: Option<Arc<dyn PersistenceBackend>>§Fase 33.y.f — Optional PEM async surface for cognitive
primitives (Remember / Recall etc.). When Some(backend),
run_remember write-through persists to PEM and run_recall
restores from PEM as a write-back cache layered over
let_bindings. When None, both handlers degrade to
let_bindings-only (in-memory) — the canonical adopter
path for tests + adopters that don’t opt into persistent
cognitive state. Arc-cloned per branch for concurrent
dispatch (Fase 33.y.e parity).
session_id: String§Fase 33.y.f — Session anchor for PEM persistence. Defaults
to flow_name in DispatchCtx::new; adopters override
for multi-session flows.
tenant_id: String§Fase 33.y.f — Tenant routing tag for PEM persistence.
Defaults to empty in DispatchCtx::new; multi-tenant
adopters set this before dispatch.
let_bindings: HashMap<String, String>§Fase 33.y.d — Let-binding scope. Map from binding name to its
resolved value. run_let inserts; run_conditional reads to
evaluate the condition; run_for_in inserts the iteration
variable per iter. Bindings persist through the flow’s
lifetime — sub-scoping is NOT introduced in 33.y.d (the
sync runner’s let semantics are flow-scoped + monotonic,
matching this discipline for D10 parity). The HashMap is
cheap to clone for branch isolation when sub-fases 33.y.e
introduce parallel branches with private scopes (Par block).
pending_effect_policy: Option<BackpressurePolicy>§Fase 33.y.c — Per-node declared <stream:<policy>> resolved
by the caller BEFORE invoking dispatch_node. The pure-shape
handlers read + consume this field (set back to None on
entry) so each handler observes the policy intended for ITS
node, never the previous node’s residue. When None, the
handler skips StreamPolicyEnforcer wrapping + emits chunks
directly to the wire.
Subsequent sub-fases 33.y.d-l adopt the same pattern for
orchestration handlers (Par / ForIn) when child nodes
declare effects.
tool_registry: Option<Arc<ToolRegistry>>§Fase 34.d (v1.29.0) — Tool registry surface for the
streaming-tool dispatcher branch. When Some(registry),
pure_shape::run_step resolves step.apply_ref against
the registry; if the entry’s is_streaming flag is true,
the step bypasses Backend::stream() entirely + invokes
tool.stream(args, ctx) via the
crate::tool_dispatch_bridge::resolve_streaming_tool
factory. When None (D9 backwards-compat), the legacy
LLM-side path is taken regardless of source-declared
effects: <stream:<policy>> — adopters who haven’t wired
the registry yet see no behavior change. Arc-shared for
concurrent dispatch (Fase 33.y.e parity).
store_registry: Option<Arc<StoreRegistry>>§Fase 35.f (v1.30.0) — axonstore registry for SQL-vs-KV
dispatch. When Some(registry), run_persist / run_retrieve
/ run_mutate / run_purge resolve store_name against it: a
postgresql-backed store routes through PostgresStoreBackend,
every other (and every undeclared) store takes the byte-
identical key-value path. When None (the DispatchCtx::new
default), every store op is key-value — the pre-35 behavior,
unchanged (D3). Arc-shared so concurrent branches share one
per-DSN pool cache.
held_capabilities: Option<Vec<String>>§Fase 35.j (v1.30.0) — Pillar IV: the capability slugs the
current request carries (the JWT bearer’s capabilities
claim). When Some, the store handlers re-check a
capability-gated store against this set before any access —
defense-in-depth behind the type-checker’s compile-time
guarantee. When None (the DispatchCtx::new default), there
is no capability context at this layer and the runtime
re-check is a no-op: the compile-time check + the endpoint’s
Fase 32.g requires: gate stand.
audit_chain: Arc<Mutex<StoreAuditChain>>§Fase 35.h (v1.30.0) — Pillar II: the flow’s tamper-evident
HMAC-Merkle mutation chain. Every persist/mutate/purge
appends a delta. Shared (Arc) across concurrent branches so a
Par block’s mutations land in one chain; the Merkle head is a
verifiable fingerprint of the flow’s complete mutation history.
pinned_conns: Arc<Mutex<HashMap<String, PoolConnection<Postgres>>>>§Fase 37.x.j (D2) — Per-flow pinned Postgres connections.
Populated at stream start by run_streaming_via_dispatcher:
the IR is walked, every postgresql-backed axonstore referenced
by the flow body has ONE PoolConnection<Postgres> acquired,
and the map holds them by axonstore name for the flow’s
lifetime. The map drops at the end of the streaming task,
returning every conn to the pool via the after_release DEALLOCATE ALL hook (Fase 38.x.a D2 composing with 37.x.j D1).
Wire-integration store handlers consult this map per op:
take the pin out → run the SQL via StoreConn::Pinned(&mut pin)
→ insert the pin back. The take/return discipline preserves
the Arc<Mutex<>> sharing pattern across cloned (par-branched)
contexts while keeping individual ops borrow-checker friendly.
Empty map ≡ no pinning held (legacy path) → handlers fall back
to StoreConn::Pool(backend.pool()). This is the case for
callers that haven’t eager-acquired (non-streaming RPC paths,
CLI tests, etc.) — D5 byte-identical backwards-compat.
Per D6.b (sub-fase 37.x.j.6): par {} branches that share this
Arc serialize on its mutex. The D6.a default (per-branch
sub-pin) replaces this Arc with a fresh empty map at par-branch
clone time so branches do NOT serialize on the parent’s pins.
Implementations§
Source§impl DispatchCtx
impl DispatchCtx
Sourcepub fn new(
flow_name: impl Into<String>,
backend_name: impl Into<String>,
system_prompt: impl Into<String>,
cancel: CancellationFlag,
tx: UnboundedSender<FlowExecutionEvent>,
) -> Self
pub fn new( flow_name: impl Into<String>, backend_name: impl Into<String>, system_prompt: impl Into<String>, cancel: CancellationFlag, tx: UnboundedSender<FlowExecutionEvent>, ) -> Self
Construct a fresh context for a new flow. Subsequent sub-fases extend this with builder methods as the surface grows (PEM / ReplayToken / CognitiveState plumbing in 33.y.f, tool registry in 33.y.k, etc.).
Sourcepub fn with_pinned_conns(
self,
conns: Arc<Mutex<HashMap<String, PoolConnection<Postgres>>>>,
) -> Self
pub fn with_pinned_conns( self, conns: Arc<Mutex<HashMap<String, PoolConnection<Postgres>>>>, ) -> Self
§Fase 37.x.j (D2) — Builder: attach an Arc-shared pinned
connection map populated by the caller. run_streaming_via_dispatcher
uses this to install the eagerly-acquired flow-scoped pins
BEFORE the dispatcher walks any node. Returns self so the
builder pattern chains with with_store_registry, with_pem,
with_tool_registry, with_held_capabilities.
Sourcepub fn with_store_registry(self, registry: Arc<StoreRegistry>) -> Self
pub fn with_store_registry(self, registry: Arc<StoreRegistry>) -> Self
§Fase 35.f — Builder: attach the axonstore registry so the
wire-integration store handlers route postgresql-backed stores
to SQL. Without it, every store op stays key-value (D3).
Returns self so builders chain.
Sourcepub fn with_held_capabilities(self, capabilities: Vec<String>) -> Self
pub fn with_held_capabilities(self, capabilities: Vec<String>) -> Self
§Fase 35.j — Builder: attach the request’s held capability
slugs so the store handlers re-check capability-gated stores
(Pillar IV). Returns self so builders chain.
Sourcepub fn with_tool_registry(self, registry: Arc<ToolRegistry>) -> Self
pub fn with_tool_registry(self, registry: Arc<ToolRegistry>) -> Self
§Fase 34.d — Builder: attach a tool registry so the
dispatcher’s streaming-tool branch can resolve apply_ref
against it. Returns self so builders chain.
Sourcepub fn with_pem(self, backend: Arc<dyn PersistenceBackend>) -> Self
pub fn with_pem(self, backend: Arc<dyn PersistenceBackend>) -> Self
Builder: attach a PEM persistence backend. Returns self so
callers can chain DispatchCtx::new(...).with_pem(backend).
Sourcepub fn with_session_id(self, session_id: impl Into<String>) -> Self
pub fn with_session_id(self, session_id: impl Into<String>) -> Self
Builder: set the session id (defaults to flow_name).
Sourcepub fn with_tenant_id(self, tenant_id: impl Into<String>) -> Self
pub fn with_tenant_id(self, tenant_id: impl Into<String>) -> Self
Builder: set the tenant id (defaults to empty).
Sourcepub fn with_effect_policy(self, policy: BackpressurePolicy) -> Self
pub fn with_effect_policy(self, policy: BackpressurePolicy) -> Self
Builder-style setter for the pending effect policy. Returns
self so callers can chain ctx.with_effect_policy(policy)
before invoking dispatch_node. Handlers read + clear the
field via Self::take_pending_effect_policy.
Sourcepub fn with_external_side_channels(
self,
enforcement_summaries: Arc<Mutex<HashMap<String, EnforcementSummaryWire>>>,
step_audit_records: Arc<Mutex<Vec<StepAuditRecord>>>,
runtime_warnings: Arc<Mutex<Vec<RuntimeWarning>>>,
) -> Self
pub fn with_external_side_channels( self, enforcement_summaries: Arc<Mutex<HashMap<String, EnforcementSummaryWire>>>, step_audit_records: Arc<Mutex<Vec<StepAuditRecord>>>, runtime_warnings: Arc<Mutex<Vec<RuntimeWarning>>>, ) -> Self
§Fase 33.z.c — Builder: inject external Arc-backed side-channels
so the dispatcher’s per-variant handlers populate the SAME
Mutexes that server_execute_streaming reads from for the SSE
wire’s enforcement_summary, step_audit, and runtime_warnings
fields.
Without this builder, DispatchCtx::new creates FRESH Arcs that
the dispatcher populates but the production hot path can’t read.
That gap broke axon.complete.enforcement_summary wire emission
on the canonical Step shape when the dispatcher graft (33.z.b)
activated — the 33.x.d production-path tests detected the
regression at the assert_eq!(generate_summary["chunks_pushed"], 1)
line because the side-channel the wire reads from stayed empty
while the dispatcher’s fresh Arc carried the counters.
Used exclusively by streaming_via_dispatcher::run_streaming_via_dispatcher
to thread the side-channels the SSE handler constructs into the
dispatcher. Downstream-crate consumers driving dispatch_node
directly continue to use DispatchCtx::new + the fresh internal
Arcs.
Sourcepub fn take_pending_effect_policy(&mut self) -> Option<BackpressurePolicy>
pub fn take_pending_effect_policy(&mut self) -> Option<BackpressurePolicy>
Read + clear the pending effect policy. Returns None when no
policy was set by the caller. The take-semantics (vs. peek)
prevents a stale policy from a previous node leaking into the
next handler’s invocation if the caller forgets to clear.
Sourcepub fn branch_path_string(&self) -> String
pub fn branch_path_string(&self) -> String
Render the current branch_path as a wire-stable string. Empty
path returns "" (flow root); single segment "par[0]"; multi
"par[0].step[1]". The format is byte-stable across calls.
Trait Implementations§
Source§impl Clone for DispatchCtx
impl Clone for DispatchCtx
Source§fn clone(&self) -> DispatchCtx
fn clone(&self) -> DispatchCtx
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl !RefUnwindSafe for DispatchCtx
impl !UnwindSafe for DispatchCtx
impl Freeze for DispatchCtx
impl Send for DispatchCtx
impl Sync for DispatchCtx
impl Unpin for DispatchCtx
impl UnsafeUnpin for DispatchCtx
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more