Skip to main content

murk_engine/
lockstep.rs

1//! Lockstep (synchronous) simulation world.
2//!
3//! [`LockstepWorld`] is the primary user-facing API for running simulations
4//! in lockstep mode. Each call to [`step_sync()`](LockstepWorld::step_sync)
5//! submits commands, executes one tick of the propagator pipeline, and
6//! returns a snapshot of the resulting world state.
7//!
8//! # Ownership model
9//!
10//! `LockstepWorld` is [`Send`] (can be moved between threads) but not
11//! [`Sync`] (cannot be shared). All mutating methods take `&mut self`,
12//! and [`step_sync()`](LockstepWorld::step_sync) returns a [`Snapshot`]
13//! that borrows from `self`. This means the caller cannot call `step_sync()`
14//! while holding a snapshot reference — the borrow checker enforces
15//! aliasing prevention at compile time.
16//!
17//! # Shutdown
18//!
19//! Dropping a `LockstepWorld` reclaims all arena memory. Since `&mut self`
20//! guarantees no outstanding borrows at drop time, cleanup is always safe
21//! (Decision E). No background threads are involved.
22
23use murk_arena::read::Snapshot;
24use murk_core::command::{Command, Receipt};
25use murk_core::id::TickId;
26
27use crate::config::{ConfigError, WorldConfig};
28use crate::metrics::StepMetrics;
29use crate::tick::{TickEngine, TickError};
30
31// Compile-time assertion: LockstepWorld is Send but NOT Sync.
32// (dyn Propagator is Send + !Sync, which is the design intent.)
33// Fails to compile if any field is !Send.
34const _: () = {
35    #[allow(dead_code)]
36    fn assert_send<T: Send>() {}
37    #[allow(dead_code)]
38    fn check() {
39        assert_send::<LockstepWorld>();
40    }
41};
42
43// ── StepResult ──────────────────────────────────────────────────
44
45/// Result of a successful [`LockstepWorld::step_sync()`] call.
46pub struct StepResult<'w> {
47    /// Read-only snapshot of world state after this tick.
48    pub snapshot: Snapshot<'w>,
49    /// Per-command receipts from both submission and tick execution.
50    ///
51    /// Includes submission-rejected receipts (e.g. `QueueFull`, `TickDisabled`)
52    /// followed by tick execution receipts (applied, expired, rolled back).
53    /// In lockstep mode the queue is drained every tick, so rejection is rare.
54    pub receipts: Vec<Receipt>,
55    /// Performance metrics for this tick.
56    pub metrics: StepMetrics,
57}
58
59// ── LockstepWorld ───────────────────────────────────────────────
60
61/// Single-threaded simulation world for lockstep (synchronous) execution.
62///
63/// Created from a [`WorldConfig`] via [`new()`](LockstepWorld::new).
64/// Each [`step_sync()`](LockstepWorld::step_sync) call runs one complete
65/// tick: submit commands → drain ingress → run propagator pipeline →
66/// publish snapshot → return results.
67///
68/// # Example
69///
70/// ```ignore
71/// let world = LockstepWorld::new(config)?;
72/// for _ in 0..1000 {
73///     let result = world.step_sync(commands)?;
74///     let obs = result.snapshot.read(field_id);
75/// }
76/// ```
77pub struct LockstepWorld {
78    engine: TickEngine,
79    seed: u64,
80}
81
82impl LockstepWorld {
83    /// Create a new lockstep world from a [`WorldConfig`].
84    ///
85    /// Validates the configuration, builds the read resolution plan,
86    /// constructs the arena, and returns a ready-to-step world.
87    /// Consumes the `WorldConfig`.
88    pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
89        let seed = config.seed;
90        Ok(Self {
91            engine: TickEngine::new(config)?,
92            seed,
93        })
94    }
95
96    /// Execute one tick synchronously.
97    ///
98    /// Submits `commands` to the ingress queue, runs the full propagator
99    /// pipeline, publishes the new snapshot, and returns a [`StepResult`]
100    /// containing the snapshot reference, receipts, and metrics.
101    ///
102    /// The returned [`Snapshot`] borrows from `self`, preventing the caller
103    /// from calling `step_sync()` again until the snapshot is dropped.
104    ///
105    /// # Errors
106    ///
107    /// Returns [`TickError`] if a propagator fails (tick is rolled back
108    /// atomically) or if ticking is disabled after consecutive rollbacks.
109    /// On rollback, the error's `receipts` field contains per-command
110    /// rollback receipts plus any submission rejections.
111    pub fn step_sync(&mut self, commands: Vec<Command>) -> Result<StepResult<'_>, TickError> {
112        let submit_receipts = self.engine.submit_commands(commands);
113
114        // Collect submission-rejected receipts (QueueFull, TickDisabled).
115        // Accepted commands get their final receipts from execute_tick.
116        let rejected: Vec<Receipt> = submit_receipts
117            .into_iter()
118            .filter(|r| !r.accepted)
119            .collect();
120
121        match self.engine.execute_tick() {
122            Ok(tick_result) => {
123                let mut receipts = rejected;
124                receipts.extend(tick_result.receipts);
125                Ok(StepResult {
126                    snapshot: self.engine.snapshot(),
127                    receipts,
128                    metrics: tick_result.metrics,
129                })
130            }
131            Err(mut tick_error) => {
132                let mut receipts = rejected;
133                receipts.append(&mut tick_error.receipts);
134                Err(TickError {
135                    kind: tick_error.kind,
136                    receipts,
137                })
138            }
139        }
140    }
141
142    /// Reset the world to tick 0 with a new seed.
143    ///
144    /// Reclaims all arena memory, clears the ingress queue, and resets
145    /// all counters. Returns a snapshot of the initial (zeroed) state.
146    ///
147    /// The `seed` is stored for future use when propagators support
148    /// seeded RNG. Currently all runs are fully deterministic regardless
149    /// of seed.
150    pub fn reset(&mut self, seed: u64) -> Result<Snapshot<'_>, ConfigError> {
151        self.engine.reset()?;
152        self.seed = seed;
153        Ok(self.engine.snapshot())
154    }
155
156    /// Get a read-only snapshot of the current published generation.
157    pub fn snapshot(&self) -> Snapshot<'_> {
158        self.engine.snapshot()
159    }
160
161    /// Current tick ID (0 after construction or reset).
162    pub fn current_tick(&self) -> TickId {
163        self.engine.current_tick()
164    }
165
166    /// Whether ticking is disabled due to consecutive rollbacks.
167    pub fn is_tick_disabled(&self) -> bool {
168        self.engine.is_tick_disabled()
169    }
170
171    /// Metrics from the most recent successful tick.
172    pub fn last_metrics(&self) -> &StepMetrics {
173        self.engine.last_metrics()
174    }
175
176    /// The current simulation seed.
177    pub fn seed(&self) -> u64 {
178        self.seed
179    }
180
181    /// Number of consecutive rollbacks since the last successful tick.
182    pub fn consecutive_rollback_count(&self) -> u32 {
183        self.engine.consecutive_rollback_count()
184    }
185
186    /// The spatial topology for this world.
187    pub fn space(&self) -> &dyn murk_space::Space {
188        self.engine.space()
189    }
190
191    /// Number of command batches currently queued for the next tick.
192    pub fn ingress_queue_depth(&self) -> usize {
193        self.engine.ingress_queue_depth()
194    }
195
196    /// Maximum number of command batches the ingress queue can hold.
197    pub fn ingress_queue_capacity(&self) -> usize {
198        self.engine.ingress_queue_capacity()
199    }
200}
201
202impl std::fmt::Debug for LockstepWorld {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.debug_struct("LockstepWorld")
205            .field("current_tick", &self.engine.current_tick())
206            .field("seed", &self.seed)
207            .field("tick_disabled", &self.engine.is_tick_disabled())
208            .finish()
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use murk_core::command::CommandPayload;
216    use murk_core::id::{Coord, FieldId};
217    use murk_core::traits::{FieldReader, SnapshotAccess};
218    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
219    use murk_propagator::propagator::WriteMode;
220    use murk_propagator::Propagator;
221    use murk_space::{EdgeBehavior, Line1D, Square4};
222    use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
223
224    fn scalar_field(name: &str) -> FieldDef {
225        FieldDef {
226            name: name.to_string(),
227            field_type: FieldType::Scalar,
228            mutability: FieldMutability::PerTick,
229            units: None,
230            bounds: None,
231            boundary_behavior: BoundaryBehavior::Clamp,
232        }
233    }
234
235    fn simple_config() -> WorldConfig {
236        WorldConfig {
237            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
238            fields: vec![scalar_field("energy")],
239            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
240            dt: 0.1,
241            seed: 42,
242            ring_buffer_size: 8,
243            max_ingress_queue: 1024,
244            tick_rate_hz: None,
245            backoff: crate::config::BackoffConfig::default(),
246        }
247    }
248
249    /// Two-field pipeline: PropA writes field0=7.0, PropB copies field0→field1.
250    fn two_field_config() -> WorldConfig {
251        WorldConfig {
252            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
253            fields: vec![scalar_field("field0"), scalar_field("field1")],
254            propagators: vec![
255                Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
256                Box::new(IdentityPropagator::new(
257                    "copy_f0_to_f1",
258                    FieldId(0),
259                    FieldId(1),
260                )),
261            ],
262            dt: 0.1,
263            seed: 42,
264            ring_buffer_size: 8,
265            max_ingress_queue: 1024,
266            tick_rate_hz: None,
267            backoff: crate::config::BackoffConfig::default(),
268        }
269    }
270
271    /// Square4 10x10 config for M0 integration testing.
272    fn square4_config() -> WorldConfig {
273        // Three-field pipeline on a 10x10 grid:
274        //   PropA writes field0 = 3.0 (uniform)
275        //   PropB copies field0 → field1
276        //   PropC reads field0 + field1, writes sum to field2
277        struct SumPropagator {
278            name: String,
279            input_a: FieldId,
280            input_b: FieldId,
281            output: FieldId,
282        }
283
284        impl SumPropagator {
285            fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
286                Self {
287                    name: name.to_string(),
288                    input_a: a,
289                    input_b: b,
290                    output: out,
291                }
292            }
293        }
294
295        impl Propagator for SumPropagator {
296            fn name(&self) -> &str {
297                &self.name
298            }
299            fn reads(&self) -> murk_core::FieldSet {
300                [self.input_a, self.input_b].into_iter().collect()
301            }
302            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
303                vec![(self.output, WriteMode::Full)]
304            }
305            fn step(
306                &self,
307                ctx: &mut murk_propagator::StepContext<'_>,
308            ) -> Result<(), murk_core::PropagatorError> {
309                let a = ctx.reads().read(self.input_a).unwrap().to_vec();
310                let b = ctx.reads().read(self.input_b).unwrap().to_vec();
311                let out = ctx.writes().write(self.output).unwrap();
312                for i in 0..out.len() {
313                    out[i] = a[i] + b[i];
314                }
315                Ok(())
316            }
317        }
318
319        WorldConfig {
320            space: Box::new(Square4::new(10, 10, EdgeBehavior::Absorb).unwrap()),
321            fields: vec![
322                scalar_field("field0"),
323                scalar_field("field1"),
324                scalar_field("field2"),
325            ],
326            propagators: vec![
327                Box::new(ConstPropagator::new("write_f0", FieldId(0), 3.0)),
328                Box::new(IdentityPropagator::new(
329                    "copy_f0_to_f1",
330                    FieldId(0),
331                    FieldId(1),
332                )),
333                Box::new(SumPropagator::new(
334                    "sum_f0_f1_to_f2",
335                    FieldId(0),
336                    FieldId(1),
337                    FieldId(2),
338                )),
339            ],
340            dt: 0.016,
341            seed: 12345,
342            ring_buffer_size: 8,
343            max_ingress_queue: 1024,
344            tick_rate_hz: None,
345            backoff: crate::config::BackoffConfig::default(),
346        }
347    }
348
349    fn make_cmd(expires: u64) -> Command {
350        Command {
351            payload: CommandPayload::SetField {
352                coord: Coord::from_elem(0, 1),
353                field_id: FieldId(0),
354                value: 1.0,
355            },
356            expires_after_tick: TickId(expires),
357            source_id: None,
358            source_seq: None,
359            priority_class: 1,
360            arrival_seq: 0,
361        }
362    }
363
364    // ── Basic lifecycle tests ────────────────────────────────
365
366    #[test]
367    fn new_creates_world_at_tick_zero() {
368        let world = LockstepWorld::new(simple_config()).unwrap();
369        assert_eq!(world.current_tick(), TickId(0));
370        assert!(!world.is_tick_disabled());
371        assert_eq!(world.seed(), 42);
372    }
373
374    #[test]
375    fn step_sync_advances_tick() {
376        let mut world = LockstepWorld::new(simple_config()).unwrap();
377        let result = world.step_sync(vec![]).unwrap();
378        assert_eq!(result.snapshot.tick_id(), TickId(1));
379        assert_eq!(world.current_tick(), TickId(1));
380    }
381
382    #[test]
383    fn step_sync_returns_correct_snapshot() {
384        let mut world = LockstepWorld::new(simple_config()).unwrap();
385        let result = world.step_sync(vec![]).unwrap();
386        let data = result.snapshot.read(FieldId(0)).unwrap();
387        assert_eq!(data.len(), 10);
388        assert!(data.iter().all(|&v| v == 42.0));
389    }
390
391    #[test]
392    fn step_sync_with_commands_produces_receipts() {
393        let mut world = LockstepWorld::new(simple_config()).unwrap();
394        let result = world.step_sync(vec![make_cmd(100), make_cmd(100)]).unwrap();
395        let applied: Vec<_> = result
396            .receipts
397            .iter()
398            .filter(|r| r.applied_tick_id.is_some())
399            .collect();
400        assert_eq!(applied.len(), 2);
401        assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
402    }
403
404    #[test]
405    fn step_sync_propagator_failure_returns_tick_error() {
406        let config = WorldConfig {
407            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
408            fields: vec![scalar_field("energy")],
409            propagators: vec![Box::new(FailingPropagator::new("fail", FieldId(0), 0))],
410            dt: 0.1,
411            seed: 42,
412            ring_buffer_size: 8,
413            max_ingress_queue: 1024,
414            tick_rate_hz: None,
415            backoff: crate::config::BackoffConfig::default(),
416        };
417        let mut world = LockstepWorld::new(config).unwrap();
418        let result = world.step_sync(vec![]);
419        assert!(result.is_err());
420    }
421
422    // ── Two-field overlay visibility ─────────────────────────
423
424    #[test]
425    fn step_sync_overlay_visibility() {
426        let mut world = LockstepWorld::new(two_field_config()).unwrap();
427        let result = world.step_sync(vec![]).unwrap();
428        // PropB copies field0 (7.0) to field1.
429        assert_eq!(result.snapshot.read(FieldId(0)).unwrap()[0], 7.0);
430        assert_eq!(result.snapshot.read(FieldId(1)).unwrap()[0], 7.0);
431    }
432
433    // ── Reset tests ──────────────────────────────────────────
434
435    #[test]
436    fn reset_returns_to_tick_zero() {
437        let mut world = LockstepWorld::new(simple_config()).unwrap();
438        world.step_sync(vec![]).unwrap();
439        world.step_sync(vec![]).unwrap();
440        assert_eq!(world.current_tick(), TickId(2));
441
442        let snap = world.reset(99).unwrap();
443        assert_eq!(snap.tick_id(), TickId(0));
444        assert_eq!(world.current_tick(), TickId(0));
445        assert_eq!(world.seed(), 99);
446    }
447
448    #[test]
449    fn reset_allows_continued_stepping() {
450        let mut world = LockstepWorld::new(simple_config()).unwrap();
451        world.step_sync(vec![]).unwrap();
452        world.reset(42).unwrap();
453
454        let result = world.step_sync(vec![]).unwrap();
455        assert_eq!(result.snapshot.tick_id(), TickId(1));
456        let data = result.snapshot.read(FieldId(0)).unwrap();
457        assert!(data.iter().all(|&v| v == 42.0));
458    }
459
460    // ── 1000-step determinism (M0 quality gate) ──────────────
461
462    #[test]
463    fn thousand_step_determinism() {
464        // Run the same config twice and compare snapshots at every tick.
465        let mut world_a = LockstepWorld::new(square4_config()).unwrap();
466        let mut world_b = LockstepWorld::new(square4_config()).unwrap();
467
468        for tick in 1..=1000u64 {
469            let result_a = world_a.step_sync(vec![]).unwrap();
470            let result_b = world_b.step_sync(vec![]).unwrap();
471
472            // Tick IDs match.
473            assert_eq!(
474                result_a.snapshot.tick_id(),
475                result_b.snapshot.tick_id(),
476                "tick ID mismatch at tick {tick}"
477            );
478
479            // Spot-check fields at each tick (full comparison every 100 ticks).
480            if tick % 100 == 0 || tick == 1 {
481                for field_idx in 0..3u32 {
482                    let field = FieldId(field_idx);
483                    let data_a = result_a.snapshot.read(field).unwrap();
484                    let data_b = result_b.snapshot.read(field).unwrap();
485                    assert_eq!(data_a, data_b, "field {field_idx} mismatch at tick {tick}");
486                }
487            }
488        }
489
490        assert_eq!(world_a.current_tick(), TickId(1000));
491        assert_eq!(world_b.current_tick(), TickId(1000));
492
493        // Final full field comparison.
494        let snap_a = world_a.snapshot();
495        let snap_b = world_b.snapshot();
496        for field_idx in 0..3u32 {
497            let field = FieldId(field_idx);
498            assert_eq!(
499                snap_a.read(field).unwrap(),
500                snap_b.read(field).unwrap(),
501                "final field {field_idx} mismatch"
502            );
503        }
504    }
505
506    // ── Memory bound (M0 quality gate) ───────────────────────
507
508    #[test]
509    fn memory_bound_tick_1000_approx_tick_10() {
510        let mut world = LockstepWorld::new(square4_config()).unwrap();
511
512        // Run 10 ticks and record memory.
513        for _ in 0..10 {
514            world.step_sync(vec![]).unwrap();
515        }
516        let mem_10 = world.last_metrics().memory_bytes;
517
518        // Run to tick 1000.
519        for _ in 10..1000 {
520            world.step_sync(vec![]).unwrap();
521        }
522        let mem_1000 = world.last_metrics().memory_bytes;
523
524        // Memory at tick 1000 should be approximately equal to tick 10.
525        // Allow up to 20% growth for internal bookkeeping (IndexMap resizing, etc).
526        let ratio = mem_1000 as f64 / mem_10 as f64;
527        assert!(
528            ratio < 1.2,
529            "memory grew {ratio:.2}x from tick 10 ({mem_10}) to tick 1000 ({mem_1000})"
530        );
531    }
532
533    // ── Square4 three-propagator integration ─────────────────
534
535    #[test]
536    fn square4_three_propagator_end_to_end() {
537        let mut world = LockstepWorld::new(square4_config()).unwrap();
538        let result = world.step_sync(vec![]).unwrap();
539
540        let snap = &result.snapshot;
541        let f0 = snap.read(FieldId(0)).unwrap();
542        let f1 = snap.read(FieldId(1)).unwrap();
543        let f2 = snap.read(FieldId(2)).unwrap();
544
545        // 100 cells (10x10 Square4)
546        assert_eq!(f0.len(), 100);
547        assert_eq!(f1.len(), 100);
548        assert_eq!(f2.len(), 100);
549
550        // PropA: field0 = 3.0
551        assert!(f0.iter().all(|&v| v == 3.0));
552        // PropB: field1 = copy of field0 = 3.0
553        assert!(f1.iter().all(|&v| v == 3.0));
554        // PropC: field2 = field0 + field1 = 6.0
555        assert!(f2.iter().all(|&v| v == 6.0));
556    }
557
558    // ── Debug impl ───────────────────────────────────────────
559
560    #[test]
561    fn debug_impl_doesnt_panic() {
562        let world = LockstepWorld::new(simple_config()).unwrap();
563        let debug = format!("{world:?}");
564        assert!(debug.contains("LockstepWorld"));
565        assert!(debug.contains("current_tick"));
566    }
567
568    // ── Snapshot borrowing prevents aliasing ─────────────────
569
570    #[test]
571    fn snapshot_borrows_from_self() {
572        // This test verifies that snapshot() returns a reference tied to
573        // &self. The borrow checker prevents calling &mut self methods
574        // while a snapshot is live. This is a compile-time guarantee —
575        // the test simply exercises the API.
576        let mut world = LockstepWorld::new(simple_config()).unwrap();
577        world.step_sync(vec![]).unwrap();
578
579        let snap = world.snapshot();
580        let data = snap.read(FieldId(0)).unwrap();
581        assert_eq!(data[0], 42.0);
582        // snap must go out of scope before calling step_sync again.
583        // (Snapshot does not implement Drop, so explicit drop() is not needed;
584        //  the binding just needs to end here.)
585        let _ = snap;
586
587        // Now we can step again.
588        world.step_sync(vec![]).unwrap();
589        assert_eq!(world.current_tick(), TickId(2));
590    }
591
592    // ── Bug-fix regression tests ─────────────────────────────
593
594    #[test]
595    fn step_sync_surfaces_submission_rejections() {
596        // Create a world with a tiny ingress queue (capacity=2).
597        let config = WorldConfig {
598            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
599            fields: vec![scalar_field("energy")],
600            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
601            dt: 0.1,
602            seed: 42,
603            ring_buffer_size: 8,
604            max_ingress_queue: 2,
605            tick_rate_hz: None,
606            backoff: crate::config::BackoffConfig::default(),
607        };
608        let mut world = LockstepWorld::new(config).unwrap();
609
610        // Submit 4 commands — only 2 fit in the queue.
611        let result = world
612            .step_sync(vec![
613                make_cmd(100),
614                make_cmd(100),
615                make_cmd(100),
616                make_cmd(100),
617            ])
618            .unwrap();
619
620        // Should have 4 receipts total: 2 applied + 2 rejected.
621        assert_eq!(result.receipts.len(), 4);
622
623        let rejected: Vec<_> = result
624            .receipts
625            .iter()
626            .filter(|r| r.reason_code == Some(murk_core::error::IngressError::QueueFull))
627            .collect();
628        assert_eq!(rejected.len(), 2, "QueueFull rejections must be surfaced");
629        assert_eq!(result.metrics.queue_full_rejections, 2);
630
631        let applied: Vec<_> = result
632            .receipts
633            .iter()
634            .filter(|r| r.applied_tick_id.is_some())
635            .collect();
636        assert_eq!(applied.len(), 2);
637    }
638
639    #[test]
640    fn reset_does_not_update_seed_on_failure() {
641        // We can't easily make arena.reset() fail in the current implementation,
642        // but we verify the ordering: seed should only change after success.
643        let mut world = LockstepWorld::new(simple_config()).unwrap();
644        assert_eq!(world.seed(), 42);
645
646        // Successful reset updates seed.
647        world.reset(99).unwrap();
648        assert_eq!(world.seed(), 99);
649
650        // Another successful reset.
651        world.reset(7).unwrap();
652        assert_eq!(world.seed(), 7);
653        assert_eq!(world.current_tick(), TickId(0));
654    }
655}