1use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9
10use murk_arena::OwnedSnapshot;
11
12type Slot = Option<(u64, Arc<OwnedSnapshot>)>;
15
16pub struct SnapshotRing {
26 slots: Vec<Mutex<Slot>>,
31 write_pos: AtomicU64,
32 not_available_events: AtomicU64,
33 eviction_events: AtomicU64,
34 stale_read_events: AtomicU64,
35 skew_retry_events: AtomicU64,
36 capacity: usize,
37}
38
39const _: fn() = || {
41 fn assert<T: Send + Sync>() {}
42 assert::<SnapshotRing>();
43};
44
45impl SnapshotRing {
46 pub fn new(capacity: usize) -> Self {
53 assert!(
54 capacity >= 2,
55 "SnapshotRing capacity must be >= 2, got {capacity}"
56 );
57 let slots = (0..capacity).map(|_| Mutex::new(None)).collect();
58 Self {
59 slots,
60 write_pos: AtomicU64::new(0),
61 not_available_events: AtomicU64::new(0),
62 eviction_events: AtomicU64::new(0),
63 stale_read_events: AtomicU64::new(0),
64 skew_retry_events: AtomicU64::new(0),
65 capacity,
66 }
67 }
68
69 pub fn push(&self, snapshot: OwnedSnapshot) -> Option<Arc<OwnedSnapshot>> {
74 let pos = self.write_pos.load(Ordering::Relaxed);
75 let slot_idx = (pos as usize) % self.capacity;
76
77 let arc = Arc::new(snapshot);
78 let evicted = {
79 let mut slot = self.slots[slot_idx].lock().unwrap();
80 let prev = slot.take().map(|(_tag, arc)| arc);
81 *slot = Some((pos, Arc::clone(&arc)));
82 prev
83 };
84 if evicted.is_some() {
85 self.eviction_events.fetch_add(1, Ordering::Relaxed);
86 }
87
88 self.write_pos.store(pos + 1, Ordering::Release);
91
92 evicted
93 }
94
95 pub fn latest(&self) -> Option<Arc<OwnedSnapshot>> {
104 self.latest_impl(true)
105 }
106
107 pub fn peek_latest(&self) -> Option<Arc<OwnedSnapshot>> {
113 self.latest_impl(false)
114 }
115
116 fn latest_impl(&self, record_events: bool) -> Option<Arc<OwnedSnapshot>> {
117 for _ in 0..self.capacity {
120 let pos = self.write_pos.load(Ordering::Acquire);
121 if pos == 0 {
122 if record_events {
123 self.not_available_events.fetch_add(1, Ordering::Relaxed);
124 }
125 return None;
126 }
127 let target_pos = pos - 1;
128 let slot_idx = (target_pos as usize) % self.capacity;
129 let slot = self.slots[slot_idx].lock().unwrap();
130 match slot.as_ref() {
131 Some((tag, arc)) if *tag == target_pos => return Some(Arc::clone(arc)),
132 _ => {
136 if record_events {
137 self.skew_retry_events.fetch_add(1, Ordering::Relaxed);
138 }
139 continue;
140 }
141 }
142 }
143
144 let mut best: Option<(u64, Arc<OwnedSnapshot>)> = None;
151 for slot_mutex in &self.slots {
152 let slot = slot_mutex.lock().unwrap();
153 if let Some((tag, arc)) = slot.as_ref() {
154 let dominated = best.as_ref().is_some_and(|(best_tag, _)| *best_tag >= *tag);
155 if !dominated {
156 best = Some((*tag, Arc::clone(arc)));
157 }
158 }
159 }
160 if let Some((_, arc)) = best {
161 Some(arc)
162 } else {
163 if record_events {
164 self.not_available_events.fetch_add(1, Ordering::Relaxed);
165 }
166 None
167 }
168 }
169
170 pub fn get_by_pos(&self, pos: u64) -> Option<Arc<OwnedSnapshot>> {
175 let current = self.write_pos.load(Ordering::Acquire);
176
177 if pos >= current {
179 self.stale_read_events.fetch_add(1, Ordering::Relaxed);
180 return None;
181 }
182
183 if current - pos > self.capacity as u64 {
185 self.stale_read_events.fetch_add(1, Ordering::Relaxed);
186 return None;
187 }
188
189 let slot_idx = (pos as usize) % self.capacity;
190 let slot = self.slots[slot_idx].lock().unwrap();
191 match slot.as_ref() {
192 Some((tag, arc)) if *tag == pos => Some(Arc::clone(arc)),
193 _ => {
196 self.stale_read_events.fetch_add(1, Ordering::Relaxed);
197 self.skew_retry_events.fetch_add(1, Ordering::Relaxed);
198 None
199 }
200 }
201 }
202
203 pub fn len(&self) -> usize {
205 let pos = self.write_pos.load(Ordering::Acquire) as usize;
206 pos.min(self.capacity)
207 }
208
209 pub fn is_empty(&self) -> bool {
211 self.write_pos.load(Ordering::Acquire) == 0
212 }
213
214 pub fn capacity(&self) -> usize {
216 self.capacity
217 }
218
219 pub fn write_pos(&self) -> u64 {
221 self.write_pos.load(Ordering::Acquire)
222 }
223
224 pub fn oldest_retained_pos(&self) -> Option<u64> {
228 let current = self.write_pos();
229 if current == 0 {
230 return None;
231 }
232 let retained = current.min(self.capacity as u64);
233 Some(current - retained)
234 }
235
236 pub fn not_available_events(&self) -> u64 {
238 self.not_available_events.load(Ordering::Relaxed)
239 }
240
241 pub fn eviction_events(&self) -> u64 {
243 self.eviction_events.load(Ordering::Relaxed)
244 }
245
246 pub fn stale_read_events(&self) -> u64 {
248 self.stale_read_events.load(Ordering::Relaxed)
249 }
250
251 pub fn skew_retry_events(&self) -> u64 {
253 self.skew_retry_events.load(Ordering::Relaxed)
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use murk_arena::config::ArenaConfig;
261 use murk_arena::pingpong::PingPongArena;
262 use murk_arena::static_arena::StaticArena;
263 use murk_core::id::{FieldId, ParameterVersion, TickId};
264 use murk_core::traits::{FieldWriter as _, SnapshotAccess};
265 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
266
267 fn make_test_snapshot(tick: u64) -> OwnedSnapshot {
268 let cell_count = 10u32;
269 let config = ArenaConfig::new(cell_count);
270 let field_defs = vec![(
271 FieldId(0),
272 FieldDef {
273 name: "energy".into(),
274 field_type: FieldType::Scalar,
275 mutability: FieldMutability::PerTick,
276 units: None,
277 bounds: None,
278 boundary_behavior: BoundaryBehavior::Clamp,
279 },
280 )];
281 let static_arena = StaticArena::new(&[]).into_shared();
282 let mut arena = PingPongArena::new(config, field_defs, static_arena).unwrap();
283
284 {
285 let mut guard = arena.begin_tick().unwrap();
286 let data = guard.writer.write(FieldId(0)).unwrap();
287 data.fill(tick as f32);
288 }
289 arena.publish(TickId(tick), ParameterVersion(0)).unwrap();
290 arena.owned_snapshot()
291 }
292
293 #[test]
294 fn test_ring_new_empty() {
295 let ring = SnapshotRing::new(4);
296 assert_eq!(ring.len(), 0);
297 assert!(ring.is_empty());
298 assert_eq!(ring.capacity(), 4);
299 assert_eq!(ring.write_pos(), 0);
300 assert_eq!(ring.not_available_events(), 0);
301 assert_eq!(ring.eviction_events(), 0);
302 assert_eq!(ring.stale_read_events(), 0);
303 assert_eq!(ring.skew_retry_events(), 0);
304 assert!(ring.latest().is_none());
305 assert_eq!(ring.not_available_events(), 1);
306 }
307
308 #[test]
309 fn test_ring_push_and_latest() {
310 let ring = SnapshotRing::new(4);
311 ring.push(make_test_snapshot(1));
312 assert_eq!(ring.len(), 1);
313 assert!(!ring.is_empty());
314 assert_eq!(ring.not_available_events(), 0);
315
316 let latest = ring.latest().unwrap();
317 assert_eq!(latest.tick_id(), TickId(1));
318 assert_eq!(ring.not_available_events(), 0);
319 assert_eq!(ring.oldest_retained_pos(), Some(0));
320 }
321
322 #[test]
323 fn test_ring_peek_latest_does_not_increment_not_available_on_empty() {
324 let ring = SnapshotRing::new(4);
325 assert!(ring.peek_latest().is_none());
326 assert_eq!(ring.not_available_events(), 0);
327 assert_eq!(ring.skew_retry_events(), 0);
328 }
329
330 #[test]
331 fn test_ring_peek_latest_returns_snapshot_without_counter_mutation() {
332 let ring = SnapshotRing::new(4);
333 ring.push(make_test_snapshot(5));
334
335 let latest = ring.peek_latest().unwrap();
336 assert_eq!(latest.tick_id(), TickId(5));
337 assert_eq!(ring.not_available_events(), 0);
338 assert_eq!(ring.skew_retry_events(), 0);
339 }
340
341 #[test]
342 fn test_ring_eviction() {
343 let ring = SnapshotRing::new(4);
344
345 for i in 1..=4 {
347 let evicted = ring.push(make_test_snapshot(i));
348 assert!(evicted.is_none());
349 }
350 assert_eq!(ring.len(), 4);
351
352 let evicted = ring.push(make_test_snapshot(5));
354 assert!(evicted.is_some());
355 assert_eq!(evicted.unwrap().tick_id(), TickId(1));
356 assert_eq!(ring.len(), 4);
357 assert_eq!(ring.eviction_events(), 1);
358 assert_eq!(ring.oldest_retained_pos(), Some(1));
359 }
360
361 #[test]
362 fn test_ring_latest_is_newest() {
363 let ring = SnapshotRing::new(4);
364 for i in 1..=10 {
365 ring.push(make_test_snapshot(i));
366 }
367 let latest = ring.latest().unwrap();
368 assert_eq!(latest.tick_id(), TickId(10));
369 }
370
371 #[test]
372 fn test_ring_get_by_pos() {
373 let ring = SnapshotRing::new(4);
374 for i in 1..=4 {
375 ring.push(make_test_snapshot(i));
376 }
377
378 let snap = ring.get_by_pos(0).unwrap();
380 assert_eq!(snap.tick_id(), TickId(1));
381
382 let snap = ring.get_by_pos(3).unwrap();
383 assert_eq!(snap.tick_id(), TickId(4));
384
385 assert!(ring.get_by_pos(4).is_none());
387 assert_eq!(ring.stale_read_events(), 1);
388 }
389
390 #[test]
391 fn test_ring_get_evicted_returns_none() {
392 let ring = SnapshotRing::new(4);
393 for i in 1..=8 {
394 ring.push(make_test_snapshot(i));
395 }
396 assert!(ring.get_by_pos(0).is_none());
398 assert!(ring.get_by_pos(3).is_none());
399 assert_eq!(ring.stale_read_events(), 2);
400
401 let snap = ring.get_by_pos(4).unwrap();
403 assert_eq!(snap.tick_id(), TickId(5));
404 }
405
406 #[test]
407 #[should_panic(expected = "capacity must be >= 2")]
408 fn test_ring_capacity_panics_below_2() {
409 SnapshotRing::new(1);
410 }
411
412 #[test]
413 fn test_get_by_pos_returns_none_after_concurrent_overwrite() {
414 let ring = SnapshotRing::new(4);
418
419 for i in 1..=4 {
421 ring.push(make_test_snapshot(i));
422 }
423 assert_eq!(ring.get_by_pos(0).unwrap().tick_id(), TickId(1));
424
425 for i in 5..=8 {
427 ring.push(make_test_snapshot(i));
428 }
429
430 assert!(ring.get_by_pos(0).is_none());
433 assert!(ring.get_by_pos(3).is_none());
434
435 let snap = ring.get_by_pos(4).unwrap();
437 assert_eq!(snap.tick_id(), TickId(5));
438
439 let snap = ring.get_by_pos(7).unwrap();
441 assert_eq!(snap.tick_id(), TickId(8));
442 }
443
444 #[test]
445 fn test_get_by_pos_tag_matches_position() {
446 let ring = SnapshotRing::new(4);
449
450 for i in 1..=4 {
451 ring.push(make_test_snapshot(i * 10));
452 }
453
454 assert_eq!(ring.get_by_pos(0).unwrap().tick_id(), TickId(10));
456 assert_eq!(ring.get_by_pos(1).unwrap().tick_id(), TickId(20));
457 assert_eq!(ring.get_by_pos(2).unwrap().tick_id(), TickId(30));
458 assert_eq!(ring.get_by_pos(3).unwrap().tick_id(), TickId(40));
459 }
460
461 #[test]
464 fn test_producer_consumer_cross_thread() {
465 use crate::config::{BackoffConfig, WorldConfig};
466 use crate::tick::TickEngine;
467 use murk_space::{EdgeBehavior, Line1D};
468 use murk_test_utils::ConstPropagator;
469 use std::sync::atomic::{AtomicBool, Ordering};
470 use std::thread;
471
472 let config = WorldConfig {
473 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
474 fields: vec![FieldDef {
475 name: "energy".into(),
476 field_type: FieldType::Scalar,
477 mutability: FieldMutability::PerTick,
478 units: None,
479 bounds: None,
480 boundary_behavior: BoundaryBehavior::Clamp,
481 }],
482 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
483 dt: 0.1,
484 seed: 42,
485 ring_buffer_size: 8,
486 max_ingress_queue: 1024,
487 tick_rate_hz: None,
488 backoff: BackoffConfig::default(),
489 };
490 let mut engine = TickEngine::new(config).unwrap();
491
492 let ring = Arc::new(SnapshotRing::new(8));
493 let epoch_counter = Arc::new(crate::epoch::EpochCounter::new());
494 let producer_done = Arc::new(AtomicBool::new(false));
495
496 let ring_prod = Arc::clone(&ring);
498 let epoch_prod = Arc::clone(&epoch_counter);
499 let done_flag = Arc::clone(&producer_done);
500 let producer = thread::spawn(move || {
501 for _ in 0..100 {
502 engine.execute_tick().unwrap();
503 let snap = engine.owned_snapshot();
504 ring_prod.push(snap);
505 epoch_prod.advance();
506 }
507 done_flag.store(true, Ordering::Release);
508 });
509
510 let consumers: Vec<_> = (0..4)
512 .map(|id| {
513 let ring_c = Arc::clone(&ring);
514 let done_c = Arc::clone(&producer_done);
515 let worker = Arc::new(crate::epoch::WorkerEpoch::new(id));
516 thread::spawn(move || {
517 let mut reads = 0u64;
518 loop {
519 if let Some(snap) = ring_c.latest() {
520 let epoch = snap.tick_id().0;
521 worker.pin(epoch);
522 let data = snap.read_field(FieldId(0)).unwrap();
523 assert_eq!(data.len(), 10);
524 assert!(data.iter().all(|&v| v == 42.0));
525 worker.unpin();
526 reads += 1;
527 }
528 if done_c.load(Ordering::Acquire) && reads > 0 {
529 break;
530 }
531 thread::yield_now();
532 }
533 reads
534 })
535 })
536 .collect();
537
538 producer.join().unwrap();
539 let mut total_reads = 0u64;
540 for c in consumers {
541 let reads = c.join().unwrap();
542 assert!(reads > 0, "consumer should have read at least one snapshot");
543 total_reads += reads;
544 }
545
546 assert!(ring.len() <= 8);
548 assert_eq!(epoch_counter.current(), 100);
549 assert!(
550 total_reads >= 4,
551 "consumers collectively should have many reads"
552 );
553 }
554
555 #[test]
556 fn test_epoch_pin_unpin_cross_thread() {
557 use crate::epoch::WorkerEpoch;
558 use std::thread;
559
560 let workers: Vec<Arc<WorkerEpoch>> =
561 (0..4).map(|i| Arc::new(WorkerEpoch::new(i))).collect();
562
563 let handles: Vec<_> = workers
564 .iter()
565 .map(|worker| {
566 let worker = worker.clone();
567 thread::spawn(move || {
568 for epoch in 0..100u64 {
569 worker.pin(epoch);
570 assert!(worker.is_pinned());
571 assert_eq!(worker.pinned_epoch(), epoch);
572 worker.unpin();
573 assert!(!worker.is_pinned());
574 }
575 })
576 })
577 .collect();
578
579 for h in handles {
580 h.join().unwrap();
581 }
582
583 for w in &workers {
585 assert!(!w.is_pinned());
586 assert!(w.last_quiesce_ns() > 0);
587 }
588 }
589
590 #[test]
595 fn test_latest_never_spurious_none_under_contention() {
596 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
597 use std::thread;
598
599 let ring = Arc::new(SnapshotRing::new(2)); let producer_done = Arc::new(AtomicBool::new(false));
601 let spurious_nones = Arc::new(AtomicU64::new(0));
602
603 let ring_p = Arc::clone(&ring);
606 let done_flag = Arc::clone(&producer_done);
607 let producer = thread::spawn(move || {
608 for i in 1..=200u64 {
609 ring_p.push(make_test_snapshot(i));
610 }
611 done_flag.store(true, Ordering::Release);
612 });
613
614 let consumers: Vec<_> = (0..4)
616 .map(|_| {
617 let ring_c = Arc::clone(&ring);
618 let done_c = Arc::clone(&producer_done);
619 let nones = Arc::clone(&spurious_nones);
620 thread::spawn(move || {
621 let mut reads = 0u64;
622 loop {
623 if ring_c.write_pos() > 0 && ring_c.latest().is_none() {
626 nones.fetch_add(1, Ordering::Relaxed);
627 }
628 reads += 1;
629 if done_c.load(Ordering::Acquire) {
630 break;
631 }
632 }
634 reads
635 })
636 })
637 .collect();
638
639 producer.join().unwrap();
640 for c in consumers {
641 c.join().unwrap();
642 }
643
644 assert_eq!(
645 spurious_nones.load(Ordering::Relaxed),
646 0,
647 "latest() must never return None when the ring is non-empty"
648 );
649 }
650
651 #[test]
652 fn test_write_pos_monotonic_after_many_pushes() {
653 let ring = SnapshotRing::new(4);
654 for i in 1..=20u64 {
655 ring.push(make_test_snapshot(i));
656 assert_eq!(
657 ring.write_pos(),
658 i,
659 "write_pos should be {i} after {i} pushes"
660 );
661 }
662 }
663
664 #[test]
665 fn test_position_tag_matches_write_pos() {
666 let ring = SnapshotRing::new(4);
667 for i in 0..4u64 {
668 ring.push(make_test_snapshot(i + 1));
669 }
670 for i in 0..4u64 {
672 let snap = ring
673 .get_by_pos(i)
674 .unwrap_or_else(|| panic!("pos {i} should be retained"));
675 assert_eq!(snap.tick_id(), TickId(i + 1), "wrong tick at pos {i}");
676 }
677 }
678
679 #[test]
680 fn test_oldest_retained_pos_tracks_eviction_boundary() {
681 let ring = SnapshotRing::new(4);
682 assert_eq!(ring.oldest_retained_pos(), None);
683
684 ring.push(make_test_snapshot(1));
685 assert_eq!(ring.oldest_retained_pos(), Some(0));
686
687 for i in 2..=4 {
688 ring.push(make_test_snapshot(i));
689 }
690 assert_eq!(ring.oldest_retained_pos(), Some(0));
691
692 ring.push(make_test_snapshot(5));
694 assert_eq!(ring.oldest_retained_pos(), Some(1));
695
696 ring.push(make_test_snapshot(6));
698 assert_eq!(ring.oldest_retained_pos(), Some(2));
699 }
700
701 #[test]
702 fn test_counter_monotonicity_after_wraparound() {
703 let ring = SnapshotRing::new(3);
704 for i in 1..=10u64 {
706 ring.push(make_test_snapshot(i));
707 }
708 assert_eq!(ring.write_pos(), 10);
710 for i in 0..7u64 {
712 assert!(
713 ring.get_by_pos(i).is_none(),
714 "pos {i} should be evicted (retained window is 7..10)"
715 );
716 }
717 for i in 7..10u64 {
719 let snap = ring
720 .get_by_pos(i)
721 .unwrap_or_else(|| panic!("pos {i} should be retained"));
722 assert_eq!(snap.tick_id(), TickId(i + 1));
723 }
724 }
725
726 #[test]
727 fn test_overwrite_detection_returns_correct_snapshot() {
728 let ring = SnapshotRing::new(3);
729 for i in 1..=3u64 {
731 ring.push(make_test_snapshot(i));
732 }
733 ring.push(make_test_snapshot(4));
735
736 assert!(ring.get_by_pos(0).is_none(), "pos 0 should be evicted");
738
739 let snap = ring.get_by_pos(3).unwrap();
741 assert_eq!(snap.tick_id(), TickId(4));
742
743 let snap = ring.get_by_pos(1).unwrap();
745 assert_eq!(snap.tick_id(), TickId(2));
746 }
747}