1use serde::de::DeserializeOwned;
7use serde::ser::SerializeStruct;
8use std::collections::HashSet;
9
10pub trait Reducer<T> {
15 fn reduce_one(current: &mut T, value: T) {
17 Self::reduce(current, vec![value]);
18 }
19
20 fn reduce(current: &mut T, values: Vec<T>);
25}
26
27#[derive(Debug)]
32pub struct ReplaceReducer;
33
34impl<T> Reducer<T> for ReplaceReducer {
35 fn reduce(current: &mut T, values: Vec<T>) {
36 assert!(
37 values.len() <= 1,
38 "Replace reducer: multiple writes in same superstep"
39 );
40 if let Some(v) = values.into_iter().next() {
41 *current = v;
42 }
43 }
44}
45
46#[derive(Debug)]
51pub struct AppendReducer;
52
53impl<T> Reducer<Vec<T>> for AppendReducer {
54 fn reduce_one(current: &mut Vec<T>, value: Vec<T>) {
55 current.extend(value);
56 }
57
58 fn reduce(current: &mut Vec<T>, values: Vec<Vec<T>>) {
59 for v in values {
60 current.extend(v);
61 }
62 }
63}
64
65#[derive(Debug)]
70pub struct AnyValueReducer;
71
72impl<T: PartialEq + Clone> Reducer<T> for AnyValueReducer {
73 fn reduce(current: &mut T, values: Vec<T>) {
74 if let Some(last) = values.last() {
75 if let Some(first) = values.first() {
77 debug_assert!(
78 values.iter().all(|v| v == first),
79 "AnyValue reducer: all values should be equal"
80 );
81 }
82 *current = last.clone();
83 }
84 }
85}
86
87#[derive(Debug)]
91pub struct LastWriteWinsReducer;
92
93impl<T> Reducer<T> for LastWriteWinsReducer {
94 fn reduce(current: &mut T, values: Vec<T>) {
95 if let Some(v) = values.into_iter().last() {
96 *current = v;
97 }
98 }
99}
100
101pub struct Overwrite<T>(pub T);
107
108impl<T: std::fmt::Debug> std::fmt::Debug for Overwrite<T> {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 f.debug_tuple("Overwrite").field(&self.0).finish()
111 }
112}
113
114impl<T> Overwrite<T> {
115 #[must_use]
117 pub const fn get(&self) -> &T {
118 &self.0
119 }
120
121 #[must_use]
123 pub fn into_inner(self) -> T {
124 self.0
125 }
126
127 #[must_use]
129 pub const fn new(value: T) -> Self {
130 Self(value)
131 }
132}
133
134impl<T: serde::Serialize> serde::Serialize for Overwrite<T> {
135 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
136 let mut s = serializer.serialize_struct("__overwrite__", 1)?;
137 s.serialize_field("__overwrite__", &self.0)?;
138 s.end()
139 }
140}
141
142impl<'de, T: serde::Deserialize<'de>> serde::Deserialize<'de> for Overwrite<T> {
143 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
144 #[derive(serde::Deserialize)]
145 struct Wrapper<T> {
146 __overwrite__: T,
147 }
148 let wrapper = Wrapper::deserialize(deserializer)?;
149 Ok(Self(wrapper.__overwrite__))
150 }
151}
152
153#[derive(Debug)]
185pub struct NamedBarrierChannel<T, R: Reducer<T>> {
186 value: T,
187 required_sources: HashSet<String>,
188 seen_sources: HashSet<String>,
189 _reducer: std::marker::PhantomData<R>,
190}
191
192impl<T, R: Reducer<T>> NamedBarrierChannel<T, R> {
193 #[must_use]
200 pub fn new_with_sources(value: T, required_sources: impl IntoIterator<Item = String>) -> Self {
201 let sources: HashSet<String> = required_sources.into_iter().collect();
202 Self {
203 value,
204 required_sources: sources,
205 seen_sources: HashSet::new(),
206 _reducer: std::marker::PhantomData,
207 }
208 }
209
210 #[must_use]
215 pub fn new(value: T) -> Self {
216 Self {
217 value,
218 required_sources: HashSet::new(),
219 seen_sources: HashSet::new(),
220 _reducer: std::marker::PhantomData,
221 }
222 }
223
224 pub fn add_required_source(&mut self, source: String) {
228 self.required_sources.insert(source);
229 }
230
231 #[must_use]
235 pub fn is_available(&self) -> bool {
236 if self.required_sources.is_empty() {
237 return true;
238 }
239 self.required_sources
240 .iter()
241 .all(|source| self.seen_sources.contains(source))
242 }
243
244 #[must_use]
246 pub const fn required_sources(&self) -> &HashSet<String> {
247 &self.required_sources
248 }
249
250 #[must_use]
252 pub const fn seen_sources(&self) -> &HashSet<String> {
253 &self.seen_sources
254 }
255
256 #[must_use]
258 pub fn has_written(&self, source: &str) -> bool {
259 self.seen_sources.contains(source)
260 }
261
262 pub fn reset(&mut self) {
266 self.seen_sources.clear();
267 }
268}
269
270impl<T, R> Channel<T> for NamedBarrierChannel<T, R>
271where
272 T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
273 R: Reducer<T> + Send + Sync + 'static,
274{
275 fn update(&mut self, values: Vec<T>) -> bool {
276 if values.is_empty() {
281 return false;
282 }
283 R::reduce(&mut self.value, values);
285 self.seen_sources = self.required_sources.clone();
287 true
288 }
289
290 fn get(&self) -> &T {
291 &self.value
292 }
293
294 fn consume(&mut self) -> bool {
295 false
296 }
297
298 fn checkpoint(&self) -> Option<serde_json::Value> {
299 serde_json::to_value(&(self.value.clone(), self.seen_sources.clone())).ok()
301 }
302
303 fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
304 where
305 Self: Sized,
306 {
307 let (parsed_value, seen_sources): (T, HashSet<String>) = serde_json::from_value(value)
308 .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
309 Ok(Self {
310 value: parsed_value,
311 required_sources: HashSet::new(),
312 seen_sources,
313 _reducer: std::marker::PhantomData,
314 })
315 }
316}
317
318impl<T, R: Reducer<T>> NamedBarrierChannel<T, R> {
319 pub fn update(&mut self, source_name: String, values: Vec<T>) -> bool {
329 assert!(
330 self.required_sources.is_empty() || self.required_sources.contains(&source_name),
331 "NamedBarrierChannel: source '{source_name}' not in required sources"
332 );
333
334 if values.is_empty() {
335 return false;
336 }
337
338 R::reduce(&mut self.value, values);
339 self.seen_sources.insert(source_name);
340 true
341 }
342}
343
344#[derive(Debug, Clone)]
376pub struct TopicChannel<T> {
377 messages: Vec<T>,
378}
379
380impl<T> TopicChannel<T> {
381 #[must_use]
383 pub const fn new() -> Self {
384 Self {
385 messages: Vec::new(),
386 }
387 }
388
389 #[must_use]
391 pub const fn len(&self) -> usize {
392 self.messages.len()
393 }
394
395 #[must_use]
397 pub const fn is_empty(&self) -> bool {
398 self.messages.is_empty()
399 }
400
401 pub fn reset(&mut self) {
406 self.messages.clear();
407 }
408
409 pub fn iter(&self) -> std::slice::Iter<'_, T> {
411 self.messages.iter()
412 }
413}
414
415impl<T> Default for TopicChannel<T> {
416 fn default() -> Self {
417 Self::new()
418 }
419}
420
421impl<'a, T> IntoIterator for &'a TopicChannel<T> {
422 type Item = &'a T;
423 type IntoIter = std::slice::Iter<'a, T>;
424
425 fn into_iter(self) -> Self::IntoIter {
426 self.iter()
427 }
428}
429
430impl<T> Channel<Vec<T>> for TopicChannel<T>
431where
432 T: Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
433{
434 fn update(&mut self, values: Vec<Vec<T>>) -> bool {
435 if values.is_empty() {
436 return false;
437 }
438 for batch in values {
440 self.messages.extend(batch);
441 }
442 true
443 }
444
445 fn get(&self) -> &Vec<T> {
446 &self.messages
447 }
448
449 fn consume(&mut self) -> bool {
450 let was_empty = self.messages.is_empty();
451 self.messages.clear();
452 !was_empty
453 }
454
455 fn checkpoint(&self) -> Option<serde_json::Value> {
456 serde_json::to_value(&self.messages).ok()
457 }
458
459 fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
460 where
461 Self: Sized,
462 {
463 let messages: Vec<T> = serde_json::from_value(value)
464 .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
465 Ok(Self { messages })
466 }
467}
468
469pub trait Channel<T>: Send + Sync + 'static {
474 fn update(&mut self, values: Vec<T>) -> bool;
476
477 fn get(&self) -> &T;
479
480 fn consume(&mut self) -> bool;
482
483 fn checkpoint(&self) -> Option<serde_json::Value>;
485
486 fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
493 where
494 Self: Sized;
495}
496
497#[derive(Debug)]
503pub struct UntrackedChannel<T, R: Reducer<T>> {
504 value: T,
505 _reducer: std::marker::PhantomData<R>,
506}
507
508impl<T, R: Reducer<T>> UntrackedChannel<T, R> {
509 #[must_use]
511 pub const fn new(value: T) -> Self {
512 Self {
513 value,
514 _reducer: std::marker::PhantomData,
515 }
516 }
517}
518
519impl<T: Default + Send + Sync + 'static, R: Reducer<T> + Send + Sync + 'static> Channel<T>
520 for UntrackedChannel<T, R>
521{
522 fn update(&mut self, values: Vec<T>) -> bool {
523 if values.is_empty() {
524 return false;
525 }
526 R::reduce(&mut self.value, values);
527 true
528 }
529
530 fn get(&self) -> &T {
531 &self.value
532 }
533
534 fn consume(&mut self) -> bool {
535 false
536 }
537
538 fn checkpoint(&self) -> Option<serde_json::Value> {
539 None
540 }
541
542 fn from_checkpoint(_value: serde_json::Value) -> Result<Self, String> {
543 Ok(Self::new(T::default()))
544 }
545}
546
547#[derive(Debug)]
552pub struct EphemeralChannel<T, R: Reducer<T>> {
553 value: T,
554 consumed: bool,
555 _reducer: std::marker::PhantomData<R>,
556}
557
558impl<T, R: Reducer<T>> EphemeralChannel<T, R> {
559 #[must_use]
561 pub const fn new(value: T) -> Self {
562 Self {
563 value,
564 consumed: false,
565 _reducer: std::marker::PhantomData,
566 }
567 }
568}
569
570impl<T: Default + Send + Sync + 'static, R: Reducer<T> + Send + Sync + 'static> Channel<T>
571 for EphemeralChannel<T, R>
572{
573 fn update(&mut self, values: Vec<T>) -> bool {
574 if values.is_empty() {
575 return false;
576 }
577 self.consumed = false;
578 R::reduce(&mut self.value, values);
579 true
580 }
581
582 fn get(&self) -> &T {
583 &self.value
584 }
585
586 fn consume(&mut self) -> bool {
587 let was_consumed = self.consumed;
588 self.consumed = true;
589 was_consumed
590 }
591
592 fn checkpoint(&self) -> Option<serde_json::Value> {
593 None
594 }
595
596 fn from_checkpoint(_value: serde_json::Value) -> Result<Self, String> {
597 Ok(Self::new(T::default()))
598 }
599}
600
601#[derive(Debug)]
606pub struct LastValueAfterFinishChannel<T, R: Reducer<T>> {
607 value: T,
608 finished_value: Option<T>,
609 is_finished: bool,
610 _reducer: std::marker::PhantomData<R>,
611}
612
613impl<T, R: Reducer<T>> LastValueAfterFinishChannel<T, R> {
614 #[must_use]
616 pub const fn new(value: T) -> Self {
617 Self {
618 value,
619 finished_value: None,
620 is_finished: false,
621 _reducer: std::marker::PhantomData,
622 }
623 }
624
625 pub const fn finish(&mut self) {
627 self.is_finished = true;
628 }
629
630 #[must_use]
632 pub const fn is_available(&self) -> bool {
633 self.is_finished
634 }
635}
636
637impl<T, R> Channel<T> for LastValueAfterFinishChannel<T, R>
638where
639 T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
640 R: Reducer<T> + Send + Sync + 'static,
641{
642 fn update(&mut self, values: Vec<T>) -> bool {
643 if values.is_empty() {
644 return false;
645 }
646 R::reduce(&mut self.value, values);
647 if self.is_finished {
648 self.finished_value = Some(self.value.clone());
649 }
650 true
651 }
652
653 fn get(&self) -> &T {
654 if self.is_finished {
655 self.finished_value.as_ref().unwrap_or(&self.value)
656 } else {
657 &self.value
658 }
659 }
660
661 fn consume(&mut self) -> bool {
662 false
663 }
664
665 fn checkpoint(&self) -> Option<serde_json::Value> {
666 if self.is_finished {
669 serde_json::to_value(&(self.value.clone(), self.is_finished)).ok()
670 } else {
671 None
672 }
673 }
674
675 fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
676 if let Ok((parsed_value, is_finished)) = serde_json::from_value::<(T, bool)>(value.clone())
678 {
679 let finished_value = is_finished.then(|| parsed_value.clone());
680 return Ok(Self {
681 value: parsed_value,
682 finished_value,
683 is_finished,
684 _reducer: std::marker::PhantomData,
685 });
686 }
687
688 let parsed_value: T = serde_json::from_value(value)
690 .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
691 Ok(Self {
692 value: parsed_value,
693 finished_value: None,
694 is_finished: false,
695 _reducer: std::marker::PhantomData,
696 })
697 }
698}
699
700#[derive(Debug)]
707pub struct DeltaChannel<T, R: Reducer<T>> {
708 value: T,
709 snapshot_frequency: usize,
711 update_count_since_snapshot: usize,
712 _reducer: std::marker::PhantomData<R>,
713}
714
715impl<T, R: Reducer<T>> DeltaChannel<T, R> {
716 #[must_use]
720 pub fn new(value: T, snapshot_frequency: usize) -> Self {
721 Self {
722 value,
723 snapshot_frequency: snapshot_frequency.max(1),
724 update_count_since_snapshot: 0,
725 _reducer: std::marker::PhantomData,
726 }
727 }
728
729 pub fn replay_writes(&mut self, values: &[T])
735 where
736 T: Clone + serde::Serialize + DeserializeOwned,
737 {
738 if values.is_empty() {
739 return;
740 }
741
742 let mut base = self.value.clone();
746 let mut start_idx = 0;
747
748 for (i, v) in values.iter().enumerate() {
752 if let Ok(json) = serde_json::to_value(v)
756 && let Some(obj) = json.as_object()
757 && obj.contains_key("__overwrite__")
758 {
759 if let Ok(inner) = serde_json::from_value::<T>(
761 obj.get("__overwrite__").cloned().unwrap_or_default(),
762 ) {
763 base = inner;
764 start_idx = i + 1;
765 }
766 }
767 }
768
769 let remaining: Vec<T> = values[start_idx..].to_vec();
771 if !remaining.is_empty() {
772 R::reduce(&mut base, remaining);
773 }
774 self.value = base;
775 self.update_count_since_snapshot = 0;
776 }
777
778 #[must_use]
780 pub const fn should_snapshot(&self) -> bool {
781 self.update_count_since_snapshot >= self.snapshot_frequency
782 }
783
784 pub const fn finish(&mut self) {
788 self.update_count_since_snapshot = self.snapshot_frequency;
789 }
790}
791
792impl<T, R> Channel<T> for DeltaChannel<T, R>
793where
794 T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
795 R: Reducer<T> + Send + Sync + 'static,
796{
797 fn update(&mut self, values: Vec<T>) -> bool {
798 if values.is_empty() {
799 return false;
800 }
801 R::reduce(&mut self.value, values);
802 self.update_count_since_snapshot += 1;
803 true
804 }
805
806 fn get(&self) -> &T {
807 &self.value
808 }
809
810 fn consume(&mut self) -> bool {
811 false
812 }
813
814 fn checkpoint(&self) -> Option<serde_json::Value> {
815 serde_json::to_value(&self.value).ok()
816 }
817
818 fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
819 let value: T = serde_json::from_value(value)
820 .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
821 Ok(Self {
822 value,
823 snapshot_frequency: 10,
824 update_count_since_snapshot: 0,
825 _reducer: std::marker::PhantomData,
826 })
827 }
828}
829
830#[derive(Clone, Debug)]
836pub enum DeltaBlob<T>
837where
838 T: Clone + serde::Serialize + serde::de::DeserializeOwned,
839{
840 Missing,
842 Snapshot(T),
844}
845
846#[derive(Clone, Debug)]
863pub struct RingBufferChannel<T> {
864 values: Vec<T>,
866 capacity: usize,
868}
869
870impl<T> RingBufferChannel<T> {
871 #[must_use]
876 pub fn new(values: Vec<T>, capacity: usize) -> Self {
877 let mut channel = Self {
878 values,
879 capacity: capacity.max(1),
880 };
881 channel.trim_to_capacity();
882 channel
883 }
884
885 #[must_use]
887 pub const fn capacity(&self) -> usize {
888 self.capacity
889 }
890
891 #[must_use]
893 pub fn len(&self) -> usize {
894 self.values.len()
895 }
896
897 #[must_use]
899 pub fn is_empty(&self) -> bool {
900 self.values.is_empty()
901 }
902
903 fn trim_to_capacity(&mut self) {
907 if self.values.len() > self.capacity {
908 let excess = self.values.len() - self.capacity;
909 self.values.drain(..excess);
910 }
911 }
912}
913
914impl<T: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static> Channel<Vec<T>>
915 for RingBufferChannel<T>
916{
917 fn update(&mut self, values: Vec<Vec<T>>) -> bool {
918 let had_non_empty = values.iter().any(|v| !v.is_empty());
919 for v in values {
920 self.values.extend(v);
921 }
922 self.trim_to_capacity();
923 had_non_empty
924 }
925
926 fn get(&self) -> &Vec<T> {
927 &self.values
928 }
929
930 fn consume(&mut self) -> bool {
931 false
932 }
933
934 fn checkpoint(&self) -> Option<serde_json::Value> {
935 serde_json::to_value(serde_json::json!({
936 "values": &self.values,
937 "capacity": self.capacity,
938 }))
939 .ok()
940 }
941
942 fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
943 if let Some(obj) = value.as_object() {
945 let values: Vec<T> =
946 serde_json::from_value(obj.get("values").cloned().unwrap_or_default())
947 .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
948 let capacity = obj
949 .get("capacity")
950 .and_then(serde_json::Value::as_u64)
951 .map_or(1000, |c| usize::try_from(c).unwrap_or(1000))
952 .max(1); Ok(Self { values, capacity })
954 } else {
955 let values: Vec<T> = serde_json::from_value(value)
957 .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
958 Ok(Self {
959 values,
960 capacity: 1000,
961 })
962 }
963 }
964}
965
966#[derive(Clone, Debug)]
971pub struct RemoveMessage {
972 pub id: String,
974}
975
976#[cfg(test)]
977mod tests {
978 use super::*;
979
980 #[test]
981 fn untracked_channel_update_returns_true_on_change() {
982 let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(0);
983 assert!(!ch.update(vec![]));
984 assert!(ch.update(vec![42]));
985 assert_eq!(*ch.get(), 42);
986 }
987
988 #[test]
989 fn untracked_channel_consume_always_false() {
990 let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(1);
991 assert!(!ch.consume());
992 }
993
994 #[test]
995 fn untracked_channel_checkpoint_is_none() {
996 let ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(5);
997 assert!(ch.checkpoint().is_none());
998 }
999
1000 #[test]
1001 fn untracked_channel_from_checkpoint_uses_default() {
1002 let ch: UntrackedChannel<i32, ReplaceReducer> =
1003 UntrackedChannel::from_checkpoint(serde_json::json!(99)).expect("should succeed");
1004 assert_eq!(*ch.get(), 0);
1005 }
1006
1007 #[test]
1008 fn ephemeral_channel_consume_tracks_state() {
1009 let mut ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(0);
1010 assert!(!ch.consume()); assert!(ch.consume()); }
1013
1014 #[test]
1015 fn ephemeral_channel_update_resets_consumed() {
1016 let mut ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(0);
1017 assert!(!ch.consume());
1018 assert!(ch.update(vec![7]));
1019 assert!(!ch.consume()); }
1021
1022 #[test]
1023 fn ephemeral_channel_checkpoint_is_none() {
1024 let ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(3);
1025 assert!(ch.checkpoint().is_none());
1026 }
1027
1028 #[test]
1029 fn last_value_after_finish_channel_not_available_before_finish() {
1030 let ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1031 LastValueAfterFinishChannel::new(0);
1032 assert!(!ch.is_available());
1033 }
1034
1035 #[test]
1036 fn last_value_after_finish_channel_available_after_finish() {
1037 let mut ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1038 LastValueAfterFinishChannel::new(0);
1039 ch.finish();
1040 assert!(ch.is_available());
1041 }
1042
1043 #[test]
1044 fn last_value_after_finish_channel_checkpoint_only_if_finished() {
1045 let ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1046 LastValueAfterFinishChannel::new(5);
1047 assert!(ch.checkpoint().is_none());
1048
1049 let mut ch2: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1050 LastValueAfterFinishChannel::new(5);
1051 ch2.finish();
1052 assert!(ch2.checkpoint().is_some());
1053 }
1054
1055 #[test]
1056 fn delta_channel_snapshot_frequency_clamped_to_one() {
1057 let ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 0);
1058 assert_eq!(ch.snapshot_frequency, 1);
1059 }
1060
1061 #[test]
1062 fn delta_channel_replay_writes_restores_state() {
1063 let mut ch: DeltaChannel<Vec<i32>, AppendReducer> = DeltaChannel::new(vec![], 10);
1064 ch.replay_writes(&[vec![1, 2], vec![3, 4]]);
1065 assert_eq!(*ch.get(), vec![1, 2, 3, 4]);
1066 assert_eq!(ch.update_count_since_snapshot, 0);
1067 }
1068
1069 #[test]
1070 fn delta_channel_checkpoint_returns_snapshot() {
1071 let ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(42, 5);
1072 let cp = ch.checkpoint().expect("should have checkpoint");
1073 assert_eq!(cp, serde_json::json!(42));
1074 }
1075
1076 #[test]
1077 fn delta_channel_should_snapshot() {
1078 let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 2);
1079 assert!(!ch.should_snapshot());
1080 ch.update(vec![1]);
1081 assert!(!ch.should_snapshot());
1082 ch.update(vec![2]);
1083 assert!(ch.should_snapshot());
1084 }
1085
1086 #[test]
1087 fn delta_blob_missing_variant_exists() {
1088 let blob: DeltaBlob<i32> = DeltaBlob::Missing;
1089 assert!(matches!(blob, DeltaBlob::Missing));
1090 }
1091
1092 #[test]
1093 fn delta_blob_snapshot_holds_value() {
1094 let blob: DeltaBlob<i32> = DeltaBlob::Snapshot(42);
1095 assert!(matches!(blob, DeltaBlob::Snapshot(_)));
1096 }
1097
1098 #[test]
1099 fn delta_blob_clone() {
1100 let blob: DeltaBlob<String> = DeltaBlob::Snapshot("hello".to_string());
1101 let cloned = blob.clone();
1102 if let DeltaBlob::Snapshot(v) = cloned {
1103 assert_eq!(v, "hello");
1104 } else {
1105 panic!("expected Snapshot variant");
1106 }
1107 if let DeltaBlob::Snapshot(v) = blob {
1109 assert_eq!(v, "hello");
1110 } else {
1111 panic!("expected Snapshot variant");
1112 }
1113 }
1114
1115 #[test]
1116 fn remove_message_holds_id() {
1117 let rm = RemoveMessage {
1118 id: "msg-123".to_string(),
1119 };
1120 assert_eq!(rm.id, "msg-123");
1121 }
1122
1123 #[test]
1124 fn overwrite_serialize_round_trip() {
1125 let original = Overwrite(42);
1126 let json = serde_json::to_string(&original).expect("should serialize");
1127 assert_eq!(json, r#"{"__overwrite__":42}"#);
1128
1129 let deserialized: Overwrite<i32> = serde_json::from_str(&json).expect("should deserialize");
1130 assert_eq!(deserialized.0, 42);
1131 }
1132
1133 #[test]
1134 fn overwrite_serialize_complex_type() {
1135 let original = Overwrite(vec![1, 2, 3]);
1136 let json = serde_json::to_string(&original).expect("should serialize");
1137 assert_eq!(json, r#"{"__overwrite__":[1,2,3]}"#);
1138
1139 let deserialized: Overwrite<Vec<i32>> =
1140 serde_json::from_str(&json).expect("should deserialize");
1141 assert_eq!(deserialized.0, vec![1, 2, 3]);
1142 }
1143
1144 #[test]
1145 fn overwrite_debug_format() {
1146 let ov = Overwrite(42);
1147 let debug_str = format!("{ov:?}");
1148 assert_eq!(debug_str, "Overwrite(42)");
1149 }
1150
1151 #[test]
1152 fn replace_reducer_single_value_succeeds() {
1153 let mut val = 0;
1154 ReplaceReducer::reduce(&mut val, vec![42]);
1155 assert_eq!(val, 42);
1156 }
1157
1158 #[test]
1159 fn replace_reducer_empty_values_succeeds() {
1160 let mut val = 99;
1161 ReplaceReducer::reduce(&mut val, vec![]);
1162 assert_eq!(val, 99);
1163 }
1164
1165 #[test]
1166 #[should_panic(expected = "Replace reducer: multiple writes in same superstep")]
1167 fn replace_reducer_multiple_values_panics() {
1168 let mut val = 0;
1169 ReplaceReducer::reduce(&mut val, vec![1, 2]);
1170 }
1171
1172 #[test]
1173 #[should_panic(expected = "Replace reducer: multiple writes in same superstep")]
1174 fn untracked_channel_multiple_writes_panics() {
1175 let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(0);
1176 ch.update(vec![1, 2]);
1177 }
1178
1179 #[test]
1181 fn named_barrier_channel_not_available_initially() {
1182 let ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new_with_sources(
1183 0,
1184 ["node_a", "node_b"].into_iter().map(String::from),
1185 );
1186 assert!(!ch.is_available());
1187 }
1188
1189 #[test]
1190 fn named_barrier_channel_available_after_all_sources_write() {
1191 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1192 NamedBarrierChannel::new_with_sources(
1193 0,
1194 ["node_a", "node_b"].into_iter().map(String::from),
1195 );
1196 assert!(!ch.is_available());
1197
1198 ch.update("node_a".to_string(), vec![42]);
1199 assert!(!ch.is_available());
1200
1201 ch.update("node_b".to_string(), vec![100]);
1202 assert!(ch.is_available());
1203 assert_eq!(*ch.get(), 100); }
1205
1206 #[test]
1207 fn named_barrier_channel_empty_required_sources_is_available() {
1208 let ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new(42);
1209 assert!(ch.is_available());
1210 }
1211
1212 #[test]
1213 fn named_barrier_channel_has_written_tracks_sources() {
1214 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1215 NamedBarrierChannel::new_with_sources(
1216 0,
1217 ["node_a", "node_b", "node_c"].into_iter().map(String::from),
1218 );
1219
1220 assert!(!ch.has_written("node_a"));
1221 ch.update("node_a".to_string(), vec![1]);
1222 assert!(ch.has_written("node_a"));
1223 assert!(!ch.has_written("node_b"));
1224 }
1225
1226 #[test]
1227 fn named_barrier_channel_reset_clears_seen_sources() {
1228 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1229 NamedBarrierChannel::new_with_sources(
1230 0,
1231 ["node_a", "node_b"].into_iter().map(String::from),
1232 );
1233
1234 ch.update("node_a".to_string(), vec![1]);
1235 ch.update("node_b".to_string(), vec![2]);
1236 assert!(ch.is_available());
1237
1238 ch.reset();
1239 assert!(!ch.is_available());
1240 assert!(!ch.has_written("node_a"));
1241 assert!(!ch.has_written("node_b"));
1242 }
1243
1244 #[test]
1245 fn named_barrier_channel_add_required_source() {
1246 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new(0);
1247 assert!(ch.is_available());
1248
1249 ch.add_required_source("node_a".to_string());
1250 assert!(!ch.is_available());
1251
1252 ch.update("node_a".to_string(), vec![42]);
1253 assert!(ch.is_available());
1254 }
1255
1256 #[test]
1257 #[should_panic(expected = "NamedBarrierChannel: source")]
1258 fn named_barrier_channel_unknown_source_panics() {
1259 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1260 NamedBarrierChannel::new_with_sources(0, vec!["node_a".to_string()]);
1261
1262 ch.update("unknown_node".to_string(), vec![42]);
1263 }
1264
1265 #[test]
1266 fn named_barrier_channel_checkpoint_persists_state() {
1267 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1268 NamedBarrierChannel::new_with_sources(0, vec!["node_a".to_string()]);
1269
1270 ch.update("node_a".to_string(), vec![42]);
1271
1272 let checkpoint = ch.checkpoint().expect("should have checkpoint");
1273 assert!(checkpoint.is_array() || checkpoint.is_object());
1275
1276 let restored: NamedBarrierChannel<i32, ReplaceReducer> =
1277 NamedBarrierChannel::from_checkpoint(checkpoint).expect("should restore");
1278 assert_eq!(*restored.get(), 42);
1279 assert!(restored.has_written("node_a"));
1280 }
1281
1282 #[test]
1283 fn named_barrier_channel_generic_update_marks_all_sources_seen() {
1284 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1285 NamedBarrierChannel::new_with_sources(0, ["node_a".to_string(), "node_b".to_string()]);
1286
1287 Channel::update(&mut ch, vec![42]);
1289 assert!(ch.is_available());
1290 assert!(ch.has_written("node_a"));
1291 assert!(ch.has_written("node_b"));
1292 }
1293
1294 #[test]
1296 fn topic_channel_new_is_empty() {
1297 let ch: TopicChannel<String> = TopicChannel::new();
1298 assert!(ch.is_empty());
1299 assert_eq!(ch.len(), 0);
1300 }
1301
1302 #[test]
1303 fn topic_channel_default_is_empty() {
1304 let ch: TopicChannel<String> = TopicChannel::default();
1305 assert!(ch.is_empty());
1306 }
1307
1308 #[test]
1309 fn topic_channel_accumulates_messages() {
1310 let mut ch: TopicChannel<String> = TopicChannel::new();
1311
1312 ch.update(vec![vec!["hello".to_string()]]);
1313 assert_eq!(ch.len(), 1);
1314 assert_eq!(ch.get()[0], "hello");
1315
1316 ch.update(vec![vec!["world".to_string()]]);
1317 assert_eq!(ch.len(), 2);
1318 assert_eq!(ch.get()[1], "world");
1319 }
1320
1321 #[test]
1322 fn topic_channel_update_with_multiple_messages() {
1323 let mut ch: TopicChannel<i32> = TopicChannel::new();
1324
1325 ch.update(vec![vec![1, 2, 3]]);
1326 assert_eq!(ch.len(), 3);
1327 assert_eq!(ch.get(), &[1, 2, 3]);
1328 }
1329
1330 #[test]
1331 fn topic_channel_update_with_multiple_batches() {
1332 let mut ch: TopicChannel<i32> = TopicChannel::new();
1333
1334 ch.update(vec![vec![1, 2], vec![3, 4]]);
1335 assert_eq!(ch.len(), 4);
1336 assert_eq!(ch.get(), &[1, 2, 3, 4]);
1337 }
1338
1339 #[test]
1340 fn topic_channel_reset_clears_messages() {
1341 let mut ch: TopicChannel<String> = TopicChannel::new();
1342
1343 ch.update(vec![vec!["test".to_string()]]);
1344 assert_eq!(ch.len(), 1);
1345
1346 ch.reset();
1347 assert!(ch.is_empty());
1348 assert_eq!(ch.len(), 0);
1349 }
1350
1351 #[test]
1352 fn topic_channel_consume_clears_and_returns_status() {
1353 let mut ch: TopicChannel<String> = TopicChannel::new();
1354
1355 let had_content = ch.consume();
1356 assert!(!had_content); ch.update(vec![vec!["test".to_string()]]);
1359 let had_content_after = ch.consume();
1360 assert!(had_content_after); assert!(ch.is_empty());
1362 }
1363
1364 #[test]
1365 fn topic_channel_iter_messages() {
1366 let mut ch: TopicChannel<i32> = TopicChannel::new();
1367
1368 ch.update(vec![vec![1, 2, 3]]);
1369
1370 let mut iter = ch.iter();
1371 assert_eq!(iter.next(), Some(&1));
1372 assert_eq!(iter.next(), Some(&2));
1373 assert_eq!(iter.next(), Some(&3));
1374 assert_eq!(iter.next(), None);
1375 }
1376
1377 #[test]
1378 fn topic_channel_checkpoint_persists_messages() {
1379 let mut ch: TopicChannel<i32> = TopicChannel::new();
1380
1381 ch.update(vec![vec![1, 2, 3]]);
1382
1383 let checkpoint = ch.checkpoint().expect("should have checkpoint");
1384 assert_eq!(checkpoint, serde_json::json!([1, 2, 3]));
1385
1386 let restored: TopicChannel<i32> =
1387 TopicChannel::from_checkpoint(checkpoint).expect("should restore");
1388 assert_eq!(restored.len(), 3);
1389 assert_eq!(restored.get(), &[1, 2, 3]);
1390 }
1391
1392 #[test]
1393 fn topic_channel_from_checkpoint_empty() {
1394 let ch: TopicChannel<i32> =
1395 TopicChannel::from_checkpoint(serde_json::json!([])).expect("should restore");
1396 assert!(ch.is_empty());
1397 }
1398
1399 #[test]
1401 fn last_value_after_finish_checkpoint_saves_is_finished_state() {
1402 let mut ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1403 LastValueAfterFinishChannel::new(10);
1404 ch.update(vec![42]);
1405 ch.finish();
1406
1407 let checkpoint = ch
1408 .checkpoint()
1409 .expect("should have checkpoint when finished");
1410 assert!(checkpoint.is_array());
1412 let arr = checkpoint.as_array().expect("should be array");
1413 assert_eq!(arr.len(), 2);
1414 assert_eq!(arr[0], serde_json::json!(42)); assert_eq!(arr[1], serde_json::json!(true)); }
1417
1418 #[test]
1419 fn last_value_after_finish_from_checkpoint_restores_is_finished() {
1420 let checkpoint_data = serde_json::json!([99, true]); let restored: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1423 LastValueAfterFinishChannel::from_checkpoint(checkpoint_data)
1424 .expect("should restore from checkpoint");
1425
1426 assert_eq!(*restored.get(), 99);
1427 assert!(restored.is_available());
1428 }
1429
1430 #[test]
1431 fn last_value_after_finish_from_checkpoint_old_format_backward_compat() {
1432 let checkpoint_data = serde_json::json!(55);
1434
1435 let restored: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1436 LastValueAfterFinishChannel::from_checkpoint(checkpoint_data)
1437 .expect("should restore from old checkpoint format");
1438
1439 assert_eq!(*restored.get(), 55);
1440 assert!(!restored.is_available()); }
1442
1443 #[test]
1444 fn last_value_after_finish_checkpoint_round_trip() {
1445 let mut ch1: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1446 LastValueAfterFinishChannel::new(0);
1447 ch1.update(vec![123]);
1448 ch1.finish();
1449
1450 let checkpoint = ch1.checkpoint().expect("should checkpoint");
1451 let ch2: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1452 LastValueAfterFinishChannel::from_checkpoint(checkpoint).expect("should restore");
1453
1454 assert_eq!(*ch1.get(), *ch2.get());
1455 assert_eq!(ch1.is_available(), ch2.is_available());
1456 }
1457
1458 #[test]
1460 fn overwrite_get_returns_inner_value() {
1461 let ov = Overwrite(42);
1462 assert_eq!(*ov.get(), 42);
1463 }
1464
1465 #[test]
1466 fn overwrite_into_inner_consumes_wrapper() {
1467 let ov = Overwrite(100);
1468 assert_eq!(ov.into_inner(), 100);
1469 }
1470
1471 #[test]
1472 fn overwrite_new_creates_wrapper() {
1473 let ov = Overwrite::new(999);
1474 assert_eq!(*ov.get(), 999);
1475 }
1476
1477 #[test]
1479 fn delta_channel_replay_writes_handles_empty_sequence() {
1480 let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(5, 10);
1481 ch.replay_writes(&[]);
1482 assert_eq!(*ch.get(), 5); }
1484
1485 #[test]
1486 fn delta_channel_replay_writes_single_value() {
1487 let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1488 ch.replay_writes(&[42]);
1489 assert_eq!(*ch.get(), 42);
1490 }
1491
1492 #[test]
1493 fn delta_channel_replay_writes_multiple_values() {
1494 let mut ch: DeltaChannel<Vec<i32>, AppendReducer> = DeltaChannel::new(vec![], 10);
1495 ch.replay_writes(&[vec![1, 2], vec![3, 4]]);
1496 assert_eq!(*ch.get(), vec![1, 2, 3, 4]);
1497 }
1498
1499 #[test]
1500 fn delta_channel_replay_writes_resets_snapshot_counter() {
1501 let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1502 ch.update(vec![1]);
1503 assert_eq!(ch.update_count_since_snapshot, 1);
1504
1505 ch.replay_writes(&[99]);
1506 assert_eq!(ch.update_count_since_snapshot, 0); }
1508
1509 #[test]
1510 fn delta_channel_replay_writes_with_replace_reducer() {
1511 let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1512 ch.replay_writes(&[42]);
1514 assert_eq!(*ch.get(), 42);
1515 }
1516
1517 #[test]
1518 fn delta_channel_replay_writes_detects_json_overwrite_format() {
1519 let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
1520 DeltaChannel::new(serde_json::json!(null), 10);
1521
1522 let overwrite_val = serde_json::json!({"__overwrite__": "baseline"});
1524 let normal_val1 = serde_json::json!("update1");
1525 let normal_val2 = serde_json::json!("update2");
1526
1527 ch.replay_writes(&[normal_val1, overwrite_val, normal_val2.clone()]);
1528
1529 assert_eq!(ch.get(), &normal_val2);
1533 }
1534
1535 #[test]
1536 fn delta_channel_replay_writes_overwrite_at_start() {
1537 let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
1538 DeltaChannel::new(serde_json::json!("initial"), 10);
1539
1540 let overwrite_val = serde_json::json!({"__overwrite__": "new_baseline"});
1541 let normal_val = serde_json::json!("update");
1542
1543 ch.replay_writes(&[overwrite_val, normal_val.clone()]);
1544
1545 assert_eq!(ch.get(), &normal_val);
1547 }
1548
1549 #[test]
1550 fn delta_channel_replay_writes_overwrite_at_end() {
1551 let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
1552 DeltaChannel::new(serde_json::json!("initial"), 10);
1553
1554 let normal_val = serde_json::json!("update");
1555 let overwrite_val = serde_json::json!({"__overwrite__": "final_baseline"});
1556
1557 ch.replay_writes(&[normal_val, overwrite_val]);
1558
1559 assert_eq!(ch.get(), &serde_json::json!("final_baseline"));
1561 }
1562
1563 #[test]
1564 fn delta_channel_finish_forces_snapshot() {
1565 let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1566 assert!(!ch.should_snapshot());
1567
1568 ch.finish();
1569 assert!(ch.should_snapshot());
1571 }
1572
1573 #[test]
1576 fn ring_buffer_channel_new_enforces_capacity() {
1577 let ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2, 3, 4, 5], 3);
1578 assert_eq!(ch.capacity(), 3);
1579 assert_eq!(ch.len(), 3);
1580 assert_eq!(ch.get(), &vec![3, 4, 5]);
1581 }
1582
1583 #[test]
1584 fn ring_buffer_channel_new_clamps_min_capacity() {
1585 let ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2], 0);
1586 assert_eq!(ch.capacity(), 1);
1587 }
1588
1589 #[test]
1590 fn ring_buffer_channel_update_appends_and_trims() {
1591 let mut ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![], 3);
1592 assert!(ch.update(vec![vec![1, 2]]));
1593 assert_eq!(ch.get(), &vec![1, 2]);
1594
1595 assert!(ch.update(vec![vec![3, 4]]));
1596 assert_eq!(ch.get(), &vec![2, 3, 4]);
1597 assert_eq!(ch.len(), 3);
1598 }
1599
1600 #[test]
1601 fn ring_buffer_channel_update_returns_false_for_empty() {
1602 let mut ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2], 5);
1603 assert!(!ch.update(vec![vec![]]));
1604 assert_eq!(ch.get(), &vec![1, 2]);
1605 }
1606
1607 #[test]
1608 fn ring_buffer_channel_update_returns_false_for_empty_outer() {
1609 let mut ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2], 5);
1610 assert!(!ch.update(vec![]));
1611 assert_eq!(ch.get(), &vec![1, 2]);
1612 }
1613
1614 #[test]
1615 fn ring_buffer_channel_consume_always_false() {
1616 let mut ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1], 5);
1617 assert!(!ch.consume());
1618 }
1619
1620 #[test]
1621 fn ring_buffer_channel_checkpoint_roundtrip() {
1622 let ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2, 3], 10);
1623 let checkpoint = ch.checkpoint().unwrap();
1624 let restored = RingBufferChannel::<i32>::from_checkpoint(checkpoint).unwrap();
1625 assert_eq!(restored.get(), &vec![1, 2, 3]);
1626 assert_eq!(restored.capacity(), 10);
1627 }
1628
1629 #[test]
1630 fn ring_buffer_channel_from_checkpoint_legacy_format() {
1631 let legacy = serde_json::json!([1, 2, 3]);
1633 let ch = RingBufferChannel::<i32>::from_checkpoint(legacy).unwrap();
1634 assert_eq!(ch.get(), &vec![1, 2, 3]);
1635 assert_eq!(ch.capacity(), 1000); }
1637
1638 #[test]
1639 fn ring_buffer_channel_from_checkpoint_clamps_capacity() {
1640 let checkpoint = serde_json::json!({"values": [1, 2], "capacity": 0});
1642 let ch = RingBufferChannel::<i32>::from_checkpoint(checkpoint).unwrap();
1643 assert_eq!(ch.capacity(), 1);
1644 }
1645
1646 #[test]
1647 fn ring_buffer_channel_is_empty() {
1648 let ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![], 5);
1649 assert!(ch.is_empty());
1650
1651 let ch2: RingBufferChannel<i32> = RingBufferChannel::new(vec![1], 5);
1652 assert!(!ch2.is_empty());
1653 }
1654}
1655
1656