1use murk_arena::read::Snapshot;
24use murk_core::command::{Command, Receipt};
25use murk_core::id::TickId;
26
27use crate::config::{ConfigError, WorldConfig};
28use crate::metrics::StepMetrics;
29use crate::tick::{TickEngine, TickError};
30
31const _: () = {
35 #[allow(dead_code)]
36 fn assert_send<T: Send>() {}
37 #[allow(dead_code)]
38 fn check() {
39 assert_send::<LockstepWorld>();
40 }
41};
42
43pub struct StepResult<'w> {
47 pub snapshot: Snapshot<'w>,
49 pub receipts: Vec<Receipt>,
55 pub metrics: StepMetrics,
57}
58
59pub struct LockstepWorld {
78 engine: TickEngine,
79 seed: u64,
80}
81
82impl LockstepWorld {
83 pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
89 let seed = config.seed;
90 Ok(Self {
91 engine: TickEngine::new(config)?,
92 seed,
93 })
94 }
95
96 pub fn step_sync(&mut self, commands: Vec<Command>) -> Result<StepResult<'_>, TickError> {
112 let submit_receipts = self.engine.submit_commands(commands);
113
114 let rejected: Vec<Receipt> = submit_receipts
117 .into_iter()
118 .filter(|r| !r.accepted)
119 .collect();
120
121 match self.engine.execute_tick() {
122 Ok(tick_result) => {
123 let mut receipts = rejected;
124 receipts.extend(tick_result.receipts);
125 Ok(StepResult {
126 snapshot: self.engine.snapshot(),
127 receipts,
128 metrics: tick_result.metrics,
129 })
130 }
131 Err(mut tick_error) => {
132 let mut receipts = rejected;
133 receipts.append(&mut tick_error.receipts);
134 Err(TickError {
135 kind: tick_error.kind,
136 receipts,
137 })
138 }
139 }
140 }
141
142 pub fn reset(&mut self, seed: u64) -> Result<Snapshot<'_>, ConfigError> {
151 self.engine.reset()?;
152 self.seed = seed;
153 Ok(self.engine.snapshot())
154 }
155
156 pub fn snapshot(&self) -> Snapshot<'_> {
158 self.engine.snapshot()
159 }
160
161 pub fn current_tick(&self) -> TickId {
163 self.engine.current_tick()
164 }
165
166 pub fn is_tick_disabled(&self) -> bool {
168 self.engine.is_tick_disabled()
169 }
170
171 pub fn last_metrics(&self) -> &StepMetrics {
173 self.engine.last_metrics()
174 }
175
176 pub fn seed(&self) -> u64 {
178 self.seed
179 }
180
181 pub fn consecutive_rollback_count(&self) -> u32 {
183 self.engine.consecutive_rollback_count()
184 }
185
186 pub fn space(&self) -> &dyn murk_space::Space {
188 self.engine.space()
189 }
190}
191
192impl std::fmt::Debug for LockstepWorld {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 f.debug_struct("LockstepWorld")
195 .field("current_tick", &self.engine.current_tick())
196 .field("seed", &self.seed)
197 .field("tick_disabled", &self.engine.is_tick_disabled())
198 .finish()
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use murk_core::command::CommandPayload;
206 use murk_core::id::{Coord, FieldId};
207 use murk_core::traits::{FieldReader, SnapshotAccess};
208 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
209 use murk_propagator::propagator::WriteMode;
210 use murk_propagator::Propagator;
211 use murk_space::{EdgeBehavior, Line1D, Square4};
212 use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
213
214 fn scalar_field(name: &str) -> FieldDef {
215 FieldDef {
216 name: name.to_string(),
217 field_type: FieldType::Scalar,
218 mutability: FieldMutability::PerTick,
219 units: None,
220 bounds: None,
221 boundary_behavior: BoundaryBehavior::Clamp,
222 }
223 }
224
225 fn simple_config() -> WorldConfig {
226 WorldConfig {
227 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
228 fields: vec![scalar_field("energy")],
229 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
230 dt: 0.1,
231 seed: 42,
232 ring_buffer_size: 8,
233 max_ingress_queue: 1024,
234 tick_rate_hz: None,
235 backoff: crate::config::BackoffConfig::default(),
236 }
237 }
238
239 fn two_field_config() -> WorldConfig {
241 WorldConfig {
242 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
243 fields: vec![scalar_field("field0"), scalar_field("field1")],
244 propagators: vec![
245 Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
246 Box::new(IdentityPropagator::new(
247 "copy_f0_to_f1",
248 FieldId(0),
249 FieldId(1),
250 )),
251 ],
252 dt: 0.1,
253 seed: 42,
254 ring_buffer_size: 8,
255 max_ingress_queue: 1024,
256 tick_rate_hz: None,
257 backoff: crate::config::BackoffConfig::default(),
258 }
259 }
260
261 fn square4_config() -> WorldConfig {
263 struct SumPropagator {
268 name: String,
269 input_a: FieldId,
270 input_b: FieldId,
271 output: FieldId,
272 }
273
274 impl SumPropagator {
275 fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
276 Self {
277 name: name.to_string(),
278 input_a: a,
279 input_b: b,
280 output: out,
281 }
282 }
283 }
284
285 impl Propagator for SumPropagator {
286 fn name(&self) -> &str {
287 &self.name
288 }
289 fn reads(&self) -> murk_core::FieldSet {
290 [self.input_a, self.input_b].into_iter().collect()
291 }
292 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
293 vec![(self.output, WriteMode::Full)]
294 }
295 fn step(
296 &self,
297 ctx: &mut murk_propagator::StepContext<'_>,
298 ) -> Result<(), murk_core::PropagatorError> {
299 let a = ctx.reads().read(self.input_a).unwrap().to_vec();
300 let b = ctx.reads().read(self.input_b).unwrap().to_vec();
301 let out = ctx.writes().write(self.output).unwrap();
302 for i in 0..out.len() {
303 out[i] = a[i] + b[i];
304 }
305 Ok(())
306 }
307 }
308
309 WorldConfig {
310 space: Box::new(Square4::new(10, 10, EdgeBehavior::Absorb).unwrap()),
311 fields: vec![
312 scalar_field("field0"),
313 scalar_field("field1"),
314 scalar_field("field2"),
315 ],
316 propagators: vec![
317 Box::new(ConstPropagator::new("write_f0", FieldId(0), 3.0)),
318 Box::new(IdentityPropagator::new(
319 "copy_f0_to_f1",
320 FieldId(0),
321 FieldId(1),
322 )),
323 Box::new(SumPropagator::new(
324 "sum_f0_f1_to_f2",
325 FieldId(0),
326 FieldId(1),
327 FieldId(2),
328 )),
329 ],
330 dt: 0.016,
331 seed: 12345,
332 ring_buffer_size: 8,
333 max_ingress_queue: 1024,
334 tick_rate_hz: None,
335 backoff: crate::config::BackoffConfig::default(),
336 }
337 }
338
339 fn make_cmd(expires: u64) -> Command {
340 Command {
341 payload: CommandPayload::SetField {
342 coord: Coord::from_elem(0, 1),
343 field_id: FieldId(0),
344 value: 1.0,
345 },
346 expires_after_tick: TickId(expires),
347 source_id: None,
348 source_seq: None,
349 priority_class: 1,
350 arrival_seq: 0,
351 }
352 }
353
354 #[test]
357 fn new_creates_world_at_tick_zero() {
358 let world = LockstepWorld::new(simple_config()).unwrap();
359 assert_eq!(world.current_tick(), TickId(0));
360 assert!(!world.is_tick_disabled());
361 assert_eq!(world.seed(), 42);
362 }
363
364 #[test]
365 fn step_sync_advances_tick() {
366 let mut world = LockstepWorld::new(simple_config()).unwrap();
367 let result = world.step_sync(vec![]).unwrap();
368 assert_eq!(result.snapshot.tick_id(), TickId(1));
369 assert_eq!(world.current_tick(), TickId(1));
370 }
371
372 #[test]
373 fn step_sync_returns_correct_snapshot() {
374 let mut world = LockstepWorld::new(simple_config()).unwrap();
375 let result = world.step_sync(vec![]).unwrap();
376 let data = result.snapshot.read(FieldId(0)).unwrap();
377 assert_eq!(data.len(), 10);
378 assert!(data.iter().all(|&v| v == 42.0));
379 }
380
381 #[test]
382 fn step_sync_with_commands_produces_receipts() {
383 let mut world = LockstepWorld::new(simple_config()).unwrap();
384 let result = world.step_sync(vec![make_cmd(100), make_cmd(100)]).unwrap();
385 let applied: Vec<_> = result
386 .receipts
387 .iter()
388 .filter(|r| r.applied_tick_id.is_some())
389 .collect();
390 assert_eq!(applied.len(), 2);
391 assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
392 }
393
394 #[test]
395 fn step_sync_propagator_failure_returns_tick_error() {
396 let config = WorldConfig {
397 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
398 fields: vec![scalar_field("energy")],
399 propagators: vec![Box::new(FailingPropagator::new("fail", FieldId(0), 0))],
400 dt: 0.1,
401 seed: 42,
402 ring_buffer_size: 8,
403 max_ingress_queue: 1024,
404 tick_rate_hz: None,
405 backoff: crate::config::BackoffConfig::default(),
406 };
407 let mut world = LockstepWorld::new(config).unwrap();
408 let result = world.step_sync(vec![]);
409 assert!(result.is_err());
410 }
411
412 #[test]
415 fn step_sync_overlay_visibility() {
416 let mut world = LockstepWorld::new(two_field_config()).unwrap();
417 let result = world.step_sync(vec![]).unwrap();
418 assert_eq!(result.snapshot.read(FieldId(0)).unwrap()[0], 7.0);
420 assert_eq!(result.snapshot.read(FieldId(1)).unwrap()[0], 7.0);
421 }
422
423 #[test]
426 fn reset_returns_to_tick_zero() {
427 let mut world = LockstepWorld::new(simple_config()).unwrap();
428 world.step_sync(vec![]).unwrap();
429 world.step_sync(vec![]).unwrap();
430 assert_eq!(world.current_tick(), TickId(2));
431
432 let snap = world.reset(99).unwrap();
433 assert_eq!(snap.tick_id(), TickId(0));
434 assert_eq!(world.current_tick(), TickId(0));
435 assert_eq!(world.seed(), 99);
436 }
437
438 #[test]
439 fn reset_allows_continued_stepping() {
440 let mut world = LockstepWorld::new(simple_config()).unwrap();
441 world.step_sync(vec![]).unwrap();
442 world.reset(42).unwrap();
443
444 let result = world.step_sync(vec![]).unwrap();
445 assert_eq!(result.snapshot.tick_id(), TickId(1));
446 let data = result.snapshot.read(FieldId(0)).unwrap();
447 assert!(data.iter().all(|&v| v == 42.0));
448 }
449
450 #[test]
453 fn thousand_step_determinism() {
454 let mut world_a = LockstepWorld::new(square4_config()).unwrap();
456 let mut world_b = LockstepWorld::new(square4_config()).unwrap();
457
458 for tick in 1..=1000u64 {
459 let result_a = world_a.step_sync(vec![]).unwrap();
460 let result_b = world_b.step_sync(vec![]).unwrap();
461
462 assert_eq!(
464 result_a.snapshot.tick_id(),
465 result_b.snapshot.tick_id(),
466 "tick ID mismatch at tick {tick}"
467 );
468
469 if tick % 100 == 0 || tick == 1 {
471 for field_idx in 0..3u32 {
472 let field = FieldId(field_idx);
473 let data_a = result_a.snapshot.read(field).unwrap();
474 let data_b = result_b.snapshot.read(field).unwrap();
475 assert_eq!(data_a, data_b, "field {field_idx} mismatch at tick {tick}");
476 }
477 }
478 }
479
480 assert_eq!(world_a.current_tick(), TickId(1000));
481 assert_eq!(world_b.current_tick(), TickId(1000));
482
483 let snap_a = world_a.snapshot();
485 let snap_b = world_b.snapshot();
486 for field_idx in 0..3u32 {
487 let field = FieldId(field_idx);
488 assert_eq!(
489 snap_a.read(field).unwrap(),
490 snap_b.read(field).unwrap(),
491 "final field {field_idx} mismatch"
492 );
493 }
494 }
495
496 #[test]
499 fn memory_bound_tick_1000_approx_tick_10() {
500 let mut world = LockstepWorld::new(square4_config()).unwrap();
501
502 for _ in 0..10 {
504 world.step_sync(vec![]).unwrap();
505 }
506 let mem_10 = world.last_metrics().memory_bytes;
507
508 for _ in 10..1000 {
510 world.step_sync(vec![]).unwrap();
511 }
512 let mem_1000 = world.last_metrics().memory_bytes;
513
514 let ratio = mem_1000 as f64 / mem_10 as f64;
517 assert!(
518 ratio < 1.2,
519 "memory grew {ratio:.2}x from tick 10 ({mem_10}) to tick 1000 ({mem_1000})"
520 );
521 }
522
523 #[test]
526 fn square4_three_propagator_end_to_end() {
527 let mut world = LockstepWorld::new(square4_config()).unwrap();
528 let result = world.step_sync(vec![]).unwrap();
529
530 let snap = &result.snapshot;
531 let f0 = snap.read(FieldId(0)).unwrap();
532 let f1 = snap.read(FieldId(1)).unwrap();
533 let f2 = snap.read(FieldId(2)).unwrap();
534
535 assert_eq!(f0.len(), 100);
537 assert_eq!(f1.len(), 100);
538 assert_eq!(f2.len(), 100);
539
540 assert!(f0.iter().all(|&v| v == 3.0));
542 assert!(f1.iter().all(|&v| v == 3.0));
544 assert!(f2.iter().all(|&v| v == 6.0));
546 }
547
548 #[test]
551 fn debug_impl_doesnt_panic() {
552 let world = LockstepWorld::new(simple_config()).unwrap();
553 let debug = format!("{world:?}");
554 assert!(debug.contains("LockstepWorld"));
555 assert!(debug.contains("current_tick"));
556 }
557
558 #[test]
561 fn snapshot_borrows_from_self() {
562 let mut world = LockstepWorld::new(simple_config()).unwrap();
567 world.step_sync(vec![]).unwrap();
568
569 let snap = world.snapshot();
570 let data = snap.read(FieldId(0)).unwrap();
571 assert_eq!(data[0], 42.0);
572 let _ = snap;
576
577 world.step_sync(vec![]).unwrap();
579 assert_eq!(world.current_tick(), TickId(2));
580 }
581
582 #[test]
585 fn step_sync_surfaces_submission_rejections() {
586 let config = WorldConfig {
588 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
589 fields: vec![scalar_field("energy")],
590 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
591 dt: 0.1,
592 seed: 42,
593 ring_buffer_size: 8,
594 max_ingress_queue: 2,
595 tick_rate_hz: None,
596 backoff: crate::config::BackoffConfig::default(),
597 };
598 let mut world = LockstepWorld::new(config).unwrap();
599
600 let result = world
602 .step_sync(vec![
603 make_cmd(100),
604 make_cmd(100),
605 make_cmd(100),
606 make_cmd(100),
607 ])
608 .unwrap();
609
610 assert_eq!(result.receipts.len(), 4);
612
613 let rejected: Vec<_> = result
614 .receipts
615 .iter()
616 .filter(|r| r.reason_code == Some(murk_core::error::IngressError::QueueFull))
617 .collect();
618 assert_eq!(rejected.len(), 2, "QueueFull rejections must be surfaced");
619
620 let applied: Vec<_> = result
621 .receipts
622 .iter()
623 .filter(|r| r.applied_tick_id.is_some())
624 .collect();
625 assert_eq!(applied.len(), 2);
626 }
627
628 #[test]
629 fn reset_does_not_update_seed_on_failure() {
630 let mut world = LockstepWorld::new(simple_config()).unwrap();
633 assert_eq!(world.seed(), 42);
634
635 world.reset(99).unwrap();
637 assert_eq!(world.seed(), 99);
638
639 world.reset(7).unwrap();
641 assert_eq!(world.seed(), 7);
642 assert_eq!(world.current_tick(), TickId(0));
643 }
644}