use std::marker::PhantomData;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use super::super::redex::{RedexError, RedexEvent, RedexFold};
use super::meta::{EventMeta, EVENT_META_SIZE};
pub(super) struct WatermarkingFold<S, F> {
inner: F,
app_seq: Arc<AtomicU64>,
origin_hash: u64,
_state: PhantomData<fn(&mut S)>,
}
impl<S, F> WatermarkingFold<S, F> {
pub(super) fn new(inner: F, app_seq: Arc<AtomicU64>, origin_hash: u64) -> Self {
Self {
inner,
app_seq,
origin_hash,
_state: PhantomData,
}
}
}
impl<S, F> RedexFold<S> for WatermarkingFold<S, F>
where
F: RedexFold<S>,
{
fn apply(&mut self, ev: &RedexEvent, state: &mut S) -> Result<(), RedexError> {
self.inner.apply(ev, state)?;
if ev.payload.len() < EVENT_META_SIZE {
return Ok(());
}
let Some(meta) = EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE]) else {
return Ok(());
};
if meta.origin_hash != self.origin_hash {
return Ok(());
}
if meta.seq_or_ts == u64::MAX {
return Ok(());
}
let next = meta.seq_or_ts.saturating_add(1);
self.app_seq.fetch_max(next, Ordering::AcqRel);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use super::super::super::redex::{RedexEntry, RedexEvent};
struct MockFold {
seen: Vec<(u64, u8)>,
fail_at_seq: Option<u64>,
}
impl MockFold {
fn new() -> Self {
Self {
seen: Vec::new(),
fail_at_seq: None,
}
}
fn fail_on(seq: u64) -> Self {
Self {
seen: Vec::new(),
fail_at_seq: Some(seq),
}
}
}
impl RedexFold<Vec<(u64, u8)>> for MockFold {
fn apply(&mut self, ev: &RedexEvent, state: &mut Vec<(u64, u8)>) -> Result<(), RedexError> {
if Some(ev.entry.seq) == self.fail_at_seq {
return Err(RedexError::Decode("forced failure".into()));
}
let dispatch = ev.payload.first().copied().unwrap_or(0);
self.seen.push((ev.entry.seq, dispatch));
state.push((ev.entry.seq, dispatch));
Ok(())
}
}
fn ev_with_meta(seq: u64, origin_hash: u64, seq_or_ts: u64, tail: &[u8]) -> RedexEvent {
let meta = EventMeta::new(0xAB, 0, origin_hash, seq_or_ts, 0);
let mut payload = Vec::with_capacity(EVENT_META_SIZE + tail.len());
payload.extend_from_slice(&meta.to_bytes());
payload.extend_from_slice(tail);
RedexEvent {
entry: RedexEntry::new_heap(seq, 0, payload.len() as u32, 0, 0),
payload: Bytes::from(payload),
}
}
fn ev_short(seq: u64, len: usize) -> RedexEvent {
let payload = vec![0u8; len];
RedexEvent {
entry: RedexEntry::new_heap(seq, 0, len as u32, 0, 0),
payload: Bytes::from(payload),
}
}
const ORIGIN_US: u64 = 0xAAAA_BBBB;
const ORIGIN_OTHER: u64 = 0xCCCC_DDDD;
#[test]
fn matching_origin_advances_app_seq_via_fetch_max() {
let app_seq = Arc::new(AtomicU64::new(0));
let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
let mut state = Vec::new();
wf.apply(&ev_with_meta(0, ORIGIN_US, 5, b""), &mut state)
.unwrap();
assert_eq!(app_seq.load(Ordering::Acquire), 6);
}
#[test]
fn other_origin_does_not_advance_app_seq() {
let app_seq = Arc::new(AtomicU64::new(0));
let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
let mut state = Vec::new();
wf.apply(&ev_with_meta(0, ORIGIN_OTHER, 999, b""), &mut state)
.unwrap();
assert_eq!(
app_seq.load(Ordering::Acquire),
0,
"events from another origin must not move our watermark",
);
assert_eq!(state.len(), 1);
assert_eq!(state[0].0, 0);
}
#[test]
fn fetch_max_keeps_watermark_monotonic_under_out_of_order_seq_or_ts() {
let app_seq = Arc::new(AtomicU64::new(0));
let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
let mut state = Vec::new();
wf.apply(&ev_with_meta(0, ORIGIN_US, 10, b""), &mut state)
.unwrap();
assert_eq!(app_seq.load(Ordering::Acquire), 11);
wf.apply(&ev_with_meta(1, ORIGIN_US, 3, b""), &mut state)
.unwrap();
assert_eq!(
app_seq.load(Ordering::Acquire),
11,
"fetch_max must keep the watermark from regressing",
);
}
#[test]
fn short_payload_is_silently_skipped() {
let app_seq = Arc::new(AtomicU64::new(7));
let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
let mut state = Vec::new();
wf.apply(&ev_short(0, 19), &mut state).unwrap();
assert_eq!(
app_seq.load(Ordering::Acquire),
7,
"watermark must be untouched when payload is too short to parse",
);
}
#[test]
fn inner_fold_error_propagates_and_does_not_advance_watermark() {
let app_seq = Arc::new(AtomicU64::new(0));
let mut wf = WatermarkingFold::new(MockFold::fail_on(0), app_seq.clone(), ORIGIN_US);
let mut state = Vec::new();
let r = wf.apply(&ev_with_meta(0, ORIGIN_US, 42, b""), &mut state);
assert!(matches!(r, Err(RedexError::Decode(_))));
assert_eq!(
app_seq.load(Ordering::Acquire),
0,
"watermark must NOT advance for an event the inner fold rejected",
);
}
#[test]
fn watermark_holds_when_pre_set_value_already_exceeds_observed_seq_or_ts() {
let app_seq = Arc::new(AtomicU64::new(100));
let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
let mut state = Vec::new();
wf.apply(&ev_with_meta(0, ORIGIN_US, 5, b""), &mut state)
.unwrap();
assert_eq!(app_seq.load(Ordering::Acquire), 100);
}
#[test]
fn mixed_origin_stream_only_advances_for_matching_origin() {
let app_seq = Arc::new(AtomicU64::new(0));
let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
let mut state = Vec::new();
let stream = [
(0, ORIGIN_OTHER, 100),
(1, ORIGIN_US, 0),
(2, ORIGIN_OTHER, 200),
(3, ORIGIN_US, 1),
(4, ORIGIN_OTHER, 300),
(5, ORIGIN_US, 2),
];
for (seq, origin, seq_or_ts) in stream {
wf.apply(&ev_with_meta(seq, origin, seq_or_ts, b""), &mut state)
.unwrap();
}
assert_eq!(
app_seq.load(Ordering::Acquire),
3,
"watermark must reflect only our origin's max+1 (saw seq_or_ts 0,1,2)",
);
assert_eq!(state.len(), 6);
}
#[test]
fn watermark_ignores_seq_or_ts_at_u64_max_to_preserve_monotonicity() {
let app_seq = Arc::new(AtomicU64::new(42));
let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
let mut state = Vec::new();
wf.apply(&ev_with_meta(0, ORIGIN_US, u64::MAX, b""), &mut state)
.unwrap();
assert_eq!(
app_seq.load(Ordering::Acquire),
42,
"watermark must NOT advance on a u64::MAX seq_or_ts \
— a subsequent fetch_add(1) on u64::MAX panics in debug \
or wraps to 0 in release, breaking per-origin monotonicity"
);
assert_eq!(state.len(), 1, "inner fold must still see the event");
wf.apply(&ev_with_meta(1, ORIGIN_US, 100, b""), &mut state)
.unwrap();
assert_eq!(
app_seq.load(Ordering::Acquire),
101,
"subsequent legitimate seq_or_ts must still advance the watermark"
);
let app_seq2 = Arc::new(AtomicU64::new(0));
let mut wf2 = WatermarkingFold::new(MockFold::new(), app_seq2.clone(), ORIGIN_US);
let mut state2 = Vec::new();
wf2.apply(&ev_with_meta(0, ORIGIN_US, u64::MAX - 1, b""), &mut state2)
.unwrap();
assert_eq!(
app_seq2.load(Ordering::Acquire),
u64::MAX,
"seq_or_ts = u64::MAX - 1 is legitimate (saturating_add(1) = u64::MAX)"
);
}
}