net-mesh 0.23.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
//! `RedexIndex<K, V>` — generic tail-driven secondary index.
//!
//! A [`RedexIndex`] spawns a background task that tails a [`RedexFile`],
//! decodes each event as a user-supplied type `T`, and applies the
//! caller's projection to emit [`IndexOp`]s. The projection owns the
//! decision of what to index and how; the index owns the bookkeeping.
//!
//! ## When to use it
//!
//! - You have a RedEX file acting as the source of truth for some
//!   domain.
//! - You want a cheap `(K) → Vec<V>` lookup that reflects the file's
//!   current state, without rolling the bookkeeping yourself.
//! - You can tolerate eventual consistency bounded by "one fold tick"
//!   — the index lags the file by at most one `apply()` call.
//!
//! ## What you give up
//!
//! - **Durability.** The index is **in-memory only**. On restart it
//!   rebuilds from the tail of the RedexFile. If you want durable
//!   indices, persist the snapshot separately and rebuild from seq N.
//! - **Cross-key atomicity.** Each `IndexOp` lands under its own
//!   `DashMap` shard lock; two reads on different keys can observe
//!   the mid-way state.
//! - **Strict read-your-writes.** The index is updated on a background
//!   task, so a thread that appends to the file and immediately reads
//!   from the index may miss its own write by one scheduler hop.
//!
//! ## Wire to snapshot / restore
//!
//! CortEX adapters that own a [`RedexIndex`] rebuild it naturally on
//! restore: the snapshot captures the domain state up to seq N; on
//! `open_from_snapshot(..., last_seq)`, the adapter spawns a new
//! tail at `FromSeq(last_seq + 1)` and the index rebuilds from there.
//! The index never rides in the snapshot itself.

use std::collections::HashSet;
use std::hash::Hash;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;

use dashmap::DashMap;
use futures::{Stream, StreamExt};
use serde::de::DeserializeOwned;
use tokio::sync::Notify;

use super::error::RedexError;
use super::event::RedexEvent;
use super::file::RedexFile;

/// One mutation a [`RedexIndex`] projection can emit per event.
#[derive(Debug, Clone)]
pub enum IndexOp<K, V> {
    /// Add `value` to the bucket at `key`. No-op if already present.
    Insert(K, V),
    /// Remove `value` from the bucket at `key`. No-op if absent.
    /// When the bucket empties, the key is dropped from the index
    /// entirely so `keys()` and `len()` reflect current contents.
    Remove(K, V),
}

/// Where to start tailing when an index is opened.
///
/// Mirrors the shape CortEX uses (`FromBeginning` / `FromSeq`) —
/// `LiveOnly` is intentionally omitted here because a live-only
/// index is almost never what you want: the bucket would only
/// reflect events received after the index was built, with no
/// history.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IndexStart {
    /// Tail from seq 0; rebuilds the index over the entire file.
    FromBeginning,
    /// Tail from `seq`; skips everything below it. Use after
    /// `open_from_snapshot` to resume with only post-snapshot
    /// events.
    FromSeq(u64),
}

/// Eventually-consistent secondary index driven from a RedEX file tail.
///
/// Lock-free on the read side ([`DashMap`]); a single background task
/// owns the write side. Reads can observe the index at most one fold
/// tick behind the file.
pub struct RedexIndex<K, V>
where
    K: Hash + Eq + Clone + Send + Sync + 'static,
    V: Hash + Eq + Clone + Send + Sync + 'static,
{
    inner: Arc<DashMap<K, HashSet<V>>>,
    /// Fired on Drop to cancel the tail task. The task races this
    /// notify against the tail stream; whichever wins wakes first
    /// and the task exits.
    shutdown: Arc<Notify>,
    /// Counter incremented every time the tail task clears the
    /// in-memory index because of a `Lagged` event or a
    /// saturation-induced watcher drop. Downstreams that need to
    /// react to a lossy reset (e.g. for an aggregating index that
    /// can't tolerate dropped counts, vs. a state-rebuild index
    /// that re-converges on re-tail) can poll
    /// [`RedexIndex::lag_resets`] and detect a bump rather than
    /// having to diff `len()` over time.
    lag_resets: Arc<AtomicU64>,
}

