1use serde::de::DeserializeOwned;
7use serde::ser::SerializeStruct;
8use std::collections::HashSet;
9
10pub trait Reducer<T> {
15 fn reduce_one(current: &mut T, value: T) {
17 Self::reduce(current, vec![value]);
18 }
19
20 fn reduce(current: &mut T, values: Vec<T>);
25}
26
27#[derive(Debug)]
32pub struct ReplaceReducer;
33
34impl<T> Reducer<T> for ReplaceReducer {
35 fn reduce(current: &mut T, values: Vec<T>) {
36 assert!(
37 values.len() <= 1,
38 "Replace reducer: multiple writes in same superstep"
39 );
40 if let Some(v) = values.into_iter().next() {
41 *current = v;
42 }
43 }
44}
45
46#[derive(Debug)]
51pub struct AppendReducer;
52
53impl<T> Reducer<Vec<T>> for AppendReducer {
54 fn reduce_one(current: &mut Vec<T>, value: Vec<T>) {
55 current.extend(value);
56 }
57
58 fn reduce(current: &mut Vec<T>, values: Vec<Vec<T>>) {
59 for v in values {
60 current.extend(v);
61 }
62 }
63}
64
65#[derive(Debug)]
70pub struct AnyValueReducer;
71
72impl<T: PartialEq + Clone> Reducer<T> for AnyValueReducer {
73 fn reduce(current: &mut T, values: Vec<T>) {
74 if let Some(last) = values.last() {
75 if let Some(first) = values.first() {
77 debug_assert!(
78 values.iter().all(|v| v == first),
79 "AnyValue reducer: all values should be equal"
80 );
81 }
82 *current = last.clone();
83 }
84 }
85}
86
87#[derive(Debug)]
91pub struct LastWriteWinsReducer;
92
93impl<T> Reducer<T> for LastWriteWinsReducer {
94 fn reduce(current: &mut T, values: Vec<T>) {
95 if let Some(v) = values.into_iter().last() {
96 *current = v;
97 }
98 }
99}
100
101pub struct Overwrite<T>(pub T);
107
108impl<T: std::fmt::Debug> std::fmt::Debug for Overwrite<T> {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 f.debug_tuple("Overwrite").field(&self.0).finish()
111 }
112}
113
114impl<T> Overwrite<T> {
115 #[must_use]
117 pub const fn get(&self) -> &T {
118 &self.0
119 }
120
121 #[must_use]
123 pub fn into_inner(self) -> T {
124 self.0
125 }
126
127 #[must_use]
129 pub const fn new(value: T) -> Self {
130 Self(value)
131 }
132}
133
134impl<T: serde::Serialize> serde::Serialize for Overwrite<T> {
135 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
136 let mut s = serializer.serialize_struct("__overwrite__", 1)?;
137 s.serialize_field("__overwrite__", &self.0)?;
138 s.end()
139 }
140}
141
142impl<'de, T: serde::Deserialize<'de>> serde::Deserialize<'de> for Overwrite<T> {
143 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
144 #[derive(serde::Deserialize)]
145 struct Wrapper<T> {
146 __overwrite__: T,
147 }
148 let wrapper = Wrapper::deserialize(deserializer)?;
149 Ok(Self(wrapper.__overwrite__))
150 }
151}
152
153#[derive(Debug)]
185pub struct NamedBarrierChannel<T, R: Reducer<T>> {
186 value: T,
187 required_sources: HashSet<String>,
188 seen_sources: HashSet<String>,
189 _reducer: std::marker::PhantomData<R>,
190}
191
192impl<T, R: Reducer<T>> NamedBarrierChannel<T, R> {
193 #[must_use]
200 pub fn new_with_sources(value: T, required_sources: impl IntoIterator<Item = String>) -> Self {
201 let sources: HashSet<String> = required_sources.into_iter().collect();
202 Self {
203 value,
204 required_sources: sources,
205 seen_sources: HashSet::new(),
206 _reducer: std::marker::PhantomData,
207 }
208 }
209
210 #[must_use]
215 pub fn new(value: T) -> Self {
216 Self {
217 value,
218 required_sources: HashSet::new(),
219 seen_sources: HashSet::new(),
220 _reducer: std::marker::PhantomData,
221 }
222 }
223
224 pub fn add_required_source(&mut self, source: String) {
228 self.required_sources.insert(source);
229 }
230
231 #[must_use]
235 pub fn is_available(&self) -> bool {
236 if self.required_sources.is_empty() {
237 return true;
238 }
239 self.required_sources
240 .iter()
241 .all(|source| self.seen_sources.contains(source))
242 }
243
244 #[must_use]
246 pub const fn required_sources(&self) -> &HashSet<String> {
247 &self.required_sources
248 }
249
250 #[must_use]
252 pub const fn seen_sources(&self) -> &HashSet<String> {
253 &self.seen_sources
254 }
255
256 #[must_use]
258 pub fn has_written(&self, source: &str) -> bool {
259 self.seen_sources.contains(source)
260 }
261
262 pub fn reset(&mut self) {
266 self.seen_sources.clear();
267 }
268}
269
270impl<T, R> Channel<T> for NamedBarrierChannel<T, R>
271where
272 T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
273 R: Reducer<T> + Send + Sync + 'static,
274{
275 fn update(&mut self, values: Vec<T>) -> bool {
276 if values.is_empty() {
281 return false;
282 }
283 R::reduce(&mut self.value, values);
285 self.seen_sources = self.required_sources.clone();
287 true
288 }
289
290 fn get(&self) -> &T {
291 &self.value
292 }
293
294 fn consume(&mut self) -> bool {
295 false
296 }
297
298 fn checkpoint(&self) -> Option<serde_json::Value> {
299 serde_json::to_value(&(self.value.clone(), self.seen_sources.clone())).ok()
301 }
302
303 fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
304 where
305 Self: Sized,
306 {
307 let (parsed_value, seen_sources): (T, HashSet<String>) = serde_json::from_value(value)
308 .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
309 Ok(Self {
310 value: parsed_value,
311 required_sources: HashSet::new(),
312 seen_sources,
313 _reducer: std::marker::PhantomData,
314 })
315 }
316}
317
318impl<T, R: Reducer<T>> NamedBarrierChannel<T, R> {
319 pub fn update(&mut self, source_name: String, values: Vec<T>) -> bool {
329 assert!(
330 self.required_sources.is_empty() || self.required_sources.contains(&source_name),
331 "NamedBarrierChannel: source '{source_name}' not in required sources"
332 );
333
334 if values.is_empty() {
335 return false;
336 }
337
338 R::reduce(&mut self.value, values);
339 self.seen_sources.insert(source_name);
340 true
341 }
342}
343
344#[derive(Debug, Clone)]
376pub struct TopicChannel<T> {
377 messages: Vec<T>,
378}
379
380impl<T> TopicChannel<T> {
381 #[must_use]
383 pub const fn new() -> Self {
384 Self {
385 messages: Vec::new(),
386 }
387 }
388
389 #[must_use]
391 pub const fn len(&self) -> usize {
392 self.messages.len()
393 }
394
395 #[must_use]
397 pub const fn is_empty(&self) -> bool {
398 self.messages.is_empty()
399 }
400
401 pub fn reset(&mut self) {
406 self.messages.clear();
407 }
408
409 pub fn iter(&self) -> std::slice::Iter<'_, T> {
411 self.messages.iter()
412 }
413}
414
415impl<T> Default for TopicChannel<T> {
416 fn default() -> Self {
417 Self::new()
418 }
419}
420
421impl<'a, T> IntoIterator for &'a TopicChannel<T> {
422 type Item = &'a T;
423 type IntoIter = std::slice::Iter<'a, T>;
424
425 fn into_iter(self) -> Self::IntoIter {
426 self.iter()
427 }
428}
429
430impl<T> Channel<Vec<T>> for TopicChannel<T>
431where
432 T: Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
433{
434 fn update(&mut self, values: Vec<Vec<T>>) -> bool {
435 if values.is_empty() {
436 return false;
437 }
438 for batch in values {
440 self.messages.extend(batch);
441 }
442 true
443 }
444
445 fn get(&self) -> &Vec<T> {
446 &self.messages
447 }
448
449 fn consume(&mut self) -> bool {
450 let was_empty = self.messages.is_empty();
451 self.messages.clear();
452 !was_empty
453 }
454
455 fn checkpoint(&self) -> Option<serde_json::Value> {
456 serde_json::to_value(&self.messages).ok()
457 }
458
459 fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
460 where
461 Self: Sized,
462 {
463 let messages: Vec<T> = serde_json::from_value(value)
464 .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
465 Ok(Self { messages })
466 }
467}
468
469pub trait Channel<T>: Send + Sync + 'static {
474 fn update(&mut self, values: Vec<T>) -> bool;
476
477 fn get(&self) -> &T;
479
480 fn consume(&mut self) -> bool;
482
483 fn checkpoint(&self) -> Option<serde_json::Value>;
485
486 fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
493 where
494 Self: Sized;
495}
496
497#[derive(Debug)]
503pub struct UntrackedChannel<T, R: Reducer<T>> {
504 value: T,
505 _reducer: std::marker::PhantomData<R>,
506}
507
508impl<T, R: Reducer<T>> UntrackedChannel<T, R> {
509 #[must_use]
511 pub const fn new(value: T) -> Self {
512 Self {
513 value,
514 _reducer: std::marker::PhantomData,
515 }
516 }
517}
518
519impl<T: Default + Send + Sync + 'static, R: Reducer<T> + Send + Sync + 'static> Channel<T>
520 for UntrackedChannel<T, R>
521{
522 fn update(&mut self, values: Vec<T>) -> bool {
523 if values.is_empty() {
524 return false;
525 }
526 R::reduce(&mut self.value, values);
527 true
528 }
529
530 fn get(&self) -> &T {
531 &self.value
532 }
533
534 fn consume(&mut self) -> bool {
535 false
536 }
537
538 fn checkpoint(&self) -> Option<serde_json::Value> {
539 None
540 }
541
542 fn from_checkpoint(_value: serde_json::Value) -> Result<Self, String> {
543 Ok(Self::new(T::default()))
544 }
545}
546
547#[derive(Debug)]
552pub struct EphemeralChannel<T, R: Reducer<T>> {
553 value: T,
554 consumed: bool,
555 _reducer: std::marker::PhantomData<R>,
556}
557
558impl<T, R: Reducer<T>> EphemeralChannel<T, R> {
559 #[must_use]
561 pub const fn new(value: T) -> Self {
562 Self {
563 value,
564 consumed: false,
565 _reducer: std::marker::PhantomData,
566 }
567 }
568}
569
570impl<T: Default + Send + Sync + 'static, R: Reducer<T> + Send + Sync + 'static> Channel<T>
571 for EphemeralChannel<T, R>
572{
573 fn update(&mut self, values: Vec<T>) -> bool {
574 if values.is_empty() {
575 return false;
576 }
577 self.consumed = false;
578 R::reduce(&mut self.value, values);
579 true
580 }
581
582 fn get(&self) -> &T {
583 &self.value
584 }
585
586 fn consume(&mut self) -> bool {
587 let was_consumed = self.consumed;
588 self.consumed = true;
589 was_consumed
590 }
591
592 fn checkpoint(&self) -> Option<serde_json::Value> {
593 None
594 }
595
596 fn from_checkpoint(_value: serde_json::Value) -> Result<Self, String> {
597 Ok(Self::new(T::default()))
598 }
599}
600
601#[derive(Debug)]
606pub struct LastValueAfterFinishChannel<T, R: Reducer<T>> {
607 value: T,
608 finished_value: Option<T>,
609 is_finished: bool,
610 _reducer: std::marker::PhantomData<R>,
611}
612
613impl<T, R: Reducer<T>> LastValueAfterFinishChannel<T, R> {
614 #[must_use]
616 pub const fn new(value: T) -> Self {
617 Self {
618 value,
619 finished_value: None,
620 is_finished: false,
621 _reducer: std::marker::PhantomData,
622 }
623 }
624
625 pub const fn finish(&mut self) {
627 self.is_finished = true;
628 }
629
630 #[must_use]
632 pub const fn is_available(&self) -> bool {
633 self.is_finished
634 }
635}
636
637impl<T, R> Channel<T> for LastValueAfterFinishChannel<T, R>
638where
639 T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
640 R: Reducer<T> + Send + Sync + 'static,
641{
642 fn update(&mut self, values: Vec<T>) -> bool {
643 if values.is_empty() {
644 return false;
645 }
646 R::reduce(&mut self.value, values);
647 if self.is_finished {
648 self.finished_value = Some(self.value.clone());
649 }
650 true
651 }
652
653 fn get(&self) -> &T {
654 if self.is_finished {
655 self.finished_value.as_ref().unwrap_or(&self.value)
656 } else {
657 &self.value
658 }
659 }
660
661 fn consume(&mut self) -> bool {
662 false
663 }
664
665 fn checkpoint(&self) -> Option<serde_json::Value> {
666 if self.is_finished {
669 serde_json::to_value(&(self.value.clone(), self.is_finished)).ok()
670 } else {
671 None
672 }
673 }
674
675 fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
676 if let Ok((parsed_value, is_finished)) = serde_json::from_value::<(T, bool)>(value.clone())
678 {
679 let finished_value = is_finished.then(|| parsed_value.clone());
680 return Ok(Self {
681 value: parsed_value,
682 finished_value,
683 is_finished,
684 _reducer: std::marker::PhantomData,
685 });
686 }
687
688 let parsed_value: T = serde_json::from_value(value)
690 .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
691 Ok(Self {
692 value: parsed_value,
693 finished_value: None,
694 is_finished: false,
695 _reducer: std::marker::PhantomData,
696 })
697 }
698}
699
700#[derive(Debug)]
707pub struct DeltaChannel<T, R: Reducer<T>> {
708 value: T,
709 snapshot_frequency: usize,
711 update_count_since_snapshot: usize,
712 _reducer: std::marker::PhantomData<R>,
713}
714
715impl<T, R: Reducer<T>> DeltaChannel<T, R> {
716 #[must_use]
720 pub fn new(value: T, snapshot_frequency: usize) -> Self {
721 Self {
722 value,
723 snapshot_frequency: snapshot_frequency.max(1),
724 update_count_since_snapshot: 0,
725 _reducer: std::marker::PhantomData,
726 }
727 }
728
729 pub fn replay_writes(&mut self, values: &[T])
735 where
736 T: Clone + serde::Serialize + DeserializeOwned,
737 {
738 if values.is_empty() {
739 return;
740 }
741
742 let mut base = self.value.clone();
746 let mut start_idx = 0;
747
748 for (i, v) in values.iter().enumerate() {
752 if let Ok(json) = serde_json::to_value(v)
756 && let Some(obj) = json.as_object()
757 && obj.contains_key("__overwrite__")
758 {
759 if let Ok(inner) = serde_json::from_value::<T>(
761 obj.get("__overwrite__").cloned().unwrap_or_default(),
762 ) {
763 base = inner;
764 start_idx = i + 1;
765 }
766 }
767 }
768
769 let remaining: Vec<T> = values[start_idx..].to_vec();
771 if !remaining.is_empty() {
772 R::reduce(&mut base, remaining);
773 }
774 self.value = base;
775 self.update_count_since_snapshot = 0;
776 }
777
778 #[must_use]
780 pub const fn should_snapshot(&self) -> bool {
781 self.update_count_since_snapshot >= self.snapshot_frequency
782 }
783
784 pub const fn finish(&mut self) {
788 self.update_count_since_snapshot = self.snapshot_frequency;
789 }
790}
791
792impl<T, R> Channel<T> for DeltaChannel<T, R>
793where
794 T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
795 R: Reducer<T> + Send + Sync + 'static,
796{
797 fn update(&mut self, values: Vec<T>) -> bool {
798 if values.is_empty() {
799 return false;
800 }
801 R::reduce(&mut self.value, values);
802 self.update_count_since_snapshot += 1;
803 true
804 }
805
806 fn get(&self) -> &T {
807 &self.value
808 }
809
810 fn consume(&mut self) -> bool {
811 false
812 }
813
814 fn checkpoint(&self) -> Option<serde_json::Value> {
815 serde_json::to_value(&self.value).ok()
816 }
817
818 fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
819 let value: T = serde_json::from_value(value)
820 .map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
821 Ok(Self {
822 value,
823 snapshot_frequency: 10,
824 update_count_since_snapshot: 0,
825 _reducer: std::marker::PhantomData,
826 })
827 }
828}
829
830#[derive(Clone, Debug)]
836pub enum DeltaBlob<T>
837where
838 T: Clone + serde::Serialize + serde::de::DeserializeOwned,
839{
840 Missing,
842 Snapshot(T),
844}
845
846#[derive(Clone, Debug)]
851pub struct RemoveMessage {
852 pub id: String,
854}
855
856#[cfg(test)]
857mod tests {
858 use super::*;
859
860 #[test]
861 fn untracked_channel_update_returns_true_on_change() {
862 let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(0);
863 assert!(!ch.update(vec![]));
864 assert!(ch.update(vec![42]));
865 assert_eq!(*ch.get(), 42);
866 }
867
868 #[test]
869 fn untracked_channel_consume_always_false() {
870 let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(1);
871 assert!(!ch.consume());
872 }
873
874 #[test]
875 fn untracked_channel_checkpoint_is_none() {
876 let ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(5);
877 assert!(ch.checkpoint().is_none());
878 }
879
880 #[test]
881 fn untracked_channel_from_checkpoint_uses_default() {
882 let ch: UntrackedChannel<i32, ReplaceReducer> =
883 UntrackedChannel::from_checkpoint(serde_json::json!(99)).expect("should succeed");
884 assert_eq!(*ch.get(), 0);
885 }
886
887 #[test]
888 fn ephemeral_channel_consume_tracks_state() {
889 let mut ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(0);
890 assert!(!ch.consume()); assert!(ch.consume()); }
893
894 #[test]
895 fn ephemeral_channel_update_resets_consumed() {
896 let mut ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(0);
897 assert!(!ch.consume());
898 assert!(ch.update(vec![7]));
899 assert!(!ch.consume()); }
901
902 #[test]
903 fn ephemeral_channel_checkpoint_is_none() {
904 let ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(3);
905 assert!(ch.checkpoint().is_none());
906 }
907
908 #[test]
909 fn last_value_after_finish_channel_not_available_before_finish() {
910 let ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
911 LastValueAfterFinishChannel::new(0);
912 assert!(!ch.is_available());
913 }
914
915 #[test]
916 fn last_value_after_finish_channel_available_after_finish() {
917 let mut ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
918 LastValueAfterFinishChannel::new(0);
919 ch.finish();
920 assert!(ch.is_available());
921 }
922
923 #[test]
924 fn last_value_after_finish_channel_checkpoint_only_if_finished() {
925 let ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
926 LastValueAfterFinishChannel::new(5);
927 assert!(ch.checkpoint().is_none());
928
929 let mut ch2: LastValueAfterFinishChannel<i32, ReplaceReducer> =
930 LastValueAfterFinishChannel::new(5);
931 ch2.finish();
932 assert!(ch2.checkpoint().is_some());
933 }
934
935 #[test]
936 fn delta_channel_snapshot_frequency_clamped_to_one() {
937 let ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 0);
938 assert_eq!(ch.snapshot_frequency, 1);
939 }
940
941 #[test]
942 fn delta_channel_replay_writes_restores_state() {
943 let mut ch: DeltaChannel<Vec<i32>, AppendReducer> = DeltaChannel::new(vec![], 10);
944 ch.replay_writes(&[vec![1, 2], vec![3, 4]]);
945 assert_eq!(*ch.get(), vec![1, 2, 3, 4]);
946 assert_eq!(ch.update_count_since_snapshot, 0);
947 }
948
949 #[test]
950 fn delta_channel_checkpoint_returns_snapshot() {
951 let ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(42, 5);
952 let cp = ch.checkpoint().expect("should have checkpoint");
953 assert_eq!(cp, serde_json::json!(42));
954 }
955
956 #[test]
957 fn delta_channel_should_snapshot() {
958 let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 2);
959 assert!(!ch.should_snapshot());
960 ch.update(vec![1]);
961 assert!(!ch.should_snapshot());
962 ch.update(vec![2]);
963 assert!(ch.should_snapshot());
964 }
965
966 #[test]
967 fn delta_blob_missing_variant_exists() {
968 let blob: DeltaBlob<i32> = DeltaBlob::Missing;
969 assert!(matches!(blob, DeltaBlob::Missing));
970 }
971
972 #[test]
973 fn delta_blob_snapshot_holds_value() {
974 let blob: DeltaBlob<i32> = DeltaBlob::Snapshot(42);
975 assert!(matches!(blob, DeltaBlob::Snapshot(_)));
976 }
977
978 #[test]
979 fn delta_blob_clone() {
980 let blob: DeltaBlob<String> = DeltaBlob::Snapshot("hello".to_string());
981 let cloned = blob.clone();
982 if let DeltaBlob::Snapshot(v) = cloned {
983 assert_eq!(v, "hello");
984 } else {
985 panic!("expected Snapshot variant");
986 }
987 if let DeltaBlob::Snapshot(v) = blob {
989 assert_eq!(v, "hello");
990 } else {
991 panic!("expected Snapshot variant");
992 }
993 }
994
995 #[test]
996 fn remove_message_holds_id() {
997 let rm = RemoveMessage {
998 id: "msg-123".to_string(),
999 };
1000 assert_eq!(rm.id, "msg-123");
1001 }
1002
1003 #[test]
1004 fn overwrite_serialize_round_trip() {
1005 let original = Overwrite(42);
1006 let json = serde_json::to_string(&original).expect("should serialize");
1007 assert_eq!(json, r#"{"__overwrite__":42}"#);
1008
1009 let deserialized: Overwrite<i32> = serde_json::from_str(&json).expect("should deserialize");
1010 assert_eq!(deserialized.0, 42);
1011 }
1012
1013 #[test]
1014 fn overwrite_serialize_complex_type() {
1015 let original = Overwrite(vec![1, 2, 3]);
1016 let json = serde_json::to_string(&original).expect("should serialize");
1017 assert_eq!(json, r#"{"__overwrite__":[1,2,3]}"#);
1018
1019 let deserialized: Overwrite<Vec<i32>> =
1020 serde_json::from_str(&json).expect("should deserialize");
1021 assert_eq!(deserialized.0, vec![1, 2, 3]);
1022 }
1023
1024 #[test]
1025 fn overwrite_debug_format() {
1026 let ov = Overwrite(42);
1027 let debug_str = format!("{ov:?}");
1028 assert_eq!(debug_str, "Overwrite(42)");
1029 }
1030
1031 #[test]
1032 fn replace_reducer_single_value_succeeds() {
1033 let mut val = 0;
1034 ReplaceReducer::reduce(&mut val, vec![42]);
1035 assert_eq!(val, 42);
1036 }
1037
1038 #[test]
1039 fn replace_reducer_empty_values_succeeds() {
1040 let mut val = 99;
1041 ReplaceReducer::reduce(&mut val, vec![]);
1042 assert_eq!(val, 99);
1043 }
1044
1045 #[test]
1046 #[should_panic(expected = "Replace reducer: multiple writes in same superstep")]
1047 fn replace_reducer_multiple_values_panics() {
1048 let mut val = 0;
1049 ReplaceReducer::reduce(&mut val, vec![1, 2]);
1050 }
1051
1052 #[test]
1053 #[should_panic(expected = "Replace reducer: multiple writes in same superstep")]
1054 fn untracked_channel_multiple_writes_panics() {
1055 let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(0);
1056 ch.update(vec![1, 2]);
1057 }
1058
1059 #[test]
1061 fn named_barrier_channel_not_available_initially() {
1062 let ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new_with_sources(
1063 0,
1064 ["node_a", "node_b"].into_iter().map(String::from),
1065 );
1066 assert!(!ch.is_available());
1067 }
1068
1069 #[test]
1070 fn named_barrier_channel_available_after_all_sources_write() {
1071 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1072 NamedBarrierChannel::new_with_sources(
1073 0,
1074 ["node_a", "node_b"].into_iter().map(String::from),
1075 );
1076 assert!(!ch.is_available());
1077
1078 ch.update("node_a".to_string(), vec![42]);
1079 assert!(!ch.is_available());
1080
1081 ch.update("node_b".to_string(), vec![100]);
1082 assert!(ch.is_available());
1083 assert_eq!(*ch.get(), 100); }
1085
1086 #[test]
1087 fn named_barrier_channel_empty_required_sources_is_available() {
1088 let ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new(42);
1089 assert!(ch.is_available());
1090 }
1091
1092 #[test]
1093 fn named_barrier_channel_has_written_tracks_sources() {
1094 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1095 NamedBarrierChannel::new_with_sources(
1096 0,
1097 ["node_a", "node_b", "node_c"].into_iter().map(String::from),
1098 );
1099
1100 assert!(!ch.has_written("node_a"));
1101 ch.update("node_a".to_string(), vec![1]);
1102 assert!(ch.has_written("node_a"));
1103 assert!(!ch.has_written("node_b"));
1104 }
1105
1106 #[test]
1107 fn named_barrier_channel_reset_clears_seen_sources() {
1108 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1109 NamedBarrierChannel::new_with_sources(
1110 0,
1111 ["node_a", "node_b"].into_iter().map(String::from),
1112 );
1113
1114 ch.update("node_a".to_string(), vec![1]);
1115 ch.update("node_b".to_string(), vec![2]);
1116 assert!(ch.is_available());
1117
1118 ch.reset();
1119 assert!(!ch.is_available());
1120 assert!(!ch.has_written("node_a"));
1121 assert!(!ch.has_written("node_b"));
1122 }
1123
1124 #[test]
1125 fn named_barrier_channel_add_required_source() {
1126 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new(0);
1127 assert!(ch.is_available());
1128
1129 ch.add_required_source("node_a".to_string());
1130 assert!(!ch.is_available());
1131
1132 ch.update("node_a".to_string(), vec![42]);
1133 assert!(ch.is_available());
1134 }
1135
1136 #[test]
1137 #[should_panic(expected = "NamedBarrierChannel: source")]
1138 fn named_barrier_channel_unknown_source_panics() {
1139 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1140 NamedBarrierChannel::new_with_sources(0, vec!["node_a".to_string()]);
1141
1142 ch.update("unknown_node".to_string(), vec![42]);
1143 }
1144
1145 #[test]
1146 fn named_barrier_channel_checkpoint_persists_state() {
1147 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1148 NamedBarrierChannel::new_with_sources(0, vec!["node_a".to_string()]);
1149
1150 ch.update("node_a".to_string(), vec![42]);
1151
1152 let checkpoint = ch.checkpoint().expect("should have checkpoint");
1153 assert!(checkpoint.is_array() || checkpoint.is_object());
1155
1156 let restored: NamedBarrierChannel<i32, ReplaceReducer> =
1157 NamedBarrierChannel::from_checkpoint(checkpoint).expect("should restore");
1158 assert_eq!(*restored.get(), 42);
1159 assert!(restored.has_written("node_a"));
1160 }
1161
1162 #[test]
1163 fn named_barrier_channel_generic_update_marks_all_sources_seen() {
1164 let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
1165 NamedBarrierChannel::new_with_sources(0, ["node_a".to_string(), "node_b".to_string()]);
1166
1167 Channel::update(&mut ch, vec![42]);
1169 assert!(ch.is_available());
1170 assert!(ch.has_written("node_a"));
1171 assert!(ch.has_written("node_b"));
1172 }
1173
1174 #[test]
1176 fn topic_channel_new_is_empty() {
1177 let ch: TopicChannel<String> = TopicChannel::new();
1178 assert!(ch.is_empty());
1179 assert_eq!(ch.len(), 0);
1180 }
1181
1182 #[test]
1183 fn topic_channel_default_is_empty() {
1184 let ch: TopicChannel<String> = TopicChannel::default();
1185 assert!(ch.is_empty());
1186 }
1187
1188 #[test]
1189 fn topic_channel_accumulates_messages() {
1190 let mut ch: TopicChannel<String> = TopicChannel::new();
1191
1192 ch.update(vec![vec!["hello".to_string()]]);
1193 assert_eq!(ch.len(), 1);
1194 assert_eq!(ch.get()[0], "hello");
1195
1196 ch.update(vec![vec!["world".to_string()]]);
1197 assert_eq!(ch.len(), 2);
1198 assert_eq!(ch.get()[1], "world");
1199 }
1200
1201 #[test]
1202 fn topic_channel_update_with_multiple_messages() {
1203 let mut ch: TopicChannel<i32> = TopicChannel::new();
1204
1205 ch.update(vec![vec![1, 2, 3]]);
1206 assert_eq!(ch.len(), 3);
1207 assert_eq!(ch.get(), &[1, 2, 3]);
1208 }
1209
1210 #[test]
1211 fn topic_channel_update_with_multiple_batches() {
1212 let mut ch: TopicChannel<i32> = TopicChannel::new();
1213
1214 ch.update(vec![vec![1, 2], vec![3, 4]]);
1215 assert_eq!(ch.len(), 4);
1216 assert_eq!(ch.get(), &[1, 2, 3, 4]);
1217 }
1218
1219 #[test]
1220 fn topic_channel_reset_clears_messages() {
1221 let mut ch: TopicChannel<String> = TopicChannel::new();
1222
1223 ch.update(vec![vec!["test".to_string()]]);
1224 assert_eq!(ch.len(), 1);
1225
1226 ch.reset();
1227 assert!(ch.is_empty());
1228 assert_eq!(ch.len(), 0);
1229 }
1230
1231 #[test]
1232 fn topic_channel_consume_clears_and_returns_status() {
1233 let mut ch: TopicChannel<String> = TopicChannel::new();
1234
1235 let had_content = ch.consume();
1236 assert!(!had_content); ch.update(vec![vec!["test".to_string()]]);
1239 let had_content_after = ch.consume();
1240 assert!(had_content_after); assert!(ch.is_empty());
1242 }
1243
1244 #[test]
1245 fn topic_channel_iter_messages() {
1246 let mut ch: TopicChannel<i32> = TopicChannel::new();
1247
1248 ch.update(vec![vec![1, 2, 3]]);
1249
1250 let mut iter = ch.iter();
1251 assert_eq!(iter.next(), Some(&1));
1252 assert_eq!(iter.next(), Some(&2));
1253 assert_eq!(iter.next(), Some(&3));
1254 assert_eq!(iter.next(), None);
1255 }
1256
1257 #[test]
1258 fn topic_channel_checkpoint_persists_messages() {
1259 let mut ch: TopicChannel<i32> = TopicChannel::new();
1260
1261 ch.update(vec![vec![1, 2, 3]]);
1262
1263 let checkpoint = ch.checkpoint().expect("should have checkpoint");
1264 assert_eq!(checkpoint, serde_json::json!([1, 2, 3]));
1265
1266 let restored: TopicChannel<i32> =
1267 TopicChannel::from_checkpoint(checkpoint).expect("should restore");
1268 assert_eq!(restored.len(), 3);
1269 assert_eq!(restored.get(), &[1, 2, 3]);
1270 }
1271
1272 #[test]
1273 fn topic_channel_from_checkpoint_empty() {
1274 let ch: TopicChannel<i32> =
1275 TopicChannel::from_checkpoint(serde_json::json!([])).expect("should restore");
1276 assert!(ch.is_empty());
1277 }
1278
1279 #[test]
1281 fn last_value_after_finish_checkpoint_saves_is_finished_state() {
1282 let mut ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1283 LastValueAfterFinishChannel::new(10);
1284 ch.update(vec![42]);
1285 ch.finish();
1286
1287 let checkpoint = ch
1288 .checkpoint()
1289 .expect("should have checkpoint when finished");
1290 assert!(checkpoint.is_array());
1292 let arr = checkpoint.as_array().expect("should be array");
1293 assert_eq!(arr.len(), 2);
1294 assert_eq!(arr[0], serde_json::json!(42)); assert_eq!(arr[1], serde_json::json!(true)); }
1297
1298 #[test]
1299 fn last_value_after_finish_from_checkpoint_restores_is_finished() {
1300 let checkpoint_data = serde_json::json!([99, true]); let restored: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1303 LastValueAfterFinishChannel::from_checkpoint(checkpoint_data)
1304 .expect("should restore from checkpoint");
1305
1306 assert_eq!(*restored.get(), 99);
1307 assert!(restored.is_available());
1308 }
1309
1310 #[test]
1311 fn last_value_after_finish_from_checkpoint_old_format_backward_compat() {
1312 let checkpoint_data = serde_json::json!(55);
1314
1315 let restored: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1316 LastValueAfterFinishChannel::from_checkpoint(checkpoint_data)
1317 .expect("should restore from old checkpoint format");
1318
1319 assert_eq!(*restored.get(), 55);
1320 assert!(!restored.is_available()); }
1322
1323 #[test]
1324 fn last_value_after_finish_checkpoint_round_trip() {
1325 let mut ch1: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1326 LastValueAfterFinishChannel::new(0);
1327 ch1.update(vec![123]);
1328 ch1.finish();
1329
1330 let checkpoint = ch1.checkpoint().expect("should checkpoint");
1331 let ch2: LastValueAfterFinishChannel<i32, ReplaceReducer> =
1332 LastValueAfterFinishChannel::from_checkpoint(checkpoint).expect("should restore");
1333
1334 assert_eq!(*ch1.get(), *ch2.get());
1335 assert_eq!(ch1.is_available(), ch2.is_available());
1336 }
1337
1338 #[test]
1340 fn overwrite_get_returns_inner_value() {
1341 let ov = Overwrite(42);
1342 assert_eq!(*ov.get(), 42);
1343 }
1344
1345 #[test]
1346 fn overwrite_into_inner_consumes_wrapper() {
1347 let ov = Overwrite(100);
1348 assert_eq!(ov.into_inner(), 100);
1349 }
1350
1351 #[test]
1352 fn overwrite_new_creates_wrapper() {
1353 let ov = Overwrite::new(999);
1354 assert_eq!(*ov.get(), 999);
1355 }
1356
1357 #[test]
1359 fn delta_channel_replay_writes_handles_empty_sequence() {
1360 let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(5, 10);
1361 ch.replay_writes(&[]);
1362 assert_eq!(*ch.get(), 5); }
1364
1365 #[test]
1366 fn delta_channel_replay_writes_single_value() {
1367 let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1368 ch.replay_writes(&[42]);
1369 assert_eq!(*ch.get(), 42);
1370 }
1371
1372 #[test]
1373 fn delta_channel_replay_writes_multiple_values() {
1374 let mut ch: DeltaChannel<Vec<i32>, AppendReducer> = DeltaChannel::new(vec![], 10);
1375 ch.replay_writes(&[vec![1, 2], vec![3, 4]]);
1376 assert_eq!(*ch.get(), vec![1, 2, 3, 4]);
1377 }
1378
1379 #[test]
1380 fn delta_channel_replay_writes_resets_snapshot_counter() {
1381 let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1382 ch.update(vec![1]);
1383 assert_eq!(ch.update_count_since_snapshot, 1);
1384
1385 ch.replay_writes(&[99]);
1386 assert_eq!(ch.update_count_since_snapshot, 0); }
1388
1389 #[test]
1390 fn delta_channel_replay_writes_with_replace_reducer() {
1391 let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1392 ch.replay_writes(&[42]);
1394 assert_eq!(*ch.get(), 42);
1395 }
1396
1397 #[test]
1398 fn delta_channel_replay_writes_detects_json_overwrite_format() {
1399 let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
1400 DeltaChannel::new(serde_json::json!(null), 10);
1401
1402 let overwrite_val = serde_json::json!({"__overwrite__": "baseline"});
1404 let normal_val1 = serde_json::json!("update1");
1405 let normal_val2 = serde_json::json!("update2");
1406
1407 ch.replay_writes(&[normal_val1, overwrite_val, normal_val2.clone()]);
1408
1409 assert_eq!(ch.get(), &normal_val2);
1413 }
1414
1415 #[test]
1416 fn delta_channel_replay_writes_overwrite_at_start() {
1417 let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
1418 DeltaChannel::new(serde_json::json!("initial"), 10);
1419
1420 let overwrite_val = serde_json::json!({"__overwrite__": "new_baseline"});
1421 let normal_val = serde_json::json!("update");
1422
1423 ch.replay_writes(&[overwrite_val, normal_val.clone()]);
1424
1425 assert_eq!(ch.get(), &normal_val);
1427 }
1428
1429 #[test]
1430 fn delta_channel_replay_writes_overwrite_at_end() {
1431 let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
1432 DeltaChannel::new(serde_json::json!("initial"), 10);
1433
1434 let normal_val = serde_json::json!("update");
1435 let overwrite_val = serde_json::json!({"__overwrite__": "final_baseline"});
1436
1437 ch.replay_writes(&[normal_val, overwrite_val]);
1438
1439 assert_eq!(ch.get(), &serde_json::json!("final_baseline"));
1441 }
1442
1443 #[test]
1444 fn delta_channel_finish_forces_snapshot() {
1445 let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
1446 assert!(!ch.should_snapshot());
1447
1448 ch.finish();
1449 assert!(ch.should_snapshot());
1451 }
1452}
1453
1454