Skip to main content

murk_engine/
lockstep.rs

1//! Lockstep (synchronous) simulation world.
2//!
3//! [`LockstepWorld`] is the primary user-facing API for running simulations
4//! in lockstep mode. Each call to [`step_sync()`](LockstepWorld::step_sync)
5//! submits commands, executes one tick of the propagator pipeline, and
6//! returns a snapshot of the resulting world state.
7//!
8//! # Ownership model
9//!
10//! `LockstepWorld` is [`Send`] (can be moved between threads) but not
11//! [`Sync`] (cannot be shared). All mutating methods take `&mut self`,
12//! and [`step_sync()`](LockstepWorld::step_sync) returns a [`Snapshot`]
13//! that borrows from `self`. This means the caller cannot call `step_sync()`
14//! while holding a snapshot reference — the borrow checker enforces
15//! aliasing prevention at compile time.
16//!
17//! # Shutdown
18//!
19//! Dropping a `LockstepWorld` reclaims all arena memory. Since `&mut self`
20//! guarantees no outstanding borrows at drop time, cleanup is always safe
21//! (Decision E). No background threads are involved.
22
23use murk_arena::read::Snapshot;
24use murk_core::command::{Command, Receipt};
25use murk_core::id::TickId;
26
27use crate::config::{ConfigError, WorldConfig};
28use crate::metrics::StepMetrics;
29use crate::tick::{TickEngine, TickError};
30
31// Compile-time assertion: LockstepWorld is Send but NOT Sync.
32// (dyn Propagator is Send + !Sync, which is the design intent.)
33// Fails to compile if any field is !Send.
34const _: () = {
35    #[allow(dead_code)]
36    fn assert_send<T: Send>() {}
37    #[allow(dead_code)]
38    fn check() {
39        assert_send::<LockstepWorld>();
40    }
41};
42
43// ── StepResult ──────────────────────────────────────────────────
44
45/// Result of a successful [`LockstepWorld::step_sync()`] call.
46pub struct StepResult<'w> {
47    /// Read-only snapshot of world state after this tick.
48    pub snapshot: Snapshot<'w>,
49    /// Per-command receipts from tick execution (applied, expired, rolled back).
50    ///
51    /// Does **not** include submission-rejected receipts (e.g. `QueueFull`).
52    /// In lockstep mode the queue is drained every tick, so rejection is rare.
53    /// If you need submission receipts, use the lower-level
54    /// [`TickEngine`] API directly.
55    pub receipts: Vec<Receipt>,
56    /// Performance metrics for this tick.
57    pub metrics: StepMetrics,
58}
59
60// ── LockstepWorld ───────────────────────────────────────────────
61
62/// Single-threaded simulation world for lockstep (synchronous) execution.
63///
64/// Created from a [`WorldConfig`] via [`new()`](LockstepWorld::new).
65/// Each [`step_sync()`](LockstepWorld::step_sync) call runs one complete
66/// tick: submit commands → drain ingress → run propagator pipeline →
67/// publish snapshot → return results.
68///
69/// # Example
70///
71/// ```ignore
72/// let world = LockstepWorld::new(config)?;
73/// for _ in 0..1000 {
74///     let result = world.step_sync(commands)?;
75///     let obs = result.snapshot.read(field_id);
76/// }
77/// ```
78pub struct LockstepWorld {
79    engine: TickEngine,
80    seed: u64,
81}
82
83impl LockstepWorld {
84    /// Create a new lockstep world from a [`WorldConfig`].
85    ///
86    /// Validates the configuration, builds the read resolution plan,
87    /// constructs the arena, and returns a ready-to-step world.
88    /// Consumes the `WorldConfig`.
89    pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
90        let seed = config.seed;
91        Ok(Self {
92            engine: TickEngine::new(config)?,
93            seed,
94        })
95    }
96
97    /// Execute one tick synchronously.
98    ///
99    /// Submits `commands` to the ingress queue, runs the full propagator
100    /// pipeline, publishes the new snapshot, and returns a [`StepResult`]
101    /// containing the snapshot reference, receipts, and metrics.
102    ///
103    /// The returned [`Snapshot`] borrows from `self`, preventing the caller
104    /// from calling `step_sync()` again until the snapshot is dropped.
105    ///
106    /// # Errors
107    ///
108    /// Returns [`TickError`] if a propagator fails (tick is rolled back
109    /// atomically) or if ticking is disabled after consecutive rollbacks.
110    /// On rollback, the error's `receipts` field contains per-command
111    /// rollback receipts plus any submission rejections.
112    pub fn step_sync(&mut self, commands: Vec<Command>) -> Result<StepResult<'_>, TickError> {
113        let submit_receipts = self.engine.submit_commands(commands);
114
115        // Collect submission-rejected receipts (QueueFull, TickDisabled).
116        // Accepted commands get their final receipts from execute_tick.
117        let rejected: Vec<Receipt> = submit_receipts
118            .into_iter()
119            .filter(|r| !r.accepted)
120            .collect();
121
122        match self.engine.execute_tick() {
123            Ok(tick_result) => {
124                let mut receipts = rejected;
125                receipts.extend(tick_result.receipts);
126                Ok(StepResult {
127                    snapshot: self.engine.snapshot(),
128                    receipts,
129                    metrics: tick_result.metrics,
130                })
131            }
132            Err(mut tick_error) => {
133                let mut receipts = rejected;
134                receipts.append(&mut tick_error.receipts);
135                Err(TickError {
136                    kind: tick_error.kind,
137                    receipts,
138                })
139            }
140        }
141    }
142
143    /// Reset the world to tick 0 with a new seed.
144    ///
145    /// Reclaims all arena memory, clears the ingress queue, and resets
146    /// all counters. Returns a snapshot of the initial (zeroed) state.
147    ///
148    /// The `seed` is stored for future use when propagators support
149    /// seeded RNG. Currently all runs are fully deterministic regardless
150    /// of seed.
151    pub fn reset(&mut self, seed: u64) -> Result<Snapshot<'_>, ConfigError> {
152        self.engine.reset()?;
153        self.seed = seed;
154        Ok(self.engine.snapshot())
155    }
156
157    /// Get a read-only snapshot of the current published generation.
158    pub fn snapshot(&self) -> Snapshot<'_> {
159        self.engine.snapshot()
160    }
161
162    /// Current tick ID (0 after construction or reset).
163    pub fn current_tick(&self) -> TickId {
164        self.engine.current_tick()
165    }
166
167    /// Whether ticking is disabled due to consecutive rollbacks.
168    pub fn is_tick_disabled(&self) -> bool {
169        self.engine.is_tick_disabled()
170    }
171
172    /// Metrics from the most recent successful tick.
173    pub fn last_metrics(&self) -> &StepMetrics {
174        self.engine.last_metrics()
175    }
176
177    /// The current simulation seed.
178    pub fn seed(&self) -> u64 {
179        self.seed
180    }
181
182    /// Number of consecutive rollbacks since the last successful tick.
183    pub fn consecutive_rollback_count(&self) -> u32 {
184        self.engine.consecutive_rollback_count()
185    }
186
187    /// The spatial topology for this world.
188    pub fn space(&self) -> &dyn murk_space::Space {
189        self.engine.space()
190    }
191}
192
193impl std::fmt::Debug for LockstepWorld {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        f.debug_struct("LockstepWorld")
196            .field("current_tick", &self.engine.current_tick())
197            .field("seed", &self.seed)
198            .field("tick_disabled", &self.engine.is_tick_disabled())
199            .finish()
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use murk_core::command::CommandPayload;
207    use murk_core::id::{FieldId, ParameterKey};
208    use murk_core::traits::{FieldReader, SnapshotAccess};
209    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
210    use murk_propagator::propagator::WriteMode;
211    use murk_propagator::Propagator;
212    use murk_space::{EdgeBehavior, Line1D, Square4};
213    use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
214
215    fn scalar_field(name: &str) -> FieldDef {
216        FieldDef {
217            name: name.to_string(),
218            field_type: FieldType::Scalar,
219            mutability: FieldMutability::PerTick,
220            units: None,
221            bounds: None,
222            boundary_behavior: BoundaryBehavior::Clamp,
223        }
224    }
225
226    fn simple_config() -> WorldConfig {
227        WorldConfig {
228            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
229            fields: vec![scalar_field("energy")],
230            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
231            dt: 0.1,
232            seed: 42,
233            ring_buffer_size: 8,
234            max_ingress_queue: 1024,
235            tick_rate_hz: None,
236            backoff: crate::config::BackoffConfig::default(),
237        }
238    }
239
240    /// Two-field pipeline: PropA writes field0=7.0, PropB copies field0→field1.
241    fn two_field_config() -> WorldConfig {
242        WorldConfig {
243            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
244            fields: vec![scalar_field("field0"), scalar_field("field1")],
245            propagators: vec![
246                Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
247                Box::new(IdentityPropagator::new(
248                    "copy_f0_to_f1",
249                    FieldId(0),
250                    FieldId(1),
251                )),
252            ],
253            dt: 0.1,
254            seed: 42,
255            ring_buffer_size: 8,
256            max_ingress_queue: 1024,
257            tick_rate_hz: None,
258            backoff: crate::config::BackoffConfig::default(),
259        }
260    }
261
262    /// Square4 10x10 config for M0 integration testing.
263    fn square4_config() -> WorldConfig {
264        // Three-field pipeline on a 10x10 grid:
265        //   PropA writes field0 = 3.0 (uniform)
266        //   PropB copies field0 → field1
267        //   PropC reads field0 + field1, writes sum to field2
268        struct SumPropagator {
269            name: String,
270            input_a: FieldId,
271            input_b: FieldId,
272            output: FieldId,
273        }
274
275        impl SumPropagator {
276            fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
277                Self {
278                    name: name.to_string(),
279                    input_a: a,
280                    input_b: b,
281                    output: out,
282                }
283            }
284        }
285
286        impl Propagator for SumPropagator {
287            fn name(&self) -> &str {
288                &self.name
289            }
290            fn reads(&self) -> murk_core::FieldSet {
291                [self.input_a, self.input_b].into_iter().collect()
292            }
293            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
294                vec![(self.output, WriteMode::Full)]
295            }
296            fn step(
297                &self,
298                ctx: &mut murk_propagator::StepContext<'_>,
299            ) -> Result<(), murk_core::PropagatorError> {
300                let a = ctx.reads().read(self.input_a).unwrap().to_vec();
301                let b = ctx.reads().read(self.input_b).unwrap().to_vec();
302                let out = ctx.writes().write(self.output).unwrap();
303                for i in 0..out.len() {
304                    out[i] = a[i] + b[i];
305                }
306                Ok(())
307            }
308        }
309
310        WorldConfig {
311            space: Box::new(Square4::new(10, 10, EdgeBehavior::Absorb).unwrap()),
312            fields: vec![
313                scalar_field("field0"),
314                scalar_field("field1"),
315                scalar_field("field2"),
316            ],
317            propagators: vec![
318                Box::new(ConstPropagator::new("write_f0", FieldId(0), 3.0)),
319                Box::new(IdentityPropagator::new(
320                    "copy_f0_to_f1",
321                    FieldId(0),
322                    FieldId(1),
323                )),
324                Box::new(SumPropagator::new(
325                    "sum_f0_f1_to_f2",
326                    FieldId(0),
327                    FieldId(1),
328                    FieldId(2),
329                )),
330            ],
331            dt: 0.016,
332            seed: 12345,
333            ring_buffer_size: 8,
334            max_ingress_queue: 1024,
335            tick_rate_hz: None,
336            backoff: crate::config::BackoffConfig::default(),
337        }
338    }
339
340    fn make_cmd(expires: u64) -> Command {
341        Command {
342            payload: CommandPayload::SetParameter {
343                key: ParameterKey(0),
344                value: 0.0,
345            },
346            expires_after_tick: TickId(expires),
347            source_id: None,
348            source_seq: None,
349            priority_class: 1,
350            arrival_seq: 0,
351        }
352    }
353
354    // ── Basic lifecycle tests ────────────────────────────────
355
356    #[test]
357    fn new_creates_world_at_tick_zero() {
358        let world = LockstepWorld::new(simple_config()).unwrap();
359        assert_eq!(world.current_tick(), TickId(0));
360        assert!(!world.is_tick_disabled());
361        assert_eq!(world.seed(), 42);
362    }
363
364    #[test]
365    fn step_sync_advances_tick() {
366        let mut world = LockstepWorld::new(simple_config()).unwrap();
367        let result = world.step_sync(vec![]).unwrap();
368        assert_eq!(result.snapshot.tick_id(), TickId(1));
369        assert_eq!(world.current_tick(), TickId(1));
370    }
371
372    #[test]
373    fn step_sync_returns_correct_snapshot() {
374        let mut world = LockstepWorld::new(simple_config()).unwrap();
375        let result = world.step_sync(vec![]).unwrap();
376        let data = result.snapshot.read(FieldId(0)).unwrap();
377        assert_eq!(data.len(), 10);
378        assert!(data.iter().all(|&v| v == 42.0));
379    }
380
381    #[test]
382    fn step_sync_with_commands_produces_receipts() {
383        let mut world = LockstepWorld::new(simple_config()).unwrap();
384        let result = world.step_sync(vec![make_cmd(100), make_cmd(100)]).unwrap();
385        let applied: Vec<_> = result
386            .receipts
387            .iter()
388            .filter(|r| r.applied_tick_id.is_some())
389            .collect();
390        assert_eq!(applied.len(), 2);
391        assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
392    }
393
394    #[test]
395    fn step_sync_propagator_failure_returns_tick_error() {
396        let config = WorldConfig {
397            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
398            fields: vec![scalar_field("energy")],
399            propagators: vec![Box::new(FailingPropagator::new("fail", FieldId(0), 0))],
400            dt: 0.1,
401            seed: 42,
402            ring_buffer_size: 8,
403            max_ingress_queue: 1024,
404            tick_rate_hz: None,
405            backoff: crate::config::BackoffConfig::default(),
406        };
407        let mut world = LockstepWorld::new(config).unwrap();
408        let result = world.step_sync(vec![]);
409        assert!(result.is_err());
410    }
411
412    // ── Two-field overlay visibility ─────────────────────────
413
414    #[test]
415    fn step_sync_overlay_visibility() {
416        let mut world = LockstepWorld::new(two_field_config()).unwrap();
417        let result = world.step_sync(vec![]).unwrap();
418        // PropB copies field0 (7.0) to field1.
419        assert_eq!(result.snapshot.read(FieldId(0)).unwrap()[0], 7.0);
420        assert_eq!(result.snapshot.read(FieldId(1)).unwrap()[0], 7.0);
421    }
422
423    // ── Reset tests ──────────────────────────────────────────
424
425    #[test]
426    fn reset_returns_to_tick_zero() {
427        let mut world = LockstepWorld::new(simple_config()).unwrap();
428        world.step_sync(vec![]).unwrap();
429        world.step_sync(vec![]).unwrap();
430        assert_eq!(world.current_tick(), TickId(2));
431
432        let snap = world.reset(99).unwrap();
433        assert_eq!(snap.tick_id(), TickId(0));
434        assert_eq!(world.current_tick(), TickId(0));
435        assert_eq!(world.seed(), 99);
436    }
437
438    #[test]
439    fn reset_allows_continued_stepping() {
440        let mut world = LockstepWorld::new(simple_config()).unwrap();
441        world.step_sync(vec![]).unwrap();
442        world.reset(42).unwrap();
443
444        let result = world.step_sync(vec![]).unwrap();
445        assert_eq!(result.snapshot.tick_id(), TickId(1));
446        let data = result.snapshot.read(FieldId(0)).unwrap();
447        assert!(data.iter().all(|&v| v == 42.0));
448    }
449
450    // ── 1000-step determinism (M0 quality gate) ──────────────
451
452    #[test]
453    fn thousand_step_determinism() {
454        // Run the same config twice and compare snapshots at every tick.
455        let mut world_a = LockstepWorld::new(square4_config()).unwrap();
456        let mut world_b = LockstepWorld::new(square4_config()).unwrap();
457
458        for tick in 1..=1000u64 {
459            let result_a = world_a.step_sync(vec![]).unwrap();
460            let result_b = world_b.step_sync(vec![]).unwrap();
461
462            // Tick IDs match.
463            assert_eq!(
464                result_a.snapshot.tick_id(),
465                result_b.snapshot.tick_id(),
466                "tick ID mismatch at tick {tick}"
467            );
468
469            // Spot-check fields at each tick (full comparison every 100 ticks).
470            if tick % 100 == 0 || tick == 1 {
471                for field_idx in 0..3u32 {
472                    let field = FieldId(field_idx);
473                    let data_a = result_a.snapshot.read(field).unwrap();
474                    let data_b = result_b.snapshot.read(field).unwrap();
475                    assert_eq!(data_a, data_b, "field {field_idx} mismatch at tick {tick}");
476                }
477            }
478        }
479
480        assert_eq!(world_a.current_tick(), TickId(1000));
481        assert_eq!(world_b.current_tick(), TickId(1000));
482
483        // Final full field comparison.
484        let snap_a = world_a.snapshot();
485        let snap_b = world_b.snapshot();
486        for field_idx in 0..3u32 {
487            let field = FieldId(field_idx);
488            assert_eq!(
489                snap_a.read(field).unwrap(),
490                snap_b.read(field).unwrap(),
491                "final field {field_idx} mismatch"
492            );
493        }
494    }
495
496    // ── Memory bound (M0 quality gate) ───────────────────────
497
498    #[test]
499    fn memory_bound_tick_1000_approx_tick_10() {
500        let mut world = LockstepWorld::new(square4_config()).unwrap();
501
502        // Run 10 ticks and record memory.
503        for _ in 0..10 {
504            world.step_sync(vec![]).unwrap();
505        }
506        let mem_10 = world.last_metrics().memory_bytes;
507
508        // Run to tick 1000.
509        for _ in 10..1000 {
510            world.step_sync(vec![]).unwrap();
511        }
512        let mem_1000 = world.last_metrics().memory_bytes;
513
514        // Memory at tick 1000 should be approximately equal to tick 10.
515        // Allow up to 20% growth for internal bookkeeping (IndexMap resizing, etc).
516        let ratio = mem_1000 as f64 / mem_10 as f64;
517        assert!(
518            ratio < 1.2,
519            "memory grew {ratio:.2}x from tick 10 ({mem_10}) to tick 1000 ({mem_1000})"
520        );
521    }
522
523    // ── Square4 three-propagator integration ─────────────────
524
525    #[test]
526    fn square4_three_propagator_end_to_end() {
527        let mut world = LockstepWorld::new(square4_config()).unwrap();
528        let result = world.step_sync(vec![]).unwrap();
529
530        let snap = &result.snapshot;
531        let f0 = snap.read(FieldId(0)).unwrap();
532        let f1 = snap.read(FieldId(1)).unwrap();
533        let f2 = snap.read(FieldId(2)).unwrap();
534
535        // 100 cells (10x10 Square4)
536        assert_eq!(f0.len(), 100);
537        assert_eq!(f1.len(), 100);
538        assert_eq!(f2.len(), 100);
539
540        // PropA: field0 = 3.0
541        assert!(f0.iter().all(|&v| v == 3.0));
542        // PropB: field1 = copy of field0 = 3.0
543        assert!(f1.iter().all(|&v| v == 3.0));
544        // PropC: field2 = field0 + field1 = 6.0
545        assert!(f2.iter().all(|&v| v == 6.0));
546    }
547
548    // ── Debug impl ───────────────────────────────────────────
549
550    #[test]
551    fn debug_impl_doesnt_panic() {
552        let world = LockstepWorld::new(simple_config()).unwrap();
553        let debug = format!("{world:?}");
554        assert!(debug.contains("LockstepWorld"));
555        assert!(debug.contains("current_tick"));
556    }
557
558    // ── Snapshot borrowing prevents aliasing ─────────────────
559
560    #[test]
561    fn snapshot_borrows_from_self() {
562        // This test verifies that snapshot() returns a reference tied to
563        // &self. The borrow checker prevents calling &mut self methods
564        // while a snapshot is live. This is a compile-time guarantee —
565        // the test simply exercises the API.
566        let mut world = LockstepWorld::new(simple_config()).unwrap();
567        world.step_sync(vec![]).unwrap();
568
569        let snap = world.snapshot();
570        let data = snap.read(FieldId(0)).unwrap();
571        assert_eq!(data[0], 42.0);
572        // snap must go out of scope before calling step_sync again.
573        // (Snapshot does not implement Drop, so explicit drop() is not needed;
574        //  the binding just needs to end here.)
575        let _ = snap;
576
577        // Now we can step again.
578        world.step_sync(vec![]).unwrap();
579        assert_eq!(world.current_tick(), TickId(2));
580    }
581
582    // ── Bug-fix regression tests ─────────────────────────────
583
584    #[test]
585    fn step_sync_surfaces_submission_rejections() {
586        // Create a world with a tiny ingress queue (capacity=2).
587        let config = WorldConfig {
588            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
589            fields: vec![scalar_field("energy")],
590            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
591            dt: 0.1,
592            seed: 42,
593            ring_buffer_size: 8,
594            max_ingress_queue: 2,
595            tick_rate_hz: None,
596            backoff: crate::config::BackoffConfig::default(),
597        };
598        let mut world = LockstepWorld::new(config).unwrap();
599
600        // Submit 4 commands — only 2 fit in the queue.
601        let result = world
602            .step_sync(vec![
603                make_cmd(100),
604                make_cmd(100),
605                make_cmd(100),
606                make_cmd(100),
607            ])
608            .unwrap();
609
610        // Should have 4 receipts total: 2 applied + 2 rejected.
611        assert_eq!(result.receipts.len(), 4);
612
613        let rejected: Vec<_> = result
614            .receipts
615            .iter()
616            .filter(|r| r.reason_code == Some(murk_core::error::IngressError::QueueFull))
617            .collect();
618        assert_eq!(rejected.len(), 2, "QueueFull rejections must be surfaced");
619
620        let applied: Vec<_> = result
621            .receipts
622            .iter()
623            .filter(|r| r.applied_tick_id.is_some())
624            .collect();
625        assert_eq!(applied.len(), 2);
626    }
627
628    #[test]
629    fn reset_does_not_update_seed_on_failure() {
630        // We can't easily make arena.reset() fail in the current implementation,
631        // but we verify the ordering: seed should only change after success.
632        let mut world = LockstepWorld::new(simple_config()).unwrap();
633        assert_eq!(world.seed(), 42);
634
635        // Successful reset updates seed.
636        world.reset(99).unwrap();
637        assert_eq!(world.seed(), 99);
638
639        // Another successful reset.
640        world.reset(7).unwrap();
641        assert_eq!(world.seed(), 7);
642        assert_eq!(world.current_tick(), TickId(0));
643    }
644}