impl<K, V> RedexIndex<K, V>
where
    K: Hash + Eq + Clone + Send + Sync + 'static,
    V: Hash + Eq + Clone + Send + Sync + 'static,
{
    /// Open an index over `file`, decoding each event payload as `T`
    /// (via postcard) and applying the caller's projection to emit
    /// zero or more [`IndexOp`]s per event.
    ///
    /// Spawns a tokio task on the current runtime. Dropping all
    /// clones of the returned handle cancels the task via `Notify`.
    /// Payload-decode errors are logged (`tracing::warn!`) and the
    /// offending event is skipped — a corrupt or wrong-type entry
    /// shouldn't poison the whole index.
    pub fn open<T, F>(file: &RedexFile, start: IndexStart, project: F) -> Self
    where
        T: DeserializeOwned + Send + 'static,
        F: Fn(&T) -> Vec<IndexOp<K, V>> + Send + Sync + 'static,
    {
        let inner: Arc<DashMap<K, HashSet<V>>> = Arc::new(DashMap::new());
        let shutdown = Arc::new(Notify::new());
        let lag_resets = Arc::new(AtomicU64::new(0));

        let from_seq = match start {
            IndexStart::FromBeginning => 0,
            IndexStart::FromSeq(n) => n,
        };
        let task_inner = inner.clone();
        let task_shutdown = shutdown.clone();
        let task_file = file.clone();
        let task_lag_resets = lag_resets.clone();

        tokio::spawn(async move {
            // Use Box::pin (not tokio::pin!) so we can swap in a
            // fresh tail stream after a Lagged recovery without
            // restructuring the task.
            let mut tail: Pin<Box<dyn Stream<Item = Result<RedexEvent, RedexError>> + Send>> =
                Box::pin(task_file.tail(from_seq));

            // Saturation-resume backoff: under sustained burst load
            // with an under-sized `tail_buffer_size`, the recovery
            // path can re-fire instantly (`tail.next() → None →
            // clear → re-tail → None → …`). Without backoff this
            // hot-loops and emits a `tracing::warn!` per cycle —
            // production deployments with under-sized buffers see
            // log floods. We track consecutive resets within a
            // short window and sleep proportionally; one
            // make-progress event resets the counter. The cap
            // (250ms) keeps the worst-case recovery latency
            // bounded.
            let mut consecutive_resets: u32 = 0;

            loop {
                tokio::select! {
                    // Drop-on-cancel: the Notify wins and the task
                    // exits, dropping the tail stream.
                    _ = task_shutdown.notified() => return,
                    next = tail.next() => {
                        match next {
                            Some(Ok(event)) => {
                                apply_event(&task_inner, &project, &event);
                                // Forward progress — clear the
                                // backoff counter so the next
                                // genuine reset starts fresh.
                                consecutive_resets = 0;
                            }
                            Some(Err(RedexError::Lagged)) => {
                                // Pre-fix, the loop just
                                // logged at debug! and continued,
                                // but the underlying watcher had
                                // been dropped from the file —
                                // `tail.next()` returned `None` on
                                // the next poll and the task
                                // exited permanently with an
                                // incomplete index. Keys missing
                                // post-`Lagged` stayed missing
                                // forever.
                                //
                                // Recovery: clear the in-memory
                                // index (any prior state is now
                                // ambiguous — we don't know which
                                // ops we missed) and re-tail from
                                // `next_seq()`, i.e. live-only.
                                // We do NOT replay retained
                                // history because the buffer that
                                // produced the original `Lagged`
                                // is the same buffer we'd have to
                                // squeeze the retained range
                                // through, which would just
                                // signal `Lagged` again. The
                                // index user is responsible for
                                // sizing the tail buffer; this
                                // path is the safe-recovery
                                // fallback when that sizing is
                                // wrong.
                                let resume_seq = task_file.next_seq();
                                task_inner.clear();
                                let total_resets =
                                    task_lag_resets.fetch_add(1, Ordering::Relaxed) + 1;
                                tail = Box::pin(task_file.tail(resume_seq));
                                consecutive_resets = consecutive_resets.saturating_add(1);
                                rate_limited_lag_warn(
                                    "RedexIndex: tail lagged; cleared index, resumed live-only",
                                    resume_seq,
                                    total_resets,
                                    consecutive_resets,
                                );
                                if let Some(d) = backoff_for(consecutive_resets) {
                                    tokio::time::sleep(d).await;
                                }
                            }
                            Some(Err(e)) => {
                                tracing::debug!(error = %e, "RedexIndex tail error; continuing");
                            }
                            None => {
                                // Stream ended.
                                // Two distinct causes share this
                                // branch:
                                //   (a) the file was closed —
                                //       the task should exit.
                                //   (b) `notify_watchers` dropped
                                //       our watcher under buffer
                                //       saturation and the
                                //       best-effort `Err(Lagged)`
                                //       send itself failed (the
                                //       channel was already full),
                                //       so the receiver just sees
                                //       a clean end. Pre-fix the
                                //       task always treated this
                                //       as (a) and exited — same
                                //       failure mode as the
                                //       explicit Lagged case.
                                // Distinguish via `is_closed()`.
                                if task_file.is_closed() {
                                    return;
                                }
                                let resume_seq = task_file.next_seq();
                                task_inner.clear();
                                let total_resets =
                                    task_lag_resets.fetch_add(1, Ordering::Relaxed) + 1;
                                tail = Box::pin(task_file.tail(resume_seq));
                                consecutive_resets = consecutive_resets.saturating_add(1);
                                rate_limited_lag_warn(
                                    "RedexIndex: tail stream ended on a still-open file \
                                     (saturation-induced watcher drop); cleared index, \
                                     resumed live-only",
                                    resume_seq,
                                    total_resets,
                                    consecutive_resets,
                                );
                                if let Some(d) = backoff_for(consecutive_resets) {
                                    tokio::time::sleep(d).await;
                                }
                            }
                        }
                    }
                }
            }
        });

        Self {
            inner,
            shutdown,
            lag_resets,
        }
    }

    /// Total number of times the index has been cleared and
    /// re-tailed because of a `Lagged` event or a saturation-
    /// induced watcher drop, since this index was opened.
    ///
    /// Each increment corresponds to a *lossy* reset — the
    /// previous in-memory contents were discarded and re-tailing
    /// resumed live-only (no historical replay). For a
    /// state-rebuild index over a fully-retained file, this is
    /// recoverable — a subsequent re-open with `IndexStart::
    /// FromBeginning` re-converges. For an aggregating index
    /// (counters, top-N) the dropped events are lost; a
    /// downstream that polls this counter can detect the bump
    /// and trigger an external recompute.
    pub fn lag_resets(&self) -> u64 {
        self.lag_resets.load(Ordering::Relaxed)
    }

    /// Snapshot the values at `key`. Returns `None` when the key
    /// doesn't exist. The returned set is a **copy** so the caller
    /// can hold it freely without blocking the tail task.
    pub fn get(&self, key: &K) -> Option<HashSet<V>> {
        self.inner.get(key).map(|e| e.value().clone())
    }

    /// Whether `value` is present in the bucket at `key`.
    pub fn contains(&self, key: &K, value: &V) -> bool {
        self.inner
            .get(key)
            .is_some_and(|e| e.value().contains(value))
    }

    /// Snapshot of all keys currently indexed. Allocates — don't call
    /// in a hot loop. Returned order is unspecified.
    pub fn keys(&self) -> Vec<K> {
        self.inner.iter().map(|e| e.key().clone()).collect()
    }

    /// Number of distinct keys currently indexed.
    pub fn len(&self) -> usize {
        self.inner.len()
    }

    /// True if no keys are currently indexed.
    pub fn is_empty(&self) -> bool {
        self.inner.is_empty()
    }
}

