Skip to main content

DispatchCtx

Struct DispatchCtx 

Source
pub struct DispatchCtx {
Show 20 fields pub flow_name: String, pub backend_name: String, pub system_prompt: String, pub cancel: CancellationFlag, pub tx: UnboundedSender<FlowExecutionEvent>, pub enforcement_summaries: Arc<Mutex<HashMap<String, EnforcementSummaryWire>>>, pub step_audit_records: Arc<Mutex<Vec<StepAuditRecord>>>, pub runtime_warnings: Arc<Mutex<Vec<RuntimeWarning>>>, pub branch_path: Vec<String>, pub step_counter: usize, pub pem_backend: Option<Arc<dyn PersistenceBackend>>, pub session_id: String, pub tenant_id: String, pub let_bindings: HashMap<String, String>, pub pending_effect_policy: Option<BackpressurePolicy>, pub tool_registry: Option<Arc<ToolRegistry>>, pub store_registry: Option<Arc<StoreRegistry>>, pub held_capabilities: Option<Vec<String>>, pub audit_chain: Arc<Mutex<StoreAuditChain>>, pub pinned_conns: Arc<Mutex<HashMap<String, PoolConnection<Postgres>>>>,
}
Expand description

Per-flow dispatcher context. Carries the producer-side wire surface (tx for FlowExecutionEvent), cancel-in-body propagation (cancel), the audit/enforcement/warning side-channels (read by the SSE handler at axon.complete time), and the orchestration branch_path for D6 per-step replay binding.

§branch_path semantics

Empty at flow root. Parent handlers push a segment when descending into a child:

  • par[0], par[1], par[2] for the n-th branch of a Par block.
  • for_in[0], for_in[1] for the n-th iteration of a ForIn loop.
  • conditional.then, conditional.else for the chosen branch of an if.
  • Children inside a branch concat: par[0].step[0] for the first child step of the first Par branch.

The path is observable in StepAuditRecord.branch_path (extended in 33.y.f when the audit row writer gains the field) so regulators replaying a flow on appeal see the full execution tree, not just a flattened step sequence.

§step_counter

Monotonic per-flow counter. Each Step (or pure-shape variant promoted to Step) increments. Surface fed into step_audit so the row index is correct under nested orchestration.

Fields§

§flow_name: String§backend_name: String§system_prompt: String§cancel: CancellationFlag§tx: UnboundedSender<FlowExecutionEvent>§enforcement_summaries: Arc<Mutex<HashMap<String, EnforcementSummaryWire>>>§step_audit_records: Arc<Mutex<Vec<StepAuditRecord>>>§runtime_warnings: Arc<Mutex<Vec<RuntimeWarning>>>§branch_path: Vec<String>§step_counter: usize§pem_backend: Option<Arc<dyn PersistenceBackend>>

§Fase 33.y.f — Optional PEM async surface for cognitive primitives (Remember / Recall etc.). When Some(backend), run_remember write-through persists to PEM and run_recall restores from PEM as a write-back cache layered over let_bindings. When None, both handlers degrade to let_bindings-only (in-memory) — the canonical adopter path for tests + adopters that don’t opt into persistent cognitive state. Arc-cloned per branch for concurrent dispatch (Fase 33.y.e parity).

§session_id: String

§Fase 33.y.f — Session anchor for PEM persistence. Defaults to flow_name in DispatchCtx::new; adopters override for multi-session flows.

§tenant_id: String

§Fase 33.y.f — Tenant routing tag for PEM persistence. Defaults to empty in DispatchCtx::new; multi-tenant adopters set this before dispatch.

§let_bindings: HashMap<String, String>

§Fase 33.y.d — Let-binding scope. Map from binding name to its resolved value. run_let inserts; run_conditional reads to evaluate the condition; run_for_in inserts the iteration variable per iter. Bindings persist through the flow’s lifetime — sub-scoping is NOT introduced in 33.y.d (the sync runner’s let semantics are flow-scoped + monotonic, matching this discipline for D10 parity). The HashMap is cheap to clone for branch isolation when sub-fases 33.y.e introduce parallel branches with private scopes (Par block).

§pending_effect_policy: Option<BackpressurePolicy>

