1use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9
10use murk_arena::OwnedSnapshot;
11
12type Slot = Option<(u64, Arc<OwnedSnapshot>)>;
15
16pub struct SnapshotRing {
26 slots: Vec<Mutex<Slot>>,
31 write_pos: AtomicU64,
32 capacity: usize,
33}
34
35const _: fn() = || {
37 fn assert<T: Send + Sync>() {}
38 assert::<SnapshotRing>();
39};
40
41impl SnapshotRing {
42 pub fn new(capacity: usize) -> Self {
49 assert!(
50 capacity >= 2,
51 "SnapshotRing capacity must be >= 2, got {capacity}"
52 );
53 let slots = (0..capacity).map(|_| Mutex::new(None)).collect();
54 Self {
55 slots,
56 write_pos: AtomicU64::new(0),
57 capacity,
58 }
59 }
60
61 pub fn push(&self, snapshot: OwnedSnapshot) -> Option<Arc<OwnedSnapshot>> {
66 let pos = self.write_pos.load(Ordering::Relaxed);
67 let slot_idx = (pos as usize) % self.capacity;
68
69 let arc = Arc::new(snapshot);
70 let evicted = {
71 let mut slot = self.slots[slot_idx].lock().unwrap();
72 let prev = slot.take().map(|(_tag, arc)| arc);
73 *slot = Some((pos, Arc::clone(&arc)));
74 prev
75 };
76
77 self.write_pos.store(pos + 1, Ordering::Release);
80
81 evicted
82 }
83
84 pub fn latest(&self) -> Option<Arc<OwnedSnapshot>> {
93 for _ in 0..self.capacity {
96 let pos = self.write_pos.load(Ordering::Acquire);
97 if pos == 0 {
98 return None;
99 }
100 let target_pos = pos - 1;
101 let slot_idx = (target_pos as usize) % self.capacity;
102 let slot = self.slots[slot_idx].lock().unwrap();
103 match slot.as_ref() {
104 Some((tag, arc)) if *tag == target_pos => return Some(Arc::clone(arc)),
105 _ => continue,
109 }
110 }
111
112 let mut best: Option<(u64, Arc<OwnedSnapshot>)> = None;
119 for slot_mutex in &self.slots {
120 let slot = slot_mutex.lock().unwrap();
121 if let Some((tag, arc)) = slot.as_ref() {
122 let dominated = best.as_ref().is_some_and(|(best_tag, _)| *best_tag >= *tag);
123 if !dominated {
124 best = Some((*tag, Arc::clone(arc)));
125 }
126 }
127 }
128 best.map(|(_, arc)| arc)
129 }
130
131 pub fn get_by_pos(&self, pos: u64) -> Option<Arc<OwnedSnapshot>> {
136 let current = self.write_pos.load(Ordering::Acquire);
137
138 if pos >= current {
140 return None;
141 }
142
143 if current - pos > self.capacity as u64 {
145 return None;
146 }
147
148 let slot_idx = (pos as usize) % self.capacity;
149 let slot = self.slots[slot_idx].lock().unwrap();
150 match slot.as_ref() {
151 Some((tag, arc)) if *tag == pos => Some(Arc::clone(arc)),
152 _ => None,
155 }
156 }
157
158 pub fn len(&self) -> usize {
160 let pos = self.write_pos.load(Ordering::Acquire) as usize;
161 pos.min(self.capacity)
162 }
163
164 pub fn is_empty(&self) -> bool {
166 self.write_pos.load(Ordering::Acquire) == 0
167 }
168
169 pub fn capacity(&self) -> usize {
171 self.capacity
172 }
173
174 pub fn write_pos(&self) -> u64 {
176 self.write_pos.load(Ordering::Acquire)
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use murk_arena::config::ArenaConfig;
184 use murk_arena::pingpong::PingPongArena;
185 use murk_arena::static_arena::StaticArena;
186 use murk_core::id::{FieldId, ParameterVersion, TickId};
187 use murk_core::traits::{FieldWriter as _, SnapshotAccess};
188 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
189
190 fn make_test_snapshot(tick: u64) -> OwnedSnapshot {
191 let cell_count = 10u32;
192 let config = ArenaConfig::new(cell_count);
193 let field_defs = vec![(
194 FieldId(0),
195 FieldDef {
196 name: "energy".into(),
197 field_type: FieldType::Scalar,
198 mutability: FieldMutability::PerTick,
199 units: None,
200 bounds: None,
201 boundary_behavior: BoundaryBehavior::Clamp,
202 },
203 )];
204 let static_arena = StaticArena::new(&[]).into_shared();
205 let mut arena = PingPongArena::new(config, field_defs, static_arena).unwrap();
206
207 {
208 let mut guard = arena.begin_tick().unwrap();
209 let data = guard.writer.write(FieldId(0)).unwrap();
210 data.fill(tick as f32);
211 }
212 arena.publish(TickId(tick), ParameterVersion(0)).unwrap();
213 arena.owned_snapshot()
214 }
215
216 #[test]
217 fn test_ring_new_empty() {
218 let ring = SnapshotRing::new(4);
219 assert_eq!(ring.len(), 0);
220 assert!(ring.is_empty());
221 assert_eq!(ring.capacity(), 4);
222 assert_eq!(ring.write_pos(), 0);
223 assert!(ring.latest().is_none());
224 }
225
226 #[test]
227 fn test_ring_push_and_latest() {
228 let ring = SnapshotRing::new(4);
229 ring.push(make_test_snapshot(1));
230 assert_eq!(ring.len(), 1);
231 assert!(!ring.is_empty());
232
233 let latest = ring.latest().unwrap();
234 assert_eq!(latest.tick_id(), TickId(1));
235 }
236
237 #[test]
238 fn test_ring_eviction() {
239 let ring = SnapshotRing::new(4);
240
241 for i in 1..=4 {
243 let evicted = ring.push(make_test_snapshot(i));
244 assert!(evicted.is_none());
245 }
246 assert_eq!(ring.len(), 4);
247
248 let evicted = ring.push(make_test_snapshot(5));
250 assert!(evicted.is_some());
251 assert_eq!(evicted.unwrap().tick_id(), TickId(1));
252 assert_eq!(ring.len(), 4);
253 }
254
255 #[test]
256 fn test_ring_latest_is_newest() {
257 let ring = SnapshotRing::new(4);
258 for i in 1..=10 {
259 ring.push(make_test_snapshot(i));
260 }
261 let latest = ring.latest().unwrap();
262 assert_eq!(latest.tick_id(), TickId(10));
263 }
264
265 #[test]
266 fn test_ring_get_by_pos() {
267 let ring = SnapshotRing::new(4);
268 for i in 1..=4 {
269 ring.push(make_test_snapshot(i));
270 }
271
272 let snap = ring.get_by_pos(0).unwrap();
274 assert_eq!(snap.tick_id(), TickId(1));
275
276 let snap = ring.get_by_pos(3).unwrap();
277 assert_eq!(snap.tick_id(), TickId(4));
278
279 assert!(ring.get_by_pos(4).is_none());
281 }
282
283 #[test]
284 fn test_ring_get_evicted_returns_none() {
285 let ring = SnapshotRing::new(4);
286 for i in 1..=8 {
287 ring.push(make_test_snapshot(i));
288 }
289 assert!(ring.get_by_pos(0).is_none());
291 assert!(ring.get_by_pos(3).is_none());
292
293 let snap = ring.get_by_pos(4).unwrap();
295 assert_eq!(snap.tick_id(), TickId(5));
296 }
297
298 #[test]
299 #[should_panic(expected = "capacity must be >= 2")]
300 fn test_ring_capacity_panics_below_2() {
301 SnapshotRing::new(1);
302 }
303
304 #[test]
305 fn test_get_by_pos_returns_none_after_concurrent_overwrite() {
306 let ring = SnapshotRing::new(4);
310
311 for i in 1..=4 {
313 ring.push(make_test_snapshot(i));
314 }
315 assert_eq!(ring.get_by_pos(0).unwrap().tick_id(), TickId(1));
316
317 for i in 5..=8 {
319 ring.push(make_test_snapshot(i));
320 }
321
322 assert!(ring.get_by_pos(0).is_none());
325 assert!(ring.get_by_pos(3).is_none());
326
327 let snap = ring.get_by_pos(4).unwrap();
329 assert_eq!(snap.tick_id(), TickId(5));
330
331 let snap = ring.get_by_pos(7).unwrap();
333 assert_eq!(snap.tick_id(), TickId(8));
334 }
335
336 #[test]
337 fn test_get_by_pos_tag_matches_position() {
338 let ring = SnapshotRing::new(4);
341
342 for i in 1..=4 {
343 ring.push(make_test_snapshot(i * 10));
344 }
345
346 assert_eq!(ring.get_by_pos(0).unwrap().tick_id(), TickId(10));
348 assert_eq!(ring.get_by_pos(1).unwrap().tick_id(), TickId(20));
349 assert_eq!(ring.get_by_pos(2).unwrap().tick_id(), TickId(30));
350 assert_eq!(ring.get_by_pos(3).unwrap().tick_id(), TickId(40));
351 }
352
353 #[test]
356 fn test_producer_consumer_cross_thread() {
357 use crate::config::{BackoffConfig, WorldConfig};
358 use crate::tick::TickEngine;
359 use murk_space::{EdgeBehavior, Line1D};
360 use murk_test_utils::ConstPropagator;
361 use std::sync::atomic::{AtomicBool, Ordering};
362 use std::thread;
363
364 let config = WorldConfig {
365 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
366 fields: vec![FieldDef {
367 name: "energy".into(),
368 field_type: FieldType::Scalar,
369 mutability: FieldMutability::PerTick,
370 units: None,
371 bounds: None,
372 boundary_behavior: BoundaryBehavior::Clamp,
373 }],
374 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
375 dt: 0.1,
376 seed: 42,
377 ring_buffer_size: 8,
378 max_ingress_queue: 1024,
379 tick_rate_hz: None,
380 backoff: BackoffConfig::default(),
381 };
382 let mut engine = TickEngine::new(config).unwrap();
383
384 let ring = Arc::new(SnapshotRing::new(8));
385 let epoch_counter = Arc::new(crate::epoch::EpochCounter::new());
386 let producer_done = Arc::new(AtomicBool::new(false));
387
388 let ring_prod = Arc::clone(&ring);
390 let epoch_prod = Arc::clone(&epoch_counter);
391 let done_flag = Arc::clone(&producer_done);
392 let producer = thread::spawn(move || {
393 for _ in 0..100 {
394 engine.execute_tick().unwrap();
395 let snap = engine.owned_snapshot();
396 ring_prod.push(snap);
397 epoch_prod.advance();
398 }
399 done_flag.store(true, Ordering::Release);
400 });
401
402 let consumers: Vec<_> = (0..4)
404 .map(|id| {
405 let ring_c = Arc::clone(&ring);
406 let done_c = Arc::clone(&producer_done);
407 let worker = Arc::new(crate::epoch::WorkerEpoch::new(id));
408 thread::spawn(move || {
409 let mut reads = 0u64;
410 loop {
411 if let Some(snap) = ring_c.latest() {
412 let epoch = snap.tick_id().0;
413 worker.pin(epoch);
414 let data = snap.read_field(FieldId(0)).unwrap();
415 assert_eq!(data.len(), 10);
416 assert!(data.iter().all(|&v| v == 42.0));
417 worker.unpin();
418 reads += 1;
419 }
420 if done_c.load(Ordering::Acquire) && reads > 0 {
421 break;
422 }
423 thread::yield_now();
424 }
425 reads
426 })
427 })
428 .collect();
429
430 producer.join().unwrap();
431 let mut total_reads = 0u64;
432 for c in consumers {
433 let reads = c.join().unwrap();
434 assert!(reads > 0, "consumer should have read at least one snapshot");
435 total_reads += reads;
436 }
437
438 assert!(ring.len() <= 8);
440 assert_eq!(epoch_counter.current(), 100);
441 assert!(
442 total_reads >= 4,
443 "consumers collectively should have many reads"
444 );
445 }
446
447 #[test]
448 fn test_epoch_pin_unpin_cross_thread() {
449 use crate::epoch::WorkerEpoch;
450 use std::thread;
451
452 let workers: Vec<Arc<WorkerEpoch>> =
453 (0..4).map(|i| Arc::new(WorkerEpoch::new(i))).collect();
454
455 let handles: Vec<_> = workers
456 .iter()
457 .map(|worker| {
458 let worker = worker.clone();
459 thread::spawn(move || {
460 for epoch in 0..100u64 {
461 worker.pin(epoch);
462 assert!(worker.is_pinned());
463 assert_eq!(worker.pinned_epoch(), epoch);
464 worker.unpin();
465 assert!(!worker.is_pinned());
466 }
467 })
468 })
469 .collect();
470
471 for h in handles {
472 h.join().unwrap();
473 }
474
475 for w in &workers {
477 assert!(!w.is_pinned());
478 assert!(w.last_quiesce_ns() > 0);
479 }
480 }
481
482 #[test]
487 fn test_latest_never_spurious_none_under_contention() {
488 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
489 use std::thread;
490
491 let ring = Arc::new(SnapshotRing::new(2)); let producer_done = Arc::new(AtomicBool::new(false));
493 let spurious_nones = Arc::new(AtomicU64::new(0));
494
495 let ring_p = Arc::clone(&ring);
498 let done_flag = Arc::clone(&producer_done);
499 let producer = thread::spawn(move || {
500 for i in 1..=200u64 {
501 ring_p.push(make_test_snapshot(i));
502 }
503 done_flag.store(true, Ordering::Release);
504 });
505
506 let consumers: Vec<_> = (0..4)
508 .map(|_| {
509 let ring_c = Arc::clone(&ring);
510 let done_c = Arc::clone(&producer_done);
511 let nones = Arc::clone(&spurious_nones);
512 thread::spawn(move || {
513 let mut reads = 0u64;
514 loop {
515 if ring_c.write_pos() > 0 && ring_c.latest().is_none() {
518 nones.fetch_add(1, Ordering::Relaxed);
519 }
520 reads += 1;
521 if done_c.load(Ordering::Acquire) {
522 break;
523 }
524 }
526 reads
527 })
528 })
529 .collect();
530
531 producer.join().unwrap();
532 for c in consumers {
533 c.join().unwrap();
534 }
535
536 assert_eq!(
537 spurious_nones.load(Ordering::Relaxed),
538 0,
539 "latest() must never return None when the ring is non-empty"
540 );
541 }
542}