impl<K, V> Drop for RedexIndex<K, V>
where
    K: Hash + Eq + Clone + Send + Sync + 'static,
    V: Hash + Eq + Clone + Send + Sync + 'static,
{
    fn drop(&mut self) {
        // `notify_one` stores a permit if no waiter is currently
        // registered, so a Drop that fires while the tail task is
        // between `tokio::select!` polls still lands — the task
        // consumes the permit on its next `notified()` and exits.
        // `notify_waiters` would be dropped in that window and leak
        // the task.
        self.shutdown.notify_one();
    }
}

impl<K, V> std::fmt::Debug for RedexIndex<K, V>
where
    K: Hash + Eq + Clone + Send + Sync + 'static,
    V: Hash + Eq + Clone + Send + Sync + 'static,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("RedexIndex")
            .field("keys", &self.inner.len())
            .finish()
    }
}

/// Backoff schedule for consecutive saturation-induced resets.
/// First reset returns `None` (no sleep — recover immediately).
/// Subsequent resets sleep for an exponentially-growing window
/// capped at 250ms. The cap keeps worst-case recovery latency
/// bounded; the growth keeps a sustained burst from emitting
/// thousands of resets per second.
fn backoff_for(consecutive_resets: u32) -> Option<Duration> {
    match consecutive_resets {
        0 | 1 => None,
        2 => Some(Duration::from_millis(5)),
        3 => Some(Duration::from_millis(20)),
        4 => Some(Duration::from_millis(60)),
        _ => Some(Duration::from_millis(250)),
    }
}

