net-mesh 0.27.6

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
//! `WatermarkingFold<S, F>` — wraps a user fold and piggybacks
//! `app_seq` discovery onto its event-traversal pass.
//!
//! Background. Both `TasksAdapter` and `MemoriesAdapter` keep a
//! per-origin monotonic counter (`app_seq`) that gets stamped on every
//! `EventMeta::seq_or_ts`. After `open_from_snapshot` the counter must
//! satisfy `app_seq > max(seq_or_ts of any in-log event for our
//! origin)` before the first `ingest_typed`, otherwise the next ingest
//! can stamp a duplicate `seq_or_ts` (data corruption — two distinct
//! events with the same per-origin sequence number).
//!
//! The typed adapters install this wrapper around the user fold so
//! discovery piggybacks on the fold task's traversal. The fold task
//! reads each event exactly once; on every successful inner-fold
//! `apply` — and also on a recoverable `Decode` error, because the
//! wire slot is claimed regardless of whether the body decodes (#6)
//! — we parse the leading [`EventMeta`] and, if the event matches
//! our `origin_hash`, advance the shared `Arc<AtomicU64>` via
//! `fetch_max(meta.seq_or_ts + 1)`. The typed constructors then
//! `wait_for_seq(replay_end - 1).await` before returning so callers
//! see a fully-ready adapter — `app_seq` is correct synchronously
//! from the caller's perspective even though it was assembled
//! asynchronously by the fold task.
//!
//! A naïve alternative — a separate synchronous `read_range` walk
//! that re-materializes every event after the inner fold task has
//! already done so — costs N redundant payload reads + N redundant
//! checksum verifications + 2N `Bytes` copies on an N-event log, and
//! is deliberately avoided here.

use std::marker::PhantomData;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use super::super::redex::{RedexError, RedexEvent, RedexFold};
use super::meta::{EventMeta, EVENT_META_SIZE};

/// Wraps `inner: F` and updates a shared `Arc<AtomicU64>` watermark
/// from each event's [`EventMeta`] header. Only events whose
/// `origin_hash` matches `self.origin_hash` advance the counter;
/// events from other origins are folded but ignored for the
/// watermark.
///
/// `S` is the user state type passed through to the inner fold.
pub(super) struct WatermarkingFold<S, F> {
    inner: F,
    app_seq: Arc<AtomicU64>,
    origin_hash: u64,
    _state: PhantomData<fn(&mut S)>,
}

impl<S, F> WatermarkingFold<S, F> {
    pub(super) fn new(inner: F, app_seq: Arc<AtomicU64>, origin_hash: u64) -> Self {
        Self {
            inner,
            app_seq,
            origin_hash,
            _state: PhantomData,
        }
    }
}

