Skip to main content

murk_engine/
ring.rs

1//! Fixed-capacity ring buffer of owned snapshots for RealtimeAsync mode.
2//!
3//! [`SnapshotRing`] stores `Arc<OwnedSnapshot>` slots with single-producer
4//! push and multi-consumer read. This is the spike implementation using
5//! `Mutex` per slot; production will replace `Arc` with epoch-based pinning.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9
10use murk_arena::OwnedSnapshot;
11
12/// A tagged slot: the `u64` is the monotonic write position when this
13/// snapshot was stored, enabling consumers to detect overwrites.
14type Slot = Option<(u64, Arc<OwnedSnapshot>)>;
15
16/// A fixed-capacity ring buffer of `Arc<OwnedSnapshot>`.
17///
18/// Single-producer: only one thread calls `push`. Multi-consumer: any
19/// thread can call `latest` or `get_by_pos` to read snapshots.
20///
21/// The write position is monotonically increasing (never wraps). Slot
22/// index is computed as `pos % capacity`. Each slot stores a position
23/// tag alongside the snapshot so that consumers can verify they are
24/// reading the slot they intended, even under concurrent producer pushes.
25pub struct SnapshotRing {
26    /// Each slot holds `(position_tag, snapshot)`. The tag is the monotonic
27    /// write position at which this snapshot was stored, enabling consumers
28    /// to detect when a slot has been overwritten between their bounds check
29    /// and their lock acquisition.
30    slots: Vec<Mutex<Slot>>,
31    write_pos: AtomicU64,
32    capacity: usize,
33}
34
35// Compile-time assertion: SnapshotRing must be Send + Sync.
36const _: fn() = || {
37    fn assert<T: Send + Sync>() {}
38    assert::<SnapshotRing>();
39};
40
41impl SnapshotRing {
42    /// Create a new ring buffer with the given capacity.
43    ///
44    /// # Panics
45    ///
46    /// Panics if `capacity < 2`. A ring buffer needs at least 2 slots
47    /// to be useful (one being written, one readable).
48    pub fn new(capacity: usize) -> Self {
49        assert!(
50            capacity >= 2,
51            "SnapshotRing capacity must be >= 2, got {capacity}"
52        );
53        let slots = (0..capacity).map(|_| Mutex::new(None)).collect();
54        Self {
55            slots,
56            write_pos: AtomicU64::new(0),
57            capacity,
58        }
59    }
60
61    /// Push a new snapshot into the ring. Single-producer only.
62    ///
63    /// Returns the evicted snapshot (if any) that was displaced by this push.
64    /// The caller can use this for reclamation bookkeeping.
65    pub fn push(&self, snapshot: OwnedSnapshot) -> Option<Arc<OwnedSnapshot>> {
66        let pos = self.write_pos.load(Ordering::Relaxed);
67        let slot_idx = (pos as usize) % self.capacity;
68
69        let arc = Arc::new(snapshot);
70        let evicted = {
71            let mut slot = self.slots[slot_idx].lock().unwrap();
72            let prev = slot.take().map(|(_tag, arc)| arc);
73            *slot = Some((pos, Arc::clone(&arc)));
74            prev
75        };
76
77        // Release-store ensures the snapshot data is visible before
78        // consumers observe the new write_pos.
79        self.write_pos.store(pos + 1, Ordering::Release);
80
81        evicted
82    }
83
84    /// Get the latest (most recently pushed) snapshot.
85    ///
86    /// Returns `None` only if no snapshots have been pushed yet.
87    /// On overwrite races (producer wraps the slot between our
88    /// `write_pos` read and lock acquisition), retries from the
89    /// fresh `write_pos`. If all retries are exhausted (producer
90    /// lapping the consumer), falls back to a full scan of all
91    /// slots, guaranteeing `Some` whenever the ring is non-empty.
92    pub fn latest(&self) -> Option<Arc<OwnedSnapshot>> {
93        // Fast path: bounded retry targeting the most recent snapshot.
94        // Succeeds immediately under normal contention.
95        for _ in 0..self.capacity {
96            let pos = self.write_pos.load(Ordering::Acquire);
97            if pos == 0 {
98                return None;
99            }
100            let target_pos = pos - 1;
101            let slot_idx = (target_pos as usize) % self.capacity;
102            let slot = self.slots[slot_idx].lock().unwrap();
103            match slot.as_ref() {
104                Some((tag, arc)) if *tag == target_pos => return Some(Arc::clone(arc)),
105                // Producer overwrote this slot between our write_pos
106                // read and lock acquisition. Re-read write_pos and
107                // try the new latest slot.
108                _ => continue,
109            }
110        }
111
112        // Fallback: the producer is lapping us faster than we can lock
113        // the latest slot. Scan all slots and return the snapshot with
114        // the highest position tag. The single producer holds at most
115        // one slot's mutex at a time, so at least (capacity - 1) slots
116        // contain valid snapshots — this scan is guaranteed to find one
117        // whenever the ring is non-empty.
118        let mut best: Option<(u64, Arc<OwnedSnapshot>)> = None;
119        for slot_mutex in &self.slots {
120            let slot = slot_mutex.lock().unwrap();
121            if let Some((tag, arc)) = slot.as_ref() {
122                let dominated = best.as_ref().is_some_and(|(best_tag, _)| *best_tag >= *tag);
123                if !dominated {
124                    best = Some((*tag, Arc::clone(arc)));
125                }
126            }
127        }
128        best.map(|(_, arc)| arc)
129    }
130
131    /// Get a snapshot by its monotonic write position.
132    ///
133    /// Returns `None` if the position has been evicted (overwritten) or
134    /// hasn't been written yet.
135    pub fn get_by_pos(&self, pos: u64) -> Option<Arc<OwnedSnapshot>> {
136        let current = self.write_pos.load(Ordering::Acquire);
137
138        // Not yet written.
139        if pos >= current {
140            return None;
141        }
142
143        // Evicted: the position is older than what the ring retains.
144        if current - pos > self.capacity as u64 {
145            return None;
146        }
147
148        let slot_idx = (pos as usize) % self.capacity;
149        let slot = self.slots[slot_idx].lock().unwrap();
150        match slot.as_ref() {
151            Some((tag, arc)) if *tag == pos => Some(Arc::clone(arc)),
152            // The producer overwrote this slot between our bounds check
153            // and lock acquisition — the requested position is gone.
154            _ => None,
155        }
156    }
157
158    /// Number of snapshots currently stored (up to `capacity`).
159    pub fn len(&self) -> usize {
160        let pos = self.write_pos.load(Ordering::Acquire) as usize;
161        pos.min(self.capacity)
162    }
163
164    /// Whether the ring is empty.
165    pub fn is_empty(&self) -> bool {
166        self.write_pos.load(Ordering::Acquire) == 0
167    }
168
169    /// The ring buffer capacity.
170    pub fn capacity(&self) -> usize {
171        self.capacity
172    }
173
174    /// The current monotonic write position.
175    pub fn write_pos(&self) -> u64 {
176        self.write_pos.load(Ordering::Acquire)
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use murk_arena::config::ArenaConfig;
184    use murk_arena::pingpong::PingPongArena;
185    use murk_arena::static_arena::StaticArena;
186    use murk_core::id::{FieldId, ParameterVersion, TickId};
187    use murk_core::traits::{FieldWriter as _, SnapshotAccess};
188    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
189
190    fn make_test_snapshot(tick: u64) -> OwnedSnapshot {
191        let cell_count = 10u32;
192        let config = ArenaConfig::new(cell_count);
193        let field_defs = vec![(
194            FieldId(0),
195            FieldDef {
196                name: "energy".into(),
197                field_type: FieldType::Scalar,
198                mutability: FieldMutability::PerTick,
199                units: None,
200                bounds: None,
201                boundary_behavior: BoundaryBehavior::Clamp,
202            },
203        )];
204        let static_arena = StaticArena::new(&[]).into_shared();
205        let mut arena = PingPongArena::new(config, field_defs, static_arena).unwrap();
206
207        {
208            let mut guard = arena.begin_tick().unwrap();
209            let data = guard.writer.write(FieldId(0)).unwrap();
210            data.fill(tick as f32);
211        }
212        arena.publish(TickId(tick), ParameterVersion(0)).unwrap();
213        arena.owned_snapshot()
214    }
215
216    #[test]
217    fn test_ring_new_empty() {
218        let ring = SnapshotRing::new(4);
219        assert_eq!(ring.len(), 0);
220        assert!(ring.is_empty());
221        assert_eq!(ring.capacity(), 4);
222        assert_eq!(ring.write_pos(), 0);
223        assert!(ring.latest().is_none());
224    }
225
226    #[test]
227    fn test_ring_push_and_latest() {
228        let ring = SnapshotRing::new(4);
229        ring.push(make_test_snapshot(1));
230        assert_eq!(ring.len(), 1);
231        assert!(!ring.is_empty());
232
233        let latest = ring.latest().unwrap();
234        assert_eq!(latest.tick_id(), TickId(1));
235    }
236
237    #[test]
238    fn test_ring_eviction() {
239        let ring = SnapshotRing::new(4);
240
241        // Push 4 — fills the ring.
242        for i in 1..=4 {
243            let evicted = ring.push(make_test_snapshot(i));
244            assert!(evicted.is_none());
245        }
246        assert_eq!(ring.len(), 4);
247
248        // Push 5th — evicts tick 1.
249        let evicted = ring.push(make_test_snapshot(5));
250        assert!(evicted.is_some());
251        assert_eq!(evicted.unwrap().tick_id(), TickId(1));
252        assert_eq!(ring.len(), 4);
253    }
254
255    #[test]
256    fn test_ring_latest_is_newest() {
257        let ring = SnapshotRing::new(4);
258        for i in 1..=10 {
259            ring.push(make_test_snapshot(i));
260        }
261        let latest = ring.latest().unwrap();
262        assert_eq!(latest.tick_id(), TickId(10));
263    }
264
265    #[test]
266    fn test_ring_get_by_pos() {
267        let ring = SnapshotRing::new(4);
268        for i in 1..=4 {
269            ring.push(make_test_snapshot(i));
270        }
271
272        // Positions are 0-indexed (write_pos starts at 0).
273        let snap = ring.get_by_pos(0).unwrap();
274        assert_eq!(snap.tick_id(), TickId(1));
275
276        let snap = ring.get_by_pos(3).unwrap();
277        assert_eq!(snap.tick_id(), TickId(4));
278
279        // Position 4 not yet written.
280        assert!(ring.get_by_pos(4).is_none());
281    }
282
283    #[test]
284    fn test_ring_get_evicted_returns_none() {
285        let ring = SnapshotRing::new(4);
286        for i in 1..=8 {
287            ring.push(make_test_snapshot(i));
288        }
289        // Positions 0-3 have been evicted (overwritten by positions 4-7).
290        assert!(ring.get_by_pos(0).is_none());
291        assert!(ring.get_by_pos(3).is_none());
292
293        // Positions 4-7 should still be available.
294        let snap = ring.get_by_pos(4).unwrap();
295        assert_eq!(snap.tick_id(), TickId(5));
296    }
297
298    #[test]
299    #[should_panic(expected = "capacity must be >= 2")]
300    fn test_ring_capacity_panics_below_2() {
301        SnapshotRing::new(1);
302    }
303
304    #[test]
305    fn test_get_by_pos_returns_none_after_concurrent_overwrite() {
306        // Simulates the race: consumer reads write_pos, producer wraps
307        // around and overwrites the target slot, consumer locks and
308        // must detect the overwrite via position tag.
309        let ring = SnapshotRing::new(4);
310
311        // Fill positions 0-3.
312        for i in 1..=4 {
313            ring.push(make_test_snapshot(i));
314        }
315        assert_eq!(ring.get_by_pos(0).unwrap().tick_id(), TickId(1));
316
317        // Now push 4 more: positions 4-7 overwrite slots 0-3.
318        for i in 5..=8 {
319            ring.push(make_test_snapshot(i));
320        }
321
322        // Position 0 was overwritten by position 4 (same slot index).
323        // get_by_pos(0) must return None, not the snapshot at position 4.
324        assert!(ring.get_by_pos(0).is_none());
325        assert!(ring.get_by_pos(3).is_none());
326
327        // Position 4 should still be accessible.
328        let snap = ring.get_by_pos(4).unwrap();
329        assert_eq!(snap.tick_id(), TickId(5));
330
331        // Position 7 should be accessible.
332        let snap = ring.get_by_pos(7).unwrap();
333        assert_eq!(snap.tick_id(), TickId(8));
334    }
335
336    #[test]
337    fn test_get_by_pos_tag_matches_position() {
338        // Verify that get_by_pos returns the exact snapshot for the
339        // requested position, not whatever happens to be in the slot.
340        let ring = SnapshotRing::new(4);
341
342        for i in 1..=4 {
343            ring.push(make_test_snapshot(i * 10));
344        }
345
346        // Each position should return its exact snapshot.
347        assert_eq!(ring.get_by_pos(0).unwrap().tick_id(), TickId(10));
348        assert_eq!(ring.get_by_pos(1).unwrap().tick_id(), TickId(20));
349        assert_eq!(ring.get_by_pos(2).unwrap().tick_id(), TickId(30));
350        assert_eq!(ring.get_by_pos(3).unwrap().tick_id(), TickId(40));
351    }
352
353    // ── Cross-thread integration tests ────────────────────────────
354
355    #[test]
356    fn test_producer_consumer_cross_thread() {
357        use crate::config::{BackoffConfig, WorldConfig};
358        use crate::tick::TickEngine;
359        use murk_space::{EdgeBehavior, Line1D};
360        use murk_test_utils::ConstPropagator;
361        use std::sync::atomic::{AtomicBool, Ordering};
362        use std::thread;
363
364        let config = WorldConfig {
365            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
366            fields: vec![FieldDef {
367                name: "energy".into(),
368                field_type: FieldType::Scalar,
369                mutability: FieldMutability::PerTick,
370                units: None,
371                bounds: None,
372                boundary_behavior: BoundaryBehavior::Clamp,
373            }],
374            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
375            dt: 0.1,
376            seed: 42,
377            ring_buffer_size: 8,
378            max_ingress_queue: 1024,
379            tick_rate_hz: None,
380            backoff: BackoffConfig::default(),
381        };
382        let mut engine = TickEngine::new(config).unwrap();
383
384        let ring = Arc::new(SnapshotRing::new(8));
385        let epoch_counter = Arc::new(crate::epoch::EpochCounter::new());
386        let producer_done = Arc::new(AtomicBool::new(false));
387
388        // Producer: run 100 ticks, push snapshots to ring, advance epoch.
389        let ring_prod = Arc::clone(&ring);
390        let epoch_prod = Arc::clone(&epoch_counter);
391        let done_flag = Arc::clone(&producer_done);
392        let producer = thread::spawn(move || {
393            for _ in 0..100 {
394                engine.execute_tick().unwrap();
395                let snap = engine.owned_snapshot();
396                ring_prod.push(snap);
397                epoch_prod.advance();
398            }
399            done_flag.store(true, Ordering::Release);
400        });
401
402        // 4 consumer threads: read latest snapshot until producer is done.
403        let consumers: Vec<_> = (0..4)
404            .map(|id| {
405                let ring_c = Arc::clone(&ring);
406                let done_c = Arc::clone(&producer_done);
407                let worker = Arc::new(crate::epoch::WorkerEpoch::new(id));
408                thread::spawn(move || {
409                    let mut reads = 0u64;
410                    loop {
411                        if let Some(snap) = ring_c.latest() {
412                            let epoch = snap.tick_id().0;
413                            worker.pin(epoch);
414                            let data = snap.read_field(FieldId(0)).unwrap();
415                            assert_eq!(data.len(), 10);
416                            assert!(data.iter().all(|&v| v == 42.0));
417                            worker.unpin();
418                            reads += 1;
419                        }
420                        if done_c.load(Ordering::Acquire) && reads > 0 {
421                            break;
422                        }
423                        thread::yield_now();
424                    }
425                    reads
426                })
427            })
428            .collect();
429
430        producer.join().unwrap();
431        let mut total_reads = 0u64;
432        for c in consumers {
433            let reads = c.join().unwrap();
434            assert!(reads > 0, "consumer should have read at least one snapshot");
435            total_reads += reads;
436        }
437
438        // Verify final state.
439        assert!(ring.len() <= 8);
440        assert_eq!(epoch_counter.current(), 100);
441        assert!(
442            total_reads >= 4,
443            "consumers collectively should have many reads"
444        );
445    }
446
447    #[test]
448    fn test_epoch_pin_unpin_cross_thread() {
449        use crate::epoch::WorkerEpoch;
450        use std::thread;
451
452        let workers: Vec<Arc<WorkerEpoch>> =
453            (0..4).map(|i| Arc::new(WorkerEpoch::new(i))).collect();
454
455        let handles: Vec<_> = workers
456            .iter()
457            .map(|worker| {
458                let worker = worker.clone();
459                thread::spawn(move || {
460                    for epoch in 0..100u64 {
461                        worker.pin(epoch);
462                        assert!(worker.is_pinned());
463                        assert_eq!(worker.pinned_epoch(), epoch);
464                        worker.unpin();
465                        assert!(!worker.is_pinned());
466                    }
467                })
468            })
469            .collect();
470
471        for h in handles {
472            h.join().unwrap();
473        }
474
475        // All workers should be unpinned.
476        for w in &workers {
477            assert!(!w.is_pinned());
478            assert!(w.last_quiesce_ns() > 0);
479        }
480    }
481
482    /// Stress test: with a tiny ring (capacity=2), hammer the producer and
483    /// multiple consumers to maximise the chance of the retry-exhaustion
484    /// race. After the fallback-scan fix, `latest()` must never return
485    /// `None` once the ring is non-empty.
486    #[test]
487    fn test_latest_never_spurious_none_under_contention() {
488        use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
489        use std::thread;
490
491        let ring = Arc::new(SnapshotRing::new(2)); // minimum capacity
492        let producer_done = Arc::new(AtomicBool::new(false));
493        let spurious_nones = Arc::new(AtomicU64::new(0));
494
495        // Producer: push 200 snapshots as fast as possible.
496        // (Each snapshot allocates a PingPongArena, so keep count moderate.)
497        let ring_p = Arc::clone(&ring);
498        let done_flag = Arc::clone(&producer_done);
499        let producer = thread::spawn(move || {
500            for i in 1..=200u64 {
501                ring_p.push(make_test_snapshot(i));
502            }
503            done_flag.store(true, Ordering::Release);
504        });
505
506        // 4 consumers: call latest() in a tight loop, counting spurious Nones.
507        let consumers: Vec<_> = (0..4)
508            .map(|_| {
509                let ring_c = Arc::clone(&ring);
510                let done_c = Arc::clone(&producer_done);
511                let nones = Arc::clone(&spurious_nones);
512                thread::spawn(move || {
513                    let mut reads = 0u64;
514                    loop {
515                        // Once at least one push has happened, latest() must
516                        // never return None.
517                        if ring_c.write_pos() > 0 && ring_c.latest().is_none() {
518                            nones.fetch_add(1, Ordering::Relaxed);
519                        }
520                        reads += 1;
521                        if done_c.load(Ordering::Acquire) {
522                            break;
523                        }
524                        // No yield — keep the loop tight to maximise contention.
525                    }
526                    reads
527                })
528            })
529            .collect();
530
531        producer.join().unwrap();
532        for c in consumers {
533            c.join().unwrap();
534        }
535
536        assert_eq!(
537            spurious_nones.load(Ordering::Relaxed),
538            0,
539            "latest() must never return None when the ring is non-empty"
540        );
541    }
542}