Skip to main content

murk_engine/
ring.rs

1//! Fixed-capacity ring buffer of owned snapshots for RealtimeAsync mode.
2//!
3//! [`SnapshotRing`] stores `Arc<OwnedSnapshot>` slots with single-producer
4//! push and multi-consumer read. This is the spike implementation using
5//! `Mutex` per slot; production will replace `Arc` with epoch-based pinning.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9
10use murk_arena::OwnedSnapshot;
11
12/// A tagged slot: the `u64` is the monotonic write position when this
13/// snapshot was stored, enabling consumers to detect overwrites.
14type Slot = Option<(u64, Arc<OwnedSnapshot>)>;
15
16/// A fixed-capacity ring buffer of `Arc<OwnedSnapshot>`.
17///
18/// Single-producer: only one thread calls `push`. Multi-consumer: any
19/// thread can call `latest` or `get_by_pos` to read snapshots.
20///
21/// The write position is monotonically increasing (never wraps). Slot
22/// index is computed as `pos % capacity`. Each slot stores a position
23/// tag alongside the snapshot so that consumers can verify they are
24/// reading the slot they intended, even under concurrent producer pushes.
25pub struct SnapshotRing {
26    /// Each slot holds `(position_tag, snapshot)`. The tag is the monotonic
27    /// write position at which this snapshot was stored, enabling consumers
28    /// to detect when a slot has been overwritten between their bounds check
29    /// and their lock acquisition.
30    slots: Vec<Mutex<Slot>>,
31    write_pos: AtomicU64,
32    capacity: usize,
33}
34
35// Compile-time assertion: SnapshotRing must be Send + Sync.
36const _: fn() = || {
37    fn assert<T: Send + Sync>() {}
38    assert::<SnapshotRing>();
39};
40
41impl SnapshotRing {
42    /// Create a new ring buffer with the given capacity.
43    ///
44    /// # Panics
45    ///
46    /// Panics if `capacity < 2`. A ring buffer needs at least 2 slots
47    /// to be useful (one being written, one readable).
48    pub fn new(capacity: usize) -> Self {
49        assert!(
50            capacity >= 2,
51            "SnapshotRing capacity must be >= 2, got {capacity}"
52        );
53        let slots = (0..capacity).map(|_| Mutex::new(None)).collect();
54        Self {
55            slots,
56            write_pos: AtomicU64::new(0),
57            capacity,
58        }
59    }
60
61    /// Push a new snapshot into the ring. Single-producer only.
62    ///
63    /// Returns the evicted snapshot (if any) that was displaced by this push.
64    /// The caller can use this for reclamation bookkeeping.
65    pub fn push(&self, snapshot: OwnedSnapshot) -> Option<Arc<OwnedSnapshot>> {
66        let pos = self.write_pos.load(Ordering::Relaxed);
67        let slot_idx = (pos as usize) % self.capacity;
68
69        let arc = Arc::new(snapshot);
70        let evicted = {
71            let mut slot = self.slots[slot_idx].lock().unwrap();
72            let prev = slot.take().map(|(_tag, arc)| arc);
73            *slot = Some((pos, Arc::clone(&arc)));
74            prev
75        };
76
77        // Release-store ensures the snapshot data is visible before
78        // consumers observe the new write_pos.
79        self.write_pos.store(pos + 1, Ordering::Release);
80
81        evicted
82    }
83
84    /// Get the latest (most recently pushed) snapshot.
85    ///
86    /// Returns `None` only if no snapshots have been pushed yet.
87    /// On overwrite races (producer wraps the slot between our
88    /// `write_pos` read and lock acquisition), retries from the
89    /// fresh `write_pos` to guarantee returning an available
90    /// snapshot whenever the ring is non-empty.
91    pub fn latest(&self) -> Option<Arc<OwnedSnapshot>> {
92        // Bounded retry: at most `capacity` attempts. A well-behaved
93        // producer can overwrite at most `capacity` slots per lap, so
94        // `capacity` retries guarantees convergence unless the producer
95        // is lapping the consumer faster than the consumer can lock a
96        // single slot — which would indicate a misconfigured system.
97        for _ in 0..self.capacity {
98            let pos = self.write_pos.load(Ordering::Acquire);
99            if pos == 0 {
100                return None;
101            }
102            let target_pos = pos - 1;
103            let slot_idx = (target_pos as usize) % self.capacity;
104            let slot = self.slots[slot_idx].lock().unwrap();
105            match slot.as_ref() {
106                Some((tag, arc)) if *tag == target_pos => return Some(Arc::clone(arc)),
107                // Producer overwrote this slot between our write_pos
108                // read and lock acquisition. Re-read write_pos and
109                // try the new latest slot.
110                _ => continue,
111            }
112        }
113        // All retries exhausted — producer is lapping us extremely fast.
114        // This should not happen under normal conditions.
115        None
116    }
117
118    /// Get a snapshot by its monotonic write position.
119    ///
120    /// Returns `None` if the position has been evicted (overwritten) or
121    /// hasn't been written yet.
122    pub fn get_by_pos(&self, pos: u64) -> Option<Arc<OwnedSnapshot>> {
123        let current = self.write_pos.load(Ordering::Acquire);
124
125        // Not yet written.
126        if pos >= current {
127            return None;
128        }
129
130        // Evicted: the position is older than what the ring retains.
131        if current - pos > self.capacity as u64 {
132            return None;
133        }
134
135        let slot_idx = (pos as usize) % self.capacity;
136        let slot = self.slots[slot_idx].lock().unwrap();
137        match slot.as_ref() {
138            Some((tag, arc)) if *tag == pos => Some(Arc::clone(arc)),
139            // The producer overwrote this slot between our bounds check
140            // and lock acquisition — the requested position is gone.
141            _ => None,
142        }
143    }
144
145    /// Number of snapshots currently stored (up to `capacity`).
146    pub fn len(&self) -> usize {
147        let pos = self.write_pos.load(Ordering::Acquire) as usize;
148        pos.min(self.capacity)
149    }
150
151    /// Whether the ring is empty.
152    pub fn is_empty(&self) -> bool {
153        self.write_pos.load(Ordering::Acquire) == 0
154    }
155
156    /// The ring buffer capacity.
157    pub fn capacity(&self) -> usize {
158        self.capacity
159    }
160
161    /// The current monotonic write position.
162    pub fn write_pos(&self) -> u64 {
163        self.write_pos.load(Ordering::Acquire)
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use murk_arena::config::ArenaConfig;
171    use murk_arena::pingpong::PingPongArena;
172    use murk_arena::static_arena::StaticArena;
173    use murk_core::id::{FieldId, ParameterVersion, TickId};
174    use murk_core::traits::{FieldWriter as _, SnapshotAccess};
175    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
176
177    fn make_test_snapshot(tick: u64) -> OwnedSnapshot {
178        let cell_count = 10u32;
179        let config = ArenaConfig::new(cell_count);
180        let field_defs = vec![(
181            FieldId(0),
182            FieldDef {
183                name: "energy".into(),
184                field_type: FieldType::Scalar,
185                mutability: FieldMutability::PerTick,
186                units: None,
187                bounds: None,
188                boundary_behavior: BoundaryBehavior::Clamp,
189            },
190        )];
191        let static_arena = StaticArena::new(&[]).into_shared();
192        let mut arena = PingPongArena::new(config, field_defs, static_arena).unwrap();
193
194        {
195            let mut guard = arena.begin_tick().unwrap();
196            let data = guard.writer.write(FieldId(0)).unwrap();
197            data.fill(tick as f32);
198        }
199        arena.publish(TickId(tick), ParameterVersion(0));
200        arena.owned_snapshot()
201    }
202
203    #[test]
204    fn test_ring_new_empty() {
205        let ring = SnapshotRing::new(4);
206        assert_eq!(ring.len(), 0);
207        assert!(ring.is_empty());
208        assert_eq!(ring.capacity(), 4);
209        assert_eq!(ring.write_pos(), 0);
210        assert!(ring.latest().is_none());
211    }
212
213    #[test]
214    fn test_ring_push_and_latest() {
215        let ring = SnapshotRing::new(4);
216        ring.push(make_test_snapshot(1));
217        assert_eq!(ring.len(), 1);
218        assert!(!ring.is_empty());
219
220        let latest = ring.latest().unwrap();
221        assert_eq!(latest.tick_id(), TickId(1));
222    }
223
224    #[test]
225    fn test_ring_eviction() {
226        let ring = SnapshotRing::new(4);
227
228        // Push 4 — fills the ring.
229        for i in 1..=4 {
230            let evicted = ring.push(make_test_snapshot(i));
231            assert!(evicted.is_none());
232        }
233        assert_eq!(ring.len(), 4);
234
235        // Push 5th — evicts tick 1.
236        let evicted = ring.push(make_test_snapshot(5));
237        assert!(evicted.is_some());
238        assert_eq!(evicted.unwrap().tick_id(), TickId(1));
239        assert_eq!(ring.len(), 4);
240    }
241
242    #[test]
243    fn test_ring_latest_is_newest() {
244        let ring = SnapshotRing::new(4);
245        for i in 1..=10 {
246            ring.push(make_test_snapshot(i));
247        }
248        let latest = ring.latest().unwrap();
249        assert_eq!(latest.tick_id(), TickId(10));
250    }
251
252    #[test]
253    fn test_ring_get_by_pos() {
254        let ring = SnapshotRing::new(4);
255        for i in 1..=4 {
256            ring.push(make_test_snapshot(i));
257        }
258
259        // Positions are 0-indexed (write_pos starts at 0).
260        let snap = ring.get_by_pos(0).unwrap();
261        assert_eq!(snap.tick_id(), TickId(1));
262
263        let snap = ring.get_by_pos(3).unwrap();
264        assert_eq!(snap.tick_id(), TickId(4));
265
266        // Position 4 not yet written.
267        assert!(ring.get_by_pos(4).is_none());
268    }
269
270    #[test]
271    fn test_ring_get_evicted_returns_none() {
272        let ring = SnapshotRing::new(4);
273        for i in 1..=8 {
274            ring.push(make_test_snapshot(i));
275        }
276        // Positions 0-3 have been evicted (overwritten by positions 4-7).
277        assert!(ring.get_by_pos(0).is_none());
278        assert!(ring.get_by_pos(3).is_none());
279
280        // Positions 4-7 should still be available.
281        let snap = ring.get_by_pos(4).unwrap();
282        assert_eq!(snap.tick_id(), TickId(5));
283    }
284
285    #[test]
286    #[should_panic(expected = "capacity must be >= 2")]
287    fn test_ring_capacity_panics_below_2() {
288        SnapshotRing::new(1);
289    }
290
291    #[test]
292    fn test_get_by_pos_returns_none_after_concurrent_overwrite() {
293        // Simulates the race: consumer reads write_pos, producer wraps
294        // around and overwrites the target slot, consumer locks and
295        // must detect the overwrite via position tag.
296        let ring = SnapshotRing::new(4);
297
298        // Fill positions 0-3.
299        for i in 1..=4 {
300            ring.push(make_test_snapshot(i));
301        }
302        assert_eq!(ring.get_by_pos(0).unwrap().tick_id(), TickId(1));
303
304        // Now push 4 more: positions 4-7 overwrite slots 0-3.
305        for i in 5..=8 {
306            ring.push(make_test_snapshot(i));
307        }
308
309        // Position 0 was overwritten by position 4 (same slot index).
310        // get_by_pos(0) must return None, not the snapshot at position 4.
311        assert!(ring.get_by_pos(0).is_none());
312        assert!(ring.get_by_pos(3).is_none());
313
314        // Position 4 should still be accessible.
315        let snap = ring.get_by_pos(4).unwrap();
316        assert_eq!(snap.tick_id(), TickId(5));
317
318        // Position 7 should be accessible.
319        let snap = ring.get_by_pos(7).unwrap();
320        assert_eq!(snap.tick_id(), TickId(8));
321    }
322
323    #[test]
324    fn test_get_by_pos_tag_matches_position() {
325        // Verify that get_by_pos returns the exact snapshot for the
326        // requested position, not whatever happens to be in the slot.
327        let ring = SnapshotRing::new(4);
328
329        for i in 1..=4 {
330            ring.push(make_test_snapshot(i * 10));
331        }
332
333        // Each position should return its exact snapshot.
334        assert_eq!(ring.get_by_pos(0).unwrap().tick_id(), TickId(10));
335        assert_eq!(ring.get_by_pos(1).unwrap().tick_id(), TickId(20));
336        assert_eq!(ring.get_by_pos(2).unwrap().tick_id(), TickId(30));
337        assert_eq!(ring.get_by_pos(3).unwrap().tick_id(), TickId(40));
338    }
339
340    // ── Cross-thread integration tests ────────────────────────────
341
342    #[test]
343    fn test_producer_consumer_cross_thread() {
344        use crate::config::{BackoffConfig, WorldConfig};
345        use crate::tick::TickEngine;
346        use murk_space::{EdgeBehavior, Line1D};
347        use murk_test_utils::ConstPropagator;
348        use std::sync::atomic::{AtomicBool, Ordering};
349        use std::thread;
350
351        let config = WorldConfig {
352            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
353            fields: vec![FieldDef {
354                name: "energy".into(),
355                field_type: FieldType::Scalar,
356                mutability: FieldMutability::PerTick,
357                units: None,
358                bounds: None,
359                boundary_behavior: BoundaryBehavior::Clamp,
360            }],
361            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
362            dt: 0.1,
363            seed: 42,
364            ring_buffer_size: 8,
365            max_ingress_queue: 1024,
366            tick_rate_hz: None,
367            backoff: BackoffConfig::default(),
368        };
369        let mut engine = TickEngine::new(config).unwrap();
370
371        let ring = Arc::new(SnapshotRing::new(8));
372        let epoch_counter = Arc::new(crate::epoch::EpochCounter::new());
373        let producer_done = Arc::new(AtomicBool::new(false));
374
375        // Producer: run 100 ticks, push snapshots to ring, advance epoch.
376        let ring_prod = Arc::clone(&ring);
377        let epoch_prod = Arc::clone(&epoch_counter);
378        let done_flag = Arc::clone(&producer_done);
379        let producer = thread::spawn(move || {
380            for _ in 0..100 {
381                engine.execute_tick().unwrap();
382                let snap = engine.owned_snapshot();
383                ring_prod.push(snap);
384                epoch_prod.advance();
385            }
386            done_flag.store(true, Ordering::Release);
387        });
388
389        // 4 consumer threads: read latest snapshot until producer is done.
390        let consumers: Vec<_> = (0..4)
391            .map(|id| {
392                let ring_c = Arc::clone(&ring);
393                let done_c = Arc::clone(&producer_done);
394                let worker = Arc::new(crate::epoch::WorkerEpoch::new(id));
395                thread::spawn(move || {
396                    let mut reads = 0u64;
397                    loop {
398                        if let Some(snap) = ring_c.latest() {
399                            let epoch = snap.tick_id().0;
400                            worker.pin(epoch);
401                            let data = snap.read_field(FieldId(0)).unwrap();
402                            assert_eq!(data.len(), 10);
403                            assert!(data.iter().all(|&v| v == 42.0));
404                            worker.unpin();
405                            reads += 1;
406                        }
407                        if done_c.load(Ordering::Acquire) && reads > 0 {
408                            break;
409                        }
410                        thread::yield_now();
411                    }
412                    reads
413                })
414            })
415            .collect();
416
417        producer.join().unwrap();
418        let mut total_reads = 0u64;
419        for c in consumers {
420            let reads = c.join().unwrap();
421            assert!(reads > 0, "consumer should have read at least one snapshot");
422            total_reads += reads;
423        }
424
425        // Verify final state.
426        assert!(ring.len() <= 8);
427        assert_eq!(epoch_counter.current(), 100);
428        assert!(
429            total_reads >= 4,
430            "consumers collectively should have many reads"
431        );
432    }
433
434    #[test]
435    fn test_epoch_pin_unpin_cross_thread() {
436        use crate::epoch::WorkerEpoch;
437        use std::thread;
438
439        let workers: Vec<Arc<WorkerEpoch>> =
440            (0..4).map(|i| Arc::new(WorkerEpoch::new(i))).collect();
441
442        let handles: Vec<_> = workers
443            .iter()
444            .map(|worker| {
445                let worker = worker.clone();
446                thread::spawn(move || {
447                    for epoch in 0..100u64 {
448                        worker.pin(epoch);
449                        assert!(worker.is_pinned());
450                        assert_eq!(worker.pinned_epoch(), epoch);
451                        worker.unpin();
452                        assert!(!worker.is_pinned());
453                    }
454                })
455            })
456            .collect();
457
458        for h in handles {
459            h.join().unwrap();
460        }
461
462        // All workers should be unpinned.
463        for w in &workers {
464            assert!(!w.is_pinned());
465            assert!(w.last_quiesce_ns() > 0);
466        }
467    }
468}