impl<S, F> RedexFold<S> for WatermarkingFold<S, F>
where
    F: RedexFold<S>,
{
    fn apply(&mut self, ev: &RedexEvent, state: &mut S) -> Result<(), RedexError> {
        // Inner fold owns the user-visible state-update semantics.
        let inner_result = self.inner.apply(ev, state);

        // The watermark advances on a successful apply AND on a
        // recoverable `Decode` error for a same-origin event. The
        // latter is the #6 fix: the `EventMeta` header is parsed
        // independently of the body, and the wire slot is *claimed*
        // (a `seq_or_ts` was already stamped on this event) the
        // moment it was written — regardless of whether the body
        // decodes. If we returned early on a recoverable Decode
        // without advancing, `app_seq` could fall behind a
        // claimed-but-dead `seq_or_ts`; a later `ingest_typed`
        // would then re-stamp that same `seq_or_ts`, producing two
        // wire events with the same per-origin sequence — the exact
        // corruption this counter exists to prevent.
        //
        // A NON-recoverable error (stream-level / configuration) is
        // surfaced verbatim with the watermark untouched: it halts
        // the fold task rather than skipping a single claimed slot,
        // so there is no claimed-but-dead slot to account for.
        match &inner_result {
            Ok(()) => {}
            Err(e) if e.is_recoverable_decode() => {
                // Fall through to the header parse + watermark
                // advance below, then return the error.
            }
            Err(_) => return inner_result,
        }

        // Defensive payload-length guard — a payload shorter than
        // `EVENT_META_SIZE` cannot have come through `ingest_typed`
        // (which always writes a `EventMeta` prefix). Still possible
        // if a third party appends raw bytes to the same channel
        // file, in which case we silently skip rather than corrupt
        // the watermark with a bogus parse.
        if ev.payload.len() < EVENT_META_SIZE {
            return inner_result;
        }
        let Some(meta) = EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE]) else {
            return inner_result;
        };
        if meta.origin_hash != self.origin_hash {
            return inner_result;
        }

        // `fetch_max` is the right primitive: events arrive in
        // RedEX-seq order (which is NOT the same as `seq_or_ts` order
        // — two adapters writing to the same channel can interleave
        // their per-origin counters), so we want monotonic-up
        // semantics regardless of arrival order.
        //
        // Skip the watermark update if `seq_or_ts == u64::MAX`.
        // Pre-fix `saturating_add(1)` pinned `app_seq` at `u64::MAX`
        // when a peer (legitimately compromised, deliberately hostile,
        // or carrying a malformed payload that survived checksum) wrote
        // an event with `seq_or_ts == u64::MAX`. The next legitimate
        // `ingest_typed` then ran `app_seq.fetch_add(1)` on `u64::MAX`,
        // which panics in debug builds and wraps to 0 in release —
        // breaking per-origin monotonicity (two distinct events
        // stamped with the same `seq_or_ts == 0`, the canonical data
        // corruption this counter exists to prevent). Reaching
        // `seq_or_ts == u64::MAX` legitimately would require 2^64
        // ingests under one origin (unreachable in practice), so
        // every observation of it is necessarily a hostile or
        // malformed event; refusing to advance keeps `app_seq <
        // u64::MAX` and preserves the monotonicity invariant.
        if meta.seq_or_ts == u64::MAX {
            return inner_result;
        }
        let next = meta.seq_or_ts.saturating_add(1);
        self.app_seq.fetch_max(next, Ordering::AcqRel);
        inner_result
    }
}

#[cfg(test)]
mod tests {
    //! Unit-level coverage for the `WatermarkingFold` wrapper. The
    //! integration tests in `tests/integration_cortex_{tasks,memories}.rs`
    //! exercise the wrapper end-to-end through the typed adapters; these
    //! tests pin its individual behaviors without spinning up a Redex
    //! file or fold task.
    //!
    //! We construct synthetic `RedexEvent`s directly. The entry's
    //! `seq` / `flags_and_checksum` fields don't matter for the
    //! wrapper — it only inspects `ev.payload[..EVENT_META_SIZE]`.
    use super::*;

    use bytes::Bytes;

    use super::super::super::redex::{RedexEntry, RedexEvent};

    /// Inner fold that just records every (seq, dispatch) pair the
    /// wrapper hands it, and optionally fails on a specific seq to
    /// exercise the error-propagation path. `fail_recoverable`
    /// selects whether the forced failure is a recoverable
    /// `Decode` (skip-and-continue) or a non-recoverable error.
    struct MockFold {
        seen: Vec<(u64, u8)>,
        fail_at_seq: Option<u64>,
        fail_recoverable: bool,
    }

    impl MockFold {
        fn new() -> Self {
            Self {
                seen: Vec::new(),
                fail_at_seq: None,
                fail_recoverable: true,
            }
        }
        /// Fail on `seq` with a recoverable `Decode` error (the
        /// per-event skip-and-continue path).
        fn fail_on(seq: u64) -> Self {
            Self {
                seen: Vec::new(),
                fail_at_seq: Some(seq),
                fail_recoverable: true,
            }
        }
        /// Fail on `seq` with a NON-recoverable error (a
        /// stream-level failure that must not advance the
        /// watermark).
        fn fail_on_nonrecoverable(seq: u64) -> Self {
            Self {
                seen: Vec::new(),
                fail_at_seq: Some(seq),
                fail_recoverable: false,
            }
        }
    }

