1use murk_arena::read::Snapshot;
24use murk_core::command::{Command, Receipt};
25use murk_core::id::TickId;
26
27use crate::config::{ConfigError, WorldConfig};
28use crate::metrics::StepMetrics;
29use crate::tick::{TickEngine, TickError};
30
31const _: () = {
35 #[allow(dead_code)]
36 fn assert_send<T: Send>() {}
37 #[allow(dead_code)]
38 fn check() {
39 assert_send::<LockstepWorld>();
40 }
41};
42
43pub struct StepResult<'w> {
47 pub snapshot: Snapshot<'w>,
49 pub receipts: Vec<Receipt>,
55 pub metrics: StepMetrics,
57}
58
59pub struct LockstepWorld {
78 engine: TickEngine,
79 seed: u64,
80}
81
82impl LockstepWorld {
83 pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
89 let seed = config.seed;
90 Ok(Self {
91 engine: TickEngine::new(config)?,
92 seed,
93 })
94 }
95
96 pub fn step_sync(&mut self, commands: Vec<Command>) -> Result<StepResult<'_>, TickError> {
112 let submit_receipts = self.engine.submit_commands(commands);
113
114 let rejected: Vec<Receipt> = submit_receipts
117 .into_iter()
118 .filter(|r| !r.accepted)
119 .collect();
120
121 match self.engine.execute_tick() {
122 Ok(tick_result) => {
123 let mut receipts = rejected;
124 receipts.extend(tick_result.receipts);
125 Ok(StepResult {
126 snapshot: self.engine.snapshot(),
127 receipts,
128 metrics: tick_result.metrics,
129 })
130 }
131 Err(mut tick_error) => {
132 let mut receipts = rejected;
133 receipts.append(&mut tick_error.receipts);
134 Err(TickError {
135 kind: tick_error.kind,
136 receipts,
137 })
138 }
139 }
140 }
141
142 pub fn reset(&mut self, seed: u64) -> Result<Snapshot<'_>, ConfigError> {
151 self.engine.reset()?;
152 self.seed = seed;
153 Ok(self.engine.snapshot())
154 }
155
156 pub fn snapshot(&self) -> Snapshot<'_> {
158 self.engine.snapshot()
159 }
160
161 pub fn current_tick(&self) -> TickId {
163 self.engine.current_tick()
164 }
165
166 pub fn is_tick_disabled(&self) -> bool {
168 self.engine.is_tick_disabled()
169 }
170
171 pub fn last_metrics(&self) -> &StepMetrics {
173 self.engine.last_metrics()
174 }
175
176 pub fn seed(&self) -> u64 {
178 self.seed
179 }
180
181 pub fn consecutive_rollback_count(&self) -> u32 {
183 self.engine.consecutive_rollback_count()
184 }
185
186 pub fn space(&self) -> &dyn murk_space::Space {
188 self.engine.space()
189 }
190
191 pub fn ingress_queue_depth(&self) -> usize {
193 self.engine.ingress_queue_depth()
194 }
195
196 pub fn ingress_queue_capacity(&self) -> usize {
198 self.engine.ingress_queue_capacity()
199 }
200}
201
202impl std::fmt::Debug for LockstepWorld {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.debug_struct("LockstepWorld")
205 .field("current_tick", &self.engine.current_tick())
206 .field("seed", &self.seed)
207 .field("tick_disabled", &self.engine.is_tick_disabled())
208 .finish()
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use murk_core::command::CommandPayload;
216 use murk_core::id::{Coord, FieldId};
217 use murk_core::traits::{FieldReader, SnapshotAccess};
218 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
219 use murk_propagator::propagator::WriteMode;
220 use murk_propagator::Propagator;
221 use murk_space::{EdgeBehavior, Line1D, Square4};
222 use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
223
224 fn scalar_field(name: &str) -> FieldDef {
225 FieldDef {
226 name: name.to_string(),
227 field_type: FieldType::Scalar,
228 mutability: FieldMutability::PerTick,
229 units: None,
230 bounds: None,
231 boundary_behavior: BoundaryBehavior::Clamp,
232 }
233 }
234
235 fn simple_config() -> WorldConfig {
236 WorldConfig {
237 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
238 fields: vec![scalar_field("energy")],
239 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
240 dt: 0.1,
241 seed: 42,
242 ring_buffer_size: 8,
243 max_ingress_queue: 1024,
244 tick_rate_hz: None,
245 backoff: crate::config::BackoffConfig::default(),
246 }
247 }
248
249 fn two_field_config() -> WorldConfig {
251 WorldConfig {
252 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
253 fields: vec![scalar_field("field0"), scalar_field("field1")],
254 propagators: vec![
255 Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
256 Box::new(IdentityPropagator::new(
257 "copy_f0_to_f1",
258 FieldId(0),
259 FieldId(1),
260 )),
261 ],
262 dt: 0.1,
263 seed: 42,
264 ring_buffer_size: 8,
265 max_ingress_queue: 1024,
266 tick_rate_hz: None,
267 backoff: crate::config::BackoffConfig::default(),
268 }
269 }
270
271 fn square4_config() -> WorldConfig {
273 struct SumPropagator {
278 name: String,
279 input_a: FieldId,
280 input_b: FieldId,
281 output: FieldId,
282 }
283
284 impl SumPropagator {
285 fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
286 Self {
287 name: name.to_string(),
288 input_a: a,
289 input_b: b,
290 output: out,
291 }
292 }
293 }
294
295 impl Propagator for SumPropagator {
296 fn name(&self) -> &str {
297 &self.name
298 }
299 fn reads(&self) -> murk_core::FieldSet {
300 [self.input_a, self.input_b].into_iter().collect()
301 }
302 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
303 vec![(self.output, WriteMode::Full)]
304 }
305 fn step(
306 &self,
307 ctx: &mut murk_propagator::StepContext<'_>,
308 ) -> Result<(), murk_core::PropagatorError> {
309 let a = ctx.reads().read(self.input_a).unwrap().to_vec();
310 let b = ctx.reads().read(self.input_b).unwrap().to_vec();
311 let out = ctx.writes().write(self.output).unwrap();
312 for i in 0..out.len() {
313 out[i] = a[i] + b[i];
314 }
315 Ok(())
316 }
317 }
318
319 WorldConfig {
320 space: Box::new(Square4::new(10, 10, EdgeBehavior::Absorb).unwrap()),
321 fields: vec![
322 scalar_field("field0"),
323 scalar_field("field1"),
324 scalar_field("field2"),
325 ],
326 propagators: vec![
327 Box::new(ConstPropagator::new("write_f0", FieldId(0), 3.0)),
328 Box::new(IdentityPropagator::new(
329 "copy_f0_to_f1",
330 FieldId(0),
331 FieldId(1),
332 )),
333 Box::new(SumPropagator::new(
334 "sum_f0_f1_to_f2",
335 FieldId(0),
336 FieldId(1),
337 FieldId(2),
338 )),
339 ],
340 dt: 0.016,
341 seed: 12345,
342 ring_buffer_size: 8,
343 max_ingress_queue: 1024,
344 tick_rate_hz: None,
345 backoff: crate::config::BackoffConfig::default(),
346 }
347 }
348
349 fn make_cmd(expires: u64) -> Command {
350 Command {
351 payload: CommandPayload::SetField {
352 coord: Coord::from_elem(0, 1),
353 field_id: FieldId(0),
354 value: 1.0,
355 },
356 expires_after_tick: TickId(expires),
357 source_id: None,
358 source_seq: None,
359 priority_class: 1,
360 arrival_seq: 0,
361 }
362 }
363
364 #[test]
367 fn new_creates_world_at_tick_zero() {
368 let world = LockstepWorld::new(simple_config()).unwrap();
369 assert_eq!(world.current_tick(), TickId(0));
370 assert!(!world.is_tick_disabled());
371 assert_eq!(world.seed(), 42);
372 }
373
374 #[test]
375 fn step_sync_advances_tick() {
376 let mut world = LockstepWorld::new(simple_config()).unwrap();
377 let result = world.step_sync(vec![]).unwrap();
378 assert_eq!(result.snapshot.tick_id(), TickId(1));
379 assert_eq!(world.current_tick(), TickId(1));
380 }
381
382 #[test]
383 fn step_sync_returns_correct_snapshot() {
384 let mut world = LockstepWorld::new(simple_config()).unwrap();
385 let result = world.step_sync(vec![]).unwrap();
386 let data = result.snapshot.read(FieldId(0)).unwrap();
387 assert_eq!(data.len(), 10);
388 assert!(data.iter().all(|&v| v == 42.0));
389 }
390
391 #[test]
392 fn step_sync_with_commands_produces_receipts() {
393 let mut world = LockstepWorld::new(simple_config()).unwrap();
394 let result = world.step_sync(vec![make_cmd(100), make_cmd(100)]).unwrap();
395 let applied: Vec<_> = result
396 .receipts
397 .iter()
398 .filter(|r| r.applied_tick_id.is_some())
399 .collect();
400 assert_eq!(applied.len(), 2);
401 assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
402 }
403
404 #[test]
405 fn step_sync_propagator_failure_returns_tick_error() {
406 let config = WorldConfig {
407 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
408 fields: vec![scalar_field("energy")],
409 propagators: vec![Box::new(FailingPropagator::new("fail", FieldId(0), 0))],
410 dt: 0.1,
411 seed: 42,
412 ring_buffer_size: 8,
413 max_ingress_queue: 1024,
414 tick_rate_hz: None,
415 backoff: crate::config::BackoffConfig::default(),
416 };
417 let mut world = LockstepWorld::new(config).unwrap();
418 let result = world.step_sync(vec![]);
419 assert!(result.is_err());
420 }
421
422 #[test]
425 fn step_sync_overlay_visibility() {
426 let mut world = LockstepWorld::new(two_field_config()).unwrap();
427 let result = world.step_sync(vec![]).unwrap();
428 assert_eq!(result.snapshot.read(FieldId(0)).unwrap()[0], 7.0);
430 assert_eq!(result.snapshot.read(FieldId(1)).unwrap()[0], 7.0);
431 }
432
433 #[test]
436 fn reset_returns_to_tick_zero() {
437 let mut world = LockstepWorld::new(simple_config()).unwrap();
438 world.step_sync(vec![]).unwrap();
439 world.step_sync(vec![]).unwrap();
440 assert_eq!(world.current_tick(), TickId(2));
441
442 let snap = world.reset(99).unwrap();
443 assert_eq!(snap.tick_id(), TickId(0));
444 assert_eq!(world.current_tick(), TickId(0));
445 assert_eq!(world.seed(), 99);
446 }
447
448 #[test]
449 fn reset_allows_continued_stepping() {
450 let mut world = LockstepWorld::new(simple_config()).unwrap();
451 world.step_sync(vec![]).unwrap();
452 world.reset(42).unwrap();
453
454 let result = world.step_sync(vec![]).unwrap();
455 assert_eq!(result.snapshot.tick_id(), TickId(1));
456 let data = result.snapshot.read(FieldId(0)).unwrap();
457 assert!(data.iter().all(|&v| v == 42.0));
458 }
459
460 #[test]
463 fn thousand_step_determinism() {
464 let mut world_a = LockstepWorld::new(square4_config()).unwrap();
466 let mut world_b = LockstepWorld::new(square4_config()).unwrap();
467
468 for tick in 1..=1000u64 {
469 let result_a = world_a.step_sync(vec![]).unwrap();
470 let result_b = world_b.step_sync(vec![]).unwrap();
471
472 assert_eq!(
474 result_a.snapshot.tick_id(),
475 result_b.snapshot.tick_id(),
476 "tick ID mismatch at tick {tick}"
477 );
478
479 if tick % 100 == 0 || tick == 1 {
481 for field_idx in 0..3u32 {
482 let field = FieldId(field_idx);
483 let data_a = result_a.snapshot.read(field).unwrap();
484 let data_b = result_b.snapshot.read(field).unwrap();
485 assert_eq!(data_a, data_b, "field {field_idx} mismatch at tick {tick}");
486 }
487 }
488 }
489
490 assert_eq!(world_a.current_tick(), TickId(1000));
491 assert_eq!(world_b.current_tick(), TickId(1000));
492
493 let snap_a = world_a.snapshot();
495 let snap_b = world_b.snapshot();
496 for field_idx in 0..3u32 {
497 let field = FieldId(field_idx);
498 assert_eq!(
499 snap_a.read(field).unwrap(),
500 snap_b.read(field).unwrap(),
501 "final field {field_idx} mismatch"
502 );
503 }
504 }
505
506 #[test]
509 fn memory_bound_tick_1000_approx_tick_10() {
510 let mut world = LockstepWorld::new(square4_config()).unwrap();
511
512 for _ in 0..10 {
514 world.step_sync(vec![]).unwrap();
515 }
516 let mem_10 = world.last_metrics().memory_bytes;
517
518 for _ in 10..1000 {
520 world.step_sync(vec![]).unwrap();
521 }
522 let mem_1000 = world.last_metrics().memory_bytes;
523
524 let ratio = mem_1000 as f64 / mem_10 as f64;
527 assert!(
528 ratio < 1.2,
529 "memory grew {ratio:.2}x from tick 10 ({mem_10}) to tick 1000 ({mem_1000})"
530 );
531 }
532
533 #[test]
536 fn square4_three_propagator_end_to_end() {
537 let mut world = LockstepWorld::new(square4_config()).unwrap();
538 let result = world.step_sync(vec![]).unwrap();
539
540 let snap = &result.snapshot;
541 let f0 = snap.read(FieldId(0)).unwrap();
542 let f1 = snap.read(FieldId(1)).unwrap();
543 let f2 = snap.read(FieldId(2)).unwrap();
544
545 assert_eq!(f0.len(), 100);
547 assert_eq!(f1.len(), 100);
548 assert_eq!(f2.len(), 100);
549
550 assert!(f0.iter().all(|&v| v == 3.0));
552 assert!(f1.iter().all(|&v| v == 3.0));
554 assert!(f2.iter().all(|&v| v == 6.0));
556 }
557
558 #[test]
561 fn debug_impl_doesnt_panic() {
562 let world = LockstepWorld::new(simple_config()).unwrap();
563 let debug = format!("{world:?}");
564 assert!(debug.contains("LockstepWorld"));
565 assert!(debug.contains("current_tick"));
566 }
567
568 #[test]
571 fn snapshot_borrows_from_self() {
572 let mut world = LockstepWorld::new(simple_config()).unwrap();
577 world.step_sync(vec![]).unwrap();
578
579 let snap = world.snapshot();
580 let data = snap.read(FieldId(0)).unwrap();
581 assert_eq!(data[0], 42.0);
582 let _ = snap;
586
587 world.step_sync(vec![]).unwrap();
589 assert_eq!(world.current_tick(), TickId(2));
590 }
591
592 #[test]
595 fn step_sync_surfaces_submission_rejections() {
596 let config = WorldConfig {
598 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
599 fields: vec![scalar_field("energy")],
600 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
601 dt: 0.1,
602 seed: 42,
603 ring_buffer_size: 8,
604 max_ingress_queue: 2,
605 tick_rate_hz: None,
606 backoff: crate::config::BackoffConfig::default(),
607 };
608 let mut world = LockstepWorld::new(config).unwrap();
609
610 let result = world
612 .step_sync(vec![
613 make_cmd(100),
614 make_cmd(100),
615 make_cmd(100),
616 make_cmd(100),
617 ])
618 .unwrap();
619
620 assert_eq!(result.receipts.len(), 4);
622
623 let rejected: Vec<_> = result
624 .receipts
625 .iter()
626 .filter(|r| r.reason_code == Some(murk_core::error::IngressError::QueueFull))
627 .collect();
628 assert_eq!(rejected.len(), 2, "QueueFull rejections must be surfaced");
629 assert_eq!(result.metrics.queue_full_rejections, 2);
630
631 let applied: Vec<_> = result
632 .receipts
633 .iter()
634 .filter(|r| r.applied_tick_id.is_some())
635 .collect();
636 assert_eq!(applied.len(), 2);
637 }
638
639 #[test]
640 fn reset_does_not_update_seed_on_failure() {
641 let mut world = LockstepWorld::new(simple_config()).unwrap();
644 assert_eq!(world.seed(), 42);
645
646 world.reset(99).unwrap();
648 assert_eq!(world.seed(), 99);
649
650 world.reset(7).unwrap();
652 assert_eq!(world.seed(), 7);
653 assert_eq!(world.current_tick(), TickId(0));
654 }
655}