Skip to main content

juncture_core/state/
channel.rs

1//! Channel trait and channel types for state field access with checkpoint support
2//!
3//! A Channel wraps a value with specific update and checkpoint semantics.
4//! Different channel types control how values are updated, persisted, and consumed.
5
6use serde::de::DeserializeOwned;
7use serde::ser::SerializeStruct;
8use std::collections::HashSet;
9
10/// Reducer trait defining merge semantics for state fields
11///
12/// Each field in a State can have its own reducer, defining how multiple
13/// writes in the same superstep are combined.
14pub trait Reducer<T> {
15    /// Merge a single value (fast path avoiding Vec allocation)
16    fn reduce_one(current: &mut T, value: T) {
17        Self::reduce(current, vec![value]);
18    }
19
20    /// Merge multiple values into current
21    ///
22    /// Values are provided in the order tasks completed (not task spawn order).
23    /// For deterministic results, use associative reducers like `AppendReducer`.
24    fn reduce(current: &mut T, values: Vec<T>);
25}
26
27/// Replace reducer: only one writer per superstep (default)
28///
29/// Equivalent to `LangGraph`'s `LastValue` channel.
30/// Panics if multiple nodes write to the same field in one superstep.
31#[derive(Debug)]
32pub struct ReplaceReducer;
33
34impl<T> Reducer<T> for ReplaceReducer {
35    fn reduce(current: &mut T, values: Vec<T>) {
36        assert!(
37            values.len() <= 1,
38            "Replace reducer: multiple writes in same superstep"
39        );
40        if let Some(v) = values.into_iter().next() {
41            *current = v;
42        }
43    }
44}
45
46/// Append reducer: accumulate all writes
47///
48/// Equivalent to `LangGraph`'s `BinaryOperatorAggregate` with operator.add.
49/// All writes are extended in order.
50#[derive(Debug)]
51pub struct AppendReducer;
52
53impl<T> Reducer<Vec<T>> for AppendReducer {
54    fn reduce_one(current: &mut Vec<T>, value: Vec<T>) {
55        current.extend(value);
56    }
57
58    fn reduce(current: &mut Vec<T>, values: Vec<Vec<T>>) {
59        for v in values {
60            current.extend(v);
61        }
62    }
63}
64
65/// `AnyValue` reducer: assumes all values are equal
66///
67/// Similar to `LastValue`, but semantically assumes all writers provide
68/// the same value. Uses the last value if they differ.
69#[derive(Debug)]
70pub struct AnyValueReducer;
71
72impl<T: PartialEq + Clone> Reducer<T> for AnyValueReducer {
73    fn reduce(current: &mut T, values: Vec<T>) {
74        if let Some(last) = values.last() {
75            // Semantic check: all values should be equal
76            if let Some(first) = values.first() {
77                debug_assert!(
78                    values.iter().all(|v| v == first),
79                    "AnyValue reducer: all values should be equal"
80                );
81            }
82            *current = last.clone();
83        }
84    }
85}
86
87/// `LastWriteWins` reducer: allows multiple writers, last one wins
88///
89/// Similar to `ReplaceReducer`, but doesn't panic on multiple writes.
90#[derive(Debug)]
91pub struct LastWriteWinsReducer;
92
93impl<T> Reducer<T> for LastWriteWinsReducer {
94    fn reduce(current: &mut T, values: Vec<T>) {
95        if let Some(v) = values.into_iter().last() {
96            *current = v;
97        }
98    }
99}
100
101/// Bypass reducer: overwrite value directly, bypassing normal merge
102///
103/// When `Overwrite<T>` is used in an update, it bypasses the field's reducer
104/// and directly replaces the value. Custom serde uses `{"__overwrite__": value}`
105/// wire format for `LangGraph` checkpoint compatibility.
106pub struct Overwrite<T>(pub T);
107
108impl<T: std::fmt::Debug> std::fmt::Debug for Overwrite<T> {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        f.debug_tuple("Overwrite").field(&self.0).finish()
111    }
112}
113
114impl<T> Overwrite<T> {
115    /// Get a reference to the inner value
116    #[must_use]
117    pub const fn get(&self) -> &T {
118        &self.0
119    }
120
121    /// Convert into the inner value
122    #[must_use]
123    pub fn into_inner(self) -> T {
124        self.0
125    }
126
127    /// Create a new Overwrite wrapper
128    #[must_use]
129    pub const fn new(value: T) -> Self {
130        Self(value)
131    }
132}
133
134impl<T: serde::Serialize> serde::Serialize for Overwrite<T> {
135    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
136        let mut s = serializer.serialize_struct("__overwrite__", 1)?;
137        s.serialize_field("__overwrite__", &self.0)?;
138        s.end()
139    }
140}
141
142impl<'de, T: serde::Deserialize<'de>> serde::Deserialize<'de> for Overwrite<T> {
143    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
144        #[derive(serde::Deserialize)]
145        struct Wrapper<T> {
146            __overwrite__: T,
147        }
148        let wrapper = Wrapper::deserialize(deserializer)?;
149        Ok(Self(wrapper.__overwrite__))
150    }
151}
152
153/// Named barrier channel: waits for all registered named sources to write
154///
155/// This channel implements barrier/wait-all semantics for parallel workflows.
156/// The value is only available after ALL required named sources have written
157/// to it. Each source must provide a unique name for tracking.
158///
159/// # Type Parameters
160///
161/// * `T` - The value type stored in the channel
162/// * `R` - The reducer type that defines how multiple writes are merged
163///
164/// # Examples
165///
166/// ```
167/// use juncture_core::state::channel::{NamedBarrierChannel, ReplaceReducer};
168///
169/// let mut channel: NamedBarrierChannel<i32, ReplaceReducer> =
170///     NamedBarrierChannel::new_with_sources(0, ["node_a", "node_b"].into_iter().map(String::from));
171///
172/// // Initially not available
173/// assert!(!channel.is_available());
174///
175/// // After first write, still not available
176/// channel.update("node_a".to_string(), vec![42]).expect("update should succeed");
177/// assert!(!channel.is_available());
178///
179/// // After second write, becomes available
180/// channel.update("node_b".to_string(), vec![100]).expect("update should succeed");
181/// assert!(channel.is_available());
182/// assert_eq!(*channel.get(), 100); // Last write wins
183/// ```
184#[derive(Debug)]
185pub struct NamedBarrierChannel<T, R: Reducer<T>> {
186    value: T,
187    required_sources: HashSet<String>,
188    seen_sources: HashSet<String>,
189    _reducer: std::marker::PhantomData<R>,
190}
191
192impl<T, R: Reducer<T>> NamedBarrierChannel<T, R> {
193    /// Create a new named barrier channel with the given initial value and required sources
194    ///
195    /// # Arguments
196    ///
197    /// * `value` - The initial value for the channel
198    /// * `required_sources` - Iterator of source names that must all write before the barrier completes
199    #[must_use]
200    pub fn new_with_sources(value: T, required_sources: impl IntoIterator<Item = String>) -> Self {
201        let sources: HashSet<String> = required_sources.into_iter().collect();
202        Self {
203            value,
204            required_sources: sources,
205            seen_sources: HashSet::new(),
206            _reducer: std::marker::PhantomData,
207        }
208    }
209
210    /// Create a new named barrier channel with no required sources
211    ///
212    /// This channel will be immediately available. Use this when you plan to
213    /// add required sources later or when the barrier should always be complete.
214    #[must_use]
215    pub fn new(value: T) -> Self {
216        Self {
217            value,
218            required_sources: HashSet::new(),
219            seen_sources: HashSet::new(),
220            _reducer: std::marker::PhantomData,
221        }
222    }
223
224    /// Add a required source to the barrier
225    ///
226    /// If the source has already written, this will immediately mark it as seen.
227    pub fn add_required_source(&mut self, source: String) {
228        self.required_sources.insert(source);
229    }
230
231    /// Check if all required sources have written
232    ///
233    /// Returns `true` only when ALL required sources have written to this channel.
234    #[must_use]
235    pub fn is_available(&self) -> bool {
236        if self.required_sources.is_empty() {
237            return true;
238        }
239        self.required_sources
240            .iter()
241            .all(|source| self.seen_sources.contains(source))
242    }
243
244    /// Get the set of required source names
245    #[must_use]
246    pub const fn required_sources(&self) -> &HashSet<String> {
247        &self.required_sources
248    }
249
250    /// Get the set of source names that have written so far
251    #[must_use]
252    pub const fn seen_sources(&self) -> &HashSet<String> {
253        &self.seen_sources
254    }
255
256    /// Check if a specific source has written
257    #[must_use]
258    pub fn has_written(&self, source: &str) -> bool {
259        self.seen_sources.contains(source)
260    }
261
262    /// Reset the barrier, clearing seen sources while keeping required sources
263    ///
264    /// This is useful for reusing the barrier across multiple supersteps.
265    pub fn reset(&mut self) {
266        self.seen_sources.clear();
267    }
268}
269
270impl<T, R> Channel<T> for NamedBarrierChannel<T, R>
271where
272    T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
273    R: Reducer<T> + Send + Sync + 'static,
274{
275    fn update(&mut self, values: Vec<T>) -> bool {
276        // Channel trait update doesn't support named sources.
277        // When using the generic Channel trait, we apply all values directly
278        // to the channel. This is useful when the caller doesn't care about
279        // named barrier tracking and just wants to update the value.
280        if values.is_empty() {
281            return false;
282        }
283        // Apply the reducer to merge all values
284        R::reduce(&mut self.value, values);
285        // Mark all required sources as seen since we received an update
286        self.seen_sources = self.required_sources.clone();
287        true
288    }
289
290    fn get(&self) -> &T {
291        &self.value
292    }
293
294    fn consume(&mut self) -> bool {
295        false
296    }
297
298    fn checkpoint(&self) -> Option<serde_json::Value> {
299        // Persist the value and seen sources
300        serde_json::to_value(&(self.value.clone(), self.seen_sources.clone())).ok()
301    }
302
303    fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
304    where
305        Self: Sized,
306    {
307        let (parsed_value, seen_sources): (T, HashSet<String>) = serde_json::from_value(value)
308            .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
309        Ok(Self {
310            value: parsed_value,
311            required_sources: HashSet::new(),
312            seen_sources,
313            _reducer: std::marker::PhantomData,
314        })
315    }
316}
317
318impl<T, R: Reducer<T>> NamedBarrierChannel<T, R> {
319    /// Update from a named source
320    ///
321    /// This is the primary method for `NamedBarrierChannel`, allowing named sources
322    /// to write to the channel. The barrier completes only after all required sources
323    /// have written.
324    ///
325    /// # Panics
326    ///
327    /// Panics if `source_name` is not in the set of required sources (when sources are configured).
328    pub fn update(&mut self, source_name: String, values: Vec<T>) -> bool {
329        assert!(
330            self.required_sources.is_empty() || self.required_sources.contains(&source_name),
331            "NamedBarrierChannel: source '{source_name}' not in required sources"
332        );
333
334        if values.is_empty() {
335            return false;
336        }
337
338        R::reduce(&mut self.value, values);
339        self.seen_sources.insert(source_name);
340        true
341    }
342}
343
344/// Topic channel: accumulates all published values into a list
345///
346/// This channel implements pub/sub messaging patterns where all writes
347/// are accumulated into a list. Each value is appended independently,
348/// allowing multiple publishers to send messages to the same topic.
349///
350/// # Type Parameters
351///
352/// * `T` - The message type stored in the topic
353///
354/// # Examples
355///
356/// ```
357/// use juncture_core::state::channel::TopicChannel;
358///
359/// let mut channel: TopicChannel<String> = TopicChannel::new();
360///
361/// // Publish messages
362/// channel.update(vec!["hello".to_string()]);
363/// channel.update(vec!["world".to_string()]);
364///
365/// // Get all accumulated messages
366/// let messages = channel.get();
367/// assert_eq!(messages.len(), 2);
368/// assert_eq!(messages[0], "hello");
369/// assert_eq!(messages[1], "world");
370///
371/// // Reset for next superstep
372/// channel.reset();
373/// assert!(messages.is_empty());
374/// ```
375#[derive(Debug, Clone)]
376pub struct TopicChannel<T> {
377    messages: Vec<T>,
378}
379
380impl<T> TopicChannel<T> {
381    /// Create a new empty topic channel
382    #[must_use]
383    pub const fn new() -> Self {
384        Self {
385            messages: Vec::new(),
386        }
387    }
388
389    /// Get the number of messages in the topic
390    #[must_use]
391    pub const fn len(&self) -> usize {
392        self.messages.len()
393    }
394
395    /// Check if the topic is empty
396    #[must_use]
397    pub const fn is_empty(&self) -> bool {
398        self.messages.is_empty()
399    }
400
401    /// Reset the topic, clearing all messages
402    ///
403    /// This is typically called at the start of each superstep to clear
404    /// ephemeral message accumulations.
405    pub fn reset(&mut self) {
406        self.messages.clear();
407    }
408
409    /// Get an iterator over the messages
410    pub fn iter(&self) -> std::slice::Iter<'_, T> {
411        self.messages.iter()
412    }
413}
414
415impl<T> Default for TopicChannel<T> {
416    fn default() -> Self {
417        Self::new()
418    }
419}
420
421impl<'a, T> IntoIterator for &'a TopicChannel<T> {
422    type Item = &'a T;
423    type IntoIter = std::slice::Iter<'a, T>;
424
425    fn into_iter(self) -> Self::IntoIter {
426        self.iter()
427    }
428}
429
430impl<T> Channel<Vec<T>> for TopicChannel<T>
431where
432    T: Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
433{
434    fn update(&mut self, values: Vec<Vec<T>>) -> bool {
435        if values.is_empty() {
436            return false;
437        }
438        // Extend messages with all new values (flatten the vec of vecs)
439        for batch in values {
440            self.messages.extend(batch);
441        }
442        true
443    }
444
445    fn get(&self) -> &Vec<T> {
446        &self.messages
447    }
448
449    fn consume(&mut self) -> bool {
450        let was_empty = self.messages.is_empty();
451        self.messages.clear();
452        !was_empty
453    }
454
455    fn checkpoint(&self) -> Option<serde_json::Value> {
456        serde_json::to_value(&self.messages).ok()
457    }
458
459    fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
460    where
461        Self: Sized,
462    {
463        let messages: Vec<T> = serde_json::from_value(value)
464            .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
465        Ok(Self { messages })
466    }
467}
468
469/// Channel trait for state field access with checkpoint support
470///
471/// A Channel wraps a value with specific update and checkpoint semantics.
472/// Different channel types control how values are updated, persisted, and consumed.
473pub trait Channel<T>: Send + Sync + 'static {
474    /// Update the channel with new values. Returns true if the value changed.
475    fn update(&mut self, values: Vec<T>) -> bool;
476
477    /// Get the current value
478    fn get(&self) -> &T;
479
480    /// Check if the channel has been consumed (for trigger-based activation)
481    fn consume(&mut self) -> bool;
482
483    /// Create a checkpoint of the current value for persistence
484    fn checkpoint(&self) -> Option<serde_json::Value>;
485
486    /// Restore from a checkpoint value
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if the checkpoint value cannot be deserialized into
491    /// the channel's value type.
492    fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
493    where
494        Self: Sized;
495}
496
497/// Untracked channel: value is not persisted across checkpoints
498///
499/// Wraps a value with a reducer. Checkpoints return `None` so the value
500/// is never persisted. This is useful for transient computation state
501/// that should not survive a restart.
502#[derive(Debug)]
503pub struct UntrackedChannel<T, R: Reducer<T>> {
504    value: T,
505    _reducer: std::marker::PhantomData<R>,
506}
507
508impl<T, R: Reducer<T>> UntrackedChannel<T, R> {
509    /// Create a new untracked channel with the given initial value
510    #[must_use]
511    pub const fn new(value: T) -> Self {
512        Self {
513            value,
514            _reducer: std::marker::PhantomData,
515        }
516    }
517}
518
519impl<T: Default + Send + Sync + 'static, R: Reducer<T> + Send + Sync + 'static> Channel<T>
520    for UntrackedChannel<T, R>
521{
522    fn update(&mut self, values: Vec<T>) -> bool {
523        if values.is_empty() {
524            return false;
525        }
526        R::reduce(&mut self.value, values);
527        true
528    }
529
530    fn get(&self) -> &T {
531        &self.value
532    }
533
534    fn consume(&mut self) -> bool {
535        false
536    }
537
538    fn checkpoint(&self) -> Option<serde_json::Value> {
539        None
540    }
541
542    fn from_checkpoint(_value: serde_json::Value) -> Result<Self, String> {
543        Ok(Self::new(T::default()))
544    }
545}
546
547/// Ephemeral channel: value is cleared at the start of each superstep
548///
549/// Has a `consumed` flag set by `consume()`. The value resets between
550/// supersteps and is never persisted.
551#[derive(Debug)]
552pub struct EphemeralChannel<T, R: Reducer<T>> {
553    value: T,
554    consumed: bool,
555    _reducer: std::marker::PhantomData<R>,
556}
557
558impl<T, R: Reducer<T>> EphemeralChannel<T, R> {
559    /// Create a new ephemeral channel with the given initial value
560    #[must_use]
561    pub const fn new(value: T) -> Self {
562        Self {
563            value,
564            consumed: false,
565            _reducer: std::marker::PhantomData,
566        }
567    }
568}
569
570impl<T: Default + Send + Sync + 'static, R: Reducer<T> + Send + Sync + 'static> Channel<T>
571    for EphemeralChannel<T, R>
572{
573    fn update(&mut self, values: Vec<T>) -> bool {
574        if values.is_empty() {
575            return false;
576        }
577        self.consumed = false;
578        R::reduce(&mut self.value, values);
579        true
580    }
581
582    fn get(&self) -> &T {
583        &self.value
584    }
585
586    fn consume(&mut self) -> bool {
587        let was_consumed = self.consumed;
588        self.consumed = true;
589        was_consumed
590    }
591
592    fn checkpoint(&self) -> Option<serde_json::Value> {
593        None
594    }
595
596    fn from_checkpoint(_value: serde_json::Value) -> Result<Self, String> {
597        Ok(Self::new(T::default()))
598    }
599}
600
601/// Last-value-after-finish channel: value only available after `finish()` is called
602///
603/// Before `finish()`, `get()` returns the default value. After `finish()`,
604/// the written value becomes available. Checkpoints persist only if finished.
605#[derive(Debug)]
606pub struct LastValueAfterFinishChannel<T, R: Reducer<T>> {
607    value: T,
608    finished_value: Option<T>,
609    is_finished: bool,
610    _reducer: std::marker::PhantomData<R>,
611}
612
613impl<T, R: Reducer<T>> LastValueAfterFinishChannel<T, R> {
614    /// Create a new channel with the given default value
615    #[must_use]
616    pub const fn new(value: T) -> Self {
617        Self {
618            value,
619            finished_value: None,
620            is_finished: false,
621            _reducer: std::marker::PhantomData,
622        }
623    }
624
625    /// Mark the channel as finished, making the value available
626    pub const fn finish(&mut self) {
627        self.is_finished = true;
628    }
629
630    /// Check if the channel has been finished and the value is available
631    #[must_use]
632    pub const fn is_available(&self) -> bool {
633        self.is_finished
634    }
635}
636
637impl<T, R> Channel<T> for LastValueAfterFinishChannel<T, R>
638where
639    T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
640    R: Reducer<T> + Send + Sync + 'static,
641{
642    fn update(&mut self, values: Vec<T>) -> bool {
643        if values.is_empty() {
644            return false;
645        }
646        R::reduce(&mut self.value, values);
647        if self.is_finished {
648            self.finished_value = Some(self.value.clone());
649        }
650        true
651    }
652
653    fn get(&self) -> &T {
654        if self.is_finished {
655            self.finished_value.as_ref().unwrap_or(&self.value)
656        } else {
657            &self.value
658        }
659    }
660
661    fn consume(&mut self) -> bool {
662        false
663    }
664
665    fn checkpoint(&self) -> Option<serde_json::Value> {
666        // Only checkpoint if finished (preserves original semantic)
667        // Save both value and is_finished state for complete restoration
668        if self.is_finished {
669            serde_json::to_value(&(self.value.clone(), self.is_finished)).ok()
670        } else {
671            None
672        }
673    }
674
675    fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
676        // Try to parse as (value, is_finished) tuple first (new format)
677        if let Ok((parsed_value, is_finished)) = serde_json::from_value::<(T, bool)>(value.clone())
678        {
679            let finished_value = is_finished.then(|| parsed_value.clone());
680            return Ok(Self {
681                value: parsed_value,
682                finished_value,
683                is_finished,
684                _reducer: std::marker::PhantomData,
685            });
686        }
687
688        // Fallback: try parsing as value only (old format for backward compatibility)
689        let parsed_value: T = serde_json::from_value(value)
690            .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
691        Ok(Self {
692            value: parsed_value,
693            finished_value: None,
694            is_finished: false,
695            _reducer: std::marker::PhantomData,
696        })
697    }
698}
699
700/// Delta channel: append-heavy optimization with periodic snapshots
701///
702/// Tracks updates since the last snapshot and can replay writes for
703/// restoring from a delta-based checkpoint. The `snapshot_frequency`
704/// controls how often a full snapshot is taken instead of just recording
705/// the delta.
706#[derive(Debug)]
707pub struct DeltaChannel<T, R: Reducer<T>> {
708    value: T,
709    /// How many updates between full snapshots (minimum 1)
710    snapshot_frequency: usize,
711    update_count_since_snapshot: usize,
712    _reducer: std::marker::PhantomData<R>,
713}
714
715impl<T, R: Reducer<T>> DeltaChannel<T, R> {
716    /// Create a new delta channel with the given initial value and snapshot frequency
717    ///
718    /// The snapshot frequency is clamped to a minimum of 1.
719    #[must_use]
720    pub fn new(value: T, snapshot_frequency: usize) -> Self {
721        Self {
722            value,
723            snapshot_frequency: snapshot_frequency.max(1),
724            update_count_since_snapshot: 0,
725            _reducer: std::marker::PhantomData,
726        }
727    }
728
729    /// Replay a sequence of writes to restore state from a checkpoint
730    ///
731    /// During checkpoint recovery, finds the last `Overwrite<T>` in the sequence
732    /// and uses it as the baseline, then applies only the writes after it via
733    /// the reducer. This implements the design specification for ancestor replay.
734    pub fn replay_writes(&mut self, values: &[T])
735    where
736        T: Clone + serde::Serialize + DeserializeOwned,
737    {
738        if values.is_empty() {
739            return;
740        }
741
742        // Find last Overwrite as baseline, only replay writes after it
743        // This implements the design spec: "Find the last Overwrite as baseline,
744        // only replay writes after it"
745        let mut base = self.value.clone();
746        let mut start_idx = 0;
747
748        // Try to detect Overwrite wrappers in the sequence
749        // Since we have &[T] not &[Overwrite<T>], we need to check
750        // if any values were deserialized from Overwrite format
751        for (i, v) in values.iter().enumerate() {
752            // Check if this value is an Overwrite by attempting to detect
753            // the special wire format. Since values are already deserialized,
754            // we check if the JSON representation has __overwrite__ key
755            if let Ok(json) = serde_json::to_value(v)
756                && let Some(obj) = json.as_object()
757                && obj.contains_key("__overwrite__")
758            {
759                // This is an Overwrite<T> value
760                if let Ok(inner) = serde_json::from_value::<T>(
761                    obj.get("__overwrite__").cloned().unwrap_or_default(),
762                ) {
763                    base = inner;
764                    start_idx = i + 1;
765                }
766            }
767        }
768
769        // Apply remaining writes to baseline
770        let remaining: Vec<T> = values[start_idx..].to_vec();
771        if !remaining.is_empty() {
772            R::reduce(&mut base, remaining);
773        }
774        self.value = base;
775        self.update_count_since_snapshot = 0;
776    }
777
778    /// Check if a snapshot is due based on the update count
779    #[must_use]
780    pub const fn should_snapshot(&self) -> bool {
781        self.update_count_since_snapshot >= self.snapshot_frequency
782    }
783
784    /// Mark the channel as finished, forcing a snapshot on the next checkpoint call
785    ///
786    /// This ensures the state is persisted when execution completes.
787    pub const fn finish(&mut self) {
788        self.update_count_since_snapshot = self.snapshot_frequency;
789    }
790}
791
792impl<T, R> Channel<T> for DeltaChannel<T, R>
793where
794    T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
795    R: Reducer<T> + Send + Sync + 'static,
796{
797    fn update(&mut self, values: Vec<T>) -> bool {
798        if values.is_empty() {
799            return false;
800        }
801        R::reduce(&mut self.value, values);
802        self.update_count_since_snapshot += 1;
803        true
804    }
805
806    fn get(&self) -> &T {
807        &self.value
808    }
809
810    fn consume(&mut self) -> bool {
811        false
812    }
813
814    fn checkpoint(&self) -> Option<serde_json::Value> {
815        serde_json::to_value(&self.value).ok()
816    }
817
818    fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
819        let value: T = serde_json::from_value(value)
820            .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
821        Ok(Self {
822            value,
823            snapshot_frequency: 10,
824            update_count_since_snapshot: 0,
825            _reducer: std::marker::PhantomData,
826        })
827    }
828}
829
830/// Delta blob for representing checkpoint state
831///
832/// A `DeltaBlob` represents the persisted state of a delta channel.
833/// `Missing` indicates no checkpoint data is available.
834/// `Snapshot` contains a full snapshot of the value.
835#[derive(Clone, Debug)]
836pub enum DeltaBlob<T>
837where
838    T: Clone + serde::Serialize + serde::de::DeserializeOwned,
839{
840    /// No checkpoint data available
841    Missing,
842    /// Full snapshot of the channel value
843    Snapshot(T),
844}
845
846/// Remove-message identifier for message deletion
847///
848/// Used to identify which message should be removed from the message list
849/// during state updates.
850#[derive(Clone, Debug)]
851pub struct RemoveMessage {
852    /// ID of the message to remove
853    pub id: String,
854}
855
856#[cfg(test)]
857mod tests {
858    use super::*;
859
860    #[test]
861    fn untracked_channel_update_returns_true_on_change() {
862        let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(0);
863        assert!(!ch.update(vec![]));
864        assert!(ch.update(vec![42]));
865        assert_eq!(*ch.get(), 42);
866    }
867
868    #[test]
869    fn untracked_channel_consume_always_false() {
870        let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(1);
871        assert!(!ch.consume());
872    }
873
874    #[test]
875    fn untracked_channel_checkpoint_is_none() {
876        let ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(5);
877        assert!(ch.checkpoint().is_none());
878    }
879
880    #[test]
881    fn untracked_channel_from_checkpoint_uses_default() {
882        let ch: UntrackedChannel<i32, ReplaceReducer> =
883            UntrackedChannel::from_checkpoint(serde_json::json!(99)).expect("should succeed");
884        assert_eq!(*ch.get(), 0);
885    }
886
887    #[test]
888    fn ephemeral_channel_consume_tracks_state() {
889        let mut ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(0);
890        assert!(!ch.consume()); // first consume returns false (was not consumed)
891        assert!(ch.consume()); // second consume returns true (was consumed)
892    }
893
894    #[test]
895    fn ephemeral_channel_update_resets_consumed() {
896        let mut ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(0);
897        assert!(!ch.consume());
898        assert!(ch.update(vec![7]));
899        assert!(!ch.consume()); // consumed was reset by update
900    }
901
902    #[test]
903    fn ephemeral_channel_checkpoint_is_none() {
904        let ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(3);
905        assert!(ch.checkpoint().is_none());
906    }
907
908    #[test]
909    fn last_value_after_finish_channel_not_available_before_finish() {
910        let ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
911            LastValueAfterFinishChannel::new(0);
912        assert!(!ch.is_available());
913    }
914
915    #[test]
916    fn last_value_after_finish_channel_available_after_finish() {
917        let mut ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
918            LastValueAfterFinishChannel::new(0);
919        ch.finish();
920        assert!(ch.is_available());
921    }
922
923    #[test]
924    fn last_value_after_finish_channel_checkpoint_only_if_finished() {
925        let ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
926            LastValueAfterFinishChannel::new(5);
927        assert!(ch.checkpoint().is_none());
928
929        let mut ch2: LastValueAfterFinishChannel<i32, ReplaceReducer> =
930            LastValueAfterFinishChannel::new(5);
931        ch2.finish();
932        assert!(ch2.checkpoint().is_some());
933    }
934
935    #[test]
936    fn delta_channel_snapshot_frequency_clamped_to_one() {
937        let ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 0);
938        assert_eq!(ch.snapshot_frequency, 1);
939    }
940
941    #[test]
942    fn delta_channel_replay_writes_restores_state() {
943        let mut ch: DeltaChannel<Vec<i32>, AppendReducer> = DeltaChannel::new(vec![], 10);
944        ch.replay_writes(&[vec![1, 2], vec![3, 4]]);
945        assert_eq!(*ch.get(), vec![1, 2, 3, 4]);
946        assert_eq!(ch.update_count_since_snapshot, 0);
947    }
948
949    #[test]
950    fn delta_channel_checkpoint_returns_snapshot() {
951        let ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(42, 5);
952        let cp = ch.checkpoint().expect("should have checkpoint");
953        assert_eq!(cp, serde_json::json!(42));
954    }
955
956    #[test]
957    fn delta_channel_should_snapshot() {
958        let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 2);
959        assert!(!ch.should_snapshot());
960        ch.update(vec![1]);
961        assert!(!ch.should_snapshot());
962        ch.update(vec![2]);
963        assert!(ch.should_snapshot());
964    }
965
966    #[test]
967    fn delta_blob_missing_variant_exists() {
968        let blob: DeltaBlob<i32> = DeltaBlob::Missing;
969        assert!(matches!(blob, DeltaBlob::Missing));
970    }
971
972    #[test]
973    fn delta_blob_snapshot_holds_value() {
974        let blob: DeltaBlob<i32> = DeltaBlob::Snapshot(42);
975        assert!(matches!(blob, DeltaBlob::Snapshot(_)));
976    }
977
978    #[test]
979    fn delta_blob_clone() {
980        let blob: DeltaBlob<String> = DeltaBlob::Snapshot("hello".to_string());
981        let cloned = blob.clone();
982        if let DeltaBlob::Snapshot(v) = cloned {
983            assert_eq!(v, "hello");
984        } else {
985            panic!("expected Snapshot variant");
986        }
987        // Use original to prove clone created independent copy
988        if let DeltaBlob::Snapshot(v) = blob {
989            assert_eq!(v, "hello");
990        } else {
991            panic!("expected Snapshot variant");
992        }
993    }
994
995    #[test]
996    fn remove_message_holds_id() {
997        let rm = RemoveMessage {
998            id: "msg-123".to_string(),
999        };
1000        assert_eq!(rm.id, "msg-123");
1001    }
1002
1003    #[test]
1004    fn overwrite_serialize_round_trip() {
1005        let original = Overwrite(42);
1006        let json = serde_json::to_string(&original).expect("should serialize");
1007        assert_eq!(json, r#"{"__overwrite__":42}"#);
1008
1009        let deserialized: Overwrite<i32> = serde_json::from_str(&json).expect("should deserialize");
1010        assert_eq!(deserialized.0, 42);
1011    }
1012
1013    #[test]
1014    fn overwrite_serialize_complex_type() {
1015        let original = Overwrite(vec![1, 2, 3]);
1016        let json = serde_json::to_string(&original).expect("should serialize");
1017        assert_eq!(json, r#"{"__overwrite__":[1,2,3]}"#);
1018
1019        let deserialized: Overwrite<Vec<i32>> =
1020            serde_json::from_str(&json).expect("should deserialize");
1021        assert_eq!(deserialized.0, vec![1, 2, 3]);
1022    }
1023
1024    #[test]
1025    fn overwrite_debug_format() {
1026        let ov = Overwrite(42);
1027        let debug_str = format!("{ov:?}");
1028        assert_eq!(debug_str, "Overwrite(42)");
1029    }
1030
1031    #[test]
1032    fn replace_reducer_single_value_succeeds() {
1033        let mut val = 0;
1034        ReplaceReducer::reduce(&mut val, vec![42]);
1035        assert_eq!(val, 42);
1036    }
1037
1038    #[test]
1039    fn replace_reducer_empty_values_succeeds() {
1040        let mut val = 99;
1041        ReplaceReducer::reduce(&mut val, vec![]);
1042        assert_eq!(val, 99);
1043    }
1044
1045    #[test]
1046    #[should_panic(expected = "Replace reducer: multiple writes in same superstep")]
1047    fn replace_reducer_multiple_values_panics() {
1048        let mut val = 0;
1049        ReplaceReducer::reduce(&mut val, vec![1, 2]);
1050    }
1051
1052    #[test]
1053    #[should_panic(expected = "Replace reducer: multiple writes in same superstep")]
1054    fn untracked_channel_multiple_writes_panics() {
1055        let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(0);
1056        ch.update(vec![1, 2]);
1057    }
1058
1059    // NamedBarrierChannel tests
1060    #[test]
1061    fn named_barrier_channel_not_available_initially() {
1062        let ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new_with_sources(
1063            0,
1064            ["node_a", "node_b"].into_iter().map(String::from),
1065        );
1066        assert!(!ch.is_available());
1067    }
1068
1069    #[test]
1070    fn named_barrier_channel_available_after_all_sources_write() {
1071        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1072            NamedBarrierChannel::new_with_sources(
1073                0,
1074                ["node_a", "node_b"].into_iter().map(String::from),
1075            );
1076        assert!(!ch.is_available());
1077
1078        ch.update("node_a".to_string(), vec![42]);
1079        assert!(!ch.is_available());
1080
1081        ch.update("node_b".to_string(), vec![100]);
1082        assert!(ch.is_available());
1083        assert_eq!(*ch.get(), 100); // Last write wins with ReplaceReducer
1084    }
1085
1086    #[test]
1087    fn named_barrier_channel_empty_required_sources_is_available() {
1088        let ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new(42);
1089        assert!(ch.is_available());
1090    }
1091
1092    #[test]
1093    fn named_barrier_channel_has_written_tracks_sources() {
1094        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1095            NamedBarrierChannel::new_with_sources(
1096                0,
1097                ["node_a", "node_b", "node_c"].into_iter().map(String::from),
1098            );
1099
1100        assert!(!ch.has_written("node_a"));
1101        ch.update("node_a".to_string(), vec![1]);
1102        assert!(ch.has_written("node_a"));
1103        assert!(!ch.has_written("node_b"));
1104    }
1105
1106    #[test]
1107    fn named_barrier_channel_reset_clears_seen_sources() {
1108        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1109            NamedBarrierChannel::new_with_sources(
1110                0,
1111                ["node_a", "node_b"].into_iter().map(String::from),
1112            );
1113
1114        ch.update("node_a".to_string(), vec![1]);
1115        ch.update("node_b".to_string(), vec![2]);
1116        assert!(ch.is_available());
1117
1118        ch.reset();
1119        assert!(!ch.is_available());
1120        assert!(!ch.has_written("node_a"));
1121        assert!(!ch.has_written("node_b"));
1122    }
1123
1124    #[test]
1125    fn named_barrier_channel_add_required_source() {
1126        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new(0);
1127        assert!(ch.is_available());
1128
1129        ch.add_required_source("node_a".to_string());
1130        assert!(!ch.is_available());
1131
1132        ch.update("node_a".to_string(), vec![42]);
1133        assert!(ch.is_available());
1134    }
1135
1136    #[test]
1137    #[should_panic(expected = "NamedBarrierChannel: source")]
1138    fn named_barrier_channel_unknown_source_panics() {
1139        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1140            NamedBarrierChannel::new_with_sources(0, vec!["node_a".to_string()]);
1141
1142        ch.update("unknown_node".to_string(), vec![42]);
1143    }
1144
1145    #[test]
1146    fn named_barrier_channel_checkpoint_persists_state() {
1147        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1148            NamedBarrierChannel::new_with_sources(0, vec!["node_a".to_string()]);
1149
1150        ch.update("node_a".to_string(), vec![42]);
1151
1152        let checkpoint = ch.checkpoint().expect("should have checkpoint");
1153        // Checkpoint is a tuple (value, seen_sources)
1154        assert!(checkpoint.is_array() || checkpoint.is_object());
1155
1156        let restored: NamedBarrierChannel<i32, ReplaceReducer> =
1157            NamedBarrierChannel::from_checkpoint(checkpoint).expect("should restore");
1158        assert_eq!(*restored.get(), 42);
1159        assert!(restored.has_written("node_a"));
1160    }
1161
1162    #[test]
1163    fn named_barrier_channel_generic_update_marks_all_sources_seen() {
1164        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1165            NamedBarrierChannel::new_with_sources(0, ["node_a".to_string(), "node_b".to_string()]);
1166
1167        // Using the generic Channel trait update
1168        Channel::update(&mut ch, vec![42]);
1169        assert!(ch.is_available());
1170        assert!(ch.has_written("node_a"));
1171        assert!(ch.has_written("node_b"));
1172    }
1173
1174    // TopicChannel tests
1175    #[test]
1176    fn topic_channel_new_is_empty() {
1177        let ch: TopicChannel<String> = TopicChannel::new();
1178        assert!(ch.is_empty());
1179        assert_eq!(ch.len(), 0);
1180    }
1181
1182    #[test]
1183    fn topic_channel_default_is_empty() {
1184        let ch: TopicChannel<String> = TopicChannel::default();
1185        assert!(ch.is_empty());
1186    }
1187
1188    #[test]
1189    fn topic_channel_accumulates_messages() {
1190        let mut ch: TopicChannel<String> = TopicChannel::new();
1191
1192        ch.update(vec![vec!["hello".to_string()]]);
1193        assert_eq!(ch.len(), 1);
1194        assert_eq!(ch.get()[0], "hello");
1195
1196        ch.update(vec![vec!["world".to_string()]]);
1197        assert_eq!(ch.len(), 2);
1198        assert_eq!(ch.get()[1], "world");
1199    }
1200
1201    #[test]
1202    fn topic_channel_update_with_multiple_messages() {
1203        let mut ch: TopicChannel<i32> = TopicChannel::new();
1204
1205        ch.update(vec![vec![1, 2, 3]]);
1206        assert_eq!(ch.len(), 3);
1207        assert_eq!(ch.get(), &[1, 2, 3]);
1208    }
1209
1210    #[test]
1211    fn topic_channel_update_with_multiple_batches() {
1212        let mut ch: TopicChannel<i32> = TopicChannel::new();
1213
1214        ch.update(vec![vec![1, 2], vec![3, 4]]);
1215        assert_eq!(ch.len(), 4);
1216        assert_eq!(ch.get(), &[1, 2, 3, 4]);
1217    }
1218
1219    #[test]
1220    fn topic_channel_reset_clears_messages() {
1221        let mut ch: TopicChannel<String> = TopicChannel::new();
1222
1223        ch.update(vec![vec!["test".to_string()]]);
1224        assert_eq!(ch.len(), 1);
1225
1226        ch.reset();
1227        assert!(ch.is_empty());
1228        assert_eq!(ch.len(), 0);
1229    }
1230
1231    #[test]
1232    fn topic_channel_consume_clears_and_returns_status() {
1233        let mut ch: TopicChannel<String> = TopicChannel::new();
1234
1235        let had_content = ch.consume();
1236        assert!(!had_content); // Empty channel returns false
1237
1238        ch.update(vec![vec!["test".to_string()]]);
1239        let had_content_after = ch.consume();
1240        assert!(had_content_after); // Non-empty channel returns true
1241        assert!(ch.is_empty());
1242    }
1243
1244    #[test]
1245    fn topic_channel_iter_messages() {
1246        let mut ch: TopicChannel<i32> = TopicChannel::new();
1247
1248        ch.update(vec![vec![1, 2, 3]]);
1249
1250        let mut iter = ch.iter();
1251        assert_eq!(iter.next(), Some(&1));
1252        assert_eq!(iter.next(), Some(&2));
1253        assert_eq!(iter.next(), Some(&3));
1254        assert_eq!(iter.next(), None);
1255    }
1256
1257    #[test]
1258    fn topic_channel_checkpoint_persists_messages() {
1259        let mut ch: TopicChannel<i32> = TopicChannel::new();
1260
1261        ch.update(vec![vec![1, 2, 3]]);
1262
1263        let checkpoint = ch.checkpoint().expect("should have checkpoint");
1264        assert_eq!(checkpoint, serde_json::json!([1, 2, 3]));
1265
1266        let restored: TopicChannel<i32> =
1267            TopicChannel::from_checkpoint(checkpoint).expect("should restore");
1268        assert_eq!(restored.len(), 3);
1269        assert_eq!(restored.get(), &[1, 2, 3]);
1270    }
1271
1272    #[test]
1273    fn topic_channel_from_checkpoint_empty() {
1274        let ch: TopicChannel<i32> =
1275            TopicChannel::from_checkpoint(serde_json::json!([])).expect("should restore");
1276        assert!(ch.is_empty());
1277    }
1278
1279    // Tests for LastValueAfterFinishChannel checkpoint fix (Task 2)
1280    #[test]
1281    fn last_value_after_finish_checkpoint_saves_is_finished_state() {
1282        let mut ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1283            LastValueAfterFinishChannel::new(10);
1284        ch.update(vec![42]);
1285        ch.finish();
1286
1287        let checkpoint = ch
1288            .checkpoint()
1289            .expect("should have checkpoint when finished");
1290        // Checkpoint should be a tuple (value, is_finished)
1291        assert!(checkpoint.is_array());
1292        let arr = checkpoint.as_array().expect("should be array");
1293        assert_eq!(arr.len(), 2);
1294        assert_eq!(arr[0], serde_json::json!(42)); // value
1295        assert_eq!(arr[1], serde_json::json!(true)); // is_finished
1296    }
1297
1298    #[test]
1299    fn last_value_after_finish_from_checkpoint_restores_is_finished() {
1300        let checkpoint_data = serde_json::json!([99, true]); // (value, is_finished)
1301
1302        let restored: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1303            LastValueAfterFinishChannel::from_checkpoint(checkpoint_data)
1304                .expect("should restore from checkpoint");
1305
1306        assert_eq!(*restored.get(), 99);
1307        assert!(restored.is_available());
1308    }
1309
1310    #[test]
1311    fn last_value_after_finish_from_checkpoint_old_format_backward_compat() {
1312        // Old format: just the value, no is_finished
1313        let checkpoint_data = serde_json::json!(55);
1314
1315        let restored: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1316            LastValueAfterFinishChannel::from_checkpoint(checkpoint_data)
1317                .expect("should restore from old checkpoint format");
1318
1319        assert_eq!(*restored.get(), 55);
1320        assert!(!restored.is_available()); // Should default to not finished
1321    }
1322
1323    #[test]
1324    fn last_value_after_finish_checkpoint_round_trip() {
1325        let mut ch1: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1326            LastValueAfterFinishChannel::new(0);
1327        ch1.update(vec![123]);
1328        ch1.finish();
1329
1330        let checkpoint = ch1.checkpoint().expect("should checkpoint");
1331        let ch2: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1332            LastValueAfterFinishChannel::from_checkpoint(checkpoint).expect("should restore");
1333
1334        assert_eq!(*ch1.get(), *ch2.get());
1335        assert_eq!(ch1.is_available(), ch2.is_available());
1336    }
1337
1338    // Tests for Overwrite helper methods
1339    #[test]
1340    fn overwrite_get_returns_inner_value() {
1341        let ov = Overwrite(42);
1342        assert_eq!(*ov.get(), 42);
1343    }
1344
1345    #[test]
1346    fn overwrite_into_inner_consumes_wrapper() {
1347        let ov = Overwrite(100);
1348        assert_eq!(ov.into_inner(), 100);
1349    }
1350
1351    #[test]
1352    fn overwrite_new_creates_wrapper() {
1353        let ov = Overwrite::new(999);
1354        assert_eq!(*ov.get(), 999);
1355    }
1356
1357    // Tests for DeltaChannel replay_writes with Overwrite detection (Task 1)
1358    #[test]
1359    fn delta_channel_replay_writes_handles_empty_sequence() {
1360        let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(5, 10);
1361        ch.replay_writes(&[]);
1362        assert_eq!(*ch.get(), 5); // Value unchanged
1363    }
1364
1365    #[test]
1366    fn delta_channel_replay_writes_single_value() {
1367        let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1368        ch.replay_writes(&[42]);
1369        assert_eq!(*ch.get(), 42);
1370    }
1371
1372    #[test]
1373    fn delta_channel_replay_writes_multiple_values() {
1374        let mut ch: DeltaChannel<Vec<i32>, AppendReducer> = DeltaChannel::new(vec![], 10);
1375        ch.replay_writes(&[vec![1, 2], vec![3, 4]]);
1376        assert_eq!(*ch.get(), vec![1, 2, 3, 4]);
1377    }
1378
1379    #[test]
1380    fn delta_channel_replay_writes_resets_snapshot_counter() {
1381        let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1382        ch.update(vec![1]);
1383        assert_eq!(ch.update_count_since_snapshot, 1);
1384
1385        ch.replay_writes(&[99]);
1386        assert_eq!(ch.update_count_since_snapshot, 0); // Reset after replay
1387    }
1388
1389    #[test]
1390    fn delta_channel_replay_writes_with_replace_reducer() {
1391        let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1392        // ReplaceReducer only allows one value
1393        ch.replay_writes(&[42]);
1394        assert_eq!(*ch.get(), 42);
1395    }
1396
1397    #[test]
1398    fn delta_channel_replay_writes_detects_json_overwrite_format() {
1399        let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
1400            DeltaChannel::new(serde_json::json!(null), 10);
1401
1402        // Create values in Overwrite format (as they would appear in checkpoints)
1403        let overwrite_val = serde_json::json!({"__overwrite__": "baseline"});
1404        let normal_val1 = serde_json::json!("update1");
1405        let normal_val2 = serde_json::json!("update2");
1406
1407        ch.replay_writes(&[normal_val1, overwrite_val, normal_val2.clone()]);
1408
1409        // After detecting the overwrite, baseline should be "baseline",
1410        // then remaining values ["update1", "update2"] applied via LastWriteWinsReducer
1411        // LastWriteWinsReducer takes the last value
1412        assert_eq!(ch.get(), &normal_val2);
1413    }
1414
1415    #[test]
1416    fn delta_channel_replay_writes_overwrite_at_start() {
1417        let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
1418            DeltaChannel::new(serde_json::json!("initial"), 10);
1419
1420        let overwrite_val = serde_json::json!({"__overwrite__": "new_baseline"});
1421        let normal_val = serde_json::json!("update");
1422
1423        ch.replay_writes(&[overwrite_val, normal_val.clone()]);
1424
1425        // Overwrite sets baseline to "new_baseline", then "update" applied
1426        assert_eq!(ch.get(), &normal_val);
1427    }
1428
1429    #[test]
1430    fn delta_channel_replay_writes_overwrite_at_end() {
1431        let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
1432            DeltaChannel::new(serde_json::json!("initial"), 10);
1433
1434        let normal_val = serde_json::json!("update");
1435        let overwrite_val = serde_json::json!({"__overwrite__": "final_baseline"});
1436
1437        ch.replay_writes(&[normal_val, overwrite_val]);
1438
1439        // Overwrite at end sets baseline to "final_baseline", no remaining values
1440        assert_eq!(ch.get(), &serde_json::json!("final_baseline"));
1441    }
1442
1443    #[test]
1444    fn delta_channel_finish_forces_snapshot() {
1445        let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1446        assert!(!ch.should_snapshot());
1447
1448        ch.finish();
1449        // After finish, should_snapshot should return true
1450        assert!(ch.should_snapshot());
1451    }
1452}
1453
1454// Rust guideline compliant 2026-05-20