1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
//! Channel-per-field state types.
//!
//! This module defines the *channel* state model described in
//! `docs/modules/graph/state-channels.md`. It is an **additive** alternative to
//! the monolithic `State` + [`crate::graph::StateReducer`] path: instead of one
//! whole-state value with one reducer, state is split into independently-named
//! *channels*, each owning its own current value and its own binary merge rule.
//!
//! The model is built from four pieces:
//!
//! - [`Channel`]: the per-key merge policy (overwrite, aggregate, append, …).
//! - [`ChannelSet`]: a named map of channels plus their current values.
//! - [`ChannelUpdate`]: a batch of `(channel_name, value)` writes a node returns.
//! - [`ChannelState`]: a concrete graph `State` wrapping a [`ChannelSet`] that
//! implements [`crate::graph::StateReducer<ChannelState, ChannelUpdate>`], so a
//! channel graph is just `GraphBuilder<ChannelState, ChannelUpdate>` running on
//! the unchanged executor.
//!
//! Values are [`serde_json::Value`]-backed for generality so channels compose
//! with checkpointing, export, and language-defined nodes without per-graph type
//! parameters.
use ;
use Value;
use crateResult;
/// A single named state channel: it owns the merge rule that folds an incoming
/// update value into the channel's current value at a superstep boundary.
///
/// Channels are object-safe and value-typed ([`serde_json::Value`]) so a
/// [`ChannelSet`] can hold a heterogeneous map of `Box<dyn Channel>`. Each
/// channel decides:
///
/// - [`Channel::merge`] — how an incoming write combines with the current value.
/// - [`Channel::allows_concurrent`] — whether two branches in the *same* step may
/// both write this channel (aggregates: yes; last-value: no, see
/// [`crate::TinyAgentsError::InvalidConcurrentUpdate`]).
/// - [`Channel::is_ephemeral`] — whether the value is cleared at the start of the
/// next step (one-shot channels).
/// - [`Channel::is_tracked`] — whether the value appears in [`ChannelSet::snapshot`]
/// and is considered part of durable state.
/// - [`Channel::is_ready`] — barrier readiness (defaults to always ready).
/// Overwrite channel: each write replaces the value (last-value semantics).
///
/// Rejects concurrent same-step writes from multiple branches with
/// [`crate::TinyAgentsError::InvalidConcurrentUpdate`].
;
/// Append channel: accumulates writes into a JSON array across steps.
///
/// A scalar write is pushed as one element; an array write extends the list.
/// Allows concurrent same-step writes (the order follows deterministic
/// active-set index order).
;
/// Numeric accumulator: each write is added to the running total.
///
/// Integer writes stay integers; any float write promotes the total to a float.
/// Allows concurrent same-step writes.
;
/// Message-merge channel: maintains a JSON array of message objects deduplicated
/// by their `id` field. An incoming message whose `id` matches an existing entry
/// replaces it in place; otherwise it is appended. Allows concurrent same-step
/// writes.
;
/// One-shot overwrite channel whose value is cleared at the start of the next
/// step (see [`ChannelUpdate::at_step`] for how step boundaries are detected).
;
/// Overwrite channel excluded from [`ChannelSet::snapshot`] and durable-state
/// views. Useful for scratch values that should not be checkpointed.
;
/// Count-based barrier: accumulates writes into a JSON array and is *ready* only
/// once it has collected at least `expected` arrivals. Allows concurrent
/// same-step writes (fan-in is the whole point).
/// Name-based barrier: accumulates writes into a JSON object keyed by arrival
/// name and is *ready* only once every name in `expected` has arrived. Each
/// incoming write is a JSON object whose keys are merged into the accumulator.
/// Allows concurrent same-step writes.
/// Binary-aggregate channel: folds writes through a user-supplied binary
/// closure (append, add, min, max, custom). The first write becomes the value
/// directly; subsequent writes are `fold(current, incoming)`. Allows concurrent
/// same-step writes.
/// A named map of channels plus their current [`serde_json::Value`]s.
///
/// The channel definitions (merge rules) live alongside the values, so the set
/// is self-describing: merging an update only needs the set itself. Construct
/// one with [`ChannelSet::new`] and register channels with
/// [`ChannelSet::with_channel`]; feed writes through
/// [`ChannelSet::apply_update`]; read the durable view with
/// [`ChannelSet::snapshot`].
/// A batch of `(channel_name, value)` writes returned by a node.
///
/// Build one with [`ChannelUpdate::new`] and chain [`ChannelUpdate::set`]. Tag
/// it with [`ChannelUpdate::at_step`] (passing `ctx.step`) to opt into
/// same-step concurrent-write conflict detection and ephemeral clearing — see
/// the module docs and [`ChannelState`].
/// A concrete graph `State` wrapping a [`ChannelSet`].
///
/// `ChannelState` implements [`crate::graph::StateReducer<ChannelState,
/// ChannelUpdate>`] for itself, so a channel-based graph is built directly with
/// `GraphBuilder<ChannelState, ChannelUpdate>` and runs on the unchanged
/// executor: each superstep the executor folds every branch's
/// [`ChannelUpdate`] into the committed `ChannelState` through this reducer,
/// dispatching each write to its channel's merge rule.
///
/// The reducer's `&self` receiver is unused — the merge rules travel inside the
/// running state's [`ChannelSet`] — so any `ChannelState` value (for example
/// [`ChannelState::default`]) can be passed to `set_reducer`.