use serde::de::DeserializeOwned;
use serde::ser::SerializeStruct;
use std::collections::HashSet;
pub trait Reducer<T> {
fn reduce_one(current: &mut T, value: T) {
Self::reduce(current, vec![value]);
}
fn reduce(current: &mut T, values: Vec<T>);
}
#[derive(Debug)]
pub struct ReplaceReducer;
impl<T> Reducer<T> for ReplaceReducer {
fn reduce(current: &mut T, values: Vec<T>) {
assert!(
values.len() <= 1,
"Replace reducer: multiple writes in same superstep"
);
if let Some(v) = values.into_iter().next() {
*current = v;
}
}
}
#[derive(Debug)]
pub struct AppendReducer;
impl<T> Reducer<Vec<T>> for AppendReducer {
fn reduce_one(current: &mut Vec<T>, value: Vec<T>) {
current.extend(value);
}
fn reduce(current: &mut Vec<T>, values: Vec<Vec<T>>) {
for v in values {
current.extend(v);
}
}
}
#[derive(Debug)]
pub struct AnyValueReducer;
impl<T: PartialEq + Clone> Reducer<T> for AnyValueReducer {
fn reduce(current: &mut T, values: Vec<T>) {
if let Some(last) = values.last() {
if let Some(first) = values.first() {
debug_assert!(
values.iter().all(|v| v == first),
"AnyValue reducer: all values should be equal"
);
}
*current = last.clone();
}
}
}
#[derive(Debug)]
pub struct LastWriteWinsReducer;
impl<T> Reducer<T> for LastWriteWinsReducer {
fn reduce(current: &mut T, values: Vec<T>) {
if let Some(v) = values.into_iter().last() {
*current = v;
}
}
}
pub struct Overwrite<T>(pub T);
impl<T: std::fmt::Debug> std::fmt::Debug for Overwrite<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("Overwrite").field(&self.0).finish()
}
}
impl<T> Overwrite<T> {
#[must_use]
pub const fn get(&self) -> &T {
&self.0
}
#[must_use]
pub fn into_inner(self) -> T {
self.0
}
#[must_use]
pub const fn new(value: T) -> Self {
Self(value)
}
}
impl<T: serde::Serialize> serde::Serialize for Overwrite<T> {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let mut s = serializer.serialize_struct("__overwrite__", 1)?;
s.serialize_field("__overwrite__", &self.0)?;
s.end()
}
}
impl<'de, T: serde::Deserialize<'de>> serde::Deserialize<'de> for Overwrite<T> {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
#[derive(serde::Deserialize)]
struct Wrapper<T> {
__overwrite__: T,
}
let wrapper = Wrapper::deserialize(deserializer)?;
Ok(Self(wrapper.__overwrite__))
}
}
#[derive(Debug)]
pub struct NamedBarrierChannel<T, R: Reducer<T>> {
value: T,
required_sources: HashSet<String>,
seen_sources: HashSet<String>,
_reducer: std::marker::PhantomData<R>,
}
impl<T, R: Reducer<T>> NamedBarrierChannel<T, R> {
#[must_use]
pub fn new_with_sources(value: T, required_sources: impl IntoIterator<Item = String>) -> Self {
let sources: HashSet<String> = required_sources.into_iter().collect();
Self {
value,
required_sources: sources,
seen_sources: HashSet::new(),
_reducer: std::marker::PhantomData,
}
}
#[must_use]
pub fn new(value: T) -> Self {
Self {
value,
required_sources: HashSet::new(),
seen_sources: HashSet::new(),
_reducer: std::marker::PhantomData,
}
}
pub fn add_required_source(&mut self, source: String) {
self.required_sources.insert(source);
}
#[must_use]
pub fn is_available(&self) -> bool {
if self.required_sources.is_empty() {
return true;
}
self.required_sources
.iter()
.all(|source| self.seen_sources.contains(source))
}
#[must_use]
pub const fn required_sources(&self) -> &HashSet<String> {
&self.required_sources
}
#[must_use]
pub const fn seen_sources(&self) -> &HashSet<String> {
&self.seen_sources
}
#[must_use]
pub fn has_written(&self, source: &str) -> bool {
self.seen_sources.contains(source)
}
pub fn reset(&mut self) {
self.seen_sources.clear();
}
}
impl<T, R> Channel<T> for NamedBarrierChannel<T, R>
where
T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
R: Reducer<T> + Send + Sync + 'static,
{
fn update(&mut self, values: Vec<T>) -> bool {
if values.is_empty() {
return false;
}
R::reduce(&mut self.value, values);
self.seen_sources = self.required_sources.clone();
true
}
fn get(&self) -> &T {
&self.value
}
fn consume(&mut self) -> bool {
false
}
fn checkpoint(&self) -> Option<serde_json::Value> {
serde_json::to_value(&(self.value.clone(), self.seen_sources.clone())).ok()
}
fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
where
Self: Sized,
{
let (parsed_value, seen_sources): (T, HashSet<String>) = serde_json::from_value(value)
.map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
Ok(Self {
value: parsed_value,
required_sources: HashSet::new(),
seen_sources,
_reducer: std::marker::PhantomData,
})
}
}
impl<T, R: Reducer<T>> NamedBarrierChannel<T, R> {
pub fn update(&mut self, source_name: String, values: Vec<T>) -> bool {
assert!(
self.required_sources.is_empty() || self.required_sources.contains(&source_name),
"NamedBarrierChannel: source '{source_name}' not in required sources"
);
if values.is_empty() {
return false;
}
R::reduce(&mut self.value, values);
self.seen_sources.insert(source_name);
true
}
}
#[derive(Debug, Clone)]
pub struct TopicChannel<T> {
messages: Vec<T>,
}
impl<T> TopicChannel<T> {
#[must_use]
pub const fn new() -> Self {
Self {
messages: Vec::new(),
}
}
#[must_use]
pub const fn len(&self) -> usize {
self.messages.len()
}
#[must_use]
pub const fn is_empty(&self) -> bool {
self.messages.is_empty()
}
pub fn reset(&mut self) {
self.messages.clear();
}
pub fn iter(&self) -> std::slice::Iter<'_, T> {
self.messages.iter()
}
}
impl<T> Default for TopicChannel<T> {
fn default() -> Self {
Self::new()
}
}
impl<'a, T> IntoIterator for &'a TopicChannel<T> {
type Item = &'a T;
type IntoIter = std::slice::Iter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<T> Channel<Vec<T>> for TopicChannel<T>
where
T: Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
{
fn update(&mut self, values: Vec<Vec<T>>) -> bool {
if values.is_empty() {
return false;
}
for batch in values {
self.messages.extend(batch);
}
true
}
fn get(&self) -> &Vec<T> {
&self.messages
}
fn consume(&mut self) -> bool {
let was_empty = self.messages.is_empty();
self.messages.clear();
!was_empty
}
fn checkpoint(&self) -> Option<serde_json::Value> {
serde_json::to_value(&self.messages).ok()
}
fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
where
Self: Sized,
{
let messages: Vec<T> = serde_json::from_value(value)
.map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
Ok(Self { messages })
}
}
pub trait Channel<T>: Send + Sync + 'static {
fn update(&mut self, values: Vec<T>) -> bool;
fn get(&self) -> &T;
fn consume(&mut self) -> bool;
fn checkpoint(&self) -> Option<serde_json::Value>;
fn from_checkpoint(value: serde_json::Value) -> Result<Self, String>
where
Self: Sized;
}
#[derive(Debug)]
pub struct UntrackedChannel<T, R: Reducer<T>> {
value: T,
_reducer: std::marker::PhantomData<R>,
}
impl<T, R: Reducer<T>> UntrackedChannel<T, R> {
#[must_use]
pub const fn new(value: T) -> Self {
Self {
value,
_reducer: std::marker::PhantomData,
}
}
}
impl<T: Default + Send + Sync + 'static, R: Reducer<T> + Send + Sync + 'static> Channel<T>
for UntrackedChannel<T, R>
{
fn update(&mut self, values: Vec<T>) -> bool {
if values.is_empty() {
return false;
}
R::reduce(&mut self.value, values);
true
}
fn get(&self) -> &T {
&self.value
}
fn consume(&mut self) -> bool {
false
}
fn checkpoint(&self) -> Option<serde_json::Value> {
None
}
fn from_checkpoint(_value: serde_json::Value) -> Result<Self, String> {
Ok(Self::new(T::default()))
}
}
#[derive(Debug)]
pub struct EphemeralChannel<T, R: Reducer<T>> {
value: T,
consumed: bool,
_reducer: std::marker::PhantomData<R>,
}
impl<T, R: Reducer<T>> EphemeralChannel<T, R> {
#[must_use]
pub const fn new(value: T) -> Self {
Self {
value,
consumed: false,
_reducer: std::marker::PhantomData,
}
}
}
impl<T: Default + Send + Sync + 'static, R: Reducer<T> + Send + Sync + 'static> Channel<T>
for EphemeralChannel<T, R>
{
fn update(&mut self, values: Vec<T>) -> bool {
if values.is_empty() {
return false;
}
self.consumed = false;
R::reduce(&mut self.value, values);
true
}
fn get(&self) -> &T {
&self.value
}
fn consume(&mut self) -> bool {
let was_consumed = self.consumed;
self.consumed = true;
was_consumed
}
fn checkpoint(&self) -> Option<serde_json::Value> {
None
}
fn from_checkpoint(_value: serde_json::Value) -> Result<Self, String> {
Ok(Self::new(T::default()))
}
}
#[derive(Debug)]
pub struct LastValueAfterFinishChannel<T, R: Reducer<T>> {
value: T,
finished_value: Option<T>,
is_finished: bool,
_reducer: std::marker::PhantomData<R>,
}
impl<T, R: Reducer<T>> LastValueAfterFinishChannel<T, R> {
#[must_use]
pub const fn new(value: T) -> Self {
Self {
value,
finished_value: None,
is_finished: false,
_reducer: std::marker::PhantomData,
}
}
pub const fn finish(&mut self) {
self.is_finished = true;
}
#[must_use]
pub const fn is_available(&self) -> bool {
self.is_finished
}
}
impl<T, R> Channel<T> for LastValueAfterFinishChannel<T, R>
where
T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
R: Reducer<T> + Send + Sync + 'static,
{
fn update(&mut self, values: Vec<T>) -> bool {
if values.is_empty() {
return false;
}
R::reduce(&mut self.value, values);
if self.is_finished {
self.finished_value = Some(self.value.clone());
}
true
}
fn get(&self) -> &T {
if self.is_finished {
self.finished_value.as_ref().unwrap_or(&self.value)
} else {
&self.value
}
}
fn consume(&mut self) -> bool {
false
}
fn checkpoint(&self) -> Option<serde_json::Value> {
if self.is_finished {
serde_json::to_value(&(self.value.clone(), self.is_finished)).ok()
} else {
None
}
}
fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
if let Ok((parsed_value, is_finished)) = serde_json::from_value::<(T, bool)>(value.clone())
{
let finished_value = is_finished.then(|| parsed_value.clone());
return Ok(Self {
value: parsed_value,
finished_value,
is_finished,
_reducer: std::marker::PhantomData,
});
}
let parsed_value: T = serde_json::from_value(value)
.map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
Ok(Self {
value: parsed_value,
finished_value: None,
is_finished: false,
_reducer: std::marker::PhantomData,
})
}
}
#[derive(Debug)]
pub struct DeltaChannel<T, R: Reducer<T>> {
value: T,
snapshot_frequency: usize,
update_count_since_snapshot: usize,
_reducer: std::marker::PhantomData<R>,
}
impl<T, R: Reducer<T>> DeltaChannel<T, R> {
#[must_use]
pub fn new(value: T, snapshot_frequency: usize) -> Self {
Self {
value,
snapshot_frequency: snapshot_frequency.max(1),
update_count_since_snapshot: 0,
_reducer: std::marker::PhantomData,
}
}
pub fn replay_writes(&mut self, values: &[T])
where
T: Clone + serde::Serialize + DeserializeOwned,
{
if values.is_empty() {
return;
}
let mut base = self.value.clone();
let mut start_idx = 0;
for (i, v) in values.iter().enumerate() {
if let Ok(json) = serde_json::to_value(v)
&& let Some(obj) = json.as_object()
&& obj.contains_key("__overwrite__")
{
if let Ok(inner) = serde_json::from_value::<T>(
obj.get("__overwrite__").cloned().unwrap_or_default(),
) {
base = inner;
start_idx = i + 1;
}
}
}
let remaining: Vec<T> = values[start_idx..].to_vec();
if !remaining.is_empty() {
R::reduce(&mut base, remaining);
}
self.value = base;
self.update_count_since_snapshot = 0;
}
#[must_use]
pub const fn should_snapshot(&self) -> bool {
self.update_count_since_snapshot >= self.snapshot_frequency
}
pub const fn finish(&mut self) {
self.update_count_since_snapshot = self.snapshot_frequency;
}
}
impl<T, R> Channel<T> for DeltaChannel<T, R>
where
T: Default + Clone + Send + Sync + serde::Serialize + DeserializeOwned + 'static,
R: Reducer<T> + Send + Sync + 'static,
{
fn update(&mut self, values: Vec<T>) -> bool {
if values.is_empty() {
return false;
}
R::reduce(&mut self.value, values);
self.update_count_since_snapshot += 1;
true
}
fn get(&self) -> &T {
&self.value
}
fn consume(&mut self) -> bool {
false
}
fn checkpoint(&self) -> Option<serde_json::Value> {
serde_json::to_value(&self.value).ok()
}
fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
let value: T = serde_json::from_value(value)
.map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
Ok(Self {
value,
snapshot_frequency: 10,
update_count_since_snapshot: 0,
_reducer: std::marker::PhantomData,
})
}
}
#[derive(Clone, Debug)]
pub enum DeltaBlob<T>
where
T: Clone + serde::Serialize + serde::de::DeserializeOwned,
{
Missing,
Snapshot(T),
}
#[derive(Clone, Debug)]
pub struct RingBufferChannel<T> {
values: Vec<T>,
capacity: usize,
}
impl<T> RingBufferChannel<T> {
#[must_use]
pub fn new(values: Vec<T>, capacity: usize) -> Self {
let mut channel = Self {
values,
capacity: capacity.max(1),
};
channel.trim_to_capacity();
channel
}
#[must_use]
pub const fn capacity(&self) -> usize {
self.capacity
}
#[must_use]
pub fn len(&self) -> usize {
self.values.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
fn trim_to_capacity(&mut self) {
if self.values.len() > self.capacity {
let excess = self.values.len() - self.capacity;
self.values.drain(..excess);
}
}
}
impl<T: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static> Channel<Vec<T>>
for RingBufferChannel<T>
{
fn update(&mut self, values: Vec<Vec<T>>) -> bool {
let had_non_empty = values.iter().any(|v| !v.is_empty());
for v in values {
self.values.extend(v);
}
self.trim_to_capacity();
had_non_empty
}
fn get(&self) -> &Vec<T> {
&self.values
}
fn consume(&mut self) -> bool {
false
}
fn checkpoint(&self) -> Option<serde_json::Value> {
serde_json::to_value(serde_json::json!({
"values": &self.values,
"capacity": self.capacity,
}))
.ok()
}
fn from_checkpoint(value: serde_json::Value) -> Result<Self, String> {
if let Some(obj) = value.as_object() {
let values: Vec<T> =
serde_json::from_value(obj.get("values").cloned().unwrap_or_default())
.map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
let capacity = obj
.get("capacity")
.and_then(serde_json::Value::as_u64)
.map_or(1000, |c| usize::try_from(c).unwrap_or(1000))
.max(1); Ok(Self { values, capacity })
} else {
let values: Vec<T> = serde_json::from_value(value)
.map_err(|e| format!("checkpoint deserialization failed: {e}"))?;
Ok(Self {
values,
capacity: 1000,
})
}
}
}
#[derive(Clone, Debug)]
pub struct RemoveMessage {
pub id: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn untracked_channel_update_returns_true_on_change() {
let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(0);
assert!(!ch.update(vec![]));
assert!(ch.update(vec![42]));
assert_eq!(*ch.get(), 42);
}
#[test]
fn untracked_channel_consume_always_false() {
let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(1);
assert!(!ch.consume());
}
#[test]
fn untracked_channel_checkpoint_is_none() {
let ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(5);
assert!(ch.checkpoint().is_none());
}
#[test]
fn untracked_channel_from_checkpoint_uses_default() {
let ch: UntrackedChannel<i32, ReplaceReducer> =
UntrackedChannel::from_checkpoint(serde_json::json!(99)).expect("should succeed");
assert_eq!(*ch.get(), 0);
}
#[test]
fn ephemeral_channel_consume_tracks_state() {
let mut ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(0);
assert!(!ch.consume()); assert!(ch.consume()); }
#[test]
fn ephemeral_channel_update_resets_consumed() {
let mut ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(0);
assert!(!ch.consume());
assert!(ch.update(vec![7]));
assert!(!ch.consume()); }
#[test]
fn ephemeral_channel_checkpoint_is_none() {
let ch: EphemeralChannel<i32, ReplaceReducer> = EphemeralChannel::new(3);
assert!(ch.checkpoint().is_none());
}
#[test]
fn last_value_after_finish_channel_not_available_before_finish() {
let ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
LastValueAfterFinishChannel::new(0);
assert!(!ch.is_available());
}
#[test]
fn last_value_after_finish_channel_available_after_finish() {
let mut ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
LastValueAfterFinishChannel::new(0);
ch.finish();
assert!(ch.is_available());
}
#[test]
fn last_value_after_finish_channel_checkpoint_only_if_finished() {
let ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
LastValueAfterFinishChannel::new(5);
assert!(ch.checkpoint().is_none());
let mut ch2: LastValueAfterFinishChannel<i32, ReplaceReducer> =
LastValueAfterFinishChannel::new(5);
ch2.finish();
assert!(ch2.checkpoint().is_some());
}
#[test]
fn delta_channel_snapshot_frequency_clamped_to_one() {
let ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 0);
assert_eq!(ch.snapshot_frequency, 1);
}
#[test]
fn delta_channel_replay_writes_restores_state() {
let mut ch: DeltaChannel<Vec<i32>, AppendReducer> = DeltaChannel::new(vec![], 10);
ch.replay_writes(&[vec![1, 2], vec![3, 4]]);
assert_eq!(*ch.get(), vec![1, 2, 3, 4]);
assert_eq!(ch.update_count_since_snapshot, 0);
}
#[test]
fn delta_channel_checkpoint_returns_snapshot() {
let ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(42, 5);
let cp = ch.checkpoint().expect("should have checkpoint");
assert_eq!(cp, serde_json::json!(42));
}
#[test]
fn delta_channel_should_snapshot() {
let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 2);
assert!(!ch.should_snapshot());
ch.update(vec![1]);
assert!(!ch.should_snapshot());
ch.update(vec![2]);
assert!(ch.should_snapshot());
}
#[test]
fn delta_blob_missing_variant_exists() {
let blob: DeltaBlob<i32> = DeltaBlob::Missing;
assert!(matches!(blob, DeltaBlob::Missing));
}
#[test]
fn delta_blob_snapshot_holds_value() {
let blob: DeltaBlob<i32> = DeltaBlob::Snapshot(42);
assert!(matches!(blob, DeltaBlob::Snapshot(_)));
}
#[test]
fn delta_blob_clone() {
let blob: DeltaBlob<String> = DeltaBlob::Snapshot("hello".to_string());
let cloned = blob.clone();
if let DeltaBlob::Snapshot(v) = cloned {
assert_eq!(v, "hello");
} else {
panic!("expected Snapshot variant");
}
if let DeltaBlob::Snapshot(v) = blob {
assert_eq!(v, "hello");
} else {
panic!("expected Snapshot variant");
}
}
#[test]
fn remove_message_holds_id() {
let rm = RemoveMessage {
id: "msg-123".to_string(),
};
assert_eq!(rm.id, "msg-123");
}
#[test]
fn overwrite_serialize_round_trip() {
let original = Overwrite(42);
let json = serde_json::to_string(&original).expect("should serialize");
assert_eq!(json, r#"{"__overwrite__":42}"#);
let deserialized: Overwrite<i32> = serde_json::from_str(&json).expect("should deserialize");
assert_eq!(deserialized.0, 42);
}
#[test]
fn overwrite_serialize_complex_type() {
let original = Overwrite(vec![1, 2, 3]);
let json = serde_json::to_string(&original).expect("should serialize");
assert_eq!(json, r#"{"__overwrite__":[1,2,3]}"#);
let deserialized: Overwrite<Vec<i32>> =
serde_json::from_str(&json).expect("should deserialize");
assert_eq!(deserialized.0, vec![1, 2, 3]);
}
#[test]
fn overwrite_debug_format() {
let ov = Overwrite(42);
let debug_str = format!("{ov:?}");
assert_eq!(debug_str, "Overwrite(42)");
}
#[test]
fn replace_reducer_single_value_succeeds() {
let mut val = 0;
ReplaceReducer::reduce(&mut val, vec![42]);
assert_eq!(val, 42);
}
#[test]
fn replace_reducer_empty_values_succeeds() {
let mut val = 99;
ReplaceReducer::reduce(&mut val, vec![]);
assert_eq!(val, 99);
}
#[test]
#[should_panic(expected = "Replace reducer: multiple writes in same superstep")]
fn replace_reducer_multiple_values_panics() {
let mut val = 0;
ReplaceReducer::reduce(&mut val, vec![1, 2]);
}
#[test]
#[should_panic(expected = "Replace reducer: multiple writes in same superstep")]
fn untracked_channel_multiple_writes_panics() {
let mut ch: UntrackedChannel<i32, ReplaceReducer> = UntrackedChannel::new(0);
ch.update(vec![1, 2]);
}
#[test]
fn named_barrier_channel_not_available_initially() {
let ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new_with_sources(
0,
["node_a", "node_b"].into_iter().map(String::from),
);
assert!(!ch.is_available());
}
#[test]
fn named_barrier_channel_available_after_all_sources_write() {
let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
NamedBarrierChannel::new_with_sources(
0,
["node_a", "node_b"].into_iter().map(String::from),
);
assert!(!ch.is_available());
ch.update("node_a".to_string(), vec![42]);
assert!(!ch.is_available());
ch.update("node_b".to_string(), vec![100]);
assert!(ch.is_available());
assert_eq!(*ch.get(), 100); }
#[test]
fn named_barrier_channel_empty_required_sources_is_available() {
let ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new(42);
assert!(ch.is_available());
}
#[test]
fn named_barrier_channel_has_written_tracks_sources() {
let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
NamedBarrierChannel::new_with_sources(
0,
["node_a", "node_b", "node_c"].into_iter().map(String::from),
);
assert!(!ch.has_written("node_a"));
ch.update("node_a".to_string(), vec![1]);
assert!(ch.has_written("node_a"));
assert!(!ch.has_written("node_b"));
}
#[test]
fn named_barrier_channel_reset_clears_seen_sources() {
let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
NamedBarrierChannel::new_with_sources(
0,
["node_a", "node_b"].into_iter().map(String::from),
);
ch.update("node_a".to_string(), vec![1]);
ch.update("node_b".to_string(), vec![2]);
assert!(ch.is_available());
ch.reset();
assert!(!ch.is_available());
assert!(!ch.has_written("node_a"));
assert!(!ch.has_written("node_b"));
}
#[test]
fn named_barrier_channel_add_required_source() {
let mut ch: NamedBarrierChannel<i32, ReplaceReducer> = NamedBarrierChannel::new(0);
assert!(ch.is_available());
ch.add_required_source("node_a".to_string());
assert!(!ch.is_available());
ch.update("node_a".to_string(), vec![42]);
assert!(ch.is_available());
}
#[test]
#[should_panic(expected = "NamedBarrierChannel: source")]
fn named_barrier_channel_unknown_source_panics() {
let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
NamedBarrierChannel::new_with_sources(0, vec!["node_a".to_string()]);
ch.update("unknown_node".to_string(), vec![42]);
}
#[test]
fn named_barrier_channel_checkpoint_persists_state() {
let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
NamedBarrierChannel::new_with_sources(0, vec!["node_a".to_string()]);
ch.update("node_a".to_string(), vec![42]);
let checkpoint = ch.checkpoint().expect("should have checkpoint");
assert!(checkpoint.is_array() || checkpoint.is_object());
let restored: NamedBarrierChannel<i32, ReplaceReducer> =
NamedBarrierChannel::from_checkpoint(checkpoint).expect("should restore");
assert_eq!(*restored.get(), 42);
assert!(restored.has_written("node_a"));
}
#[test]
fn named_barrier_channel_generic_update_marks_all_sources_seen() {
let mut ch: NamedBarrierChannel<i32, ReplaceReducer> =
NamedBarrierChannel::new_with_sources(0, ["node_a".to_string(), "node_b".to_string()]);
Channel::update(&mut ch, vec![42]);
assert!(ch.is_available());
assert!(ch.has_written("node_a"));
assert!(ch.has_written("node_b"));
}
#[test]
fn topic_channel_new_is_empty() {
let ch: TopicChannel<String> = TopicChannel::new();
assert!(ch.is_empty());
assert_eq!(ch.len(), 0);
}
#[test]
fn topic_channel_default_is_empty() {
let ch: TopicChannel<String> = TopicChannel::default();
assert!(ch.is_empty());
}
#[test]
fn topic_channel_accumulates_messages() {
let mut ch: TopicChannel<String> = TopicChannel::new();
ch.update(vec![vec!["hello".to_string()]]);
assert_eq!(ch.len(), 1);
assert_eq!(ch.get()[0], "hello");
ch.update(vec![vec!["world".to_string()]]);
assert_eq!(ch.len(), 2);
assert_eq!(ch.get()[1], "world");
}
#[test]
fn topic_channel_update_with_multiple_messages() {
let mut ch: TopicChannel<i32> = TopicChannel::new();
ch.update(vec![vec![1, 2, 3]]);
assert_eq!(ch.len(), 3);
assert_eq!(ch.get(), &[1, 2, 3]);
}
#[test]
fn topic_channel_update_with_multiple_batches() {
let mut ch: TopicChannel<i32> = TopicChannel::new();
ch.update(vec![vec![1, 2], vec![3, 4]]);
assert_eq!(ch.len(), 4);
assert_eq!(ch.get(), &[1, 2, 3, 4]);
}
#[test]
fn topic_channel_reset_clears_messages() {
let mut ch: TopicChannel<String> = TopicChannel::new();
ch.update(vec![vec!["test".to_string()]]);
assert_eq!(ch.len(), 1);
ch.reset();
assert!(ch.is_empty());
assert_eq!(ch.len(), 0);
}
#[test]
fn topic_channel_consume_clears_and_returns_status() {
let mut ch: TopicChannel<String> = TopicChannel::new();
let had_content = ch.consume();
assert!(!had_content);
ch.update(vec![vec!["test".to_string()]]);
let had_content_after = ch.consume();
assert!(had_content_after); assert!(ch.is_empty());
}
#[test]
fn topic_channel_iter_messages() {
let mut ch: TopicChannel<i32> = TopicChannel::new();
ch.update(vec![vec![1, 2, 3]]);
let mut iter = ch.iter();
assert_eq!(iter.next(), Some(&1));
assert_eq!(iter.next(), Some(&2));
assert_eq!(iter.next(), Some(&3));
assert_eq!(iter.next(), None);
}
#[test]
fn topic_channel_checkpoint_persists_messages() {
let mut ch: TopicChannel<i32> = TopicChannel::new();
ch.update(vec![vec![1, 2, 3]]);
let checkpoint = ch.checkpoint().expect("should have checkpoint");
assert_eq!(checkpoint, serde_json::json!([1, 2, 3]));
let restored: TopicChannel<i32> =
TopicChannel::from_checkpoint(checkpoint).expect("should restore");
assert_eq!(restored.len(), 3);
assert_eq!(restored.get(), &[1, 2, 3]);
}
#[test]
fn topic_channel_from_checkpoint_empty() {
let ch: TopicChannel<i32> =
TopicChannel::from_checkpoint(serde_json::json!([])).expect("should restore");
assert!(ch.is_empty());
}
#[test]
fn last_value_after_finish_checkpoint_saves_is_finished_state() {
let mut ch: LastValueAfterFinishChannel<i32, ReplaceReducer> =
LastValueAfterFinishChannel::new(10);
ch.update(vec![42]);
ch.finish();
let checkpoint = ch
.checkpoint()
.expect("should have checkpoint when finished");
assert!(checkpoint.is_array());
let arr = checkpoint.as_array().expect("should be array");
assert_eq!(arr.len(), 2);
assert_eq!(arr[0], serde_json::json!(42)); assert_eq!(arr[1], serde_json::json!(true)); }
#[test]
fn last_value_after_finish_from_checkpoint_restores_is_finished() {
let checkpoint_data = serde_json::json!([99, true]);
let restored: LastValueAfterFinishChannel<i32, ReplaceReducer> =
LastValueAfterFinishChannel::from_checkpoint(checkpoint_data)
.expect("should restore from checkpoint");
assert_eq!(*restored.get(), 99);
assert!(restored.is_available());
}
#[test]
fn last_value_after_finish_from_checkpoint_old_format_backward_compat() {
let checkpoint_data = serde_json::json!(55);
let restored: LastValueAfterFinishChannel<i32, ReplaceReducer> =
LastValueAfterFinishChannel::from_checkpoint(checkpoint_data)
.expect("should restore from old checkpoint format");
assert_eq!(*restored.get(), 55);
assert!(!restored.is_available()); }
#[test]
fn last_value_after_finish_checkpoint_round_trip() {
let mut ch1: LastValueAfterFinishChannel<i32, ReplaceReducer> =
LastValueAfterFinishChannel::new(0);
ch1.update(vec![123]);
ch1.finish();
let checkpoint = ch1.checkpoint().expect("should checkpoint");
let ch2: LastValueAfterFinishChannel<i32, ReplaceReducer> =
LastValueAfterFinishChannel::from_checkpoint(checkpoint).expect("should restore");
assert_eq!(*ch1.get(), *ch2.get());
assert_eq!(ch1.is_available(), ch2.is_available());
}
#[test]
fn overwrite_get_returns_inner_value() {
let ov = Overwrite(42);
assert_eq!(*ov.get(), 42);
}
#[test]
fn overwrite_into_inner_consumes_wrapper() {
let ov = Overwrite(100);
assert_eq!(ov.into_inner(), 100);
}
#[test]
fn overwrite_new_creates_wrapper() {
let ov = Overwrite::new(999);
assert_eq!(*ov.get(), 999);
}
#[test]
fn delta_channel_replay_writes_handles_empty_sequence() {
let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(5, 10);
ch.replay_writes(&[]);
assert_eq!(*ch.get(), 5); }
#[test]
fn delta_channel_replay_writes_single_value() {
let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
ch.replay_writes(&[42]);
assert_eq!(*ch.get(), 42);
}
#[test]
fn delta_channel_replay_writes_multiple_values() {
let mut ch: DeltaChannel<Vec<i32>, AppendReducer> = DeltaChannel::new(vec![], 10);
ch.replay_writes(&[vec![1, 2], vec![3, 4]]);
assert_eq!(*ch.get(), vec![1, 2, 3, 4]);
}
#[test]
fn delta_channel_replay_writes_resets_snapshot_counter() {
let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
ch.update(vec![1]);
assert_eq!(ch.update_count_since_snapshot, 1);
ch.replay_writes(&[99]);
assert_eq!(ch.update_count_since_snapshot, 0); }
#[test]
fn delta_channel_replay_writes_with_replace_reducer() {
let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
ch.replay_writes(&[42]);
assert_eq!(*ch.get(), 42);
}
#[test]
fn delta_channel_replay_writes_detects_json_overwrite_format() {
let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
DeltaChannel::new(serde_json::json!(null), 10);
let overwrite_val = serde_json::json!({"__overwrite__": "baseline"});
let normal_val1 = serde_json::json!("update1");
let normal_val2 = serde_json::json!("update2");
ch.replay_writes(&[normal_val1, overwrite_val, normal_val2.clone()]);
assert_eq!(ch.get(), &normal_val2);
}
#[test]
fn delta_channel_replay_writes_overwrite_at_start() {
let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
DeltaChannel::new(serde_json::json!("initial"), 10);
let overwrite_val = serde_json::json!({"__overwrite__": "new_baseline"});
let normal_val = serde_json::json!("update");
ch.replay_writes(&[overwrite_val, normal_val.clone()]);
assert_eq!(ch.get(), &normal_val);
}
#[test]
fn delta_channel_replay_writes_overwrite_at_end() {
let mut ch: DeltaChannel<serde_json::Value, LastWriteWinsReducer> =
DeltaChannel::new(serde_json::json!("initial"), 10);
let normal_val = serde_json::json!("update");
let overwrite_val = serde_json::json!({"__overwrite__": "final_baseline"});
ch.replay_writes(&[normal_val, overwrite_val]);
assert_eq!(ch.get(), &serde_json::json!("final_baseline"));
}
#[test]
fn delta_channel_finish_forces_snapshot() {
let mut ch: DeltaChannel<i32, ReplaceReducer> = DeltaChannel::new(0, 10);
assert!(!ch.should_snapshot());
ch.finish();
assert!(ch.should_snapshot());
}
#[test]
fn ring_buffer_channel_new_enforces_capacity() {
let ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2, 3, 4, 5], 3);
assert_eq!(ch.capacity(), 3);
assert_eq!(ch.len(), 3);
assert_eq!(ch.get(), &vec![3, 4, 5]);
}
#[test]
fn ring_buffer_channel_new_clamps_min_capacity() {
let ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2], 0);
assert_eq!(ch.capacity(), 1);
}
#[test]
fn ring_buffer_channel_update_appends_and_trims() {
let mut ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![], 3);
assert!(ch.update(vec![vec![1, 2]]));
assert_eq!(ch.get(), &vec![1, 2]);
assert!(ch.update(vec![vec![3, 4]]));
assert_eq!(ch.get(), &vec![2, 3, 4]);
assert_eq!(ch.len(), 3);
}
#[test]
fn ring_buffer_channel_update_returns_false_for_empty() {
let mut ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2], 5);
assert!(!ch.update(vec![vec![]]));
assert_eq!(ch.get(), &vec![1, 2]);
}
#[test]
fn ring_buffer_channel_update_returns_false_for_empty_outer() {
let mut ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2], 5);
assert!(!ch.update(vec![]));
assert_eq!(ch.get(), &vec![1, 2]);
}
#[test]
fn ring_buffer_channel_consume_always_false() {
let mut ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1], 5);
assert!(!ch.consume());
}
#[test]
fn ring_buffer_channel_checkpoint_roundtrip() {
let ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![1, 2, 3], 10);
let checkpoint = ch.checkpoint().unwrap();
let restored = RingBufferChannel::<i32>::from_checkpoint(checkpoint).unwrap();
assert_eq!(restored.get(), &vec![1, 2, 3]);
assert_eq!(restored.capacity(), 10);
}
#[test]
fn ring_buffer_channel_from_checkpoint_legacy_format() {
let legacy = serde_json::json!([1, 2, 3]);
let ch = RingBufferChannel::<i32>::from_checkpoint(legacy).unwrap();
assert_eq!(ch.get(), &vec![1, 2, 3]);
assert_eq!(ch.capacity(), 1000); }
#[test]
fn ring_buffer_channel_from_checkpoint_clamps_capacity() {
let checkpoint = serde_json::json!({"values": [1, 2], "capacity": 0});
let ch = RingBufferChannel::<i32>::from_checkpoint(checkpoint).unwrap();
assert_eq!(ch.capacity(), 1);
}
#[test]
fn ring_buffer_channel_is_empty() {
let ch: RingBufferChannel<i32> = RingBufferChannel::new(vec![], 5);
assert!(ch.is_empty());
let ch2: RingBufferChannel<i32> = RingBufferChannel::new(vec![1], 5);
assert!(!ch2.is_empty());
}
}