    impl RedexFold<Vec<(u64, u8)>> for MockFold {
        fn apply(&mut self, ev: &RedexEvent, state: &mut Vec<(u64, u8)>) -> Result<(), RedexError> {
            if Some(ev.entry.seq) == self.fail_at_seq {
                if self.fail_recoverable {
                    return Err(RedexError::Decode("forced failure".into()));
                }
                return Err(RedexError::Io("forced non-recoverable failure".into()));
            }
            let dispatch = ev.payload.first().copied().unwrap_or(0);
            self.seen.push((ev.entry.seq, dispatch));
            state.push((ev.entry.seq, dispatch));
            Ok(())
        }
    }

    /// Build a synthetic `RedexEvent` whose payload is `EventMeta` (with
    /// the given origin/seq_or_ts) followed by `tail`.
    fn ev_with_meta(seq: u64, origin_hash: u64, seq_or_ts: u64, tail: &[u8]) -> RedexEvent {
        let meta = EventMeta::new(0xAB, 0, origin_hash, seq_or_ts, 0);
        let mut payload = Vec::with_capacity(EVENT_META_SIZE + tail.len());
        payload.extend_from_slice(&meta.to_bytes());
        payload.extend_from_slice(tail);
        RedexEvent {
            entry: RedexEntry::new_heap(seq, 0, payload.len() as u32, 0, 0),
            payload: Bytes::from(payload),
        }
    }

    /// Build a `RedexEvent` whose payload is shorter than
    /// `EVENT_META_SIZE` — exercises the defensive guard.
    fn ev_short(seq: u64, len: usize) -> RedexEvent {
        let payload = vec![0u8; len];
        RedexEvent {
            entry: RedexEntry::new_heap(seq, 0, len as u32, 0, 0),
            payload: Bytes::from(payload),
        }
    }

    const ORIGIN_US: u64 = 0xAAAA_BBBB;
    const ORIGIN_OTHER: u64 = 0xCCCC_DDDD;

    #[test]
    fn matching_origin_advances_app_seq_via_fetch_max() {
        let app_seq = Arc::new(AtomicU64::new(0));
        let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
        let mut state = Vec::new();

        wf.apply(&ev_with_meta(0, ORIGIN_US, 5, b""), &mut state)
            .unwrap();
        assert_eq!(app_seq.load(Ordering::Acquire), 6);
    }

    #[test]
    fn other_origin_does_not_advance_app_seq() {
        let app_seq = Arc::new(AtomicU64::new(0));
        let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
        let mut state = Vec::new();

        wf.apply(&ev_with_meta(0, ORIGIN_OTHER, 999, b""), &mut state)
            .unwrap();
        assert_eq!(
            app_seq.load(Ordering::Acquire),
            0,
            "events from another origin must not move our watermark",
        );
        // Inner fold still saw the event — wrapper does NOT filter
        // delivery, only the watermark update.
        assert_eq!(state.len(), 1);
        assert_eq!(state[0].0, 0);
    }

    #[test]
    fn fetch_max_keeps_watermark_monotonic_under_out_of_order_seq_or_ts() {
        // Two adapters writing to the same channel can interleave their
        // per-origin counters, so a single matching-origin tail can
        // legitimately arrive in non-monotonic seq_or_ts order. The
        // watermark must track the MAX, not the latest.
        let app_seq = Arc::new(AtomicU64::new(0));
        let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
        let mut state = Vec::new();

        wf.apply(&ev_with_meta(0, ORIGIN_US, 10, b""), &mut state)
            .unwrap();
        assert_eq!(app_seq.load(Ordering::Acquire), 11);

        // A later RedEX seq with a SMALLER seq_or_ts must NOT pull the
        // watermark back down.
        wf.apply(&ev_with_meta(1, ORIGIN_US, 3, b""), &mut state)
            .unwrap();
        assert_eq!(
            app_seq.load(Ordering::Acquire),
            11,
            "fetch_max must keep the watermark from regressing",
        );
    }

    #[test]
    fn short_payload_is_silently_skipped() {
        // A third-party writer that appended raw bytes (no `EventMeta`
        // prefix) would produce a payload < EVENT_META_SIZE. The
        // wrapper must defensively skip rather than corrupt the
        // watermark with a bogus parse.
        let app_seq = Arc::new(AtomicU64::new(7));
        let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
        let mut state = Vec::new();

        // 19 bytes — one short of EVENT_META_SIZE (20).
        wf.apply(&ev_short(0, 19), &mut state).unwrap();
        assert_eq!(
            app_seq.load(Ordering::Acquire),
            7,
            "watermark must be untouched when payload is too short to parse",
        );
    }

