1use murk_arena::read::Snapshot;
24use murk_core::command::{Command, Receipt};
25use murk_core::id::TickId;
26
27use crate::config::{ConfigError, WorldConfig};
28use crate::metrics::StepMetrics;
29use crate::tick::{TickEngine, TickError};
30
31const _: () = {
35 #[allow(dead_code)]
36 fn assert_send<T: Send>() {}
37 #[allow(dead_code)]
38 fn check() {
39 assert_send::<LockstepWorld>();
40 }
41};
42
43pub struct StepResult<'w> {
47 pub snapshot: Snapshot<'w>,
49 pub receipts: Vec<Receipt>,
56 pub metrics: StepMetrics,
58}
59
60pub struct LockstepWorld {
79 engine: TickEngine,
80 seed: u64,
81}
82
83impl LockstepWorld {
84 pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
90 let seed = config.seed;
91 Ok(Self {
92 engine: TickEngine::new(config)?,
93 seed,
94 })
95 }
96
97 pub fn step_sync(&mut self, commands: Vec<Command>) -> Result<StepResult<'_>, TickError> {
113 let submit_receipts = self.engine.submit_commands(commands);
114
115 let rejected: Vec<Receipt> = submit_receipts
118 .into_iter()
119 .filter(|r| !r.accepted)
120 .collect();
121
122 match self.engine.execute_tick() {
123 Ok(tick_result) => {
124 let mut receipts = rejected;
125 receipts.extend(tick_result.receipts);
126 Ok(StepResult {
127 snapshot: self.engine.snapshot(),
128 receipts,
129 metrics: tick_result.metrics,
130 })
131 }
132 Err(mut tick_error) => {
133 let mut receipts = rejected;
134 receipts.append(&mut tick_error.receipts);
135 Err(TickError {
136 kind: tick_error.kind,
137 receipts,
138 })
139 }
140 }
141 }
142
143 pub fn reset(&mut self, seed: u64) -> Result<Snapshot<'_>, ConfigError> {
152 self.engine.reset()?;
153 self.seed = seed;
154 Ok(self.engine.snapshot())
155 }
156
157 pub fn snapshot(&self) -> Snapshot<'_> {
159 self.engine.snapshot()
160 }
161
162 pub fn current_tick(&self) -> TickId {
164 self.engine.current_tick()
165 }
166
167 pub fn is_tick_disabled(&self) -> bool {
169 self.engine.is_tick_disabled()
170 }
171
172 pub fn last_metrics(&self) -> &StepMetrics {
174 self.engine.last_metrics()
175 }
176
177 pub fn seed(&self) -> u64 {
179 self.seed
180 }
181
182 pub fn consecutive_rollback_count(&self) -> u32 {
184 self.engine.consecutive_rollback_count()
185 }
186
187 pub fn space(&self) -> &dyn murk_space::Space {
189 self.engine.space()
190 }
191}
192
193impl std::fmt::Debug for LockstepWorld {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 f.debug_struct("LockstepWorld")
196 .field("current_tick", &self.engine.current_tick())
197 .field("seed", &self.seed)
198 .field("tick_disabled", &self.engine.is_tick_disabled())
199 .finish()
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use murk_core::command::CommandPayload;
207 use murk_core::id::{FieldId, ParameterKey};
208 use murk_core::traits::{FieldReader, SnapshotAccess};
209 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
210 use murk_propagator::propagator::WriteMode;
211 use murk_propagator::Propagator;
212 use murk_space::{EdgeBehavior, Line1D, Square4};
213 use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
214
215 fn scalar_field(name: &str) -> FieldDef {
216 FieldDef {
217 name: name.to_string(),
218 field_type: FieldType::Scalar,
219 mutability: FieldMutability::PerTick,
220 units: None,
221 bounds: None,
222 boundary_behavior: BoundaryBehavior::Clamp,
223 }
224 }
225
226 fn simple_config() -> WorldConfig {
227 WorldConfig {
228 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
229 fields: vec![scalar_field("energy")],
230 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
231 dt: 0.1,
232 seed: 42,
233 ring_buffer_size: 8,
234 max_ingress_queue: 1024,
235 tick_rate_hz: None,
236 backoff: crate::config::BackoffConfig::default(),
237 }
238 }
239
240 fn two_field_config() -> WorldConfig {
242 WorldConfig {
243 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
244 fields: vec![scalar_field("field0"), scalar_field("field1")],
245 propagators: vec![
246 Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
247 Box::new(IdentityPropagator::new(
248 "copy_f0_to_f1",
249 FieldId(0),
250 FieldId(1),
251 )),
252 ],
253 dt: 0.1,
254 seed: 42,
255 ring_buffer_size: 8,
256 max_ingress_queue: 1024,
257 tick_rate_hz: None,
258 backoff: crate::config::BackoffConfig::default(),
259 }
260 }
261
262 fn square4_config() -> WorldConfig {
264 struct SumPropagator {
269 name: String,
270 input_a: FieldId,
271 input_b: FieldId,
272 output: FieldId,
273 }
274
275 impl SumPropagator {
276 fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
277 Self {
278 name: name.to_string(),
279 input_a: a,
280 input_b: b,
281 output: out,
282 }
283 }
284 }
285
286 impl Propagator for SumPropagator {
287 fn name(&self) -> &str {
288 &self.name
289 }
290 fn reads(&self) -> murk_core::FieldSet {
291 [self.input_a, self.input_b].into_iter().collect()
292 }
293 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
294 vec![(self.output, WriteMode::Full)]
295 }
296 fn step(
297 &self,
298 ctx: &mut murk_propagator::StepContext<'_>,
299 ) -> Result<(), murk_core::PropagatorError> {
300 let a = ctx.reads().read(self.input_a).unwrap().to_vec();
301 let b = ctx.reads().read(self.input_b).unwrap().to_vec();
302 let out = ctx.writes().write(self.output).unwrap();
303 for i in 0..out.len() {
304 out[i] = a[i] + b[i];
305 }
306 Ok(())
307 }
308 }
309
310 WorldConfig {
311 space: Box::new(Square4::new(10, 10, EdgeBehavior::Absorb).unwrap()),
312 fields: vec![
313 scalar_field("field0"),
314 scalar_field("field1"),
315 scalar_field("field2"),
316 ],
317 propagators: vec![
318 Box::new(ConstPropagator::new("write_f0", FieldId(0), 3.0)),
319 Box::new(IdentityPropagator::new(
320 "copy_f0_to_f1",
321 FieldId(0),
322 FieldId(1),
323 )),
324 Box::new(SumPropagator::new(
325 "sum_f0_f1_to_f2",
326 FieldId(0),
327 FieldId(1),
328 FieldId(2),
329 )),
330 ],
331 dt: 0.016,
332 seed: 12345,
333 ring_buffer_size: 8,
334 max_ingress_queue: 1024,
335 tick_rate_hz: None,
336 backoff: crate::config::BackoffConfig::default(),
337 }
338 }
339
340 fn make_cmd(expires: u64) -> Command {
341 Command {
342 payload: CommandPayload::SetParameter {
343 key: ParameterKey(0),
344 value: 0.0,
345 },
346 expires_after_tick: TickId(expires),
347 source_id: None,
348 source_seq: None,
349 priority_class: 1,
350 arrival_seq: 0,
351 }
352 }
353
354 #[test]
357 fn new_creates_world_at_tick_zero() {
358 let world = LockstepWorld::new(simple_config()).unwrap();
359 assert_eq!(world.current_tick(), TickId(0));
360 assert!(!world.is_tick_disabled());
361 assert_eq!(world.seed(), 42);
362 }
363
364 #[test]
365 fn step_sync_advances_tick() {
366 let mut world = LockstepWorld::new(simple_config()).unwrap();
367 let result = world.step_sync(vec![]).unwrap();
368 assert_eq!(result.snapshot.tick_id(), TickId(1));
369 assert_eq!(world.current_tick(), TickId(1));
370 }
371
372 #[test]
373 fn step_sync_returns_correct_snapshot() {
374 let mut world = LockstepWorld::new(simple_config()).unwrap();
375 let result = world.step_sync(vec![]).unwrap();
376 let data = result.snapshot.read(FieldId(0)).unwrap();
377 assert_eq!(data.len(), 10);
378 assert!(data.iter().all(|&v| v == 42.0));
379 }
380
381 #[test]
382 fn step_sync_with_commands_produces_receipts() {
383 let mut world = LockstepWorld::new(simple_config()).unwrap();
384 let result = world.step_sync(vec![make_cmd(100), make_cmd(100)]).unwrap();
385 let applied: Vec<_> = result
386 .receipts
387 .iter()
388 .filter(|r| r.applied_tick_id.is_some())
389 .collect();
390 assert_eq!(applied.len(), 2);
391 assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
392 }
393
394 #[test]
395 fn step_sync_propagator_failure_returns_tick_error() {
396 let config = WorldConfig {
397 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
398 fields: vec![scalar_field("energy")],
399 propagators: vec![Box::new(FailingPropagator::new("fail", FieldId(0), 0))],
400 dt: 0.1,
401 seed: 42,
402 ring_buffer_size: 8,
403 max_ingress_queue: 1024,
404 tick_rate_hz: None,
405 backoff: crate::config::BackoffConfig::default(),
406 };
407 let mut world = LockstepWorld::new(config).unwrap();
408 let result = world.step_sync(vec![]);
409 assert!(result.is_err());
410 }
411
412 #[test]
415 fn step_sync_overlay_visibility() {
416 let mut world = LockstepWorld::new(two_field_config()).unwrap();
417 let result = world.step_sync(vec![]).unwrap();
418 assert_eq!(result.snapshot.read(FieldId(0)).unwrap()[0], 7.0);
420 assert_eq!(result.snapshot.read(FieldId(1)).unwrap()[0], 7.0);
421 }
422
423 #[test]
426 fn reset_returns_to_tick_zero() {
427 let mut world = LockstepWorld::new(simple_config()).unwrap();
428 world.step_sync(vec![]).unwrap();
429 world.step_sync(vec![]).unwrap();
430 assert_eq!(world.current_tick(), TickId(2));
431
432 let snap = world.reset(99).unwrap();
433 assert_eq!(snap.tick_id(), TickId(0));
434 assert_eq!(world.current_tick(), TickId(0));
435 assert_eq!(world.seed(), 99);
436 }
437
438 #[test]
439 fn reset_allows_continued_stepping() {
440 let mut world = LockstepWorld::new(simple_config()).unwrap();
441 world.step_sync(vec![]).unwrap();
442 world.reset(42).unwrap();
443
444 let result = world.step_sync(vec![]).unwrap();
445 assert_eq!(result.snapshot.tick_id(), TickId(1));
446 let data = result.snapshot.read(FieldId(0)).unwrap();
447 assert!(data.iter().all(|&v| v == 42.0));
448 }
449
450 #[test]
453 fn thousand_step_determinism() {
454 let mut world_a = LockstepWorld::new(square4_config()).unwrap();
456 let mut world_b = LockstepWorld::new(square4_config()).unwrap();
457
458 for tick in 1..=1000u64 {
459 let result_a = world_a.step_sync(vec![]).unwrap();
460 let result_b = world_b.step_sync(vec![]).unwrap();
461
462 assert_eq!(
464 result_a.snapshot.tick_id(),
465 result_b.snapshot.tick_id(),
466 "tick ID mismatch at tick {tick}"
467 );
468
469 if tick % 100 == 0 || tick == 1 {
471 for field_idx in 0..3u32 {
472 let field = FieldId(field_idx);
473 let data_a = result_a.snapshot.read(field).unwrap();
474 let data_b = result_b.snapshot.read(field).unwrap();
475 assert_eq!(data_a, data_b, "field {field_idx} mismatch at tick {tick}");
476 }
477 }
478 }
479
480 assert_eq!(world_a.current_tick(), TickId(1000));
481 assert_eq!(world_b.current_tick(), TickId(1000));
482
483 let snap_a = world_a.snapshot();
485 let snap_b = world_b.snapshot();
486 for field_idx in 0..3u32 {
487 let field = FieldId(field_idx);
488 assert_eq!(
489 snap_a.read(field).unwrap(),
490 snap_b.read(field).unwrap(),
491 "final field {field_idx} mismatch"
492 );
493 }
494 }
495
496 #[test]
499 fn memory_bound_tick_1000_approx_tick_10() {
500 let mut world = LockstepWorld::new(square4_config()).unwrap();
501
502 for _ in 0..10 {
504 world.step_sync(vec![]).unwrap();
505 }
506 let mem_10 = world.last_metrics().memory_bytes;
507
508 for _ in 10..1000 {
510 world.step_sync(vec![]).unwrap();
511 }
512 let mem_1000 = world.last_metrics().memory_bytes;
513
514 let ratio = mem_1000 as f64 / mem_10 as f64;
517 assert!(
518 ratio < 1.2,
519 "memory grew {ratio:.2}x from tick 10 ({mem_10}) to tick 1000 ({mem_1000})"
520 );
521 }
522
523 #[test]
526 fn square4_three_propagator_end_to_end() {
527 let mut world = LockstepWorld::new(square4_config()).unwrap();
528 let result = world.step_sync(vec![]).unwrap();
529
530 let snap = &result.snapshot;
531 let f0 = snap.read(FieldId(0)).unwrap();
532 let f1 = snap.read(FieldId(1)).unwrap();
533 let f2 = snap.read(FieldId(2)).unwrap();
534
535 assert_eq!(f0.len(), 100);
537 assert_eq!(f1.len(), 100);
538 assert_eq!(f2.len(), 100);
539
540 assert!(f0.iter().all(|&v| v == 3.0));
542 assert!(f1.iter().all(|&v| v == 3.0));
544 assert!(f2.iter().all(|&v| v == 6.0));
546 }
547
548 #[test]
551 fn debug_impl_doesnt_panic() {
552 let world = LockstepWorld::new(simple_config()).unwrap();
553 let debug = format!("{world:?}");
554 assert!(debug.contains("LockstepWorld"));
555 assert!(debug.contains("current_tick"));
556 }
557
558 #[test]
561 fn snapshot_borrows_from_self() {
562 let mut world = LockstepWorld::new(simple_config()).unwrap();
567 world.step_sync(vec![]).unwrap();
568
569 let snap = world.snapshot();
570 let data = snap.read(FieldId(0)).unwrap();
571 assert_eq!(data[0], 42.0);
572 let _ = snap;
576
577 world.step_sync(vec![]).unwrap();
579 assert_eq!(world.current_tick(), TickId(2));
580 }
581
582 #[test]
585 fn step_sync_surfaces_submission_rejections() {
586 let config = WorldConfig {
588 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
589 fields: vec![scalar_field("energy")],
590 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
591 dt: 0.1,
592 seed: 42,
593 ring_buffer_size: 8,
594 max_ingress_queue: 2,
595 tick_rate_hz: None,
596 backoff: crate::config::BackoffConfig::default(),
597 };
598 let mut world = LockstepWorld::new(config).unwrap();
599
600 let result = world
602 .step_sync(vec![
603 make_cmd(100),
604 make_cmd(100),
605 make_cmd(100),
606 make_cmd(100),
607 ])
608 .unwrap();
609
610 assert_eq!(result.receipts.len(), 4);
612
613 let rejected: Vec<_> = result
614 .receipts
615 .iter()
616 .filter(|r| r.reason_code == Some(murk_core::error::IngressError::QueueFull))
617 .collect();
618 assert_eq!(rejected.len(), 2, "QueueFull rejections must be surfaced");
619
620 let applied: Vec<_> = result
621 .receipts
622 .iter()
623 .filter(|r| r.applied_tick_id.is_some())
624 .collect();
625 assert_eq!(applied.len(), 2);
626 }
627
628 #[test]
629 fn reset_does_not_update_seed_on_failure() {
630 let mut world = LockstepWorld::new(simple_config()).unwrap();
633 assert_eq!(world.seed(), 42);
634
635 world.reset(99).unwrap();
637 assert_eq!(world.seed(), 99);
638
639 world.reset(7).unwrap();
641 assert_eq!(world.seed(), 7);
642 assert_eq!(world.current_tick(), TickId(0));
643 }
644}