Skip to main content

murk_engine/
lockstep.rs

1//! Lockstep (synchronous) simulation world.
2//!
3//! [`LockstepWorld`] is the primary user-facing API for running simulations
4//! in lockstep mode. Each call to [`step_sync()`](LockstepWorld::step_sync)
5//! submits commands, executes one tick of the propagator pipeline, and
6//! returns a snapshot of the resulting world state.
7//!
8//! # Ownership model
9//!
10//! `LockstepWorld` is [`Send`] (can be moved between threads) but not
11//! [`Sync`] (cannot be shared). All mutating methods take `&mut self`,
12//! and [`step_sync()`](LockstepWorld::step_sync) returns a [`Snapshot`]
13//! that borrows from `self`. This means the caller cannot call `step_sync()`
14//! while holding a snapshot reference — the borrow checker enforces
15//! aliasing prevention at compile time.
16//!
17//! # Shutdown
18//!
19//! Dropping a `LockstepWorld` reclaims all arena memory. Since `&mut self`
20//! guarantees no outstanding borrows at drop time, cleanup is always safe
21//! (Decision E). No background threads are involved.
22
23use murk_arena::read::Snapshot;
24use murk_core::command::{Command, Receipt};
25use murk_core::id::TickId;
26
27use crate::config::{ConfigError, WorldConfig};
28use crate::metrics::StepMetrics;
29use crate::tick::{TickEngine, TickError};
30
31// Compile-time assertion: LockstepWorld is Send but NOT Sync.
32// (dyn Propagator is Send + !Sync, which is the design intent.)
33// Fails to compile if any field is !Send.
34const _: () = {
35    #[allow(dead_code)]
36    fn assert_send<T: Send>() {}
37    #[allow(dead_code)]
38    fn check() {
39        assert_send::<LockstepWorld>();
40    }
41};
42
43// ── StepResult ──────────────────────────────────────────────────
44
45/// Result of a successful [`LockstepWorld::step_sync()`] call.
46pub struct StepResult<'w> {
47    /// Read-only snapshot of world state after this tick.
48    pub snapshot: Snapshot<'w>,
49    /// Per-command receipts from both submission and tick execution.
50    ///
51    /// Includes submission-rejected receipts (e.g. `QueueFull`, `TickDisabled`)
52    /// followed by tick execution receipts (applied, expired, rolled back).
53    /// In lockstep mode the queue is drained every tick, so rejection is rare.
54    pub receipts: Vec<Receipt>,
55    /// Performance metrics for this tick.
56    pub metrics: StepMetrics,
57}
58
59// ── LockstepWorld ───────────────────────────────────────────────
60
61/// Single-threaded simulation world for lockstep (synchronous) execution.
62///
63/// Created from a [`WorldConfig`] via [`new()`](LockstepWorld::new).
64/// Each [`step_sync()`](LockstepWorld::step_sync) call runs one complete
65/// tick: submit commands → drain ingress → run propagator pipeline →
66/// publish snapshot → return results.
67///
68/// # Example
69///
70/// ```ignore
71/// let world = LockstepWorld::new(config)?;
72/// for _ in 0..1000 {
73///     let result = world.step_sync(commands)?;
74///     let obs = result.snapshot.read(field_id);
75/// }
76/// ```
77pub struct LockstepWorld {
78    engine: TickEngine,
79    seed: u64,
80}
81
82impl LockstepWorld {
83    /// Create a new lockstep world from a [`WorldConfig`].
84    ///
85    /// Validates the configuration, builds the read resolution plan,
86    /// constructs the arena, and returns a ready-to-step world.
87    /// Consumes the `WorldConfig`.
88    pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
89        let seed = config.seed;
90        Ok(Self {
91            engine: TickEngine::new(config)?,
92            seed,
93        })
94    }
95
96    /// Execute one tick synchronously.
97    ///
98    /// Submits `commands` to the ingress queue, runs the full propagator
99    /// pipeline, publishes the new snapshot, and returns a [`StepResult`]
100    /// containing the snapshot reference, receipts, and metrics.
101    ///
102    /// The returned [`Snapshot`] borrows from `self`, preventing the caller
103    /// from calling `step_sync()` again until the snapshot is dropped.
104    ///
105    /// # Errors
106    ///
107    /// Returns [`TickError`] if a propagator fails (tick is rolled back
108    /// atomically) or if ticking is disabled after consecutive rollbacks.
109    /// On rollback, the error's `receipts` field contains per-command
110    /// rollback receipts plus any submission rejections.
111    pub fn step_sync(&mut self, commands: Vec<Command>) -> Result<StepResult<'_>, TickError> {
112        let submit_receipts = self.engine.submit_commands(commands);
113
114        // Collect submission-rejected receipts (QueueFull, TickDisabled).
115        // Accepted commands get their final receipts from execute_tick.
116        let rejected: Vec<Receipt> = submit_receipts
117            .into_iter()
118            .filter(|r| !r.accepted)
119            .collect();
120
121        match self.engine.execute_tick() {
122            Ok(tick_result) => {
123                let mut receipts = rejected;
124                receipts.extend(tick_result.receipts);
125                Ok(StepResult {
126                    snapshot: self.engine.snapshot(),
127                    receipts,
128                    metrics: tick_result.metrics,
129                })
130            }
131            Err(mut tick_error) => {
132                let mut receipts = rejected;
133                receipts.append(&mut tick_error.receipts);
134                Err(TickError {
135                    kind: tick_error.kind,
136                    receipts,
137                })
138            }
139        }
140    }
141
142    /// Reset the world to tick 0 with a new seed.
143    ///
144    /// Reclaims all arena memory, clears the ingress queue, and resets
145    /// all counters. Returns a snapshot of the initial (zeroed) state.
146    ///
147    /// The `seed` is stored for future use when propagators support
148    /// seeded RNG. Currently all runs are fully deterministic regardless
149    /// of seed.
150    pub fn reset(&mut self, seed: u64) -> Result<Snapshot<'_>, ConfigError> {
151        self.engine.reset()?;
152        self.seed = seed;
153        Ok(self.engine.snapshot())
154    }
155
156    /// Get a read-only snapshot of the current published generation.
157    pub fn snapshot(&self) -> Snapshot<'_> {
158        self.engine.snapshot()
159    }
160
161    /// Current tick ID (0 after construction or reset).
162    pub fn current_tick(&self) -> TickId {
163        self.engine.current_tick()
164    }
165
166    /// Whether ticking is disabled due to consecutive rollbacks.
167    pub fn is_tick_disabled(&self) -> bool {
168        self.engine.is_tick_disabled()
169    }
170
171    /// Metrics from the most recent successful tick.
172    pub fn last_metrics(&self) -> &StepMetrics {
173        self.engine.last_metrics()
174    }
175
176    /// The current simulation seed.
177    pub fn seed(&self) -> u64 {
178        self.seed
179    }
180
181    /// Number of consecutive rollbacks since the last successful tick.
182    pub fn consecutive_rollback_count(&self) -> u32 {
183        self.engine.consecutive_rollback_count()
184    }
185
186    /// The spatial topology for this world.
187    pub fn space(&self) -> &dyn murk_space::Space {
188        self.engine.space()
189    }
190}
191
192impl std::fmt::Debug for LockstepWorld {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        f.debug_struct("LockstepWorld")
195            .field("current_tick", &self.engine.current_tick())
196            .field("seed", &self.seed)
197            .field("tick_disabled", &self.engine.is_tick_disabled())
198            .finish()
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use murk_core::command::CommandPayload;
206    use murk_core::id::{Coord, FieldId};
207    use murk_core::traits::{FieldReader, SnapshotAccess};
208    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
209    use murk_propagator::propagator::WriteMode;
210    use murk_propagator::Propagator;
211    use murk_space::{EdgeBehavior, Line1D, Square4};
212    use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
213
214    fn scalar_field(name: &str) -> FieldDef {
215        FieldDef {
216            name: name.to_string(),
217            field_type: FieldType::Scalar,
218            mutability: FieldMutability::PerTick,
219            units: None,
220            bounds: None,
221            boundary_behavior: BoundaryBehavior::Clamp,
222        }
223    }
224
225    fn simple_config() -> WorldConfig {
226        WorldConfig {
227            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
228            fields: vec![scalar_field("energy")],
229            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
230            dt: 0.1,
231            seed: 42,
232            ring_buffer_size: 8,
233            max_ingress_queue: 1024,
234            tick_rate_hz: None,
235            backoff: crate::config::BackoffConfig::default(),
236        }
237    }
238
239    /// Two-field pipeline: PropA writes field0=7.0, PropB copies field0→field1.
240    fn two_field_config() -> WorldConfig {
241        WorldConfig {
242            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
243            fields: vec![scalar_field("field0"), scalar_field("field1")],
244            propagators: vec![
245                Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
246                Box::new(IdentityPropagator::new(
247                    "copy_f0_to_f1",
248                    FieldId(0),
249                    FieldId(1),
250                )),
251            ],
252            dt: 0.1,
253            seed: 42,
254            ring_buffer_size: 8,
255            max_ingress_queue: 1024,
256            tick_rate_hz: None,
257            backoff: crate::config::BackoffConfig::default(),
258        }
259    }
260
261    /// Square4 10x10 config for M0 integration testing.
262    fn square4_config() -> WorldConfig {
263        // Three-field pipeline on a 10x10 grid:
264        //   PropA writes field0 = 3.0 (uniform)
265        //   PropB copies field0 → field1
266        //   PropC reads field0 + field1, writes sum to field2
267        struct SumPropagator {
268            name: String,
269            input_a: FieldId,
270            input_b: FieldId,
271            output: FieldId,
272        }
273
274        impl SumPropagator {
275            fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
276                Self {
277                    name: name.to_string(),
278                    input_a: a,
279                    input_b: b,
280                    output: out,
281                }
282            }
283        }
284
285        impl Propagator for SumPropagator {
286            fn name(&self) -> &str {
287                &self.name
288            }
289            fn reads(&self) -> murk_core::FieldSet {
290                [self.input_a, self.input_b].into_iter().collect()
291            }
292            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
293                vec![(self.output, WriteMode::Full)]
294            }
295            fn step(
296                &self,
297                ctx: &mut murk_propagator::StepContext<'_>,
298            ) -> Result<(), murk_core::PropagatorError> {
299                let a = ctx.reads().read(self.input_a).unwrap().to_vec();
300                let b = ctx.reads().read(self.input_b).unwrap().to_vec();
301                let out = ctx.writes().write(self.output).unwrap();
302                for i in 0..out.len() {
303                    out[i] = a[i] + b[i];
304                }
305                Ok(())
306            }
307        }
308
309        WorldConfig {
310            space: Box::new(Square4::new(10, 10, EdgeBehavior::Absorb).unwrap()),
311            fields: vec![
312                scalar_field("field0"),
313                scalar_field("field1"),
314                scalar_field("field2"),
315            ],
316            propagators: vec![
317                Box::new(ConstPropagator::new("write_f0", FieldId(0), 3.0)),
318                Box::new(IdentityPropagator::new(
319                    "copy_f0_to_f1",
320                    FieldId(0),
321                    FieldId(1),
322                )),
323                Box::new(SumPropagator::new(
324                    "sum_f0_f1_to_f2",
325                    FieldId(0),
326                    FieldId(1),
327                    FieldId(2),
328                )),
329            ],
330            dt: 0.016,
331            seed: 12345,
332            ring_buffer_size: 8,
333            max_ingress_queue: 1024,
334            tick_rate_hz: None,
335            backoff: crate::config::BackoffConfig::default(),
336        }
337    }
338
339    fn make_cmd(expires: u64) -> Command {
340        Command {
341            payload: CommandPayload::SetField {
342                coord: Coord::from_elem(0, 1),
343                field_id: FieldId(0),
344                value: 1.0,
345            },
346            expires_after_tick: TickId(expires),
347            source_id: None,
348            source_seq: None,
349            priority_class: 1,
350            arrival_seq: 0,
351        }
352    }
353
354    // ── Basic lifecycle tests ────────────────────────────────
355
356    #[test]
357    fn new_creates_world_at_tick_zero() {
358        let world = LockstepWorld::new(simple_config()).unwrap();
359        assert_eq!(world.current_tick(), TickId(0));
360        assert!(!world.is_tick_disabled());
361        assert_eq!(world.seed(), 42);
362    }
363
364    #[test]
365    fn step_sync_advances_tick() {
366        let mut world = LockstepWorld::new(simple_config()).unwrap();
367        let result = world.step_sync(vec![]).unwrap();
368        assert_eq!(result.snapshot.tick_id(), TickId(1));
369        assert_eq!(world.current_tick(), TickId(1));
370    }
371
372    #[test]
373    fn step_sync_returns_correct_snapshot() {
374        let mut world = LockstepWorld::new(simple_config()).unwrap();
375        let result = world.step_sync(vec![]).unwrap();
376        let data = result.snapshot.read(FieldId(0)).unwrap();
377        assert_eq!(data.len(), 10);
378        assert!(data.iter().all(|&v| v == 42.0));
379    }
380
381    #[test]
382    fn step_sync_with_commands_produces_receipts() {
383        let mut world = LockstepWorld::new(simple_config()).unwrap();
384        let result = world.step_sync(vec![make_cmd(100), make_cmd(100)]).unwrap();
385        let applied: Vec<_> = result
386            .receipts
387            .iter()
388            .filter(|r| r.applied_tick_id.is_some())
389            .collect();
390        assert_eq!(applied.len(), 2);
391        assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
392    }
393
394    #[test]
395    fn step_sync_propagator_failure_returns_tick_error() {
396        let config = WorldConfig {
397            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
398            fields: vec![scalar_field("energy")],
399            propagators: vec![Box::new(FailingPropagator::new("fail", FieldId(0), 0))],
400            dt: 0.1,
401            seed: 42,
402            ring_buffer_size: 8,
403            max_ingress_queue: 1024,
404            tick_rate_hz: None,
405            backoff: crate::config::BackoffConfig::default(),
406        };
407        let mut world = LockstepWorld::new(config).unwrap();
408        let result = world.step_sync(vec![]);
409        assert!(result.is_err());
410    }
411
412    // ── Two-field overlay visibility ─────────────────────────
413
414    #[test]
415    fn step_sync_overlay_visibility() {
416        let mut world = LockstepWorld::new(two_field_config()).unwrap();
417        let result = world.step_sync(vec![]).unwrap();
418        // PropB copies field0 (7.0) to field1.
419        assert_eq!(result.snapshot.read(FieldId(0)).unwrap()[0], 7.0);
420        assert_eq!(result.snapshot.read(FieldId(1)).unwrap()[0], 7.0);
421    }
422
423    // ── Reset tests ──────────────────────────────────────────
424
425    #[test]
426    fn reset_returns_to_tick_zero() {
427        let mut world = LockstepWorld::new(simple_config()).unwrap();
428        world.step_sync(vec![]).unwrap();
429        world.step_sync(vec![]).unwrap();
430        assert_eq!(world.current_tick(), TickId(2));
431
432        let snap = world.reset(99).unwrap();
433        assert_eq!(snap.tick_id(), TickId(0));
434        assert_eq!(world.current_tick(), TickId(0));
435        assert_eq!(world.seed(), 99);
436    }
437
438    #[test]
439    fn reset_allows_continued_stepping() {
440        let mut world = LockstepWorld::new(simple_config()).unwrap();
441        world.step_sync(vec![]).unwrap();
442        world.reset(42).unwrap();
443
444        let result = world.step_sync(vec![]).unwrap();
445        assert_eq!(result.snapshot.tick_id(), TickId(1));
446        let data = result.snapshot.read(FieldId(0)).unwrap();
447        assert!(data.iter().all(|&v| v == 42.0));
448    }
449
450    // ── 1000-step determinism (M0 quality gate) ──────────────
451
452    #[test]
453    fn thousand_step_determinism() {
454        // Run the same config twice and compare snapshots at every tick.
455        let mut world_a = LockstepWorld::new(square4_config()).unwrap();
456        let mut world_b = LockstepWorld::new(square4_config()).unwrap();
457
458        for tick in 1..=1000u64 {
459            let result_a = world_a.step_sync(vec![]).unwrap();
460            let result_b = world_b.step_sync(vec![]).unwrap();
461
462            // Tick IDs match.
463            assert_eq!(
464                result_a.snapshot.tick_id(),
465                result_b.snapshot.tick_id(),
466                "tick ID mismatch at tick {tick}"
467            );
468
469            // Spot-check fields at each tick (full comparison every 100 ticks).
470            if tick % 100 == 0 || tick == 1 {
471                for field_idx in 0..3u32 {
472                    let field = FieldId(field_idx);
473                    let data_a = result_a.snapshot.read(field).unwrap();
474                    let data_b = result_b.snapshot.read(field).unwrap();
475                    assert_eq!(data_a, data_b, "field {field_idx} mismatch at tick {tick}");
476                }
477            }
478        }
479
480        assert_eq!(world_a.current_tick(), TickId(1000));
481        assert_eq!(world_b.current_tick(), TickId(1000));
482
483        // Final full field comparison.
484        let snap_a = world_a.snapshot();
485        let snap_b = world_b.snapshot();
486        for field_idx in 0..3u32 {
487            let field = FieldId(field_idx);
488            assert_eq!(
489                snap_a.read(field).unwrap(),
490                snap_b.read(field).unwrap(),
491                "final field {field_idx} mismatch"
492            );
493        }
494    }
495
496    // ── Memory bound (M0 quality gate) ───────────────────────
497
498    #[test]
499    fn memory_bound_tick_1000_approx_tick_10() {
500        let mut world = LockstepWorld::new(square4_config()).unwrap();
501
502        // Run 10 ticks and record memory.
503        for _ in 0..10 {
504            world.step_sync(vec![]).unwrap();
505        }
506        let mem_10 = world.last_metrics().memory_bytes;
507
508        // Run to tick 1000.
509        for _ in 10..1000 {
510            world.step_sync(vec![]).unwrap();
511        }
512        let mem_1000 = world.last_metrics().memory_bytes;
513
514        // Memory at tick 1000 should be approximately equal to tick 10.
515        // Allow up to 20% growth for internal bookkeeping (IndexMap resizing, etc).
516        let ratio = mem_1000 as f64 / mem_10 as f64;
517        assert!(
518            ratio < 1.2,
519            "memory grew {ratio:.2}x from tick 10 ({mem_10}) to tick 1000 ({mem_1000})"
520        );
521    }
522
523    // ── Square4 three-propagator integration ─────────────────
524
525    #[test]
526    fn square4_three_propagator_end_to_end() {
527        let mut world = LockstepWorld::new(square4_config()).unwrap();
528        let result = world.step_sync(vec![]).unwrap();
529
530        let snap = &result.snapshot;
531        let f0 = snap.read(FieldId(0)).unwrap();
532        let f1 = snap.read(FieldId(1)).unwrap();
533        let f2 = snap.read(FieldId(2)).unwrap();
534
535        // 100 cells (10x10 Square4)
536        assert_eq!(f0.len(), 100);
537        assert_eq!(f1.len(), 100);
538        assert_eq!(f2.len(), 100);
539
540        // PropA: field0 = 3.0
541        assert!(f0.iter().all(|&v| v == 3.0));
542        // PropB: field1 = copy of field0 = 3.0
543        assert!(f1.iter().all(|&v| v == 3.0));
544        // PropC: field2 = field0 + field1 = 6.0
545        assert!(f2.iter().all(|&v| v == 6.0));
546    }
547
548    // ── Debug impl ───────────────────────────────────────────
549
550    #[test]
551    fn debug_impl_doesnt_panic() {
552        let world = LockstepWorld::new(simple_config()).unwrap();
553        let debug = format!("{world:?}");
554        assert!(debug.contains("LockstepWorld"));
555        assert!(debug.contains("current_tick"));
556    }
557
558    // ── Snapshot borrowing prevents aliasing ─────────────────
559
560    #[test]
561    fn snapshot_borrows_from_self() {
562        // This test verifies that snapshot() returns a reference tied to
563        // &self. The borrow checker prevents calling &mut self methods
564        // while a snapshot is live. This is a compile-time guarantee —
565        // the test simply exercises the API.
566        let mut world = LockstepWorld::new(simple_config()).unwrap();
567        world.step_sync(vec![]).unwrap();
568
569        let snap = world.snapshot();
570        let data = snap.read(FieldId(0)).unwrap();
571        assert_eq!(data[0], 42.0);
572        // snap must go out of scope before calling step_sync again.
573        // (Snapshot does not implement Drop, so explicit drop() is not needed;
574        //  the binding just needs to end here.)
575        let _ = snap;
576
577        // Now we can step again.
578        world.step_sync(vec![]).unwrap();
579        assert_eq!(world.current_tick(), TickId(2));
580    }
581
582    // ── Bug-fix regression tests ─────────────────────────────
583
584    #[test]
585    fn step_sync_surfaces_submission_rejections() {
586        // Create a world with a tiny ingress queue (capacity=2).
587        let config = WorldConfig {
588            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
589            fields: vec![scalar_field("energy")],
590            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
591            dt: 0.1,
592            seed: 42,
593            ring_buffer_size: 8,
594            max_ingress_queue: 2,
595            tick_rate_hz: None,
596            backoff: crate::config::BackoffConfig::default(),
597        };
598        let mut world = LockstepWorld::new(config).unwrap();
599
600        // Submit 4 commands — only 2 fit in the queue.
601        let result = world
602            .step_sync(vec![
603                make_cmd(100),
604                make_cmd(100),
605                make_cmd(100),
606                make_cmd(100),
607            ])
608            .unwrap();
609
610        // Should have 4 receipts total: 2 applied + 2 rejected.
611        assert_eq!(result.receipts.len(), 4);
612
613        let rejected: Vec<_> = result
614            .receipts
615            .iter()
616            .filter(|r| r.reason_code == Some(murk_core::error::IngressError::QueueFull))
617            .collect();
618        assert_eq!(rejected.len(), 2, "QueueFull rejections must be surfaced");
619
620        let applied: Vec<_> = result
621            .receipts
622            .iter()
623            .filter(|r| r.applied_tick_id.is_some())
624            .collect();
625        assert_eq!(applied.len(), 2);
626    }
627
628    #[test]
629    fn reset_does_not_update_seed_on_failure() {
630        // We can't easily make arena.reset() fail in the current implementation,
631        // but we verify the ordering: seed should only change after success.
632        let mut world = LockstepWorld::new(simple_config()).unwrap();
633        assert_eq!(world.seed(), 42);
634
635        // Successful reset updates seed.
636        world.reset(99).unwrap();
637        assert_eq!(world.seed(), 99);
638
639        // Another successful reset.
640        world.reset(7).unwrap();
641        assert_eq!(world.seed(), 7);
642        assert_eq!(world.current_tick(), TickId(0));
643    }
644}