Skip to main content

entelix_graph/
reducer.rs

1//! `Reducer<T>` — typed state-merge function.
2//!
3//! When two updates collide on the same state slot — typically because
4//! two parallel branches each produced a value, or because a node's
5//! return type only carries the *delta* it computed — a [`Reducer<T>`]
6//! decides how to combine `(current, update)` into the next value.
7//!
8//! Each node returns a full state; reducers are standalone helpers
9//! users call from their node closures, and the building block for
10//! the field-level `Annotated<T, R>` merge convention.
11//!
12//! ## Stock impls
13//!
14//! - [`Replace`]   — last-write-wins (matches the current default).
15//! - [`Append`]    — for `Vec<U>`; concatenates `current` and `update`.
16//! - [`MergeMap`]  — for `HashMap<K, V>`; right-bias union.
17//! - [`Max`]       — for any `T: Ord`; keeps the larger of the two.
18//!
19//! ## State-level composition
20//!
21//! [`StateMerge`] lifts the per-slot reducer pattern up to the
22//! whole `S` shape: each implementor describes how an incoming
23//! update folds into the current state. The companion
24//! `entelix-graph-derive::StateMerge` proc-macro generates the
25//! impl by walking struct fields — `Annotated<T, R>` fields apply
26//! their bundled reducer; plain fields are replaced by the
27//! incoming update.
28
29use std::cmp::Ord;
30use std::collections::HashMap;
31use std::hash::Hash;
32use std::marker::PhantomData;
33
34/// Combines a current value and an incoming update into the next value.
35///
36/// Implementors must be deterministic — the merge function runs
37/// inside `CompiledGraph::execute_loop` and a reducer that depends on
38/// outside state (random RNG, wall-clock time, …) breaks crash-resume
39/// reproducibility.
40pub trait Reducer<T>: Send + Sync + 'static {
41    /// Combine `current` and `update`. Implementations are free to
42    /// consume both; many simply return `update` (replace) or push
43    /// the second into the first (append).
44    fn reduce(&self, current: T, update: T) -> T;
45}
46
47/// Last-write-wins. Matches the current [`StateGraph`](crate::StateGraph)
48/// default — included so users who explicitly opt into reducer
49/// machinery have a no-op option.
50#[derive(Clone, Copy, Debug, Default)]
51pub struct Replace;
52
53impl<T> Reducer<T> for Replace
54where
55    T: Send + Sync + 'static,
56{
57    fn reduce(&self, _current: T, update: T) -> T {
58        update
59    }
60}
61
62/// Append: `current` followed by `update`. Specialised for
63/// `Vec<U>` where `U: Clone + Send + Sync + 'static`.
64#[derive(Clone, Copy, Debug)]
65pub struct Append<U>(PhantomData<fn() -> U>);
66
67impl<U> Default for Append<U> {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl<U> Append<U> {
74    /// Construct.
75    #[must_use]
76    pub const fn new() -> Self {
77        Self(PhantomData)
78    }
79}
80
81impl<U> Reducer<Vec<U>> for Append<U>
82where
83    U: Send + Sync + 'static,
84{
85    fn reduce(&self, mut current: Vec<U>, mut update: Vec<U>) -> Vec<U> {
86        current.append(&mut update);
87        current
88    }
89}
90
91/// Merge two `HashMap<K, V>`s, right-biased — entries from `update`
92/// overwrite collisions in `current`. (Right-bias matches typical
93/// LangGraph / dict-update semantics.)
94#[derive(Clone, Copy, Debug)]
95pub struct MergeMap<K, V>(PhantomData<fn() -> (K, V)>);
96
97impl<K, V> Default for MergeMap<K, V> {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl<K, V> MergeMap<K, V> {
104    /// Construct.
105    #[must_use]
106    pub const fn new() -> Self {
107        Self(PhantomData)
108    }
109}
110
111impl<K, V> Reducer<HashMap<K, V>> for MergeMap<K, V>
112where
113    K: Eq + Hash + Send + Sync + 'static,
114    V: Send + Sync + 'static,
115{
116    fn reduce(&self, mut current: HashMap<K, V>, update: HashMap<K, V>) -> HashMap<K, V> {
117        for (k, v) in update {
118            current.insert(k, v);
119        }
120        current
121    }
122}
123
124/// Keep the larger of `current` / `update` per `T: Ord`. Useful for
125/// "highest score wins" reducers and `usize` step counters merged
126/// across parallel branches.
127#[derive(Clone, Copy, Debug)]
128pub struct Max<T>(PhantomData<fn() -> T>);
129
130impl<T> Default for Max<T> {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136impl<T> Max<T> {
137    /// Construct.
138    #[must_use]
139    pub const fn new() -> Self {
140        Self(PhantomData)
141    }
142}
143
144impl<T> Reducer<T> for Max<T>
145where
146    T: Ord + Send + Sync + 'static,
147{
148    fn reduce(&self, current: T, update: T) -> T {
149        std::cmp::max(current, update)
150    }
151}
152
153/// Newtype that bundles a value `T` with the reducer `R` that should
154/// merge it. The wrapper is a standalone helper users compose into
155/// their state types; it does not hook into `StateGraph::add_node`
156/// directly.
157#[derive(Clone, Debug)]
158pub struct Annotated<T, R>
159where
160    R: Reducer<T>,
161{
162    /// The current value.
163    pub value: T,
164    reducer: R,
165}
166
167impl<T, R> Default for Annotated<T, R>
168where
169    T: Default,
170    R: Reducer<T> + Default,
171{
172    fn default() -> Self {
173        Self {
174            value: T::default(),
175            reducer: R::default(),
176        }
177    }
178}
179
180impl<T, R> Annotated<T, R>
181where
182    R: Reducer<T>,
183{
184    /// Wrap `value` with the supplied reducer.
185    pub const fn new(value: T, reducer: R) -> Self {
186        Self { value, reducer }
187    }
188
189    /// Borrow the inner reducer.
190    pub const fn reducer(&self) -> &R {
191        &self.reducer
192    }
193
194    /// Consume the wrapper and return the inner value.
195    pub fn into_value(self) -> T {
196        self.value
197    }
198
199    /// Apply the bundled reducer to fold `update` into `self.value`.
200    pub fn reduce_in_place(&mut self, update: T)
201    where
202        T: Default,
203    {
204        let current = std::mem::take(&mut self.value);
205        self.value = self.reducer.reduce(current, update);
206    }
207
208    /// Consume `self`, return a new `Annotated<T, R>` with `update`
209    /// folded in.
210    #[must_use]
211    pub fn reduced(self, update: T) -> Self
212    where
213        R: Clone,
214    {
215        let merged = self.reducer.reduce(self.value, update);
216        Self {
217            value: merged,
218            reducer: self.reducer,
219        }
220    }
221
222    /// Merge two annotated values. Both sides must agree on the
223    /// reducer type by construction (they share `R`); the resulting
224    /// `Annotated` keeps `self`'s reducer instance — this matters
225    /// for stateful reducers, where the *current* slot's reducer
226    /// has the right configuration. Stock reducers
227    /// ([`Replace`], [`Append`], [`MergeMap`], [`Max`]) are unit
228    /// structs, so the choice is moot for them.
229    ///
230    /// This is the building block the `StateMerge` derive macro
231    /// emits per `Annotated<T, R>` field.
232    #[must_use]
233    pub fn merge(self, other: Self) -> Self {
234        let merged = self.reducer.reduce(self.value, other.value);
235        Self {
236            value: merged,
237            reducer: self.reducer,
238        }
239    }
240}
241
242/// State-level merge: how an incoming update folds into the current
243/// state. The dispatch-loop counterpart to [`Reducer<T>`], one level
244/// up — implemented on the whole `S` shape rather than a single
245/// slot.
246///
247/// Two merge axes:
248///
249/// - [`Self::merge`] folds two same-shape `S` values (used by
250///   parallel-branch joins via `add_send_edges`, where two
251///   branches each produce a complete `S` and the dispatcher needs
252///   to combine them).
253/// - [`Self::merge_contribution`] folds an `Option`-wrapped
254///   contribution from one node into the current state. The
255///   `Contribution` type — declared via the `type Contribution`
256///   associated item — names which slots the node *actually
257///   wrote*, distinguishing "no contribution" from "contributed
258///   the default value". This is the canonical
259///   [`StateGraph::add_contributing_node`](crate::StateGraph::add_contributing_node)
260///   entry point — closer to LangGraph's TypedDict
261///   partial-return shape than the same-shape merge alone could
262///   express.
263///
264/// The companion `entelix-graph-derive::StateMerge` derive macro
265/// generates both methods plus the `<Name>Contribution` companion
266/// struct. Manual impls are supported when field-by-field shape
267/// doesn't fit (e.g. cross-field invariants enforced at merge time).
268pub trait StateMerge: Sized {
269    /// Companion type carrying an `Option`-wrapped slot per field
270    /// of `Self`. The derive macro generates this struct
271    /// automatically; manual implementors define their own.
272    type Contribution: Default + Send + Sync + 'static;
273
274    /// Fold `update` into `self` and return the merged state.
275    /// Implementations must be deterministic for the same reason
276    /// [`Reducer::reduce`] is — the merge runs inside the dispatch
277    /// loop and a non-deterministic implementation breaks
278    /// crash-resume reproducibility.
279    #[must_use]
280    fn merge(self, update: Self) -> Self;
281
282    /// Fold a [`Self::Contribution`] (an `Option`-wrapped partial
283    /// state) into `self`. Slots the node didn't write
284    /// (`None`) leave the current value untouched; slots it did
285    /// (`Some`) merge through the per-field reducer for
286    /// [`Annotated<T, R>`] fields, or replace for plain fields.
287    #[must_use]
288    fn merge_contribution(self, contribution: Self::Contribution) -> Self;
289}
290
291#[cfg(test)]
292#[allow(clippy::unwrap_used)]
293mod tests {
294    use super::*;
295
296    #[test]
297    fn replace_returns_update() {
298        let r = Replace;
299        assert_eq!(r.reduce(1u32, 7), 7);
300    }
301
302    #[test]
303    fn append_concatenates_vecs() {
304        let r = Append::<u32>::new();
305        assert_eq!(r.reduce(vec![1, 2], vec![3, 4]), vec![1, 2, 3, 4]);
306    }
307
308    #[test]
309    fn append_handles_empty_inputs() {
310        let r = Append::<u32>::new();
311        assert_eq!(r.reduce(Vec::new(), vec![1]), vec![1]);
312        assert_eq!(r.reduce(vec![1], Vec::new()), vec![1]);
313        assert!(r.reduce(Vec::<u32>::new(), Vec::new()).is_empty());
314    }
315
316    #[test]
317    fn merge_map_is_right_biased() {
318        let r = MergeMap::<&str, i32>::new();
319        let mut current = HashMap::new();
320        current.insert("a", 1);
321        current.insert("b", 2);
322        let mut update = HashMap::new();
323        update.insert("b", 20);
324        update.insert("c", 3);
325        let merged = r.reduce(current, update);
326        assert_eq!(merged.get("a"), Some(&1));
327        assert_eq!(merged.get("b"), Some(&20)); // right wins on collision
328        assert_eq!(merged.get("c"), Some(&3));
329    }
330
331    #[test]
332    fn max_keeps_larger() {
333        let r = Max::<i32>::new();
334        assert_eq!(r.reduce(3, 5), 5);
335        assert_eq!(r.reduce(10, 5), 10);
336        assert_eq!(r.reduce(-1, -3), -1);
337    }
338
339    #[test]
340    fn annotated_reduced_returns_merged() {
341        let a = Annotated::new(vec![1, 2], Append::<u32>::new());
342        let b = a.reduced(vec![3]);
343        assert_eq!(b.value, vec![1, 2, 3]);
344    }
345
346    #[test]
347    fn annotated_reduce_in_place_updates_value() {
348        let mut a = Annotated::new(vec![1, 2], Append::<u32>::new());
349        a.reduce_in_place(vec![3, 4]);
350        assert_eq!(a.value, vec![1, 2, 3, 4]);
351    }
352
353    #[test]
354    fn annotated_into_value_unwraps() {
355        let a = Annotated::new(42_i32, Replace);
356        assert_eq!(a.into_value(), 42);
357    }
358
359    #[test]
360    fn annotated_merge_combines_two_annotated_values() {
361        let left = Annotated::new(vec![1u32, 2], Append::<u32>::new());
362        let right = Annotated::new(vec![3u32, 4], Append::<u32>::new());
363        let merged = left.merge(right);
364        assert_eq!(merged.value, vec![1, 2, 3, 4]);
365    }
366
367    #[test]
368    fn annotated_merge_respects_reducer_kind() {
369        let left = Annotated::new(7_i32, Max::<i32>::new());
370        let right = Annotated::new(4_i32, Max::<i32>::new());
371        // Max-reducer keeps the larger of the two regardless of order.
372        assert_eq!(left.merge(right).value, 7);
373    }
374
375    #[test]
376    fn state_merge_can_be_implemented_manually() {
377        // Hand-rolled impl confirms the trait surface is usable
378        // without the derive macro — useful when a state type has
379        // cross-field invariants that need merge-time enforcement.
380        struct WithInvariant {
381            log: Annotated<Vec<u32>, Append<u32>>,
382            tag: String,
383        }
384        #[derive(Default)]
385        struct WithInvariantContribution {
386            log: Option<Annotated<Vec<u32>, Append<u32>>>,
387            tag: Option<String>,
388        }
389        impl StateMerge for WithInvariant {
390            type Contribution = WithInvariantContribution;
391            fn merge(self, update: Self) -> Self {
392                Self {
393                    log: self.log.merge(update.log),
394                    tag: update.tag,
395                }
396            }
397            fn merge_contribution(self, c: Self::Contribution) -> Self {
398                Self {
399                    log: match c.log {
400                        Some(v) => self.log.merge(v),
401                        None => self.log,
402                    },
403                    tag: c.tag.unwrap_or(self.tag),
404                }
405            }
406        }
407        let merged = WithInvariant {
408            log: Annotated::new(vec![1, 2], Append::new()),
409            tag: "old".into(),
410        }
411        .merge(WithInvariant {
412            log: Annotated::new(vec![3], Append::new()),
413            tag: "new".into(),
414        });
415        assert_eq!(merged.log.value, vec![1, 2, 3]);
416        assert_eq!(merged.tag, "new");
417
418        // Contribution path: `tag` slot left None should keep the
419        // current `tag`, `log` slot Some should merge through the
420        // per-field reducer.
421        let merged2 = WithInvariant {
422            log: Annotated::new(vec![10], Append::new()),
423            tag: "keep".into(),
424        }
425        .merge_contribution(WithInvariantContribution {
426            log: Some(Annotated::new(vec![20], Append::new())),
427            tag: None,
428        });
429        assert_eq!(merged2.log.value, vec![10, 20]);
430        assert_eq!(merged2.tag, "keep");
431    }
432
433    #[test]
434    fn reducer_object_is_dyn_safe() {
435        // If this compiles, `Reducer<Vec<i32>>` is dyn-safe — useful
436        // for users who want to swap reducers at runtime.
437        let r: Box<dyn Reducer<Vec<i32>>> = Box::new(Append::<i32>::new());
438        assert_eq!(r.reduce(vec![1], vec![2]), vec![1, 2]);
439    }
440}