    #[test]
    fn recoverable_decode_error_still_advances_watermark_for_matching_origin() {
        // #6 fix (UPDATED — this test previously pinned the buggy
        // behavior of NOT advancing on a recoverable Decode).
        //
        // A recoverable `Decode` error on a SAME-ORIGIN event still
        // advances `app_seq`: the wire slot was already claimed (a
        // `seq_or_ts` was stamped when the event was written), and
        // the `EventMeta` header parses independently of the body.
        // If we left `app_seq` behind here, a later `ingest_typed`
        // could re-stamp the same `seq_or_ts`, producing a
        // duplicate per-origin sequence — the exact corruption this
        // counter prevents. The wrapper must STILL surface the
        // error to the inner fold-error-policy interpreter.
        let app_seq = Arc::new(AtomicU64::new(0));
        let mut wf = WatermarkingFold::new(MockFold::fail_on(0), app_seq.clone(), ORIGIN_US);
        let mut state = Vec::new();

        let r = wf.apply(&ev_with_meta(0, ORIGIN_US, 42, b""), &mut state);
        assert!(
            matches!(r, Err(RedexError::Decode(_))),
            "the recoverable Decode error must still propagate to the caller",
        );
        assert_eq!(
            app_seq.load(Ordering::Acquire),
            43,
            "watermark MUST advance past a claimed-but-dead same-origin slot \
             (seq_or_ts 42 -> app_seq 43) so it can't be re-stamped later",
        );
    }

    #[test]
    fn recoverable_decode_error_does_not_advance_for_other_origin() {
        // The recoverable-Decode advance is scoped to MATCHING
        // origin only — a decode failure on another origin's event
        // claims no slot in OUR per-origin counter, so it must
        // leave our watermark untouched (same as the success path).
        let app_seq = Arc::new(AtomicU64::new(0));
        let mut wf = WatermarkingFold::new(MockFold::fail_on(0), app_seq.clone(), ORIGIN_US);
        let mut state = Vec::new();

        let r = wf.apply(&ev_with_meta(0, ORIGIN_OTHER, 999, b""), &mut state);
        assert!(matches!(r, Err(RedexError::Decode(_))));
        assert_eq!(
            app_seq.load(Ordering::Acquire),
            0,
            "another origin's claimed slot must not move our watermark",
        );
    }

    #[test]
    fn non_recoverable_inner_error_does_not_advance_watermark() {
        // A NON-recoverable error (stream-level / configuration)
        // halts the fold task rather than skipping a single claimed
        // slot, so there is no claimed-but-dead slot to account for.
        // The watermark must stay put and the error must surface
        // verbatim — only the recoverable-Decode path changed in #6.
        let app_seq = Arc::new(AtomicU64::new(0));
        let mut wf = WatermarkingFold::new(
            MockFold::fail_on_nonrecoverable(0),
            app_seq.clone(),
            ORIGIN_US,
        );
        let mut state = Vec::new();

        let r = wf.apply(&ev_with_meta(0, ORIGIN_US, 42, b""), &mut state);
        assert!(
            matches!(r, Err(RedexError::Io(_))),
            "non-recoverable error must propagate verbatim",
        );
        assert_eq!(
            app_seq.load(Ordering::Acquire),
            0,
            "watermark must NOT advance on a non-recoverable inner error",
        );
    }

    #[test]
    fn watermark_holds_when_pre_set_value_already_exceeds_observed_seq_or_ts() {
        // `open_from_snapshot` pre-loads `app_seq` from the snapshot
        // payload. If the snapshot value already covers every same-
        // origin event in the replay tail, the wrapper's fetch_max is
        // a no-op. Pin that semantics so `open_from_snapshot` doesn't
        // accidentally regress the counter when the tail is empty
        // for our origin.
        let app_seq = Arc::new(AtomicU64::new(100));
        let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
        let mut state = Vec::new();

        wf.apply(&ev_with_meta(0, ORIGIN_US, 5, b""), &mut state)
            .unwrap();
        assert_eq!(app_seq.load(Ordering::Acquire), 100);
    }