§Fase 33.y.c — Per-node declared <stream:<policy>> resolved by the caller BEFORE invoking dispatch_node. The pure-shape handlers read + consume this field (set back to None on entry) so each handler observes the policy intended for ITS node, never the previous node’s residue. When None, the handler skips StreamPolicyEnforcer wrapping + emits chunks directly to the wire.

Subsequent sub-fases 33.y.d-l adopt the same pattern for orchestration handlers (Par / ForIn) when child nodes declare effects.

§tool_registry: Option<Arc<ToolRegistry>>

§Fase 34.d (v1.29.0) — Tool registry surface for the streaming-tool dispatcher branch. When Some(registry), pure_shape::run_step resolves step.apply_ref against the registry; if the entry’s is_streaming flag is true, the step bypasses Backend::stream() entirely + invokes tool.stream(args, ctx) via the crate::tool_dispatch_bridge::resolve_streaming_tool factory. When None (D9 backwards-compat), the legacy LLM-side path is taken regardless of source-declared effects: <stream:<policy>> — adopters who haven’t wired the registry yet see no behavior change. Arc-shared for concurrent dispatch (Fase 33.y.e parity).

§store_registry: Option<Arc<StoreRegistry>>

§Fase 35.f (v1.30.0) — axonstore registry for SQL-vs-KV dispatch. When Some(registry), run_persist / run_retrieve / run_mutate / run_purge resolve store_name against it: a postgresql-backed store routes through PostgresStoreBackend, every other (and every undeclared) store takes the byte- identical key-value path. When None (the DispatchCtx::new default), every store op is key-value — the pre-35 behavior, unchanged (D3). Arc-shared so concurrent branches share one per-DSN pool cache.

§held_capabilities: Option<Vec<String>>

§Fase 35.j (v1.30.0) — Pillar IV: the capability slugs the current request carries (the JWT bearer’s capabilities claim). When Some, the store handlers re-check a capability-gated store against this set before any access — defense-in-depth behind the type-checker’s compile-time guarantee. When None (the DispatchCtx::new default), there is no capability context at this layer and the runtime re-check is a no-op: the compile-time check + the endpoint’s Fase 32.g requires: gate stand.

§audit_chain: Arc<Mutex<StoreAuditChain>>

§Fase 35.h (v1.30.0) — Pillar II: the flow’s tamper-evident HMAC-Merkle mutation chain. Every persist/mutate/purge appends a delta. Shared (Arc) across concurrent branches so a Par block’s mutations land in one chain; the Merkle head is a verifiable fingerprint of the flow’s complete mutation history.

§pinned_conns: Arc<Mutex<HashMap<String, PoolConnection<Postgres>>>>

§Fase 37.x.j (D2) — Per-flow pinned Postgres connections. Populated at stream start by run_streaming_via_dispatcher: the IR is walked, every postgresql-backed axonstore referenced by the flow body has ONE PoolConnection<Postgres> acquired, and the map holds them by axonstore name for the flow’s lifetime. The map drops at the end of the streaming task, returning every conn to the pool via the after_release DEALLOCATE ALL hook (Fase 38.x.a D2 composing with 37.x.j D1).

Wire-integration store handlers consult this map per op: take the pin out → run the SQL via StoreConn::Pinned(&mut pin)insert the pin back. The take/return discipline preserves the Arc<Mutex<>> sharing pattern across cloned (par-branched) contexts while keeping individual ops borrow-checker friendly.

Empty map ≡ no pinning held (legacy path) → handlers fall back to StoreConn::Pool(backend.pool()). This is the case for callers that haven’t eager-acquired (non-streaming RPC paths, CLI tests, etc.) — D5 byte-identical backwards-compat.

Per D6.b (sub-fase 37.x.j.6): par {} branches that share this Arc serialize on its mutex. The D6.a default (per-branch sub-pin) replaces this Arc with a fresh empty map at par-branch clone time so branches do NOT serialize on the parent’s pins.

Implementations§

Source§

impl DispatchCtx

Source

pub fn new( flow_name: impl Into<String>, backend_name: impl Into<String>, system_prompt: impl Into<String>, cancel: CancellationFlag, tx: UnboundedSender<FlowExecutionEvent>, ) -> Self

