entelix_graph/reducer.rs
1//! `Reducer<T>` — typed state-merge function.
2//!
3//! When two updates collide on the same state slot — typically because
4//! two parallel branches each produced a value, or because a node's
5//! return type only carries the *delta* it computed — a [`Reducer<T>`]
6//! decides how to combine `(current, update)` into the next value.
7//!
8//! Each node returns a full state; reducers are standalone helpers
9//! users call from their node closures, and the building block for
10//! the field-level `Annotated<T, R>` merge convention.
11//!
12//! ## Stock impls
13//!
14//! - [`Replace`] — last-write-wins (matches the current default).
15//! - [`Append`] — for `Vec<U>`; concatenates `current` and `update`.
16//! - [`MergeMap`] — for `HashMap<K, V>`; right-bias union.
17//! - [`Max`] — for any `T: Ord`; keeps the larger of the two.
18//!
19//! ## State-level composition
20//!
21//! [`StateMerge`] lifts the per-slot reducer pattern up to the
22//! whole `S` shape: each implementor describes how an incoming
23//! update folds into the current state. The companion
24//! `entelix-graph-derive::StateMerge` proc-macro generates the
25//! impl by walking struct fields — `Annotated<T, R>` fields apply
26//! their bundled reducer; plain fields are replaced by the
27//! incoming update.
28
29use std::cmp::Ord;
30use std::collections::HashMap;
31use std::hash::Hash;
32use std::marker::PhantomData;
33
34/// Combines a current value and an incoming update into the next value.
35///
36/// Implementors must be deterministic — the merge function runs
37/// inside `CompiledGraph::execute_loop` and a reducer that depends on
38/// outside state (random RNG, wall-clock time, …) breaks crash-resume
39/// reproducibility.
40pub trait Reducer<T>: Send + Sync + 'static {
41 /// Combine `current` and `update`. Implementations are free to
42 /// consume both; many simply return `update` (replace) or push
43 /// the second into the first (append).
44 fn reduce(&self, current: T, update: T) -> T;
45}
46
47/// Last-write-wins. Matches the current [`StateGraph`](crate::StateGraph)
48/// default — included so users who explicitly opt into reducer
49/// machinery have a no-op option.
50#[derive(Clone, Copy, Debug, Default)]
51pub struct Replace;
52
53impl<T> Reducer<T> for Replace
54where
55 T: Send + Sync + 'static,
56{
57 fn reduce(&self, _current: T, update: T) -> T {
58 update
59 }
60}
61
62/// Append: `current` followed by `update`. Specialised for
63/// `Vec<U>` where `U: Clone + Send + Sync + 'static`.
64#[derive(Clone, Copy, Debug)]
65pub struct Append<U>(PhantomData<fn() -> U>);
66
67impl<U> Default for Append<U> {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl<U> Append<U> {
74 /// Construct.
75 #[must_use]
76 pub const fn new() -> Self {
77 Self(PhantomData)
78 }
79}
80
81impl<U> Reducer<Vec<U>> for Append<U>
82where
83 U: Send + Sync + 'static,
84{
85 fn reduce(&self, mut current: Vec<U>, mut update: Vec<U>) -> Vec<U> {
86 current.append(&mut update);
87 current
88 }
89}
90
91/// Merge two `HashMap<K, V>`s, right-biased — entries from `update`
92/// overwrite collisions in `current`. (Right-bias matches typical
93/// LangGraph / dict-update semantics.)
94#[derive(Clone, Copy, Debug)]
95pub struct MergeMap<K, V>(PhantomData<fn() -> (K, V)>);
96
97impl<K, V> Default for MergeMap<K, V> {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl<K, V> MergeMap<K, V> {
104 /// Construct.
105 #[must_use]
106 pub const fn new() -> Self {
107 Self(PhantomData)
108 }
109}
110
111impl<K, V> Reducer<HashMap<K, V>> for MergeMap<K, V>
112where
113 K: Eq + Hash + Send + Sync + 'static,
114 V: Send + Sync + 'static,
115{
116 fn reduce(&self, mut current: HashMap<K, V>, update: HashMap<K, V>) -> HashMap<K, V> {
117 for (k, v) in update {
118 current.insert(k, v);
119 }
120 current
121 }
122}
123
124/// Keep the larger of `current` / `update` per `T: Ord`. Useful for
125/// "highest score wins" reducers and `usize` step counters merged
126/// across parallel branches.
127#[derive(Clone, Copy, Debug)]
128pub struct Max<T>(PhantomData<fn() -> T>);
129
130impl<T> Default for Max<T> {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136impl<T> Max<T> {
137 /// Construct.
138 #[must_use]
139 pub const fn new() -> Self {
140 Self(PhantomData)
141 }
142}
143
144impl<T> Reducer<T> for Max<T>
145where
146 T: Ord + Send + Sync + 'static,
147{
148 fn reduce(&self, current: T, update: T) -> T {
149 std::cmp::max(current, update)
150 }
151}
152
153/// Newtype that bundles a value `T` with the reducer `R` that should
154/// merge it. The wrapper is a standalone helper users compose into
155/// their state types; it does not hook into `StateGraph::add_node`
156/// directly.
157#[derive(Clone, Debug)]
158pub struct Annotated<T, R>
159where
160 R: Reducer<T>,
161{
162 /// The current value.
163 pub value: T,
164 reducer: R,
165}
166
167impl<T, R> Default for Annotated<T, R>
168where
169 T: Default,
170 R: Reducer<T> + Default,
171{
172 fn default() -> Self {
173 Self {
174 value: T::default(),
175 reducer: R::default(),
176 }
177 }
178}
179
180impl<T, R> Annotated<T, R>
181where
182 R: Reducer<T>,
183{
184 /// Wrap `value` with the supplied reducer.
185 pub const fn new(value: T, reducer: R) -> Self {
186 Self { value, reducer }
187 }
188
189 /// Borrow the inner reducer.
190 pub const fn reducer(&self) -> &R {
191 &self.reducer
192 }
193
194 /// Consume the wrapper and return the inner value.
195 pub fn into_value(self) -> T {
196 self.value
197 }
198
199 /// Apply the bundled reducer to fold `update` into `self.value`.
200 pub fn reduce_in_place(&mut self, update: T)
201 where
202 T: Default,
203 {
204 let current = std::mem::take(&mut self.value);
205 self.value = self.reducer.reduce(current, update);
206 }
207
208 /// Consume `self`, return a new `Annotated<T, R>` with `update`
209 /// folded in.
210 #[must_use]
211 pub fn reduced(self, update: T) -> Self
212 where
213 R: Clone,
214 {
215 let merged = self.reducer.reduce(self.value, update);
216 Self {
217 value: merged,
218 reducer: self.reducer,
219 }
220 }
221
222 /// Merge two annotated values. Both sides must agree on the
223 /// reducer type by construction (they share `R`); the resulting
224 /// `Annotated` keeps `self`'s reducer instance — this matters
225 /// for stateful reducers, where the *current* slot's reducer
226 /// has the right configuration. Stock reducers
227 /// ([`Replace`], [`Append`], [`MergeMap`], [`Max`]) are unit
228 /// structs, so the choice is moot for them.
229 ///
230 /// This is the building block the `StateMerge` derive macro
231 /// emits per `Annotated<T, R>` field.
232 #[must_use]
233 pub fn merge(self, other: Self) -> Self {
234 let merged = self.reducer.reduce(self.value, other.value);
235 Self {
236 value: merged,
237 reducer: self.reducer,
238 }
239 }
240}
241
242/// State-level merge: how an incoming update folds into the current
243/// state. The dispatch-loop counterpart to [`Reducer<T>`], one level
244/// up — implemented on the whole `S` shape rather than a single
245/// slot.
246///
247/// Two merge axes:
248///
249/// - [`Self::merge`] folds two same-shape `S` values (used by
250/// parallel-branch joins via `add_send_edges`, where two
251/// branches each produce a complete `S` and the dispatcher needs
252/// to combine them).
253/// - [`Self::merge_contribution`] folds an `Option`-wrapped
254/// contribution from one node into the current state. The
255/// `Contribution` type — declared via the `type Contribution`
256/// associated item — names which slots the node *actually
257/// wrote*, distinguishing "no contribution" from "contributed
258/// the default value". This is the canonical
259/// [`StateGraph::add_contributing_node`](crate::StateGraph::add_contributing_node)
260/// entry point — closer to LangGraph's TypedDict
261/// partial-return shape than the same-shape merge alone could
262/// express.
263///
264/// The companion `entelix-graph-derive::StateMerge` derive macro
265/// generates both methods plus the `<Name>Contribution` companion
266/// struct. Manual impls are supported when field-by-field shape
267/// doesn't fit (e.g. cross-field invariants enforced at merge time).
268pub trait StateMerge: Sized {
269 /// Companion type carrying an `Option`-wrapped slot per field
270 /// of `Self`. The derive macro generates this struct
271 /// automatically; manual implementors define their own.
272 type Contribution: Default + Send + Sync + 'static;
273
274 /// Fold `update` into `self` and return the merged state.
275 /// Implementations must be deterministic for the same reason
276 /// [`Reducer::reduce`] is — the merge runs inside the dispatch
277 /// loop and a non-deterministic implementation breaks
278 /// crash-resume reproducibility.
279 #[must_use]
280 fn merge(self, update: Self) -> Self;
281
282 /// Fold a [`Self::Contribution`] (an `Option`-wrapped partial
283 /// state) into `self`. Slots the node didn't write
284 /// (`None`) leave the current value untouched; slots it did
285 /// (`Some`) merge through the per-field reducer for
286 /// [`Annotated<T, R>`] fields, or replace for plain fields.
287 #[must_use]
288 fn merge_contribution(self, contribution: Self::Contribution) -> Self;
289}
290
291#[cfg(test)]
292#[allow(clippy::unwrap_used)]
293mod tests {
294 use super::*;
295
296 #[test]
297 fn replace_returns_update() {
298 let r = Replace;
299 assert_eq!(r.reduce(1u32, 7), 7);
300 }
301
302 #[test]
303 fn append_concatenates_vecs() {
304 let r = Append::<u32>::new();
305 assert_eq!(r.reduce(vec![1, 2], vec![3, 4]), vec![1, 2, 3, 4]);
306 }
307
308 #[test]
309 fn append_handles_empty_inputs() {
310 let r = Append::<u32>::new();
311 assert_eq!(r.reduce(Vec::new(), vec![1]), vec![1]);
312 assert_eq!(r.reduce(vec![1], Vec::new()), vec![1]);
313 assert!(r.reduce(Vec::<u32>::new(), Vec::new()).is_empty());
314 }
315
316 #[test]
317 fn merge_map_is_right_biased() {
318 let r = MergeMap::<&str, i32>::new();
319 let mut current = HashMap::new();
320 current.insert("a", 1);
321 current.insert("b", 2);
322 let mut update = HashMap::new();
323 update.insert("b", 20);
324 update.insert("c", 3);
325 let merged = r.reduce(current, update);
326 assert_eq!(merged.get("a"), Some(&1));
327 assert_eq!(merged.get("b"), Some(&20)); // right wins on collision
328 assert_eq!(merged.get("c"), Some(&3));
329 }
330
331 #[test]
332 fn max_keeps_larger() {
333 let r = Max::<i32>::new();
334 assert_eq!(r.reduce(3, 5), 5);
335 assert_eq!(r.reduce(10, 5), 10);
336 assert_eq!(r.reduce(-1, -3), -1);
337 }
338
339 #[test]
340 fn annotated_reduced_returns_merged() {
341 let a = Annotated::new(vec![1, 2], Append::<u32>::new());
342 let b = a.reduced(vec![3]);
343 assert_eq!(b.value, vec![1, 2, 3]);
344 }
345
346 #[test]
347 fn annotated_reduce_in_place_updates_value() {
348 let mut a = Annotated::new(vec![1, 2], Append::<u32>::new());
349 a.reduce_in_place(vec![3, 4]);
350 assert_eq!(a.value, vec![1, 2, 3, 4]);
351 }
352
353 #[test]
354 fn annotated_into_value_unwraps() {
355 let a = Annotated::new(42_i32, Replace);
356 assert_eq!(a.into_value(), 42);
357 }
358
359 #[test]
360 fn annotated_merge_combines_two_annotated_values() {
361 let left = Annotated::new(vec![1u32, 2], Append::<u32>::new());
362 let right = Annotated::new(vec![3u32, 4], Append::<u32>::new());
363 let merged = left.merge(right);
364 assert_eq!(merged.value, vec![1, 2, 3, 4]);
365 }
366
367 #[test]
368 fn annotated_merge_respects_reducer_kind() {
369 let left = Annotated::new(7_i32, Max::<i32>::new());
370 let right = Annotated::new(4_i32, Max::<i32>::new());
371 // Max-reducer keeps the larger of the two regardless of order.
372 assert_eq!(left.merge(right).value, 7);
373 }
374
375 #[test]
376 fn state_merge_can_be_implemented_manually() {
377 // Hand-rolled impl confirms the trait surface is usable
378 // without the derive macro — useful when a state type has
379 // cross-field invariants that need merge-time enforcement.
380 struct WithInvariant {
381 log: Annotated<Vec<u32>, Append<u32>>,
382 tag: String,
383 }
384 #[derive(Default)]
385 struct WithInvariantContribution {
386 log: Option<Annotated<Vec<u32>, Append<u32>>>,
387 tag: Option<String>,
388 }
389 impl StateMerge for WithInvariant {
390 type Contribution = WithInvariantContribution;
391 fn merge(self, update: Self) -> Self {
392 Self {
393 log: self.log.merge(update.log),
394 tag: update.tag,
395 }
396 }
397 fn merge_contribution(self, c: Self::Contribution) -> Self {
398 Self {
399 log: match c.log {
400 Some(v) => self.log.merge(v),
401 None => self.log,
402 },
403 tag: c.tag.unwrap_or(self.tag),
404 }
405 }
406 }
407 let merged = WithInvariant {
408 log: Annotated::new(vec![1, 2], Append::new()),
409 tag: "old".into(),
410 }
411 .merge(WithInvariant {
412 log: Annotated::new(vec![3], Append::new()),
413 tag: "new".into(),
414 });
415 assert_eq!(merged.log.value, vec![1, 2, 3]);
416 assert_eq!(merged.tag, "new");
417
418 // Contribution path: `tag` slot left None should keep the
419 // current `tag`, `log` slot Some should merge through the
420 // per-field reducer.
421 let merged2 = WithInvariant {
422 log: Annotated::new(vec![10], Append::new()),
423 tag: "keep".into(),
424 }
425 .merge_contribution(WithInvariantContribution {
426 log: Some(Annotated::new(vec![20], Append::new())),
427 tag: None,
428 });
429 assert_eq!(merged2.log.value, vec![10, 20]);
430 assert_eq!(merged2.tag, "keep");
431 }
432
433 #[test]
434 fn reducer_object_is_dyn_safe() {
435 // If this compiles, `Reducer<Vec<i32>>` is dyn-safe — useful
436 // for users who want to swap reducers at runtime.
437 let r: Box<dyn Reducer<Vec<i32>>> = Box::new(Append::<i32>::new());
438 assert_eq!(r.reduce(vec![1], vec![2]), vec![1, 2]);
439 }
440}