1use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9
10use murk_arena::OwnedSnapshot;
11
12type Slot = Option<(u64, Arc<OwnedSnapshot>)>;
15
16pub struct SnapshotRing {
26 slots: Vec<Mutex<Slot>>,
31 write_pos: AtomicU64,
32 capacity: usize,
33}
34
35const _: fn() = || {
37 fn assert<T: Send + Sync>() {}
38 assert::<SnapshotRing>();
39};
40
41impl SnapshotRing {
42 pub fn new(capacity: usize) -> Self {
49 assert!(
50 capacity >= 2,
51 "SnapshotRing capacity must be >= 2, got {capacity}"
52 );
53 let slots = (0..capacity).map(|_| Mutex::new(None)).collect();
54 Self {
55 slots,
56 write_pos: AtomicU64::new(0),
57 capacity,
58 }
59 }
60
61 pub fn push(&self, snapshot: OwnedSnapshot) -> Option<Arc<OwnedSnapshot>> {
66 let pos = self.write_pos.load(Ordering::Relaxed);
67 let slot_idx = (pos as usize) % self.capacity;
68
69 let arc = Arc::new(snapshot);
70 let evicted = {
71 let mut slot = self.slots[slot_idx].lock().unwrap();
72 let prev = slot.take().map(|(_tag, arc)| arc);
73 *slot = Some((pos, Arc::clone(&arc)));
74 prev
75 };
76
77 self.write_pos.store(pos + 1, Ordering::Release);
80
81 evicted
82 }
83
84 pub fn latest(&self) -> Option<Arc<OwnedSnapshot>> {
92 for _ in 0..self.capacity {
98 let pos = self.write_pos.load(Ordering::Acquire);
99 if pos == 0 {
100 return None;
101 }
102 let target_pos = pos - 1;
103 let slot_idx = (target_pos as usize) % self.capacity;
104 let slot = self.slots[slot_idx].lock().unwrap();
105 match slot.as_ref() {
106 Some((tag, arc)) if *tag == target_pos => return Some(Arc::clone(arc)),
107 _ => continue,
111 }
112 }
113 None
116 }
117
118 pub fn get_by_pos(&self, pos: u64) -> Option<Arc<OwnedSnapshot>> {
123 let current = self.write_pos.load(Ordering::Acquire);
124
125 if pos >= current {
127 return None;
128 }
129
130 if current - pos > self.capacity as u64 {
132 return None;
133 }
134
135 let slot_idx = (pos as usize) % self.capacity;
136 let slot = self.slots[slot_idx].lock().unwrap();
137 match slot.as_ref() {
138 Some((tag, arc)) if *tag == pos => Some(Arc::clone(arc)),
139 _ => None,
142 }
143 }
144
145 pub fn len(&self) -> usize {
147 let pos = self.write_pos.load(Ordering::Acquire) as usize;
148 pos.min(self.capacity)
149 }
150
151 pub fn is_empty(&self) -> bool {
153 self.write_pos.load(Ordering::Acquire) == 0
154 }
155
156 pub fn capacity(&self) -> usize {
158 self.capacity
159 }
160
161 pub fn write_pos(&self) -> u64 {
163 self.write_pos.load(Ordering::Acquire)
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use murk_arena::config::ArenaConfig;
171 use murk_arena::pingpong::PingPongArena;
172 use murk_arena::static_arena::StaticArena;
173 use murk_core::id::{FieldId, ParameterVersion, TickId};
174 use murk_core::traits::{FieldWriter as _, SnapshotAccess};
175 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
176
177 fn make_test_snapshot(tick: u64) -> OwnedSnapshot {
178 let cell_count = 10u32;
179 let config = ArenaConfig::new(cell_count);
180 let field_defs = vec![(
181 FieldId(0),
182 FieldDef {
183 name: "energy".into(),
184 field_type: FieldType::Scalar,
185 mutability: FieldMutability::PerTick,
186 units: None,
187 bounds: None,
188 boundary_behavior: BoundaryBehavior::Clamp,
189 },
190 )];
191 let static_arena = StaticArena::new(&[]).into_shared();
192 let mut arena = PingPongArena::new(config, field_defs, static_arena).unwrap();
193
194 {
195 let mut guard = arena.begin_tick().unwrap();
196 let data = guard.writer.write(FieldId(0)).unwrap();
197 data.fill(tick as f32);
198 }
199 arena.publish(TickId(tick), ParameterVersion(0));
200 arena.owned_snapshot()
201 }
202
203 #[test]
204 fn test_ring_new_empty() {
205 let ring = SnapshotRing::new(4);
206 assert_eq!(ring.len(), 0);
207 assert!(ring.is_empty());
208 assert_eq!(ring.capacity(), 4);
209 assert_eq!(ring.write_pos(), 0);
210 assert!(ring.latest().is_none());
211 }
212
213 #[test]
214 fn test_ring_push_and_latest() {
215 let ring = SnapshotRing::new(4);
216 ring.push(make_test_snapshot(1));
217 assert_eq!(ring.len(), 1);
218 assert!(!ring.is_empty());
219
220 let latest = ring.latest().unwrap();
221 assert_eq!(latest.tick_id(), TickId(1));
222 }
223
224 #[test]
225 fn test_ring_eviction() {
226 let ring = SnapshotRing::new(4);
227
228 for i in 1..=4 {
230 let evicted = ring.push(make_test_snapshot(i));
231 assert!(evicted.is_none());
232 }
233 assert_eq!(ring.len(), 4);
234
235 let evicted = ring.push(make_test_snapshot(5));
237 assert!(evicted.is_some());
238 assert_eq!(evicted.unwrap().tick_id(), TickId(1));
239 assert_eq!(ring.len(), 4);
240 }
241
242 #[test]
243 fn test_ring_latest_is_newest() {
244 let ring = SnapshotRing::new(4);
245 for i in 1..=10 {
246 ring.push(make_test_snapshot(i));
247 }
248 let latest = ring.latest().unwrap();
249 assert_eq!(latest.tick_id(), TickId(10));
250 }
251
252 #[test]
253 fn test_ring_get_by_pos() {
254 let ring = SnapshotRing::new(4);
255 for i in 1..=4 {
256 ring.push(make_test_snapshot(i));
257 }
258
259 let snap = ring.get_by_pos(0).unwrap();
261 assert_eq!(snap.tick_id(), TickId(1));
262
263 let snap = ring.get_by_pos(3).unwrap();
264 assert_eq!(snap.tick_id(), TickId(4));
265
266 assert!(ring.get_by_pos(4).is_none());
268 }
269
270 #[test]
271 fn test_ring_get_evicted_returns_none() {
272 let ring = SnapshotRing::new(4);
273 for i in 1..=8 {
274 ring.push(make_test_snapshot(i));
275 }
276 assert!(ring.get_by_pos(0).is_none());
278 assert!(ring.get_by_pos(3).is_none());
279
280 let snap = ring.get_by_pos(4).unwrap();
282 assert_eq!(snap.tick_id(), TickId(5));
283 }
284
285 #[test]
286 #[should_panic(expected = "capacity must be >= 2")]
287 fn test_ring_capacity_panics_below_2() {
288 SnapshotRing::new(1);
289 }
290
291 #[test]
292 fn test_get_by_pos_returns_none_after_concurrent_overwrite() {
293 let ring = SnapshotRing::new(4);
297
298 for i in 1..=4 {
300 ring.push(make_test_snapshot(i));
301 }
302 assert_eq!(ring.get_by_pos(0).unwrap().tick_id(), TickId(1));
303
304 for i in 5..=8 {
306 ring.push(make_test_snapshot(i));
307 }
308
309 assert!(ring.get_by_pos(0).is_none());
312 assert!(ring.get_by_pos(3).is_none());
313
314 let snap = ring.get_by_pos(4).unwrap();
316 assert_eq!(snap.tick_id(), TickId(5));
317
318 let snap = ring.get_by_pos(7).unwrap();
320 assert_eq!(snap.tick_id(), TickId(8));
321 }
322
323 #[test]
324 fn test_get_by_pos_tag_matches_position() {
325 let ring = SnapshotRing::new(4);
328
329 for i in 1..=4 {
330 ring.push(make_test_snapshot(i * 10));
331 }
332
333 assert_eq!(ring.get_by_pos(0).unwrap().tick_id(), TickId(10));
335 assert_eq!(ring.get_by_pos(1).unwrap().tick_id(), TickId(20));
336 assert_eq!(ring.get_by_pos(2).unwrap().tick_id(), TickId(30));
337 assert_eq!(ring.get_by_pos(3).unwrap().tick_id(), TickId(40));
338 }
339
340 #[test]
343 fn test_producer_consumer_cross_thread() {
344 use crate::config::{BackoffConfig, WorldConfig};
345 use crate::tick::TickEngine;
346 use murk_space::{EdgeBehavior, Line1D};
347 use murk_test_utils::ConstPropagator;
348 use std::sync::atomic::{AtomicBool, Ordering};
349 use std::thread;
350
351 let config = WorldConfig {
352 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
353 fields: vec![FieldDef {
354 name: "energy".into(),
355 field_type: FieldType::Scalar,
356 mutability: FieldMutability::PerTick,
357 units: None,
358 bounds: None,
359 boundary_behavior: BoundaryBehavior::Clamp,
360 }],
361 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
362 dt: 0.1,
363 seed: 42,
364 ring_buffer_size: 8,
365 max_ingress_queue: 1024,
366 tick_rate_hz: None,
367 backoff: BackoffConfig::default(),
368 };
369 let mut engine = TickEngine::new(config).unwrap();
370
371 let ring = Arc::new(SnapshotRing::new(8));
372 let epoch_counter = Arc::new(crate::epoch::EpochCounter::new());
373 let producer_done = Arc::new(AtomicBool::new(false));
374
375 let ring_prod = Arc::clone(&ring);
377 let epoch_prod = Arc::clone(&epoch_counter);
378 let done_flag = Arc::clone(&producer_done);
379 let producer = thread::spawn(move || {
380 for _ in 0..100 {
381 engine.execute_tick().unwrap();
382 let snap = engine.owned_snapshot();
383 ring_prod.push(snap);
384 epoch_prod.advance();
385 }
386 done_flag.store(true, Ordering::Release);
387 });
388
389 let consumers: Vec<_> = (0..4)
391 .map(|id| {
392 let ring_c = Arc::clone(&ring);
393 let done_c = Arc::clone(&producer_done);
394 let worker = Arc::new(crate::epoch::WorkerEpoch::new(id));
395 thread::spawn(move || {
396 let mut reads = 0u64;
397 loop {
398 if let Some(snap) = ring_c.latest() {
399 let epoch = snap.tick_id().0;
400 worker.pin(epoch);
401 let data = snap.read_field(FieldId(0)).unwrap();
402 assert_eq!(data.len(), 10);
403 assert!(data.iter().all(|&v| v == 42.0));
404 worker.unpin();
405 reads += 1;
406 }
407 if done_c.load(Ordering::Acquire) && reads > 0 {
408 break;
409 }
410 thread::yield_now();
411 }
412 reads
413 })
414 })
415 .collect();
416
417 producer.join().unwrap();
418 let mut total_reads = 0u64;
419 for c in consumers {
420 let reads = c.join().unwrap();
421 assert!(reads > 0, "consumer should have read at least one snapshot");
422 total_reads += reads;
423 }
424
425 assert!(ring.len() <= 8);
427 assert_eq!(epoch_counter.current(), 100);
428 assert!(
429 total_reads >= 4,
430 "consumers collectively should have many reads"
431 );
432 }
433
434 #[test]
435 fn test_epoch_pin_unpin_cross_thread() {
436 use crate::epoch::WorkerEpoch;
437 use std::thread;
438
439 let workers: Vec<Arc<WorkerEpoch>> =
440 (0..4).map(|i| Arc::new(WorkerEpoch::new(i))).collect();
441
442 let handles: Vec<_> = workers
443 .iter()
444 .map(|worker| {
445 let worker = worker.clone();
446 thread::spawn(move || {
447 for epoch in 0..100u64 {
448 worker.pin(epoch);
449 assert!(worker.is_pinned());
450 assert_eq!(worker.pinned_epoch(), epoch);
451 worker.unpin();
452 assert!(!worker.is_pinned());
453 }
454 })
455 })
456 .collect();
457
458 for h in handles {
459 h.join().unwrap();
460 }
461
462 for w in &workers {
464 assert!(!w.is_pinned());
465 assert!(w.last_quiesce_ns() > 0);
466 }
467 }
468}