/// Emit a `tracing::warn!` only on the first reset of a burst
/// and then once per power-of-two cumulative reset (1, 2, 4, 8,
/// 16, …). A sustained burst with an under-sized buffer would
/// otherwise flood the log; one warn per reset times millions of
/// events per second is enough to drown observability tooling.
/// We keep the first hit so single-shot lags are still visible.
fn rate_limited_lag_warn(msg: &'static str, resume_seq: u64, total: u64, consecutive: u32) {
    let log_this = consecutive == 1 || total.is_power_of_two();
    if log_this {
        tracing::warn!(resume_seq, total, consecutive, "{msg}");
    }
}

fn apply_event<T, K, V, F>(inner: &DashMap<K, HashSet<V>>, project: &F, event: &RedexEvent)
where
    T: DeserializeOwned,
    K: Hash + Eq + Clone,
    V: Hash + Eq + Clone,
    F: Fn(&T) -> Vec<IndexOp<K, V>>,
{
    let decoded: T = match postcard::from_bytes(&event.payload) {
        Ok(t) => t,
        Err(e) => {
            tracing::warn!(
                error = %e,
                seq = event.entry.seq,
                "RedexIndex: failed to decode event; skipping",
            );
            return;
        }
    };
    for op in project(&decoded) {
        match op {
            IndexOp::Insert(k, v) => {
                inner.entry(k).or_default().insert(v);
            }
            IndexOp::Remove(k, v) => {
                // Remove the value; drop the whole bucket if it
                // empties so `len()` and `keys()` reflect current
                // contents.
                let mut drop_key = false;
                if let Some(mut set) = inner.get_mut(&k) {
                    set.remove(&v);
                    drop_key = set.is_empty();
                }
                if drop_key {
                    inner.remove(&k);
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::adapter::net::channel::ChannelName;
    use crate::adapter::net::redex::config::RedexFileConfig;
    use crate::adapter::net::redex::manager::Redex;
    use serde::{Deserialize, Serialize};

    #[derive(Debug, Clone, Serialize, Deserialize)]
    struct Tagged {
        id: u64,
        tags: Vec<String>,
    }

    fn open_file(name: &str) -> super::RedexFile {
        let r = Redex::new();
        r.open_file(&ChannelName::new(name).unwrap(), RedexFileConfig::default())
            .unwrap()
    }

    async fn yield_a_few() {
        for _ in 0..10 {
            tokio::task::yield_now().await;
        }
    }

    #[tokio::test]
    async fn test_index_populates_from_existing_entries() {
        let f = open_file("idx/basic");
        for id in 0..5u64 {
            let ev = Tagged {
                id,
                tags: vec!["even".into()],
            };
            f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
        }
        // Only the "even" tag should be indexed; values are the ids.
        let idx: RedexIndex<String, u64> =
            RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
                t.tags
                    .iter()
                    .map(|tag| IndexOp::Insert(tag.clone(), t.id))
                    .collect()
            });
        yield_a_few().await;

        let even = idx.get(&"even".to_string()).expect("even bucket populated");
        assert_eq!(even.len(), 5);
        for i in 0..5u64 {
            assert!(even.contains(&i));
        }
        assert_eq!(idx.keys().len(), 1);
    }

    #[tokio::test]
    async fn test_index_insert_remove_symmetry() {
        let f = open_file("idx/insert_remove");
        let idx: RedexIndex<String, u64> =
            RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
                // Tag starting with "-" means "remove", otherwise insert.
                t.tags
                    .iter()
                    .map(|tag| {
                        if let Some(stripped) = tag.strip_prefix('-') {
                            IndexOp::Remove(stripped.to_string(), t.id)
                        } else {
                            IndexOp::Insert(tag.clone(), t.id)
                        }
                    })
                    .collect()
            });

        let add = Tagged {
            id: 1,
            tags: vec!["k".into()],
        };
        let remove = Tagged {
            id: 1,
            tags: vec!["-k".into()],
        };
        f.append(&postcard::to_allocvec(&add).unwrap()).unwrap();
        yield_a_few().await;
        assert!(idx.contains(&"k".to_string(), &1u64));

        f.append(&postcard::to_allocvec(&remove).unwrap()).unwrap();
        yield_a_few().await;
        // Key should be GONE once the bucket empties — not an empty
        // set lying around inflating `keys()`.
        assert!(idx.get(&"k".to_string()).is_none());
        assert_eq!(idx.len(), 0);
    }

    #[tokio::test]
    async fn test_index_multiple_ops_per_event() {
        let f = open_file("idx/multiop");
        let idx: RedexIndex<String, u64> =
            RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
                t.tags
                    .iter()
                    .map(|tag| IndexOp::Insert(tag.clone(), t.id))
                    .collect()
            });

        let ev = Tagged {
            id: 42,
            tags: vec!["alpha".into(), "beta".into(), "gamma".into()],
        };
        f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
        yield_a_few().await;

        for tag in ["alpha", "beta", "gamma"] {
            assert!(idx.contains(&tag.to_string(), &42u64));
        }
        assert_eq!(idx.len(), 3);
    }

    #[tokio::test]
    async fn test_index_from_seq_skips_earlier_events() {
        let f = open_file("idx/fromseq");
        for id in 0..4u64 {
            let ev = Tagged {
                id,
                tags: vec!["t".into()],
            };
            f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
        }
        // Resume from seq=2 → should index only ids 2 and 3.
        let idx: RedexIndex<String, u64> =
            RedexIndex::open::<Tagged, _>(&f, IndexStart::FromSeq(2), |t| {
                t.tags
                    .iter()
                    .map(|tag| IndexOp::Insert(tag.clone(), t.id))
                    .collect()
            });
        yield_a_few().await;

        let bucket = idx.get(&"t".to_string()).unwrap();
        assert_eq!(bucket.len(), 2);
        assert!(bucket.contains(&2));
        assert!(bucket.contains(&3));
        assert!(!bucket.contains(&0));
        assert!(!bucket.contains(&1));
    }

    #[tokio::test]
    async fn test_index_decode_error_skips_entry() {
        // Appending garbage bytes that don't deserialize as `Tagged`
        // must not poison the index — the bad event is skipped,
        // subsequent valid events are applied.
        let f = open_file("idx/decode_err");
        f.append(b"\xFF\xFF\xFF\xFF").unwrap();

        let idx: RedexIndex<String, u64> =
            RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
                t.tags
                    .iter()
                    .map(|tag| IndexOp::Insert(tag.clone(), t.id))
                    .collect()
            });

        let good = Tagged {
            id: 7,
            tags: vec!["x".into()],
        };
        f.append(&postcard::to_allocvec(&good).unwrap()).unwrap();
        yield_a_few().await;

        assert!(idx.contains(&"x".to_string(), &7u64));
        assert_eq!(idx.len(), 1);
    }

    /// A `Lagged` from the underlying tail must NOT
    /// permanently halt the index task. Pre-fix the loop logged at
    /// debug! and continued, but the file had already dropped the
    /// watcher — the next `tail.next()` returned `None` and the
    /// task exited with the index frozen at whatever state it had
    /// when the lag fired. Keys appended later never appeared.
    ///
    /// Post-fix: on `Lagged` (or on a stream `None` arriving while
    /// the file is still open — saturation-induced watcher drop),
    /// the task clears the in-memory index and re-tails from
    /// `next_seq()` (live-only). Live events from that point
    /// forward populate the index again.
    ///
    /// The test paces the post-lag appends with a yield between
    /// each so the recovered tail's small buffer doesn't itself
    /// saturate; absent that pacing, the buffer-2 watcher would
    /// drop again and the index would clear a second time, which
    /// is also correct but not what we're trying to verify.
    #[tokio::test]
    async fn index_recovers_from_tail_lag_and_continues_indexing() {
        // Tiny tail buffer so the backfill saturates immediately
        // when we open the index against a file with many events.
        let r = Redex::new();
        let f = r
            .open_file(
                &ChannelName::new("idx/lag-recovery").unwrap(),
                RedexFileConfig::default().with_tail_buffer_size(2),
            )
            .unwrap();

        // Pre-load the file with more events than the tail buffer
        // can hold during backfill — this triggers Lagged at
        // open-time.
        for id in 0..10u64 {
            let ev = Tagged {
                id,
                tags: vec!["pre".into()],
            };
            f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
        }

        let idx: RedexIndex<String, u64> =
            RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
                t.tags
                    .iter()
                    .map(|tag| IndexOp::Insert(tag.clone(), t.id))
                    .collect()
            });

        // Let the task observe Lagged and re-tail from live.
        yield_a_few().await;

        // Append new events one at a time with a yield between
        // each so the recovered watcher (also buffer=2) drains
        // before the next event arrives.
        for id in 100..105u64 {
            let ev = Tagged {
                id,
                tags: vec!["post".into()],
            };
            f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
            yield_a_few().await;
        }

        // The index must reflect every post-lag event. Pre-fix the
        // task had already exited and `idx.get` would return
        // `None`.
        let post_keys = idx.get(&"post".to_string()).expect(
            "post-lag bucket missing — pre-fix the index task halted \
                 permanently after Lagged and never observed these events",
        );
        assert_eq!(
            post_keys.len(),
            5,
            "every post-lag event must be indexed; recovered set was {:?}",
            post_keys
        );
        for id in 100..105u64 {
            assert!(
                post_keys.contains(&id),
                "post-lag id {} missing from recovered index",
                id
            );
        }
    }

    /// Closing the file naturally must still
    /// terminate the index task (no infinite re-tail loop).
    #[tokio::test]
    async fn index_terminates_when_file_closes() {
        let r = Redex::new();
        let f = r
            .open_file(
                &ChannelName::new("idx/close-terminates").unwrap(),
                RedexFileConfig::default(),
            )
            .unwrap();

        let idx: RedexIndex<String, u64> =
            RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
                t.tags
                    .iter()
                    .map(|tag| IndexOp::Insert(tag.clone(), t.id))
                    .collect()
            });

        let ev = Tagged {
            id: 1,
            tags: vec!["a".into()],
        };
        f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
        yield_a_few().await;
        assert!(idx.contains(&"a".to_string(), &1));

        // Close should propagate as Err(Closed) on the tail; the
        // task must NOT treat it as a saturation event and re-tail.
        f.close().unwrap();
        yield_a_few().await;
        // Hard to assert "task exited" directly without exposing
        // the JoinHandle, but if the task were re-tailing in a
        // loop on Err(Closed) → re-tail → Err(Closed) → ... the
        // CPU would spin and the test would still pass. The real
        // assertion is that we don't deadlock here — close returned,
        // and the file's drop semantics are unchanged.
        assert!(f.is_closed());
    }

    /// `lag_resets()` must increment when the tail task clears
    /// the index because of a saturation-induced reset.
    /// Downstreams use this counter to detect that the index
    /// just lost data — without it, the only signal was polling
    /// `len()` over time. Pre-fix there was no public way to
    /// observe a clear at all.
    #[tokio::test]
    async fn lag_resets_counter_increments_on_saturation_reset() {
        let r = Redex::new();
        let f = r
            .open_file(
                &ChannelName::new("idx/lag-resets-counter").unwrap(),
                RedexFileConfig::default().with_tail_buffer_size(2),
            )
            .unwrap();

        // Pre-load past the tail buffer so the backfill saturates
        // and the index task takes the recovery path.
        for id in 0..10u64 {
            let ev = Tagged {
                id,
                tags: vec!["pre".into()],
            };
            f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
        }

        let idx: RedexIndex<String, u64> =
            RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
                t.tags
                    .iter()
                    .map(|tag| IndexOp::Insert(tag.clone(), t.id))
                    .collect()
            });

        // At open time `lag_resets` is zero. After saturation it
        // must be at least 1. We yield a generous handful of times
        // to give the spawned task room to observe Lagged.
        assert_eq!(idx.lag_resets(), 0, "fresh index has not reset yet");
        for _ in 0..50 {
            tokio::task::yield_now().await;
            if idx.lag_resets() > 0 {
                break;
            }
        }
        assert!(
            idx.lag_resets() >= 1,
            "saturation must bump lag_resets, got {}",
            idx.lag_resets()
        );
    }

    /// `backoff_for` schedule pins the worst-case recovery
    /// latency at 250ms while still allowing immediate recovery
    /// for a single transient lag event. Pre-fix the loop had no
    /// backoff at all and a sustained burst with an under-sized
    /// buffer hot-looped.
    #[test]
    fn backoff_schedule_is_monotonic_and_bounded() {
        // First (single transient) reset → no sleep.
        assert_eq!(backoff_for(0), None);
        assert_eq!(backoff_for(1), None);

        // Subsequent consecutive resets must sleep, monotonically
        // non-decreasing.
        let prev = backoff_for(2).unwrap();
        for n in 3..=10 {
            let cur = backoff_for(n).unwrap();
            assert!(
                cur >= prev,
                "backoff must be non-decreasing in consecutive resets; \
                 backoff_for({n}) = {:?} < backoff_for({}) = {:?}",
                cur,
                n - 1,
                prev,
            );
            assert!(
                cur <= Duration::from_millis(250),
                "backoff must be bounded at 250ms; backoff_for({n}) = {:?}",
                cur,
            );
        }

        // The cap must be reached (otherwise growth could continue
        // unbounded under a different schedule).
        assert_eq!(backoff_for(100).unwrap(), Duration::from_millis(250));
        assert_eq!(backoff_for(u32::MAX).unwrap(), Duration::from_millis(250));
    }
}