    #[test]
    fn mixed_origin_stream_only_advances_for_matching_origin() {
        // Realistic shape: a channel shared by us and another origin,
        // events interleaved in the log.
        let app_seq = Arc::new(AtomicU64::new(0));
        let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
        let mut state = Vec::new();

        let stream = [
            (0, ORIGIN_OTHER, 100),
            (1, ORIGIN_US, 0),
            (2, ORIGIN_OTHER, 200),
            (3, ORIGIN_US, 1),
            (4, ORIGIN_OTHER, 300),
            (5, ORIGIN_US, 2),
        ];
        for (seq, origin, seq_or_ts) in stream {
            wf.apply(&ev_with_meta(seq, origin, seq_or_ts, b""), &mut state)
                .unwrap();
        }

        assert_eq!(
            app_seq.load(Ordering::Acquire),
            3,
            "watermark must reflect only our origin's max+1 (saw seq_or_ts 0,1,2)",
        );
        // Inner fold saw every event (delivery is not filtered).
        assert_eq!(state.len(), 6);
    }

    #[test]
    fn watermark_ignores_seq_or_ts_at_u64_max_to_preserve_monotonicity() {
        // Pre-fix: `saturating_add(1)` pinned `app_seq` at
        // `u64::MAX` when a peer wrote an event with
        // `seq_or_ts == u64::MAX`. The next legitimate ingest's
        // `fetch_add(1)` on `u64::MAX` then panics (debug) or
        // wraps to 0 (release), breaking the per-origin
        // monotonicity invariant. A hostile or malformed peer
        // could thus poison our adapter with one bad event.
        //
        // Post-fix: `seq_or_ts == u64::MAX` is treated as
        // malformed and ignored. The watermark stays at
        // whatever it was, the inner fold still receives the
        // event (delivery is not filtered), and the next
        // ingest's `fetch_add(1)` is always safe.
        let app_seq = Arc::new(AtomicU64::new(42));
        let mut wf = WatermarkingFold::new(MockFold::new(), app_seq.clone(), ORIGIN_US);
        let mut state = Vec::new();

        // Inner fold runs (delivery is not filtered), but the
        // watermark must NOT advance to u64::MAX.
        wf.apply(&ev_with_meta(0, ORIGIN_US, u64::MAX, b""), &mut state)
            .unwrap();
        assert_eq!(
            app_seq.load(Ordering::Acquire),
            42,
            "watermark must NOT advance on a u64::MAX seq_or_ts \
             — a subsequent fetch_add(1) on u64::MAX panics in debug \
             or wraps to 0 in release, breaking per-origin monotonicity"
        );
        assert_eq!(state.len(), 1, "inner fold must still see the event");

        // Confirm normal operation still works after the
        // poisoning attempt.
        wf.apply(&ev_with_meta(1, ORIGIN_US, 100, b""), &mut state)
            .unwrap();
        assert_eq!(
            app_seq.load(Ordering::Acquire),
            101,
            "subsequent legitimate seq_or_ts must still advance the watermark"
        );

        // Boundary: seq_or_ts = u64::MAX - 1 still advances (it's
        // legitimate, even if astronomical). The next state is
        // app_seq = u64::MAX, which is the highest value an
        // adapter can ever observe — but that's only a problem
        // if the NEXT ingest is allowed; the audit's invariant
        // here is just that hostile u64::MAX inputs don't
        // accelerate exhaustion.
        let app_seq2 = Arc::new(AtomicU64::new(0));
        let mut wf2 = WatermarkingFold::new(MockFold::new(), app_seq2.clone(), ORIGIN_US);
        let mut state2 = Vec::new();
        wf2.apply(&ev_with_meta(0, ORIGIN_US, u64::MAX - 1, b""), &mut state2)
            .unwrap();
        assert_eq!(
            app_seq2.load(Ordering::Acquire),
            u64::MAX,
            "seq_or_ts = u64::MAX - 1 is legitimate (saturating_add(1) = u64::MAX)"
        );
    }
}