Construct a fresh context for a new flow. Subsequent sub-fases extend this with builder methods as the surface grows (PEM / ReplayToken / CognitiveState plumbing in 33.y.f, tool registry in 33.y.k, etc.).

Source

pub fn with_pinned_conns( self, conns: Arc<Mutex<HashMap<String, PoolConnection<Postgres>>>>, ) -> Self

§Fase 37.x.j (D2) — Builder: attach an Arc-shared pinned connection map populated by the caller. run_streaming_via_dispatcher uses this to install the eagerly-acquired flow-scoped pins BEFORE the dispatcher walks any node. Returns self so the builder pattern chains with with_store_registry, with_pem, with_tool_registry, with_held_capabilities.

Source

pub fn with_store_registry(self, registry: Arc<StoreRegistry>) -> Self

§Fase 35.f — Builder: attach the axonstore registry so the wire-integration store handlers route postgresql-backed stores to SQL. Without it, every store op stays key-value (D3). Returns self so builders chain.

Source

pub fn with_held_capabilities(self, capabilities: Vec<String>) -> Self

§Fase 35.j — Builder: attach the request’s held capability slugs so the store handlers re-check capability-gated stores (Pillar IV). Returns self so builders chain.

Source

pub fn with_tool_registry(self, registry: Arc<ToolRegistry>) -> Self

§Fase 34.d — Builder: attach a tool registry so the dispatcher’s streaming-tool branch can resolve apply_ref against it. Returns self so builders chain.

Source

pub fn with_pem(self, backend: Arc<dyn PersistenceBackend>) -> Self

Builder: attach a PEM persistence backend. Returns self so callers can chain DispatchCtx::new(...).with_pem(backend).

Source

pub fn with_session_id(self, session_id: impl Into<String>) -> Self

Builder: set the session id (defaults to flow_name).

Source

pub fn with_tenant_id(self, tenant_id: impl Into<String>) -> Self

Builder: set the tenant id (defaults to empty).

Source

pub fn with_effect_policy(self, policy: BackpressurePolicy) -> Self

Builder-style setter for the pending effect policy. Returns self so callers can chain ctx.with_effect_policy(policy) before invoking dispatch_node. Handlers read + clear the field via Self::take_pending_effect_policy.

Source

pub fn with_external_side_channels( self, enforcement_summaries: Arc<Mutex<HashMap<String, EnforcementSummaryWire>>>, step_audit_records: Arc<Mutex<Vec<StepAuditRecord>>>, runtime_warnings: Arc<Mutex<Vec<RuntimeWarning>>>, ) -> Self

§Fase 33.z.c — Builder: inject external Arc-backed side-channels so the dispatcher’s per-variant handlers populate the SAME Mutexes that server_execute_streaming reads from for the SSE wire’s enforcement_summary, step_audit, and runtime_warnings fields.

Without this builder, DispatchCtx::new creates FRESH Arcs that the dispatcher populates but the production hot path can’t read. That gap broke axon.complete.enforcement_summary wire emission on the canonical Step shape when the dispatcher graft (33.z.b) activated — the 33.x.d production-path tests detected the regression at the assert_eq!(generate_summary["chunks_pushed"], 1) line because the side-channel the wire reads from stayed empty while the dispatcher’s fresh Arc carried the counters.

Used exclusively by streaming_via_dispatcher::run_streaming_via_dispatcher to thread the side-channels the SSE handler constructs into the dispatcher. Downstream-crate consumers driving dispatch_node directly continue to use DispatchCtx::new + the fresh internal Arcs.

Source

pub fn take_pending_effect_policy(&mut self) -> Option<BackpressurePolicy>

Read + clear the pending effect policy. Returns None when no policy was set by the caller. The take-semantics (vs. peek) prevents a stale policy from a previous node leaking into the next handler’s invocation if the caller forgets to clear.

Source

pub fn branch_path_string(&self) -> String

Render the current branch_path as a wire-stable string. Empty path returns "" (flow root); single segment "par[0]"; multi "par[0].step[1]". The format is byte-stable across calls.

Trait Implementations§

Source§

impl Clone for DispatchCtx

Source§

fn clone(&self) -> DispatchCtx

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

Source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more