Skip to main content

juncture_core/state/
channel.rs

1//! Channel trait and channel types for state field access with checkpoint support
2//!
3//! A Channel wraps a value with specific update and checkpoint semantics.
4//! Different channel types control how values are updated, persisted, and consumed.
5
6use serde::de::DeserializeOwned;
7use serde::ser::SerializeStruct;
8use std::collections::HashSet;
9
10/// Reducer trait defining merge semantics for state fields
11///
12/// Each field in a State can have its own reducer, defining how multiple
13/// writes in the same superstep are combined.
14pub trait Reducer<T> {
15    /// Merge a single value (fast path avoiding Vec allocation)
16    fn reduce_one(current: &mut T, value: T) {
17        Self::reduce(current, vec![value]);
18    }
19
20    /// Merge multiple values into current
21    ///
22    /// Values are provided in the order tasks completed (not task spawn order).
23    /// For deterministic results, use associative reducers like `AppendReducer`.
24    fn reduce(current: &mut T, values: Vec<T>);
25}
26
27/// Replace reducer: only one writer per superstep (default)
28///
29/// Equivalent to `LangGraph`'s `LastValue` channel.
30/// Panics if multiple nodes write to the same field in one superstep.
31#[derive(Debug)]
32pub struct ReplaceReducer;
33
34impl<T> Reducer<T> for ReplaceReducer {
35    fn reduce(current: &mut T, values: Vec<T>) {
36        assert!(
37            values.len() <= 1,
38            "Replace reducer: multiple writes in same superstep"
39        );
40        if let Some(v) = values.into_iter().next() {
41            *current = v;
42        }
43    }
44}
45
46/// Append reducer: accumulate all writes
47///
48/// Equivalent to `LangGraph`'s `BinaryOperatorAggregate` with operator.add.
49/// All writes are extended in order.
50#[derive(Debug)]
51pub struct AppendReducer;
52
53impl<T> Reducer<Vec<T>> for AppendReducer {
54    fn reduce_one(current: &mut Vec<T>, value: Vec<T>) {
55        current.extend(value);
56    }
57
58    fn reduce(current: &mut Vec<T>, values: Vec<Vec<T>>) {
59        for v in values {
60            current.extend(v);
61        }
62    }
63}
64
65/// `AnyValue` reducer: assumes all values are equal
66///
67/// Similar to `LastValue`, but semantically assumes all writers provide
68/// the same value. Uses the last value if they differ.
69#[derive(Debug)]
70pub struct AnyValueReducer;
71
72impl<T: PartialEq + Clone> Reducer<T> for AnyValueReducer {
73    fn reduce(current: &mut T, values: Vec<T>) {
74        if let Some(last) = values.last() {
75            // Semantic check: all values should be equal
76            if let Some(first) = values.first() {
77                debug_assert!(
78                    values.iter().all(|v| v == first),
79                    "AnyValue reducer: all values should be equal"
80                );
81            }
82            *current = last.clone();
83        }
84    }
85}
86
87/// `LastWriteWins` reducer: allows multiple writers, last one wins
88///
89/// Similar to `ReplaceReducer`, but doesn't panic on multiple writes.
90#[derive(Debug)]
91pub struct LastWriteWinsReducer;
92
93impl<T> Reducer<T> for LastWriteWinsReducer {
94    fn reduce(current: &mut T, values: Vec<T>) {
95        if let Some(v) = values.into_iter().last() {
96            *current = v;
97        }
98    }
99}
100
101/// Bypass reducer: overwrite value directly, bypassing normal merge
102///
103/// When `Overwrite<T>` is used in an update, it bypasses the field's reducer
104/// and directly replaces the value. Custom serde uses `{"__overwrite__": value}`
105/// wire format for `LangGraph` checkpoint compatibility.
106pub struct Overwrite<T>(pub T);
107
108impl<T: std::fmt::Debug> std::fmt::Debug for Overwrite<T> {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        f.debug_tuple("Overwrite").field(&self.0).finish()
111    }
112}
113
114impl<T> Overwrite<T> {
115    /// Get a reference to the inner value
116    #[must_use]
117    pub const fn get(&self) -> &T {
118        &self.0
119    }
120
121    /// Convert into the inner value
122    #[must_use]
123    pub fn into_inner(self) -> T {
124        self.0
125    }
126
127    /// Create a new Overwrite wrapper
128    #[must_use]
129    pub const fn new(value: T) -> Self {
130        Self(value)
131    }
132}
133
134impl<T: serde::Serialize> serde::Serialize for Overwrite<T> {
135    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
136        let mut s = serializer.serialize_struct("__overwrite__", 1)?;
137        s.serialize_field("__overwrite__", &self.0)?;
138        s.end()
139    }
140}
141
142impl<'de, T: serde::Deserialize<'de>> serde::Deserialize<'de> for Overwrite<T> {
143    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
144        #[derive(serde::Deserialize)]
145        struct Wrapper<T> {
146            __overwrite__: T,
147        }
148        let wrapper = Wrapper::deserialize(deserializer)?;
149        Ok(Self(wrapper.__overwrite__))
150    }
151}
152
153/// Named barrier channel: waits for all registered named sources to write
154///
155/// This channel implements barrier/wait-all semantics for parallel workflows.
156/// The value is only available after ALL required named sources have written
157/// to it. Each source must provide a unique name for tracking.
158///
159/// # Type Parameters
160///
161/// * `T` - The value type stored in the channel
162/// * `R` - The reducer type that defines how multiple writes are merged
163///
164/// # Examples
165///
166/// ```
167/// use juncture_core::state::channel::{NamedBarrierChannel, ReplaceReducer};
168///
169/// let mut channel: NamedBarrierChannel<i32, ReplaceReducer> =
170///     NamedBarrierChannel::new_with_sources(0, ["node_a", "node_b"].into_iter().map(String::from));
171///
172/// // Initially not available
173/// assert!(!channel.is_available());
174///
175/// // After first write, still not available
176/// channel.update("node_a".to_string(), vec![42]).expect("update should succeed");
177/// assert!(!channel.is_available());
178///
179/// // After second write, becomes available
180/// channel.update("node_b".to_string(), vec![100]).expect("update should succeed");
181/// assert!(channel.is_available());
182/// assert_eq!(*channel.get(), 100); // Last write wins
183/// ```
184#[derive(Debug)]
185pub struct NamedBarrierChannel<T, R: Reducer<T>> {
186    value: T,
187    required_sources: HashSet<String>,
188    seen_sources: HashSet<String>,
189    _reducer: std::marker::PhantomData<R>,
190}
191
192impl<T, R: Reducer<T>> NamedBarrierChannel<T, R> {
193    /// Create a new named barrier channel with the given initial value and required sources
194    ///
195    /// # Arguments
196    ///
197    /// * `value` - The initial value for the channel
198    /// * `required_sources` - Iterator of source names that must all write before the barrier completes
199    #[must_use]
200    pub fn new_with_sources(value: T, required_sources: impl IntoIterator<Item = String>) -> Self {
201        let sources: HashSet<String> = required_sources.into_iter().collect();
202        Self {
203            value,
204            required_sources: sources,
205            seen_sources: HashSet::new(),
206            _reducer: std::marker::PhantomData,
207        }
208    }
209
210    /// Create a new named barrier channel with no required sources
211    ///
212    /// This channel will be immediately available. Use this when you plan to
213    /// add required sources later or when the barrier should always be complete.
214    #[must_use]
215    pub fn new(value: T) -> Self {
216        Self {
217            value,
218            required_sources: HashSet::new(),
219            seen_sources: HashSet::new(),
220            _reducer: std::marker::PhantomData,
221        }
222    }
223
224    /// Add a required source to the barrier
225    ///
226    /// If the source has already written, this will immediately mark it as seen.
227    pub fn add_required_source(&mut self, source: String) {
228        self.required_sources.insert(source);
229    }
230
231    /// Check if all required sources have written
232    ///
233    /// Returns `true` only when ALL required sources have written to this channel.
234    #[must_use]
235    pub fn is_available(&self) -> bool {
236        if self.required_sources.is_empty() {
237            return true;
238        }
239        self.required_sources
240            .iter()
241            .all(|source| self.seen_sources.contains(source))
242    }
243
244    /// Get the set of required source names
245    #[must_use]
246    pub const fn required_sources(&self) -> &HashSet<String> {
247        &self.required_sources
248    }
249
250    /// Get the set of source names that have written so far
251    #[must_use]
252    pub const fn seen_sources(&self) -> &HashSet<String> {
253        &self.seen_sources
254    }
255
256    /// Check if a specific source has written
257    #[must_use]
258    pub fn has_written(&self, source: &str) -> bool {
259        self.seen_sources.contains(source)
260    }
261
262    /// Reset the barrier, clearing seen sources while keeping required sources
263    ///
264    /// This is useful for reusing the barrier across multiple supersteps.
265    pub fn reset(&mut self) {
266        self.seen_sources.clear();
267    }
268}
269
270impl<T, R> Channel<T> for NamedBarrierChannel<T, R>
271where
272    T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
273    R: Reducer<T> + Send + Sync + 'static,
274{
275    fn update(&mut self, values: Vec<T>) -> bool {
276        // Channel trait update doesn't support named sources.
277        // When using the generic Channel trait, we apply all values directly
278        // to the channel. This is useful when the caller doesn't care about
279        // named barrier tracking and just wants to update the value.
280        if values.is_empty() {
281            return false;
282        }
283        // Apply the reducer to merge all values
284        R::reduce(&mut self.value, values);
285        // Mark all required sources as seen since we received an update
286        self.seen_sources = self.required_sources.clone();
287        true
288    }
289
290    fn get(&self) -> &T {
291        &self.value
292    }
293
294    fn consume(&mut self) -> bool {
295        false
296    }
297
298    fn checkpoint(&self) -> Option<serde_json::Value> {
299        // Persist the value and seen sources
300        serde_json::to_value(&(self.value.clone(), self.seen_sources.clone())).ok()
301    }
302
303    fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
304    where
305        Self: Sized,
306    {
307        let (parsed_value, seen_sources): (T, HashSet<String>) = serde_json::from_value(value)
308            .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
309        Ok(Self {
310            value: parsed_value,
311            required_sources: HashSet::new(),
312            seen_sources,
313            _reducer: std::marker::PhantomData,
314        })
315    }
316}
317
318impl<T, R: Reducer<T>> NamedBarrierChannel<T, R> {
319    /// Update from a named source
320    ///
321    /// This is the primary method for `NamedBarrierChannel`, allowing named sources
322    /// to write to the channel. The barrier completes only after all required sources
323    /// have written.
324    ///
325    /// # Panics
326    ///
327    /// Panics if `source_name` is not in the set of required sources (when sources are configured).
328    pub fn update(&mut self, source_name: String, values: Vec<T>) -> bool {
329        assert!(
330            self.required_sources.is_empty() || self.required_sources.contains(&source_name),
331            "NamedBarrierChannel: source '{source_name}' not in required sources"
332        );
333
334        if values.is_empty() {
335            return false;
336        }
337
338        R::reduce(&mut self.value, values);
339        self.seen_sources.insert(source_name);
340        true
341    }
342}
343
344/// Topic channel: accumulates all published values into a list
345///
346/// This channel implements pub/sub messaging patterns where all writes
347/// are accumulated into a list. Each value is appended independently,
348/// allowing multiple publishers to send messages to the same topic.
349///
350/// # Type Parameters
351///
352/// * `T` - The message type stored in the topic
353///
354/// # Examples
355///
356/// ```
357/// use juncture_core::state::channel::TopicChannel;
358///
359/// let mut channel: TopicChannel<String> = TopicChannel::new();
360///
361/// // Publish messages
362/// channel.update(vec!["hello".to_string()]);
363/// channel.update(vec!["world".to_string()]);
364///
365/// // Get all accumulated messages
366/// let messages = channel.get();
367/// assert_eq!(messages.len(), 2);
368/// assert_eq!(messages[0], "hello");
369/// assert_eq!(messages[1], "world");
370///
371/// // Reset for next superstep
372/// channel.reset();
373/// assert!(messages.is_empty());
374/// ```
375#[derive(Debug, Clone)]
376pub struct TopicChannel<T> {
377    messages: Vec<T>,
378}
379
380impl<T> TopicChannel<T> {
381    /// Create a new empty topic channel
382    #[must_use]
383    pub const fn new() -> Self {
384        Self {
385            messages: Vec::new(),
386        }
387    }
388
389    /// Get the number of messages in the topic
390    #[must_use]
391    pub const fn len(&self) -> usize {
392        self.messages.len()
393    }
394
395    /// Check if the topic is empty
396    #[must_use]
397    pub const fn is_empty(&self) -> bool {
398        self.messages.is_empty()
399    }
400
401    /// Reset the topic, clearing all messages
402    ///
403    /// This is typically called at the start of each superstep to clear
404    /// ephemeral message accumulations.
405    pub fn reset(&mut self) {
406        self.messages.clear();
407    }
408
409    /// Get an iterator over the messages
410    pub fn iter(&self) -> std::slice::Iter<'_, T> {
411        self.messages.iter()
412    }
413}
414
415impl<T> Default for TopicChannel<T> {
416    fn default() -> Self {
417        Self::new()
418    }
419}
420
421impl<'a, T> IntoIterator for &'a TopicChannel<T> {
422    type Item = &'a T;
423    type IntoIter = std::slice::Iter<'a, T>;
424
425    fn into_iter(self) -> Self::IntoIter {
426        self.iter()
427    }
428}
429
430impl<T> Channel<Vec<T>> for TopicChannel<T>
431where
432    T: Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
433{
434    fn update(&mut self, values: Vec<Vec<T>>) -> bool {
435        if values.is_empty() {
436            return false;
437        }
438        // Extend messages with all new values (flatten the vec of vecs)
439        for batch in values {
440            self.messages.extend(batch);
441        }
442        true
443    }
444
445    fn get(&self) -> &Vec<T> {
446        &self.messages
447    }
448
449    fn consume(&mut self) -> bool {
450        let was_empty = self.messages.is_empty();
451        self.messages.clear();
452        !was_empty
453    }
454
455    fn checkpoint(&self) -> Option<serde_json::Value> {
456        serde_json::to_value(&self.messages).ok()
457    }
458
459    fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
460    where
461        Self: Sized,
462    {
463        let messages: Vec<T> = serde_json::from_value(value)
464            .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
465        Ok(Self { messages })
466    }
467}
468
469/// Channel trait for state field access with checkpoint support
470///
471/// A Channel wraps a value with specific update and checkpoint semantics.
472/// Different channel types control how values are updated, persisted, and consumed.
473pub trait Channel<T>: Send + Sync + 'static {
474    /// Update the channel with new values. Returns true if the value changed.
475    fn update(&mut self, values: Vec<T>) -> bool;
476
477    /// Get the current value
478    fn get(&self) -> &T;
479
480    /// Check if the channel has been consumed (for trigger-based activation)
481    fn consume(&mut self) -> bool;
482
483    /// Create a checkpoint of the current value for persistence
484    fn checkpoint(&self) -> Option<serde_json::Value>;
485
486    /// Restore from a checkpoint value
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if the checkpoint value cannot be deserialized into
491    /// the channel's value type.
492    fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
493    where
494        Self: Sized;
495}
496
497/// Untracked channel: value is not persisted across checkpoints
498///
499/// Wraps a value with a reducer. Checkpoints return `None` so the value
500/// is never persisted. This is useful for transient computation state
501/// that should not survive a restart.
502#[derive(Debug)]
503pub struct UntrackedChannel<T, R: Reducer<T>> {
504    value: T,
505    _reducer: std::marker::PhantomData<R>,
506}
507
508impl<T, R: Reducer<T>> UntrackedChannel<T, R> {
509    /// Create a new untracked channel with the given initial value
510    #[must_use]
511    pub const fn new(value: T) -> Self {
512        Self {
513            value,
514            _reducer: std::marker::PhantomData,
515        }
516    }
517}
518
519impl<T: Default + Send + Sync + 'static, R: Reducer<T> + Send + Sync + 'static> Channel<T>
520    for UntrackedChannel<T, R>
521{
522    fn update(&mut self, values: Vec<T>) -> bool {
523        if values.is_empty() {
524            return false;
525        }
526        R::reduce(&mut self.value, values);
527        true
528    }
529
530    fn get(&self) -> &T {
531        &self.value
532    }
533
534    fn consume(&mut self) -> bool {
535        false
536    }
537
538    fn checkpoint(&self) -> Option<serde_json::Value> {
539        None
540    }
541
542    fn from_checkpoint(_value: serde_json::Value) -> Result<Self, String> {
543        Ok(Self::new(T::default()))
544    }
545}
546
547/// Ephemeral channel: value is cleared at the start of each superstep
548///
549/// Has a `consumed` flag set by `consume()`. The value resets between
550/// supersteps and is never persisted.
551#[derive(Debug)]
552pub struct EphemeralChannel<T, R: Reducer<T>> {
553    value: T,
554    consumed: bool,
555    _reducer: std::marker::PhantomData<R>,
556}
557
558impl<T, R: Reducer<T>> EphemeralChannel<T, R> {
559    /// Create a new ephemeral channel with the given initial value
560    #[must_use]
561    pub const fn new(value: T) -> Self {
562        Self {
563            value,
564            consumed: false,
565            _reducer: std::marker::PhantomData,
566        }
567    }
568}
569
570impl<T: Default + Send + Sync + 'static, R: Reducer<T> + Send + Sync + 'static> Channel<T>
571    for EphemeralChannel<T, R>
572{
573    fn update(&mut self, values: Vec<T>) -> bool {
574        if values.is_empty() {
575            return false;
576        }
577        self.consumed = false;
578        R::reduce(&mut self.value, values);
579        true
580    }
581
582    fn get(&self) -> &T {
583        &self.value
584    }
585
586    fn consume(&mut self) -> bool {
587        let was_consumed = self.consumed;
588        self.consumed = true;
589        was_consumed
590    }
591
592    fn checkpoint(&self) -> Option<serde_json::Value> {
593        None
594    }
595
596    fn from_checkpoint(_value: serde_json::Value) -> Result<Self, String> {
597        Ok(Self::new(T::default()))
598    }
599}
600
601/// Last-value-after-finish channel: value only available after `finish()` is called
602///
603/// Before `finish()`, `get()` returns the default value. After `finish()`,
604/// the written value becomes available. Checkpoints persist only if finished.
605#[derive(Debug)]
606pub struct LastValueAfterFinishChannel<T, R: Reducer<T>> {
607    value: T,
608    finished_value: Option<T>,
609    is_finished: bool,
610    _reducer: std::marker::PhantomData<R>,
611}
612
613impl<T, R: Reducer<T>> LastValueAfterFinishChannel<T, R> {
614    /// Create a new channel with the given default value
615    #[must_use]
616    pub const fn new(value: T) -> Self {
617        Self {
618            value,
619            finished_value: None,
620            is_finished: false,
621            _reducer: std::marker::PhantomData,
622        }
623    }
624
625    /// Mark the channel as finished, making the value available
626    pub const fn finish(&mut self) {
627        self.is_finished = true;
628    }
629
630    /// Check if the channel has been finished and the value is available
631    #[must_use]
632    pub const fn is_available(&self) -> bool {
633        self.is_finished
634    }
635}
636
637impl<T, R> Channel<T> for LastValueAfterFinishChannel<T, R>
638where
639    T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
640    R: Reducer<T> + Send + Sync + 'static,
641{
642    fn update(&mut self, values: Vec<T>) -> bool {
643        if values.is_empty() {
644            return false;
645        }
646        R::reduce(&mut self.value, values);
647        if self.is_finished {
648            self.finished_value = Some(self.value.clone());
649        }
650        true
651    }
652
653    fn get(&self) -> &T {
654        if self.is_finished {
655            self.finished_value.as_ref().unwrap_or(&self.value)
656        } else {
657            &self.value
658        }
659    }
660
661    fn consume(&mut self) -> bool {
662        false
663    }
664
665    fn checkpoint(&self) -> Option<serde_json::Value> {
666        // Only checkpoint if finished (preserves original semantic)
667        // Save both value and is_finished state for complete restoration
668        if self.is_finished {
669            serde_json::to_value(&(self.value.clone(), self.is_finished)).ok()
670        } else {
671            None
672        }
673    }
674
675    fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
676        // Try to parse as (value, is_finished) tuple first (new format)
677        if let Ok((parsed_value, is_finished)) = serde_json::from_value::<(T, bool)>(value.clone())
678        {
679            let finished_value = is_finished.then(|| parsed_value.clone());
680            return Ok(Self {
681                value: parsed_value,
682                finished_value,
683                is_finished,
684                _reducer: std::marker::PhantomData,
685            });
686        }
687
688        // Fallback: try parsing as value only (old format for backward compatibility)
689        let parsed_value: T = serde_json::from_value(value)
690            .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
691        Ok(Self {
692            value: parsed_value,
693            finished_value: None,
694            is_finished: false,
695            _reducer: std::marker::PhantomData,
696        })
697    }
698}
699
700/// Delta channel: append-heavy optimization with periodic snapshots
701///
702/// Tracks updates since the last snapshot and can replay writes for
703/// restoring from a delta-based checkpoint. The `snapshot_frequency`
704/// controls how often a full snapshot is taken instead of just recording
705/// the delta.
706#[derive(Debug)]
707pub struct DeltaChannel<T, R: Reducer<T>> {
708    value: T,
709    /// How many updates between full snapshots (minimum 1)
710    snapshot_frequency: usize,
711    update_count_since_snapshot: usize,
712    _reducer: std::marker::PhantomData<R>,
713}
714
715impl<T, R: Reducer<T>> DeltaChannel<T, R> {
716    /// Create a new delta channel with the given initial value and snapshot frequency
717    ///
718    /// The snapshot frequency is clamped to a minimum of 1.
719    #[must_use]
720    pub fn new(value: T, snapshot_frequency: usize) -> Self {
721        Self {
722            value,
723            snapshot_frequency: snapshot_frequency.max(1),
724            update_count_since_snapshot: 0,
725            _reducer: std::marker::PhantomData,
726        }
727    }
728
729    /// Replay a sequence of writes to restore state from a checkpoint
730    ///
731    /// During checkpoint recovery, finds the last `Overwrite<T>` in the sequence
732    /// and uses it as the baseline, then applies only the writes after it via
733    /// the reducer. This implements the design specification for ancestor replay.
734    pub fn replay_writes(&mut self, values: &[T])
735    where
736        T: Clone + serde::Serialize + DeserializeOwned,
737    {
738        if values.is_empty() {
739            return;
740        }
741
742        // Find last Overwrite as baseline, only replay writes after it
743        // This implements the design spec: "Find the last Overwrite as baseline,
744        // only replay writes after it"
745        let mut base = self.value.clone();
746        let mut start_idx = 0;
747
748        // Try to detect Overwrite wrappers in the sequence
749        // Since we have &[T] not &[Overwrite<T>], we need to check
750        // if any values were deserialized from Overwrite format
751        for (i, v) in values.iter().enumerate() {
752            // Check if this value is an Overwrite by attempting to detect
753            // the special wire format. Since values are already deserialized,
754            // we check if the JSON representation has __overwrite__ key
755            if let Ok(json) = serde_json::to_value(v)
756                && let Some(obj) = json.as_object()
757                && obj.contains_key("__overwrite__")
758            {
759                // This is an Overwrite<T> value
760                if let Ok(inner) = serde_json::from_value::<T>(
761                    obj.get("__overwrite__").cloned().unwrap_or_default(),
762                ) {
763                    base = inner;
764                    start_idx = i + 1;
765                }
766            }
767        }
768
769        // Apply remaining writes to baseline
770        let remaining: Vec<T> = values[start_idx..].to_vec();
771        if !remaining.is_empty() {
772            R::reduce(&mut base, remaining);
773        }
774        self.value = base;
775        self.update_count_since_snapshot = 0;
776    }
777
778    /// Check if a snapshot is due based on the update count
779    #[must_use]
780    pub const fn should_snapshot(&self) -> bool {
781        self.update_count_since_snapshot >= self.snapshot_frequency
782    }
783
784    /// Mark the channel as finished, forcing a snapshot on the next checkpoint call
785    ///
786    /// This ensures the state is persisted when execution completes.
787    pub const fn finish(&mut self) {
788        self.update_count_since_snapshot = self.snapshot_frequency;
789    }
790}
791
792impl<T, R> Channel<T> for DeltaChannel<T, R>
793where
794    T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
795    R: Reducer<T> + Send + Sync + 'static,
796{
797    fn update(&mut self, values: Vec<T>) -> bool {
798        if values.is_empty() {
799            return false;
800        }
801        R::reduce(&mut self.value, values);
802        self.update_count_since_snapshot += 1;
803        true
804    }
805
806    fn get(&self) -> &T {
807        &self.value
808    }
809
810    fn consume(&mut self) -> bool {
811        false
812    }
813
814    fn checkpoint(&self) -> Option<serde_json::Value> {
815        serde_json::to_value(&self.value).ok()
816    }
817
818    fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
819        let value: T = serde_json::from_value(value)
820            .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
821        Ok(Self {
822            value,
823            snapshot_frequency: 10,
824            update_count_since_snapshot: 0,
825            _reducer: std::marker::PhantomData,
826        })
827    }
828}
829
830/// Delta blob for representing checkpoint state
831///
832/// A `DeltaBlob` represents the persisted state of a delta channel.
833/// `Missing` indicates no checkpoint data is available.
834/// `Snapshot` contains a full snapshot of the value.
835#[derive(Clone, Debug)]
836pub enum DeltaBlob<T>
837where
838    T: Clone + serde::Serialize + serde::de::DeserializeOwned,
839{
840    /// No checkpoint data available
841    Missing,
842    /// Full snapshot of the channel value
843    Snapshot(T),
844}
845
846/// Ring buffer channel for append-heavy fields with bounded size
847///
848/// Wraps a `Vec<T>` with a maximum capacity. When new values are appended
849/// and the capacity is exceeded, the oldest elements are removed. This
850/// prevents unbounded memory growth for append-heavy fields like messages,
851/// events, and logs.
852///
853/// # Examples
854///
855/// ```ignore
856/// use juncture_core::state::channel::RingBufferChannel;
857///
858/// let mut ch = RingBufferChannel::new(Vec::new(), 100);
859/// ch.update(vec!["msg1".to_string(), "msg2".to_string()]);
860/// assert_eq!(ch.get().len(), 2);
861/// ```
862#[derive(Clone, Debug)]
863pub struct RingBufferChannel<T> {
864    /// Current values in the ring buffer
865    values: Vec<T>,
866    /// Maximum capacity of the ring buffer
867    capacity: usize,
868}
869
870impl<T> RingBufferChannel<T> {
871    /// Create a new ring buffer channel with the given initial values and capacity
872    ///
873    /// The capacity is clamped to a minimum of 1. If `values` exceeds the
874    /// capacity, the oldest elements are trimmed.
875    #[must_use]
876    pub fn new(values: Vec<T>, capacity: usize) -> Self {
877        let mut channel = Self {
878            values,
879            capacity: capacity.max(1),
880        };
881        channel.trim_to_capacity();
882        channel
883    }
884
885    /// Get the current capacity
886    #[must_use]
887    pub const fn capacity(&self) -> usize {
888        self.capacity
889    }
890
891    /// Get the current number of elements
892    #[must_use]
893    pub fn len(&self) -> usize {
894        self.values.len()
895    }
896
897    /// Check if the ring buffer is empty
898    #[must_use]
899    pub fn is_empty(&self) -> bool {
900        self.values.is_empty()
901    }
902
903    /// Trim the values to the configured capacity
904    ///
905    /// Removes the oldest elements (from the front) if the buffer exceeds capacity.
906    fn trim_to_capacity(&mut self) {
907        if self.values.len() > self.capacity {
908            let excess = self.values.len() - self.capacity;
909            self.values.drain(..excess);
910        }
911    }
912}
913
914impl<T: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static> Channel<Vec<T>>
915    for RingBufferChannel<T>
916{
917    fn update(&mut self, values: Vec<Vec<T>>) -> bool {
918        let had_non_empty = values.iter().any(|v| !v.is_empty());
919        for v in values {
920            self.values.extend(v);
921        }
922        self.trim_to_capacity();
923        had_non_empty
924    }
925
926    fn get(&self) -> &Vec<T> {
927        &self.values
928    }
929
930    fn consume(&mut self) -> bool {
931        false
932    }
933
934    fn checkpoint(&self) -> Option<serde_json::Value> {
935        serde_json::to_value(serde_json::json!({
936            "values": &self.values,
937            "capacity": self.capacity,
938        }))
939        .ok()
940    }
941
942    fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
943        // Support both legacy format (plain array) and new format (object with capacity)
944        if let Some(obj) = value.as_object() {
945            let values: Vec<T> =
946                serde_json::from_value(obj.get("values").cloned().unwrap_or_default())
947                    .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
948            let capacity = obj
949                .get("capacity")
950                .and_then(serde_json::Value::as_u64)
951                .map_or(1000, |c| usize::try_from(c).unwrap_or(1000))
952                .max(1); // Enforce minimum capacity of 1 (same as new())
953            Ok(Self { values, capacity })
954        } else {
955            // Legacy format: plain array without capacity
956            let values: Vec<T> = serde_json::from_value(value)
957                .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
958            Ok(Self {
959                values,
960                capacity: 1000,
961            })
962        }
963    }
964}
965
966/// Remove-message identifier for message deletion
967///
968/// Used to identify which message should be removed from the message list
969/// during state updates.
970#[derive(Clone, Debug)]
971pub struct RemoveMessage {
972    /// ID of the message to remove
973    pub id: String,
974}
975
976#[cfg(test)]
977mod tests {
978    use super::*;
979
980    #[test]
981    fn untracked_channel_update_returns_true_on_change() {
982        let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(0);
983        assert!(!ch.update(vec![]));
984        assert!(ch.update(vec![42]));
985        assert_eq!(*ch.get(), 42);
986    }
987
988    #[test]
989    fn untracked_channel_consume_always_false() {
990        let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(1);
991        assert!(!ch.consume());
992    }
993
994    #[test]
995    fn untracked_channel_checkpoint_is_none() {
996        let ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(5);
997        assert!(ch.checkpoint().is_none());
998    }
999
1000    #[test]
1001    fn untracked_channel_from_checkpoint_uses_default() {
1002        let ch: UntrackedChannel<i32, ReplaceReducer> =
1003            UntrackedChannel::from_checkpoint(serde_json::json!(99)).expect("should succeed");
1004        assert_eq!(*ch.get(), 0);
1005    }
1006
1007    #[test]
1008    fn ephemeral_channel_consume_tracks_state() {
1009        let mut ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(0);
1010        assert!(!ch.consume()); // first consume returns false (was not consumed)
1011        assert!(ch.consume()); // second consume returns true (was consumed)
1012    }
1013
1014    #[test]
1015    fn ephemeral_channel_update_resets_consumed() {
1016        let mut ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(0);
1017        assert!(!ch.consume());
1018        assert!(ch.update(vec![7]));
1019        assert!(!ch.consume()); // consumed was reset by update
1020    }
1021
1022    #[test]
1023    fn ephemeral_channel_checkpoint_is_none() {
1024        let ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(3);
1025        assert!(ch.checkpoint().is_none());
1026    }
1027
1028    #[test]
1029    fn last_value_after_finish_channel_not_available_before_finish() {
1030        let ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1031            LastValueAfterFinishChannel::new(0);
1032        assert!(!ch.is_available());
1033    }
1034
1035    #[test]
1036    fn last_value_after_finish_channel_available_after_finish() {
1037        let mut ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1038            LastValueAfterFinishChannel::new(0);
1039        ch.finish();
1040        assert!(ch.is_available());
1041    }
1042
1043    #[test]
1044    fn last_value_after_finish_channel_checkpoint_only_if_finished() {
1045        let ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1046            LastValueAfterFinishChannel::new(5);
1047        assert!(ch.checkpoint().is_none());
1048
1049        let mut ch2: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1050            LastValueAfterFinishChannel::new(5);
1051        ch2.finish();
1052        assert!(ch2.checkpoint().is_some());
1053    }
1054
1055    #[test]
1056    fn delta_channel_snapshot_frequency_clamped_to_one() {
1057        let ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 0);
1058        assert_eq!(ch.snapshot_frequency, 1);
1059    }
1060
1061    #[test]
1062    fn delta_channel_replay_writes_restores_state() {
1063        let mut ch: DeltaChannel<Vec<i32>, AppendReducer> = DeltaChannel::new(vec![], 10);
1064        ch.replay_writes(&[vec![1, 2], vec![3, 4]]);
1065        assert_eq!(*ch.get(), vec![1, 2, 3, 4]);
1066        assert_eq!(ch.update_count_since_snapshot, 0);
1067    }
1068
1069    #[test]
1070    fn delta_channel_checkpoint_returns_snapshot() {
1071        let ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(42, 5);
1072        let cp = ch.checkpoint().expect("should have checkpoint");
1073        assert_eq!(cp, serde_json::json!(42));
1074    }
1075
1076    #[test]
1077    fn delta_channel_should_snapshot() {
1078        let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 2);
1079        assert!(!ch.should_snapshot());
1080        ch.update(vec![1]);
1081        assert!(!ch.should_snapshot());
1082        ch.update(vec![2]);
1083        assert!(ch.should_snapshot());
1084    }
1085
1086    #[test]
1087    fn delta_blob_missing_variant_exists() {
1088        let blob: DeltaBlob<i32> = DeltaBlob::Missing;
1089        assert!(matches!(blob, DeltaBlob::Missing));
1090    }
1091
1092    #[test]
1093    fn delta_blob_snapshot_holds_value() {
1094        let blob: DeltaBlob<i32> = DeltaBlob::Snapshot(42);
1095        assert!(matches!(blob, DeltaBlob::Snapshot(_)));
1096    }
1097
1098    #[test]
1099    fn delta_blob_clone() {
1100        let blob: DeltaBlob<String> = DeltaBlob::Snapshot("hello".to_string());
1101        let cloned = blob.clone();
1102        if let DeltaBlob::Snapshot(v) = cloned {
1103            assert_eq!(v, "hello");
1104        } else {
1105            panic!("expected Snapshot variant");
1106        }
1107        // Use original to prove clone created independent copy
1108        if let DeltaBlob::Snapshot(v) = blob {
1109            assert_eq!(v, "hello");
1110        } else {
1111            panic!("expected Snapshot variant");
1112        }
1113    }
1114
1115    #[test]
1116    fn remove_message_holds_id() {
1117        let rm = RemoveMessage {
1118            id: "msg-123".to_string(),
1119        };
1120        assert_eq!(rm.id, "msg-123");
1121    }
1122
1123    #[test]
1124    fn overwrite_serialize_round_trip() {
1125        let original = Overwrite(42);
1126        let json = serde_json::to_string(&original).expect("should serialize");
1127        assert_eq!(json, r#"{"__overwrite__":42}"#);
1128
1129        let deserialized: Overwrite<i32> = serde_json::from_str(&json).expect("should deserialize");
1130        assert_eq!(deserialized.0, 42);
1131    }
1132
1133    #[test]
1134    fn overwrite_serialize_complex_type() {
1135        let original = Overwrite(vec![1, 2, 3]);
1136        let json = serde_json::to_string(&original).expect("should serialize");
1137        assert_eq!(json, r#"{"__overwrite__":[1,2,3]}"#);
1138
1139        let deserialized: Overwrite<Vec<i32>> =
1140            serde_json::from_str(&json).expect("should deserialize");
1141        assert_eq!(deserialized.0, vec![1, 2, 3]);
1142    }
1143
1144    #[test]
1145    fn overwrite_debug_format() {
1146        let ov = Overwrite(42);
1147        let debug_str = format!("{ov:?}");
1148        assert_eq!(debug_str, "Overwrite(42)");
1149    }
1150
1151    #[test]
1152    fn replace_reducer_single_value_succeeds() {
1153        let mut val = 0;
1154        ReplaceReducer::reduce(&mut val, vec![42]);
1155        assert_eq!(val, 42);
1156    }
1157
1158    #[test]
1159    fn replace_reducer_empty_values_succeeds() {
1160        let mut val = 99;
1161        ReplaceReducer::reduce(&mut val, vec![]);
1162        assert_eq!(val, 99);
1163    }
1164
1165    #[test]
1166    #[should_panic(expected = "Replace reducer: multiple writes in same superstep")]
1167    fn replace_reducer_multiple_values_panics() {
1168        let mut val = 0;
1169        ReplaceReducer::reduce(&mut val, vec![1, 2]);
1170    }
1171
1172    #[test]
1173    #[should_panic(expected = "Replace reducer: multiple writes in same superstep")]
1174    fn untracked_channel_multiple_writes_panics() {
1175        let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(0);
1176        ch.update(vec![1, 2]);
1177    }
1178
1179    // NamedBarrierChannel tests
1180    #[test]
1181    fn named_barrier_channel_not_available_initially() {
1182        let ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new_with_sources(
1183            0,
1184            ["node_a", "node_b"].into_iter().map(String::from),
1185        );
1186        assert!(!ch.is_available());
1187    }
1188
1189    #[test]
1190    fn named_barrier_channel_available_after_all_sources_write() {
1191        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1192            NamedBarrierChannel::new_with_sources(
1193                0,
1194                ["node_a", "node_b"].into_iter().map(String::from),
1195            );
1196        assert!(!ch.is_available());
1197
1198        ch.update("node_a".to_string(), vec![42]);
1199        assert!(!ch.is_available());
1200
1201        ch.update("node_b".to_string(), vec![100]);
1202        assert!(ch.is_available());
1203        assert_eq!(*ch.get(), 100); // Last write wins with ReplaceReducer
1204    }
1205
1206    #[test]
1207    fn named_barrier_channel_empty_required_sources_is_available() {
1208        let ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new(42);
1209        assert!(ch.is_available());
1210    }
1211
1212    #[test]
1213    fn named_barrier_channel_has_written_tracks_sources() {
1214        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1215            NamedBarrierChannel::new_with_sources(
1216                0,
1217                ["node_a", "node_b", "node_c"].into_iter().map(String::from),
1218            );
1219
1220        assert!(!ch.has_written("node_a"));
1221        ch.update("node_a".to_string(), vec![1]);
1222        assert!(ch.has_written("node_a"));
1223        assert!(!ch.has_written("node_b"));
1224    }
1225
1226    #[test]
1227    fn named_barrier_channel_reset_clears_seen_sources() {
1228        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1229            NamedBarrierChannel::new_with_sources(
1230                0,
1231                ["node_a", "node_b"].into_iter().map(String::from),
1232            );
1233
1234        ch.update("node_a".to_string(), vec![1]);
1235        ch.update("node_b".to_string(), vec![2]);
1236        assert!(ch.is_available());
1237
1238        ch.reset();
1239        assert!(!ch.is_available());
1240        assert!(!ch.has_written("node_a"));
1241        assert!(!ch.has_written("node_b"));
1242    }
1243
1244    #[test]
1245    fn named_barrier_channel_add_required_source() {
1246        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new(0);
1247        assert!(ch.is_available());
1248
1249        ch.add_required_source("node_a".to_string());
1250        assert!(!ch.is_available());
1251
1252        ch.update("node_a".to_string(), vec![42]);
1253        assert!(ch.is_available());
1254    }
1255
1256    #[test]
1257    #[should_panic(expected = "NamedBarrierChannel: source")]
1258    fn named_barrier_channel_unknown_source_panics() {
1259        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1260            NamedBarrierChannel::new_with_sources(0, vec!["node_a".to_string()]);
1261
1262        ch.update("unknown_node".to_string(), vec![42]);
1263    }
1264
1265    #[test]
1266    fn named_barrier_channel_checkpoint_persists_state() {
1267        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1268            NamedBarrierChannel::new_with_sources(0, vec!["node_a".to_string()]);
1269
1270        ch.update("node_a".to_string(), vec![42]);
1271
1272        let checkpoint = ch.checkpoint().expect("should have checkpoint");
1273        // Checkpoint is a tuple (value, seen_sources)
1274        assert!(checkpoint.is_array() || checkpoint.is_object());
1275
1276        let restored: NamedBarrierChannel<i32, ReplaceReducer> =
1277            NamedBarrierChannel::from_checkpoint(checkpoint).expect("should restore");
1278        assert_eq!(*restored.get(), 42);
1279        assert!(restored.has_written("node_a"));
1280    }
1281
1282    #[test]
1283    fn named_barrier_channel_generic_update_marks_all_sources_seen() {
1284        let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1285            NamedBarrierChannel::new_with_sources(0, ["node_a".to_string(), "node_b".to_string()]);
1286
1287        // Using the generic Channel trait update
1288        Channel::update(&mut ch, vec![42]);
1289        assert!(ch.is_available());
1290        assert!(ch.has_written("node_a"));
1291        assert!(ch.has_written("node_b"));
1292    }
1293
1294    // TopicChannel tests
1295    #[test]
1296    fn topic_channel_new_is_empty() {
1297        let ch: TopicChannel<String> = TopicChannel::new();
1298        assert!(ch.is_empty());
1299        assert_eq!(ch.len(), 0);
1300    }
1301
1302    #[test]
1303    fn topic_channel_default_is_empty() {
1304        let ch: TopicChannel<String> = TopicChannel::default();
1305        assert!(ch.is_empty());
1306    }
1307
1308    #[test]
1309    fn topic_channel_accumulates_messages() {
1310        let mut ch: TopicChannel<String> = TopicChannel::new();
1311
1312        ch.update(vec![vec!["hello".to_string()]]);
1313        assert_eq!(ch.len(), 1);
1314        assert_eq!(ch.get()[0], "hello");
1315
1316        ch.update(vec![vec!["world".to_string()]]);
1317        assert_eq!(ch.len(), 2);
1318        assert_eq!(ch.get()[1], "world");
1319    }
1320
1321    #[test]
1322    fn topic_channel_update_with_multiple_messages() {
1323        let mut ch: TopicChannel<i32> = TopicChannel::new();
1324
1325        ch.update(vec![vec![1, 2, 3]]);
1326        assert_eq!(ch.len(), 3);
1327        assert_eq!(ch.get(), &[1, 2, 3]);
1328    }
1329
1330    #[test]
1331    fn topic_channel_update_with_multiple_batches() {
1332        let mut ch: TopicChannel<i32> = TopicChannel::new();
1333
1334        ch.update(vec![vec![1, 2], vec![3, 4]]);
1335        assert_eq!(ch.len(), 4);
1336        assert_eq!(ch.get(), &[1, 2, 3, 4]);
1337    }
1338
1339    #[test]
1340    fn topic_channel_reset_clears_messages() {
1341        let mut ch: TopicChannel<String> = TopicChannel::new();
1342
1343        ch.update(vec![vec!["test".to_string()]]);
1344        assert_eq!(ch.len(), 1);
1345
1346        ch.reset();
1347        assert!(ch.is_empty());
1348        assert_eq!(ch.len(), 0);
1349    }
1350
1351    #[test]
1352    fn topic_channel_consume_clears_and_returns_status() {
1353        let mut ch: TopicChannel<String> = TopicChannel::new();
1354
1355        let had_content = ch.consume();
1356        assert!(!had_content); // Empty channel returns false
1357
1358        ch.update(vec![vec!["test".to_string()]]);
1359        let had_content_after = ch.consume();
1360        assert!(had_content_after); // Non-empty channel returns true
1361        assert!(ch.is_empty());
1362    }
1363
1364    #[test]
1365    fn topic_channel_iter_messages() {
1366        let mut ch: TopicChannel<i32> = TopicChannel::new();
1367
1368        ch.update(vec![vec![1, 2, 3]]);
1369
1370        let mut iter = ch.iter();
1371        assert_eq!(iter.next(), Some(&1));
1372        assert_eq!(iter.next(), Some(&2));
1373        assert_eq!(iter.next(), Some(&3));
1374        assert_eq!(iter.next(), None);
1375    }
1376
1377    #[test]
1378    fn topic_channel_checkpoint_persists_messages() {
1379        let mut ch: TopicChannel<i32> = TopicChannel::new();
1380
1381        ch.update(vec![vec![1, 2, 3]]);
1382
1383        let checkpoint = ch.checkpoint().expect("should have checkpoint");
1384        assert_eq!(checkpoint, serde_json::json!([1, 2, 3]));
1385
1386        let restored: TopicChannel<i32> =
1387            TopicChannel::from_checkpoint(checkpoint).expect("should restore");
1388        assert_eq!(restored.len(), 3);
1389        assert_eq!(restored.get(), &[1, 2, 3]);
1390    }
1391
1392    #[test]
1393    fn topic_channel_from_checkpoint_empty() {
1394        let ch: TopicChannel<i32> =
1395            TopicChannel::from_checkpoint(serde_json::json!([])).expect("should restore");
1396        assert!(ch.is_empty());
1397    }
1398
1399    // Tests for LastValueAfterFinishChannel checkpoint fix (Task 2)
1400    #[test]
1401    fn last_value_after_finish_checkpoint_saves_is_finished_state() {
1402        let mut ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1403            LastValueAfterFinishChannel::new(10);
1404        ch.update(vec![42]);
1405        ch.finish();
1406
1407        let checkpoint = ch
1408            .checkpoint()
1409            .expect("should have checkpoint when finished");
1410        // Checkpoint should be a tuple (value, is_finished)
1411        assert!(checkpoint.is_array());
1412        let arr = checkpoint.as_array().expect("should be array");
1413        assert_eq!(arr.len(), 2);
1414        assert_eq!(arr[0], serde_json::json!(42)); // value
1415        assert_eq!(arr[1], serde_json::json!(true)); // is_finished
1416    }
1417
1418    #[test]
1419    fn last_value_after_finish_from_checkpoint_restores_is_finished() {
1420        let checkpoint_data = serde_json::json!([99, true]); // (value, is_finished)
1421
1422        let restored: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1423            LastValueAfterFinishChannel::from_checkpoint(checkpoint_data)
1424                .expect("should restore from checkpoint");
1425
1426        assert_eq!(*restored.get(), 99);
1427        assert!(restored.is_available());
1428    }
1429
1430    #[test]
1431    fn last_value_after_finish_from_checkpoint_old_format_backward_compat() {
1432        // Old format: just the value, no is_finished
1433        let checkpoint_data = serde_json::json!(55);
1434
1435        let restored: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1436            LastValueAfterFinishChannel::from_checkpoint(checkpoint_data)
1437                .expect("should restore from old checkpoint format");
1438
1439        assert_eq!(*restored.get(), 55);
1440        assert!(!restored.is_available()); // Should default to not finished
1441    }
1442
1443    #[test]
1444    fn last_value_after_finish_checkpoint_round_trip() {
1445        let mut ch1: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1446            LastValueAfterFinishChannel::new(0);
1447        ch1.update(vec![123]);
1448        ch1.finish();
1449
1450        let checkpoint = ch1.checkpoint().expect("should checkpoint");
1451        let ch2: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1452            LastValueAfterFinishChannel::from_checkpoint(checkpoint).expect("should restore");
1453
1454        assert_eq!(*ch1.get(), *ch2.get());
1455        assert_eq!(ch1.is_available(), ch2.is_available());
1456    }
1457
1458    // Tests for Overwrite helper methods
1459    #[test]
1460    fn overwrite_get_returns_inner_value() {
1461        let ov = Overwrite(42);
1462        assert_eq!(*ov.get(), 42);
1463    }
1464
1465    #[test]
1466    fn overwrite_into_inner_consumes_wrapper() {
1467        let ov = Overwrite(100);
1468        assert_eq!(ov.into_inner(), 100);
1469    }
1470
1471    #[test]
1472    fn overwrite_new_creates_wrapper() {
1473        let ov = Overwrite::new(999);
1474        assert_eq!(*ov.get(), 999);
1475    }
1476
1477    // Tests for DeltaChannel replay_writes with Overwrite detection (Task 1)
1478    #[test]
1479    fn delta_channel_replay_writes_handles_empty_sequence() {
1480        let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(5, 10);
1481        ch.replay_writes(&[]);
1482        assert_eq!(*ch.get(), 5); // Value unchanged
1483    }
1484
1485    #[test]
1486    fn delta_channel_replay_writes_single_value() {
1487        let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1488        ch.replay_writes(&[42]);
1489        assert_eq!(*ch.get(), 42);
1490    }
1491
1492    #[test]
1493    fn delta_channel_replay_writes_multiple_values() {
1494        let mut ch: DeltaChannel<Vec<i32>, AppendReducer> = DeltaChannel::new(vec![], 10);
1495        ch.replay_writes(&[vec![1, 2], vec![3, 4]]);
1496        assert_eq!(*ch.get(), vec![1, 2, 3, 4]);
1497    }
1498
1499    #[test]
1500    fn delta_channel_replay_writes_resets_snapshot_counter() {
1501        let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1502        ch.update(vec![1]);
1503        assert_eq!(ch.update_count_since_snapshot, 1);
1504
1505        ch.replay_writes(&[99]);
1506        assert_eq!(ch.update_count_since_snapshot, 0); // Reset after replay
1507    }
1508
1509    #[test]
1510    fn delta_channel_replay_writes_with_replace_reducer() {
1511        let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1512        // ReplaceReducer only allows one value
1513        ch.replay_writes(&[42]);
1514        assert_eq!(*ch.get(), 42);
1515    }
1516
1517    #[test]
1518    fn delta_channel_replay_writes_detects_json_overwrite_format() {
1519        let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
1520            DeltaChannel::new(serde_json::json!(null), 10);
1521
1522        // Create values in Overwrite format (as they would appear in checkpoints)
1523        let overwrite_val = serde_json::json!({"__overwrite__": "baseline"});
1524        let normal_val1 = serde_json::json!("update1");
1525        let normal_val2 = serde_json::json!("update2");
1526
1527        ch.replay_writes(&[normal_val1, overwrite_val, normal_val2.clone()]);
1528
1529        // After detecting the overwrite, baseline should be "baseline",
1530        // then remaining values ["update1", "update2"] applied via LastWriteWinsReducer
1531        // LastWriteWinsReducer takes the last value
1532        assert_eq!(ch.get(), &normal_val2);
1533    }
1534
1535    #[test]
1536    fn delta_channel_replay_writes_overwrite_at_start() {
1537        let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
1538            DeltaChannel::new(serde_json::json!("initial"), 10);
1539
1540        let overwrite_val = serde_json::json!({"__overwrite__": "new_baseline"});
1541        let normal_val = serde_json::json!("update");
1542
1543        ch.replay_writes(&[overwrite_val, normal_val.clone()]);
1544
1545        // Overwrite sets baseline to "new_baseline", then "update" applied
1546        assert_eq!(ch.get(), &normal_val);
1547    }
1548
1549    #[test]
1550    fn delta_channel_replay_writes_overwrite_at_end() {
1551        let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
1552            DeltaChannel::new(serde_json::json!("initial"), 10);
1553
1554        let normal_val = serde_json::json!("update");
1555        let overwrite_val = serde_json::json!({"__overwrite__": "final_baseline"});
1556
1557        ch.replay_writes(&[normal_val, overwrite_val]);
1558
1559        // Overwrite at end sets baseline to "final_baseline", no remaining values
1560        assert_eq!(ch.get(), &serde_json::json!("final_baseline"));
1561    }
1562
1563    #[test]
1564    fn delta_channel_finish_forces_snapshot() {
1565        let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1566        assert!(!ch.should_snapshot());
1567
1568        ch.finish();
1569        // After finish, should_snapshot should return true
1570        assert!(ch.should_snapshot());
1571    }
1572
1573    // --- RingBufferChannel tests ---
1574
1575    #[test]
1576    fn ring_buffer_channel_new_enforces_capacity() {
1577        let ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2, 3, 4, 5], 3);
1578        assert_eq!(ch.capacity(), 3);
1579        assert_eq!(ch.len(), 3);
1580        assert_eq!(ch.get(), &vec![3, 4, 5]);
1581    }
1582
1583    #[test]
1584    fn ring_buffer_channel_new_clamps_min_capacity() {
1585        let ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2], 0);
1586        assert_eq!(ch.capacity(), 1);
1587    }
1588
1589    #[test]
1590    fn ring_buffer_channel_update_appends_and_trims() {
1591        let mut ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![], 3);
1592        assert!(ch.update(vec![vec![1, 2]]));
1593        assert_eq!(ch.get(), &vec![1, 2]);
1594
1595        assert!(ch.update(vec![vec![3, 4]]));
1596        assert_eq!(ch.get(), &vec![2, 3, 4]);
1597        assert_eq!(ch.len(), 3);
1598    }
1599
1600    #[test]
1601    fn ring_buffer_channel_update_returns_false_for_empty() {
1602        let mut ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2], 5);
1603        assert!(!ch.update(vec![vec![]]));
1604        assert_eq!(ch.get(), &vec![1, 2]);
1605    }
1606
1607    #[test]
1608    fn ring_buffer_channel_update_returns_false_for_empty_outer() {
1609        let mut ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2], 5);
1610        assert!(!ch.update(vec![]));
1611        assert_eq!(ch.get(), &vec![1, 2]);
1612    }
1613
1614    #[test]
1615    fn ring_buffer_channel_consume_always_false() {
1616        let mut ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1], 5);
1617        assert!(!ch.consume());
1618    }
1619
1620    #[test]
1621    fn ring_buffer_channel_checkpoint_roundtrip() {
1622        let ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2, 3], 10);
1623        let checkpoint = ch.checkpoint().unwrap();
1624        let restored = RingBufferChannel::<i32>::from_checkpoint(checkpoint).unwrap();
1625        assert_eq!(restored.get(), &vec![1, 2, 3]);
1626        assert_eq!(restored.capacity(), 10);
1627    }
1628
1629    #[test]
1630    fn ring_buffer_channel_from_checkpoint_legacy_format() {
1631        // Legacy format: plain array without capacity
1632        let legacy = serde_json::json!([1, 2, 3]);
1633        let ch = RingBufferChannel::<i32>::from_checkpoint(legacy).unwrap();
1634        assert_eq!(ch.get(), &vec![1, 2, 3]);
1635        assert_eq!(ch.capacity(), 1000); // default capacity
1636    }
1637
1638    #[test]
1639    fn ring_buffer_channel_from_checkpoint_clamps_capacity() {
1640        // Checkpoint with capacity 0 should be clamped to 1
1641        let checkpoint = serde_json::json!({"values": [1, 2], "capacity": 0});
1642        let ch = RingBufferChannel::<i32>::from_checkpoint(checkpoint).unwrap();
1643        assert_eq!(ch.capacity(), 1);
1644    }
1645
1646    #[test]
1647    fn ring_buffer_channel_is_empty() {
1648        let ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![], 5);
1649        assert!(ch.is_empty());
1650
1651        let ch2: RingBufferChannel<i32> = RingBufferChannel::new(vec![1], 5);
1652        assert!(!ch2.is_empty());
1653    }
1654}
1655
1656// Rust guideline compliant 2026-05-20