Skip to main content

Module channel

Module channel 

Source
Expand description

Channel-per-field state model (additive).

See [types] for the type definitions and the high-level model. This file supplies the concrete Channel merge rules, the ChannelSet map operations, and the ChannelStateStateReducer bridge that lets a channel graph run on the existing executor.

§How a channel graph runs on the unchanged executor

The executor folds a superstep’s branch results one at a time: state = reducer.apply(state, update) for each branch’s ChannelUpdate. ChannelState is its own reducer, so each apply dispatches every write in the update to the owning channel’s Channel::merge.

§Concurrent-write conflict detection

When two fan-out branches write the same channel in one superstep, the merge must decide whether that is legal:

Because the executor applies a step’s updates as a contiguous batch, “same step” is tracked by stamping each ChannelUpdate with the node’s ctx.step via ChannelUpdate::at_step. When updates are stamped, the reducer resets its per-step bookkeeping (and clears Ephemeral channels) whenever the step number advances. Unstamped updates are each treated as their own step (last-value writes always win, no conflict detection and no ephemeral clearing) — so existing whole-state habits keep working and conflict detection is strictly opt-in.

Structs§

Barrier
Count-based barrier: accumulates writes into a JSON array and is ready only once it has collected at least expected arrivals. Allows concurrent same-step writes (fan-in is the whole point).
BinaryAggregate
Binary-aggregate channel: folds writes through a user-supplied binary closure (append, add, min, max, custom). The first write becomes the value directly; subsequent writes are fold(current, incoming). Allows concurrent same-step writes.
ChannelSet
A named map of channels plus their current serde_json::Values.
ChannelState
A concrete graph State wrapping a ChannelSet.
ChannelUpdate
A batch of (channel_name, value) writes returned by a node.
Delta
Numeric accumulator: each write is added to the running total.
Ephemeral
One-shot overwrite channel whose value is cleared at the start of the next step (see ChannelUpdate::at_step for how step boundaries are detected).
LastValue
Overwrite channel: each write replaces the value (last-value semantics).
Messages
Message-merge channel: maintains a JSON array of message objects deduplicated by their id field. An incoming message whose id matches an existing entry replaces it in place; otherwise it is appended. Allows concurrent same-step writes.
NamedBarrier
Name-based barrier: accumulates writes into a JSON object keyed by arrival name and is ready only once every name in expected has arrived. Each incoming write is a JSON object whose keys are merged into the accumulator. Allows concurrent same-step writes.
Topic
Append channel: accumulates writes into a JSON array across steps.
Untracked
Overwrite channel excluded from ChannelSet::snapshot and durable-state views. Useful for scratch values that should not be checkpointed.

Traits§

Channel
A single named state channel: it owns the merge rule that folds an incoming update value into the channel’s current value at a superstep boundary.