Skip to main content

murk_engine/
ring.rs

1//! Fixed-capacity ring buffer of owned snapshots for RealtimeAsync mode.
2//!
3//! [`SnapshotRing`] stores `Arc<OwnedSnapshot>` slots with single-producer
4//! push and multi-consumer read. This is the spike implementation using
5//! `Mutex` per slot; production will replace `Arc` with epoch-based pinning.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9
10use murk_arena::OwnedSnapshot;
11
12/// A tagged slot: the `u64` is the monotonic write position when this
13/// snapshot was stored, enabling consumers to detect overwrites.
14type Slot = Option<(u64, Arc<OwnedSnapshot>)>;
15
16/// A fixed-capacity ring buffer of `Arc<OwnedSnapshot>`.
17///
18/// Single-producer: only one thread calls `push`. Multi-consumer: any
19/// thread can call `latest` or `get_by_pos` to read snapshots.
20///
21/// The write position is monotonically increasing (never wraps). Slot
22/// index is computed as `pos % capacity`. Each slot stores a position
23/// tag alongside the snapshot so that consumers can verify they are
24/// reading the slot they intended, even under concurrent producer pushes.
25pub struct SnapshotRing {
26    /// Each slot holds `(position_tag, snapshot)`. The tag is the monotonic
27    /// write position at which this snapshot was stored, enabling consumers
28    /// to detect when a slot has been overwritten between their bounds check
29    /// and their lock acquisition.
30    slots: Vec<Mutex<Slot>>,
31    write_pos: AtomicU64,
32    not_available_events: AtomicU64,
33    eviction_events: AtomicU64,
34    stale_read_events: AtomicU64,
35    skew_retry_events: AtomicU64,
36    capacity: usize,
37}
38
39// Compile-time assertion: SnapshotRing must be Send + Sync.
40const _: fn() = || {
41    fn assert<T: Send + Sync>() {}
42    assert::<SnapshotRing>();
43};
44
45impl SnapshotRing {
46    /// Create a new ring buffer with the given capacity.
47    ///
48    /// # Panics
49    ///
50    /// Panics if `capacity < 2`. A ring buffer needs at least 2 slots
51    /// to be useful (one being written, one readable).
52    pub fn new(capacity: usize) -> Self {
53        assert!(
54            capacity >= 2,
55            "SnapshotRing capacity must be >= 2, got {capacity}"
56        );
57        let slots = (0..capacity).map(|_| Mutex::new(None)).collect();
58        Self {
59            slots,
60            write_pos: AtomicU64::new(0),
61            not_available_events: AtomicU64::new(0),
62            eviction_events: AtomicU64::new(0),
63            stale_read_events: AtomicU64::new(0),
64            skew_retry_events: AtomicU64::new(0),
65            capacity,
66        }
67    }
68
69    /// Push a new snapshot into the ring. Single-producer only.
70    ///
71    /// Returns the evicted snapshot (if any) that was displaced by this push.
72    /// The caller can use this for reclamation bookkeeping.
73    pub fn push(&self, snapshot: OwnedSnapshot) -> Option<Arc<OwnedSnapshot>> {
74        let pos = self.write_pos.load(Ordering::Relaxed);
75        let slot_idx = (pos as usize) % self.capacity;
76
77        let arc = Arc::new(snapshot);
78        let evicted = {
79            let mut slot = self.slots[slot_idx].lock().unwrap();
80            let prev = slot.take().map(|(_tag, arc)| arc);
81            *slot = Some((pos, Arc::clone(&arc)));
82            prev
83        };
84        if evicted.is_some() {
85            self.eviction_events.fetch_add(1, Ordering::Relaxed);
86        }
87
88        // Release-store ensures the snapshot data is visible before
89        // consumers observe the new write_pos.
90        self.write_pos.store(pos + 1, Ordering::Release);
91
92        evicted
93    }
94
95    /// Get the latest (most recently pushed) snapshot.
96    ///
97    /// Returns `None` only if no snapshots have been pushed yet.
98    /// On overwrite races (producer wraps the slot between our
99    /// `write_pos` read and lock acquisition), retries from the
100    /// fresh `write_pos`. If all retries are exhausted (producer
101    /// lapping the consumer), falls back to a full scan of all
102    /// slots, guaranteeing `Some` whenever the ring is non-empty.
103    pub fn latest(&self) -> Option<Arc<OwnedSnapshot>> {
104        self.latest_impl(true)
105    }
106
107    /// Peek the latest snapshot without mutating read-side telemetry counters.
108    ///
109    /// This is intended for non-failing readiness/health checks (for example
110    /// `RealtimeAsyncWorld::preflight`) that should not be counted as
111    /// observation read failures or skew retries.
112    pub fn peek_latest(&self) -> Option<Arc<OwnedSnapshot>> {
113        self.latest_impl(false)
114    }
115
116    fn latest_impl(&self, record_events: bool) -> Option<Arc<OwnedSnapshot>> {
117        // Fast path: bounded retry targeting the most recent snapshot.
118        // Succeeds immediately under normal contention.
119        for _ in 0..self.capacity {
120            let pos = self.write_pos.load(Ordering::Acquire);
121            if pos == 0 {
122                if record_events {
123                    self.not_available_events.fetch_add(1, Ordering::Relaxed);
124                }
125                return None;
126            }
127            let target_pos = pos - 1;
128            let slot_idx = (target_pos as usize) % self.capacity;
129            let slot = self.slots[slot_idx].lock().unwrap();
130            match slot.as_ref() {
131                Some((tag, arc)) if *tag == target_pos => return Some(Arc::clone(arc)),
132                // Producer overwrote this slot between our write_pos
133                // read and lock acquisition. Re-read write_pos and
134                // try the new latest slot.
135                _ => {
136                    if record_events {
137                        self.skew_retry_events.fetch_add(1, Ordering::Relaxed);
138                    }
139                    continue;
140                }
141            }
142        }
143
144        // Fallback: the producer is lapping us faster than we can lock
145        // the latest slot. Scan all slots and return the snapshot with
146        // the highest position tag. The single producer holds at most
147        // one slot's mutex at a time, so at least (capacity - 1) slots
148        // contain valid snapshots — this scan is guaranteed to find one
149        // whenever the ring is non-empty.
150        let mut best: Option<(u64, Arc<OwnedSnapshot>)> = None;
151        for slot_mutex in &self.slots {
152            let slot = slot_mutex.lock().unwrap();
153            if let Some((tag, arc)) = slot.as_ref() {
154                let dominated = best.as_ref().is_some_and(|(best_tag, _)| *best_tag >= *tag);
155                if !dominated {
156                    best = Some((*tag, Arc::clone(arc)));
157                }
158            }
159        }
160        if let Some((_, arc)) = best {
161            Some(arc)
162        } else {
163            if record_events {
164                self.not_available_events.fetch_add(1, Ordering::Relaxed);
165            }
166            None
167        }
168    }
169
170    /// Get a snapshot by its monotonic write position.
171    ///
172    /// Returns `None` if the position has been evicted (overwritten) or
173    /// hasn't been written yet.
174    pub fn get_by_pos(&self, pos: u64) -> Option<Arc<OwnedSnapshot>> {
175        let current = self.write_pos.load(Ordering::Acquire);
176
177        // Not yet written.
178        if pos >= current {
179            self.stale_read_events.fetch_add(1, Ordering::Relaxed);
180            return None;
181        }
182
183        // Evicted: the position is older than what the ring retains.
184        if current - pos > self.capacity as u64 {
185            self.stale_read_events.fetch_add(1, Ordering::Relaxed);
186            return None;
187        }
188
189        let slot_idx = (pos as usize) % self.capacity;
190        let slot = self.slots[slot_idx].lock().unwrap();
191        match slot.as_ref() {
192            Some((tag, arc)) if *tag == pos => Some(Arc::clone(arc)),
193            // The producer overwrote this slot between our bounds check
194            // and lock acquisition — the requested position is gone.
195            _ => {
196                self.stale_read_events.fetch_add(1, Ordering::Relaxed);
197                self.skew_retry_events.fetch_add(1, Ordering::Relaxed);
198                None
199            }
200        }
201    }
202
203    /// Number of snapshots currently stored (up to `capacity`).
204    pub fn len(&self) -> usize {
205        let pos = self.write_pos.load(Ordering::Acquire) as usize;
206        pos.min(self.capacity)
207    }
208
209    /// Whether the ring is empty.
210    pub fn is_empty(&self) -> bool {
211        self.write_pos.load(Ordering::Acquire) == 0
212    }
213
214    /// The ring buffer capacity.
215    pub fn capacity(&self) -> usize {
216        self.capacity
217    }
218
219    /// The current monotonic write position.
220    pub fn write_pos(&self) -> u64 {
221        self.write_pos.load(Ordering::Acquire)
222    }
223
224    /// Oldest retained write position currently available in the ring.
225    ///
226    /// Returns `None` when no snapshots have been pushed yet.
227    pub fn oldest_retained_pos(&self) -> Option<u64> {
228        let current = self.write_pos();
229        if current == 0 {
230            return None;
231        }
232        let retained = current.min(self.capacity as u64);
233        Some(current - retained)
234    }
235
236    /// Number of times an observation request found no snapshot available.
237    pub fn not_available_events(&self) -> u64 {
238        self.not_available_events.load(Ordering::Relaxed)
239    }
240
241    /// Number of push operations that evicted an older retained snapshot.
242    pub fn eviction_events(&self) -> u64 {
243        self.eviction_events.load(Ordering::Relaxed)
244    }
245
246    /// Number of read attempts that targeted stale or not-yet-written positions.
247    pub fn stale_read_events(&self) -> u64 {
248        self.stale_read_events.load(Ordering::Relaxed)
249    }
250
251    /// Number of read retries caused by producer/consumer overwrite skew.
252    pub fn skew_retry_events(&self) -> u64 {
253        self.skew_retry_events.load(Ordering::Relaxed)
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use murk_arena::config::ArenaConfig;
261    use murk_arena::pingpong::PingPongArena;
262    use murk_arena::static_arena::StaticArena;
263    use murk_core::id::{FieldId, ParameterVersion, TickId};
264    use murk_core::traits::{FieldWriter as _, SnapshotAccess};
265    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
266
267    fn make_test_snapshot(tick: u64) -> OwnedSnapshot {
268        let cell_count = 10u32;
269        let config = ArenaConfig::new(cell_count);
270        let field_defs = vec![(
271            FieldId(0),
272            FieldDef {
273                name: "energy".into(),
274                field_type: FieldType::Scalar,
275                mutability: FieldMutability::PerTick,
276                units: None,
277                bounds: None,
278                boundary_behavior: BoundaryBehavior::Clamp,
279            },
280        )];
281        let static_arena = StaticArena::new(&[]).into_shared();
282        let mut arena = PingPongArena::new(config, field_defs, static_arena).unwrap();
283
284        {
285            let mut guard = arena.begin_tick().unwrap();
286            let data = guard.writer.write(FieldId(0)).unwrap();
287            data.fill(tick as f32);
288        }
289        arena.publish(TickId(tick), ParameterVersion(0)).unwrap();
290        arena.owned_snapshot()
291    }
292
293    #[test]
294    fn test_ring_new_empty() {
295        let ring = SnapshotRing::new(4);
296        assert_eq!(ring.len(), 0);
297        assert!(ring.is_empty());
298        assert_eq!(ring.capacity(), 4);
299        assert_eq!(ring.write_pos(), 0);
300        assert_eq!(ring.not_available_events(), 0);
301        assert_eq!(ring.eviction_events(), 0);
302        assert_eq!(ring.stale_read_events(), 0);
303        assert_eq!(ring.skew_retry_events(), 0);
304        assert!(ring.latest().is_none());
305        assert_eq!(ring.not_available_events(), 1);
306    }
307
308    #[test]
309    fn test_ring_push_and_latest() {
310        let ring = SnapshotRing::new(4);
311        ring.push(make_test_snapshot(1));
312        assert_eq!(ring.len(), 1);
313        assert!(!ring.is_empty());
314        assert_eq!(ring.not_available_events(), 0);
315
316        let latest = ring.latest().unwrap();
317        assert_eq!(latest.tick_id(), TickId(1));
318        assert_eq!(ring.not_available_events(), 0);
319        assert_eq!(ring.oldest_retained_pos(), Some(0));
320    }
321
322    #[test]
323    fn test_ring_peek_latest_does_not_increment_not_available_on_empty() {
324        let ring = SnapshotRing::new(4);
325        assert!(ring.peek_latest().is_none());
326        assert_eq!(ring.not_available_events(), 0);
327        assert_eq!(ring.skew_retry_events(), 0);
328    }
329
330    #[test]
331    fn test_ring_peek_latest_returns_snapshot_without_counter_mutation() {
332        let ring = SnapshotRing::new(4);
333        ring.push(make_test_snapshot(5));
334
335        let latest = ring.peek_latest().unwrap();
336        assert_eq!(latest.tick_id(), TickId(5));
337        assert_eq!(ring.not_available_events(), 0);
338        assert_eq!(ring.skew_retry_events(), 0);
339    }
340
341    #[test]
342    fn test_ring_eviction() {
343        let ring = SnapshotRing::new(4);
344
345        // Push 4 — fills the ring.
346        for i in 1..=4 {
347            let evicted = ring.push(make_test_snapshot(i));
348            assert!(evicted.is_none());
349        }
350        assert_eq!(ring.len(), 4);
351
352        // Push 5th — evicts tick 1.
353        let evicted = ring.push(make_test_snapshot(5));
354        assert!(evicted.is_some());
355        assert_eq!(evicted.unwrap().tick_id(), TickId(1));
356        assert_eq!(ring.len(), 4);
357        assert_eq!(ring.eviction_events(), 1);
358        assert_eq!(ring.oldest_retained_pos(), Some(1));
359    }
360
361    #[test]
362    fn test_ring_latest_is_newest() {
363        let ring = SnapshotRing::new(4);
364        for i in 1..=10 {
365            ring.push(make_test_snapshot(i));
366        }
367        let latest = ring.latest().unwrap();
368        assert_eq!(latest.tick_id(), TickId(10));
369    }
370
371    #[test]
372    fn test_ring_get_by_pos() {
373        let ring = SnapshotRing::new(4);
374        for i in 1..=4 {
375            ring.push(make_test_snapshot(i));
376        }
377
378        // Positions are 0-indexed (write_pos starts at 0).
379        let snap = ring.get_by_pos(0).unwrap();
380        assert_eq!(snap.tick_id(), TickId(1));
381
382        let snap = ring.get_by_pos(3).unwrap();
383        assert_eq!(snap.tick_id(), TickId(4));
384
385        // Position 4 not yet written.
386        assert!(ring.get_by_pos(4).is_none());
387        assert_eq!(ring.stale_read_events(), 1);
388    }
389
390    #[test]
391    fn test_ring_get_evicted_returns_none() {
392        let ring = SnapshotRing::new(4);
393        for i in 1..=8 {
394            ring.push(make_test_snapshot(i));
395        }
396        // Positions 0-3 have been evicted (overwritten by positions 4-7).
397        assert!(ring.get_by_pos(0).is_none());
398        assert!(ring.get_by_pos(3).is_none());
399        assert_eq!(ring.stale_read_events(), 2);
400
401        // Positions 4-7 should still be available.
402        let snap = ring.get_by_pos(4).unwrap();
403        assert_eq!(snap.tick_id(), TickId(5));
404    }
405
406    #[test]
407    #[should_panic(expected = "capacity must be >= 2")]
408    fn test_ring_capacity_panics_below_2() {
409        SnapshotRing::new(1);
410    }
411
412    #[test]
413    fn test_get_by_pos_returns_none_after_concurrent_overwrite() {
414        // Simulates the race: consumer reads write_pos, producer wraps
415        // around and overwrites the target slot, consumer locks and
416        // must detect the overwrite via position tag.
417        let ring = SnapshotRing::new(4);
418
419        // Fill positions 0-3.
420        for i in 1..=4 {
421            ring.push(make_test_snapshot(i));
422        }
423        assert_eq!(ring.get_by_pos(0).unwrap().tick_id(), TickId(1));
424
425        // Now push 4 more: positions 4-7 overwrite slots 0-3.
426        for i in 5..=8 {
427            ring.push(make_test_snapshot(i));
428        }
429
430        // Position 0 was overwritten by position 4 (same slot index).
431        // get_by_pos(0) must return None, not the snapshot at position 4.
432        assert!(ring.get_by_pos(0).is_none());
433        assert!(ring.get_by_pos(3).is_none());
434
435        // Position 4 should still be accessible.
436        let snap = ring.get_by_pos(4).unwrap();
437        assert_eq!(snap.tick_id(), TickId(5));
438
439        // Position 7 should be accessible.
440        let snap = ring.get_by_pos(7).unwrap();
441        assert_eq!(snap.tick_id(), TickId(8));
442    }
443
444    #[test]
445    fn test_get_by_pos_tag_matches_position() {
446        // Verify that get_by_pos returns the exact snapshot for the
447        // requested position, not whatever happens to be in the slot.
448        let ring = SnapshotRing::new(4);
449
450        for i in 1..=4 {
451            ring.push(make_test_snapshot(i * 10));
452        }
453
454        // Each position should return its exact snapshot.
455        assert_eq!(ring.get_by_pos(0).unwrap().tick_id(), TickId(10));
456        assert_eq!(ring.get_by_pos(1).unwrap().tick_id(), TickId(20));
457        assert_eq!(ring.get_by_pos(2).unwrap().tick_id(), TickId(30));
458        assert_eq!(ring.get_by_pos(3).unwrap().tick_id(), TickId(40));
459    }
460
461    // ── Cross-thread integration tests ────────────────────────────
462
463    #[test]
464    fn test_producer_consumer_cross_thread() {
465        use crate::config::{BackoffConfig, WorldConfig};
466        use crate::tick::TickEngine;
467        use murk_space::{EdgeBehavior, Line1D};
468        use murk_test_utils::ConstPropagator;
469        use std::sync::atomic::{AtomicBool, Ordering};
470        use std::thread;
471
472        let config = WorldConfig {
473            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
474            fields: vec![FieldDef {
475                name: "energy".into(),
476                field_type: FieldType::Scalar,
477                mutability: FieldMutability::PerTick,
478                units: None,
479                bounds: None,
480                boundary_behavior: BoundaryBehavior::Clamp,
481            }],
482            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
483            dt: 0.1,
484            seed: 42,
485            ring_buffer_size: 8,
486            max_ingress_queue: 1024,
487            tick_rate_hz: None,
488            backoff: BackoffConfig::default(),
489        };
490        let mut engine = TickEngine::new(config).unwrap();
491
492        let ring = Arc::new(SnapshotRing::new(8));
493        let epoch_counter = Arc::new(crate::epoch::EpochCounter::new());
494        let producer_done = Arc::new(AtomicBool::new(false));
495
496        // Producer: run 100 ticks, push snapshots to ring, advance epoch.
497        let ring_prod = Arc::clone(&ring);
498        let epoch_prod = Arc::clone(&epoch_counter);
499        let done_flag = Arc::clone(&producer_done);
500        let producer = thread::spawn(move || {
501            for _ in 0..100 {
502                engine.execute_tick().unwrap();
503                let snap = engine.owned_snapshot();
504                ring_prod.push(snap);
505                epoch_prod.advance();
506            }
507            done_flag.store(true, Ordering::Release);
508        });
509
510        // 4 consumer threads: read latest snapshot until producer is done.
511        let consumers: Vec<_> = (0..4)
512            .map(|id| {
513                let ring_c = Arc::clone(&ring);
514                let done_c = Arc::clone(&producer_done);
515                let worker = Arc::new(crate::epoch::WorkerEpoch::new(id));
516                thread::spawn(move || {
517                    let mut reads = 0u64;
518                    loop {
519                        if let Some(snap) = ring_c.latest() {
520                            let epoch = snap.tick_id().0;
521                            worker.pin(epoch);
522                            let data = snap.read_field(FieldId(0)).unwrap();
523                            assert_eq!(data.len(), 10);
524                            assert!(data.iter().all(|&v| v == 42.0));
525                            worker.unpin();
526                            reads += 1;
527                        }
528                        if done_c.load(Ordering::Acquire) && reads > 0 {
529                            break;
530                        }
531                        thread::yield_now();
532                    }
533                    reads
534                })
535            })
536            .collect();
537
538        producer.join().unwrap();
539        let mut total_reads = 0u64;
540        for c in consumers {
541            let reads = c.join().unwrap();
542            assert!(reads > 0, "consumer should have read at least one snapshot");
543            total_reads += reads;
544        }
545
546        // Verify final state.
547        assert!(ring.len() <= 8);
548        assert_eq!(epoch_counter.current(), 100);
549        assert!(
550            total_reads >= 4,
551            "consumers collectively should have many reads"
552        );
553    }
554
555    #[test]
556    fn test_epoch_pin_unpin_cross_thread() {
557        use crate::epoch::WorkerEpoch;
558        use std::thread;
559
560        let workers: Vec<Arc<WorkerEpoch>> =
561            (0..4).map(|i| Arc::new(WorkerEpoch::new(i))).collect();
562
563        let handles: Vec<_> = workers
564            .iter()
565            .map(|worker| {
566                let worker = worker.clone();
567                thread::spawn(move || {
568                    for epoch in 0..100u64 {
569                        worker.pin(epoch);
570                        assert!(worker.is_pinned());
571                        assert_eq!(worker.pinned_epoch(), epoch);
572                        worker.unpin();
573                        assert!(!worker.is_pinned());
574                    }
575                })
576            })
577            .collect();
578
579        for h in handles {
580            h.join().unwrap();
581        }
582
583        // All workers should be unpinned.
584        for w in &workers {
585            assert!(!w.is_pinned());
586            assert!(w.last_quiesce_ns() > 0);
587        }
588    }
589
590    /// Stress test: with a tiny ring (capacity=2), hammer the producer and
591    /// multiple consumers to maximise the chance of the retry-exhaustion
592    /// race. After the fallback-scan fix, `latest()` must never return
593    /// `None` once the ring is non-empty.
594    #[test]
595    fn test_latest_never_spurious_none_under_contention() {
596        use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
597        use std::thread;
598
599        let ring = Arc::new(SnapshotRing::new(2)); // minimum capacity
600        let producer_done = Arc::new(AtomicBool::new(false));
601        let spurious_nones = Arc::new(AtomicU64::new(0));
602
603        // Producer: push 200 snapshots as fast as possible.
604        // (Each snapshot allocates a PingPongArena, so keep count moderate.)
605        let ring_p = Arc::clone(&ring);
606        let done_flag = Arc::clone(&producer_done);
607        let producer = thread::spawn(move || {
608            for i in 1..=200u64 {
609                ring_p.push(make_test_snapshot(i));
610            }
611            done_flag.store(true, Ordering::Release);
612        });
613
614        // 4 consumers: call latest() in a tight loop, counting spurious Nones.
615        let consumers: Vec<_> = (0..4)
616            .map(|_| {
617                let ring_c = Arc::clone(&ring);
618                let done_c = Arc::clone(&producer_done);
619                let nones = Arc::clone(&spurious_nones);
620                thread::spawn(move || {
621                    let mut reads = 0u64;
622                    loop {
623                        // Once at least one push has happened, latest() must
624                        // never return None.
625                        if ring_c.write_pos() > 0 && ring_c.latest().is_none() {
626                            nones.fetch_add(1, Ordering::Relaxed);
627                        }
628                        reads += 1;
629                        if done_c.load(Ordering::Acquire) {
630                            break;
631                        }
632                        // No yield — keep the loop tight to maximise contention.
633                    }
634                    reads
635                })
636            })
637            .collect();
638
639        producer.join().unwrap();
640        for c in consumers {
641            c.join().unwrap();
642        }
643
644        assert_eq!(
645            spurious_nones.load(Ordering::Relaxed),
646            0,
647            "latest() must never return None when the ring is non-empty"
648        );
649    }
650
651    #[test]
652    fn test_write_pos_monotonic_after_many_pushes() {
653        let ring = SnapshotRing::new(4);
654        for i in 1..=20u64 {
655            ring.push(make_test_snapshot(i));
656            assert_eq!(
657                ring.write_pos(),
658                i,
659                "write_pos should be {i} after {i} pushes"
660            );
661        }
662    }
663
664    #[test]
665    fn test_position_tag_matches_write_pos() {
666        let ring = SnapshotRing::new(4);
667        for i in 0..4u64 {
668            ring.push(make_test_snapshot(i + 1));
669        }
670        // Positions 0..4 should all be retrievable with correct ticks.
671        for i in 0..4u64 {
672            let snap = ring
673                .get_by_pos(i)
674                .unwrap_or_else(|| panic!("pos {i} should be retained"));
675            assert_eq!(snap.tick_id(), TickId(i + 1), "wrong tick at pos {i}");
676        }
677    }
678
679    #[test]
680    fn test_oldest_retained_pos_tracks_eviction_boundary() {
681        let ring = SnapshotRing::new(4);
682        assert_eq!(ring.oldest_retained_pos(), None);
683
684        ring.push(make_test_snapshot(1));
685        assert_eq!(ring.oldest_retained_pos(), Some(0));
686
687        for i in 2..=4 {
688            ring.push(make_test_snapshot(i));
689        }
690        assert_eq!(ring.oldest_retained_pos(), Some(0));
691
692        // Push 5th — evicts pos 0.
693        ring.push(make_test_snapshot(5));
694        assert_eq!(ring.oldest_retained_pos(), Some(1));
695
696        // Push 6th — evicts pos 1.
697        ring.push(make_test_snapshot(6));
698        assert_eq!(ring.oldest_retained_pos(), Some(2));
699    }
700
701    #[test]
702    fn test_counter_monotonicity_after_wraparound() {
703        let ring = SnapshotRing::new(3);
704        // Push 10 snapshots — wraps multiple times.
705        for i in 1..=10u64 {
706            ring.push(make_test_snapshot(i));
707        }
708        // write_pos should be 10, not reset.
709        assert_eq!(ring.write_pos(), 10);
710        // Old positions (before the retained window) should return None.
711        for i in 0..7u64 {
712            assert!(
713                ring.get_by_pos(i).is_none(),
714                "pos {i} should be evicted (retained window is 7..10)"
715            );
716        }
717        // Retained positions should return correct snapshots.
718        for i in 7..10u64 {
719            let snap = ring
720                .get_by_pos(i)
721                .unwrap_or_else(|| panic!("pos {i} should be retained"));
722            assert_eq!(snap.tick_id(), TickId(i + 1));
723        }
724    }
725
726    #[test]
727    fn test_overwrite_detection_returns_correct_snapshot() {
728        let ring = SnapshotRing::new(3);
729        // Push ticks 1, 2, 3 — fills ring. Slot indices: 0→tick1, 1→tick2, 2→tick3.
730        for i in 1..=3u64 {
731            ring.push(make_test_snapshot(i));
732        }
733        // Push tick 4 — overwrites slot 0. Now slot 0 holds tick 4, not tick 1.
734        ring.push(make_test_snapshot(4));
735
736        // get_by_pos(0) should return None (evicted), NOT tick 4.
737        assert!(ring.get_by_pos(0).is_none(), "pos 0 should be evicted");
738
739        // get_by_pos(3) should return tick 4 (in slot 0, tag=3).
740        let snap = ring.get_by_pos(3).unwrap();
741        assert_eq!(snap.tick_id(), TickId(4));
742
743        // get_by_pos(1) should still return tick 2.
744        let snap = ring.get_by_pos(1).unwrap();
745        assert_eq!(snap.tick_id(), TickId(2));
746    }
747}