1use std::fmt;
14use std::time::Instant;
15
16use murk_arena::config::ArenaConfig;
17use murk_arena::pingpong::PingPongArena;
18use murk_arena::read::Snapshot;
19use murk_arena::static_arena::StaticArena;
20use murk_core::command::{Command, CommandPayload, Receipt};
21use murk_core::error::{IngressError, StepError};
22use murk_core::id::{FieldId, ParameterVersion, TickId};
23use murk_core::traits::{FieldReader, FieldWriter};
24use murk_core::FieldMutability;
25use murk_propagator::pipeline::{ReadResolutionPlan, ReadSource};
26use murk_propagator::propagator::Propagator;
27use murk_propagator::scratch::ScratchRegion as PropagatorScratch;
28
29use crate::config::{ConfigError, WorldConfig};
30use crate::ingress::IngressQueue;
31use crate::metrics::StepMetrics;
32use crate::overlay::{BaseFieldCache, BaseFieldSet, OverlayReader, StagedFieldCache};
33
34#[derive(Debug)]
38pub struct TickResult {
39 pub receipts: Vec<Receipt>,
41 pub metrics: StepMetrics,
43}
44
45#[derive(Debug, PartialEq, Eq)]
53pub struct TickError {
54 pub kind: StepError,
56 pub receipts: Vec<Receipt>,
58}
59
60impl fmt::Display for TickError {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(f, "{}", self.kind)
63 }
64}
65
66impl std::error::Error for TickError {
67 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
68 Some(&self.kind)
69 }
70}
71
72pub struct TickEngine {
80 arena: PingPongArena,
81 propagators: Vec<Box<dyn Propagator>>,
82 plan: ReadResolutionPlan,
83 ingress: IngressQueue,
84 space: Box<dyn murk_space::Space>,
85 dt: f64,
86 current_tick: TickId,
87 param_version: ParameterVersion,
88 consecutive_rollback_count: u32,
89 tick_disabled: bool,
90 max_consecutive_rollbacks: u32,
91 propagator_scratch: PropagatorScratch,
92 base_field_set: BaseFieldSet,
93 base_cache: BaseFieldCache,
94 staged_cache: StagedFieldCache,
95 last_metrics: StepMetrics,
96}
97
98impl TickEngine {
99 pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
105 config.validate()?;
107 let defined_fields = config.defined_field_set();
108 let plan =
109 murk_propagator::validate_pipeline(&config.propagators, &defined_fields, config.dt)?;
110
111 let arena_field_defs: Vec<(FieldId, murk_core::FieldDef)> = config
114 .fields
115 .iter()
116 .enumerate()
117 .map(|(i, def)| {
118 (
119 FieldId(u32::try_from(i).expect("field count validated")),
120 def.clone(),
121 )
122 })
123 .collect();
124
125 let cell_count = u32::try_from(config.space.cell_count()).expect("cell count validated");
127 let arena_config = ArenaConfig::new(cell_count);
128
129 let static_fields: Vec<(FieldId, u32)> = arena_field_defs
131 .iter()
132 .filter(|(_, d)| d.mutability == FieldMutability::Static)
133 .map(|(id, d)| (*id, cell_count * d.field_type.components()))
134 .collect();
135 let static_arena = StaticArena::new(&static_fields).into_shared();
136
137 let arena = PingPongArena::new(arena_config, arena_field_defs, static_arena)?;
138
139 let base_field_set = BaseFieldSet::from_plan(&plan, &config.propagators);
141
142 let max_scratch = config
144 .propagators
145 .iter()
146 .map(|p| p.scratch_bytes())
147 .max()
148 .unwrap_or(0);
149 let propagator_scratch = PropagatorScratch::with_byte_capacity(max_scratch);
150
151 let ingress = IngressQueue::new(config.max_ingress_queue);
152
153 Ok(Self {
154 arena,
155 propagators: config.propagators,
156 plan,
157 ingress,
158 space: config.space,
159 dt: config.dt,
160 current_tick: TickId(0),
161 param_version: ParameterVersion(0),
162 consecutive_rollback_count: 0,
163 tick_disabled: false,
164 max_consecutive_rollbacks: 3,
165 propagator_scratch,
166 base_field_set,
167 base_cache: BaseFieldCache::new(),
168 staged_cache: StagedFieldCache::new(),
169 last_metrics: StepMetrics::default(),
170 })
171 }
172
173 pub fn submit_commands(&mut self, commands: Vec<Command>) -> Vec<Receipt> {
177 self.ingress.submit(commands, self.tick_disabled)
178 }
179
180 pub fn execute_tick(&mut self) -> Result<TickResult, TickError> {
186 let tick_start = Instant::now();
187
188 if self.tick_disabled {
190 return Err(TickError {
191 kind: StepError::TickDisabled,
192 receipts: Vec::new(),
193 });
194 }
195
196 let next_tick = TickId(self.current_tick.0 + 1);
197
198 {
200 let snapshot = self.arena.snapshot();
201 self.base_cache.populate(&snapshot, &self.base_field_set);
202 }
203
204 let mut guard = self.arena.begin_tick().map_err(|_| TickError {
206 kind: StepError::AllocationFailed,
207 receipts: Vec::new(),
208 })?;
209
210 let cmd_start = Instant::now();
212 let drain = self.ingress.drain(next_tick);
213 let mut receipts = drain.expired_receipts;
214 let commands = drain.commands;
215 let accepted_receipt_start = receipts.len();
216 for dc in &commands {
217 receipts.push(Receipt {
218 accepted: true,
219 applied_tick_id: None,
220 reason_code: None,
221 command_index: dc.command_index,
222 });
223 }
224 for (i, dc) in commands.iter().enumerate() {
226 let receipt = &mut receipts[accepted_receipt_start + i];
227 match &dc.command.payload {
228 CommandPayload::SetField {
229 ref coord,
230 field_id,
231 value,
232 } => {
233 if let Some(rank) = self.space.canonical_rank(coord) {
234 if let Some(buf) = guard.writer.write(*field_id) {
235 if rank < buf.len() {
236 buf[rank] = *value;
237 }
238 }
239 }
240 }
241 CommandPayload::SetParameter { .. }
242 | CommandPayload::SetParameterBatch { .. }
243 | CommandPayload::Move { .. }
244 | CommandPayload::Spawn { .. }
245 | CommandPayload::Despawn { .. }
246 | CommandPayload::Custom { .. } => {
247 receipt.accepted = false;
248 receipt.reason_code = Some(IngressError::UnsupportedCommand);
249 }
250 }
251 }
252 let command_processing_us = cmd_start.elapsed().as_micros() as u64;
253
254 let mut propagator_us = Vec::with_capacity(self.propagators.len());
256 for (i, prop) in self.propagators.iter().enumerate() {
257 let prop_start = Instant::now();
258
259 self.staged_cache.clear();
261 if let Some(routes) = self.plan.routes_for(i) {
262 for (&field, &source) in routes {
263 if let ReadSource::Staged { .. } = source {
264 if let Some(data) = guard.writer.read(field) {
265 self.staged_cache.insert(field, data);
266 }
267 }
268 }
269 }
270
271 let empty_routes = indexmap::IndexMap::new();
273 let routes = self.plan.routes_for(i).unwrap_or(&empty_routes);
274 let overlay = OverlayReader::new(routes, &self.base_cache, &self.staged_cache);
275
276 for field in self.plan.incremental_fields_for(i) {
278 if let Some(prev_data) = self.base_cache.read(field) {
279 let prev: Vec<f32> = prev_data.to_vec();
282 if let Some(write_buf) = guard.writer.write(field) {
283 let copy_len = prev.len().min(write_buf.len());
284 write_buf[..copy_len].copy_from_slice(&prev[..copy_len]);
285 }
286 }
287 }
288
289 self.propagator_scratch.reset();
291
292 {
294 let mut ctx = murk_propagator::StepContext::new(
295 &overlay,
296 &self.base_cache,
297 &mut guard.writer,
298 &mut self.propagator_scratch,
299 self.space.as_ref(),
300 next_tick,
301 self.dt,
302 );
303
304 if let Err(reason) = prop.step(&mut ctx) {
306 let prop_name = prop.name().to_string();
309 return self.handle_rollback(
310 prop_name,
311 reason,
312 receipts,
313 accepted_receipt_start,
314 );
315 }
316 }
317
318 propagator_us.push((
319 prop.name().to_string(),
320 prop_start.elapsed().as_micros() as u64,
321 ));
322 }
323
324 let publish_start = Instant::now();
328 self.arena
329 .publish(next_tick, self.param_version)
330 .map_err(|_| TickError {
331 kind: StepError::AllocationFailed,
332 receipts: vec![],
333 })?;
334 let snapshot_publish_us = publish_start.elapsed().as_micros() as u64;
335
336 self.current_tick = next_tick;
338 self.consecutive_rollback_count = 0;
339
340 for receipt in &mut receipts[accepted_receipt_start..] {
342 if receipt.accepted {
343 receipt.applied_tick_id = Some(next_tick);
344 }
345 }
346
347 let total_us = tick_start.elapsed().as_micros() as u64;
349 let metrics = StepMetrics {
350 total_us,
351 command_processing_us,
352 propagator_us,
353 snapshot_publish_us,
354 memory_bytes: self.arena.memory_bytes(),
355 sparse_retired_ranges: self.arena.sparse_retired_range_count() as u32,
356 sparse_pending_retired: self.arena.sparse_pending_retired_count() as u32,
357 };
358 self.last_metrics = metrics.clone();
359
360 Ok(TickResult { receipts, metrics })
361 }
362
363 fn handle_rollback(
368 &mut self,
369 prop_name: String,
370 reason: murk_core::PropagatorError,
371 mut receipts: Vec<Receipt>,
372 accepted_start: usize,
373 ) -> Result<TickResult, TickError> {
374 self.consecutive_rollback_count += 1;
376 if self.consecutive_rollback_count >= self.max_consecutive_rollbacks {
377 self.tick_disabled = true;
378 }
379
380 for receipt in &mut receipts[accepted_start..] {
384 if receipt.accepted {
385 receipt.applied_tick_id = None;
386 receipt.reason_code = Some(IngressError::TickRollback);
387 }
388 }
389
390 Err(TickError {
391 kind: StepError::PropagatorFailed {
392 name: prop_name,
393 reason,
394 },
395 receipts,
396 })
397 }
398
399 pub fn reset(&mut self) -> Result<(), ConfigError> {
401 self.arena.reset().map_err(ConfigError::Arena)?;
402 self.ingress.clear();
403 self.current_tick = TickId(0);
404 self.param_version = ParameterVersion(0);
405 self.tick_disabled = false;
406 self.consecutive_rollback_count = 0;
407 self.last_metrics = StepMetrics::default();
408 Ok(())
409 }
410
411 pub fn snapshot(&self) -> Snapshot<'_> {
413 self.arena.snapshot()
414 }
415
416 pub fn owned_snapshot(&self) -> murk_arena::OwnedSnapshot {
421 self.arena.owned_snapshot()
422 }
423
424 pub fn current_tick(&self) -> TickId {
426 self.current_tick
427 }
428
429 pub fn is_tick_disabled(&self) -> bool {
431 self.tick_disabled
432 }
433
434 pub fn consecutive_rollback_count(&self) -> u32 {
436 self.consecutive_rollback_count
437 }
438
439 pub fn last_metrics(&self) -> &StepMetrics {
441 &self.last_metrics
442 }
443
444 pub fn space(&self) -> &dyn murk_space::Space {
446 self.space.as_ref()
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use murk_core::command::CommandPayload;
454 use murk_core::id::{Coord, ParameterKey};
455 use murk_core::traits::SnapshotAccess;
456 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
457 use murk_propagator::propagator::WriteMode;
458 use murk_space::{EdgeBehavior, Line1D};
459 use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
460
461 fn scalar_field(name: &str) -> FieldDef {
462 FieldDef {
463 name: name.to_string(),
464 field_type: FieldType::Scalar,
465 mutability: FieldMutability::PerTick,
466 units: None,
467 bounds: None,
468 boundary_behavior: BoundaryBehavior::Clamp,
469 }
470 }
471
472 fn make_cmd(expires: u64) -> Command {
473 Command {
474 payload: CommandPayload::SetParameter {
475 key: ParameterKey(0),
476 value: 0.0,
477 },
478 expires_after_tick: TickId(expires),
479 source_id: None,
480 source_seq: None,
481 priority_class: 1,
482 arrival_seq: 0,
483 }
484 }
485
486 fn simple_engine() -> TickEngine {
487 let config = WorldConfig {
488 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
489 fields: vec![scalar_field("energy")],
490 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
491 dt: 0.1,
492 seed: 42,
493 ring_buffer_size: 8,
494 max_ingress_queue: 1024,
495 tick_rate_hz: None,
496 backoff: crate::config::BackoffConfig::default(),
497 };
498 TickEngine::new(config).unwrap()
499 }
500
501 fn two_field_engine() -> TickEngine {
502 let config = WorldConfig {
503 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
504 fields: vec![scalar_field("field0"), scalar_field("field1")],
505 propagators: vec![
506 Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
507 Box::new(IdentityPropagator::new(
508 "copy_f0_to_f1",
509 FieldId(0),
510 FieldId(1),
511 )),
512 ],
513 dt: 0.1,
514 seed: 42,
515 ring_buffer_size: 8,
516 max_ingress_queue: 1024,
517 tick_rate_hz: None,
518 backoff: crate::config::BackoffConfig::default(),
519 };
520 TickEngine::new(config).unwrap()
521 }
522
523 fn three_field_engine() -> TickEngine {
524 struct SumPropagator {
528 name: String,
529 input_a: FieldId,
530 input_b: FieldId,
531 output: FieldId,
532 }
533
534 impl SumPropagator {
535 fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
536 Self {
537 name: name.to_string(),
538 input_a: a,
539 input_b: b,
540 output: out,
541 }
542 }
543 }
544
545 impl Propagator for SumPropagator {
546 fn name(&self) -> &str {
547 &self.name
548 }
549 fn reads(&self) -> murk_core::FieldSet {
550 [self.input_a, self.input_b].into_iter().collect()
551 }
552 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
553 vec![(self.output, WriteMode::Full)]
554 }
555 fn step(
556 &self,
557 ctx: &mut murk_propagator::StepContext<'_>,
558 ) -> Result<(), murk_core::PropagatorError> {
559 let a = ctx.reads().read(self.input_a).unwrap().to_vec();
560 let b = ctx.reads().read(self.input_b).unwrap().to_vec();
561 let out = ctx.writes().write(self.output).unwrap();
562 for i in 0..out.len() {
563 out[i] = a[i] + b[i];
564 }
565 Ok(())
566 }
567 }
568
569 let config = WorldConfig {
570 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
571 fields: vec![
572 scalar_field("field0"),
573 scalar_field("field1"),
574 scalar_field("field2"),
575 ],
576 propagators: vec![
577 Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
578 Box::new(IdentityPropagator::new(
579 "copy_f0_to_f1",
580 FieldId(0),
581 FieldId(1),
582 )),
583 Box::new(SumPropagator::new(
584 "sum_f0_f1_to_f2",
585 FieldId(0),
586 FieldId(1),
587 FieldId(2),
588 )),
589 ],
590 dt: 0.1,
591 seed: 42,
592 ring_buffer_size: 8,
593 max_ingress_queue: 1024,
594 tick_rate_hz: None,
595 backoff: crate::config::BackoffConfig::default(),
596 };
597 TickEngine::new(config).unwrap()
598 }
599
600 fn failing_engine(succeed_count: usize) -> TickEngine {
601 let config = WorldConfig {
602 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
603 fields: vec![scalar_field("energy")],
604 propagators: vec![Box::new(FailingPropagator::new(
605 "fail",
606 FieldId(0),
607 succeed_count,
608 ))],
609 dt: 0.1,
610 seed: 42,
611 ring_buffer_size: 8,
612 max_ingress_queue: 1024,
613 tick_rate_hz: None,
614 backoff: crate::config::BackoffConfig::default(),
615 };
616 TickEngine::new(config).unwrap()
617 }
618
619 fn partial_failure_engine() -> TickEngine {
620 let config = WorldConfig {
622 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
623 fields: vec![scalar_field("field0"), scalar_field("field1")],
624 propagators: vec![
625 Box::new(ConstPropagator::new("ok_prop", FieldId(0), 1.0)),
626 Box::new(FailingPropagator::new("fail_prop", FieldId(1), 0)),
627 ],
628 dt: 0.1,
629 seed: 42,
630 ring_buffer_size: 8,
631 max_ingress_queue: 1024,
632 tick_rate_hz: None,
633 backoff: crate::config::BackoffConfig::default(),
634 };
635 TickEngine::new(config).unwrap()
636 }
637
638 #[test]
641 fn staged_read_sees_prior_propagator_write() {
642 let mut engine = two_field_engine();
644 let result = engine.execute_tick().unwrap();
645 let snap = engine.snapshot();
646 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
648 assert!(result.metrics.total_us > 0);
649 }
650
651 #[test]
652 fn reads_previous_sees_base_gen() {
653 struct ReadsPrevPropagator;
657 impl Propagator for ReadsPrevPropagator {
658 fn name(&self) -> &str {
659 "reads_prev"
660 }
661 fn reads(&self) -> murk_core::FieldSet {
662 murk_core::FieldSet::empty()
663 }
664 fn reads_previous(&self) -> murk_core::FieldSet {
665 [FieldId(0)].into_iter().collect()
666 }
667 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
668 vec![(FieldId(1), WriteMode::Full)]
669 }
670 fn step(
671 &self,
672 ctx: &mut murk_propagator::StepContext<'_>,
673 ) -> Result<(), murk_core::PropagatorError> {
674 let prev = ctx.reads_previous().read(FieldId(0)).unwrap().to_vec();
675 let out = ctx.writes().write(FieldId(1)).unwrap();
676 out.copy_from_slice(&prev);
677 Ok(())
678 }
679 }
680
681 let config = WorldConfig {
682 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
683 fields: vec![scalar_field("field0"), scalar_field("field1")],
684 propagators: vec![
685 Box::new(ConstPropagator::new("write_f0", FieldId(0), 99.0)),
686 Box::new(ReadsPrevPropagator),
687 ],
688 dt: 0.1,
689 seed: 42,
690 ring_buffer_size: 8,
691 max_ingress_queue: 1024,
692 tick_rate_hz: None,
693 backoff: crate::config::BackoffConfig::default(),
694 };
695 let mut engine = TickEngine::new(config).unwrap();
696
697 engine.execute_tick().unwrap();
700 let snap = engine.snapshot();
701 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 0.0);
703
704 engine.execute_tick().unwrap();
706 let snap = engine.snapshot();
707 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 99.0);
708 }
709
710 #[test]
711 fn three_propagator_overlay_visibility() {
712 let mut engine = three_field_engine();
716 engine.execute_tick().unwrap();
717 let snap = engine.snapshot();
718
719 assert_eq!(snap.read(FieldId(0)).unwrap()[0], 7.0);
720 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
721 assert_eq!(snap.read(FieldId(2)).unwrap()[0], 14.0); }
723
724 #[test]
725 fn unwritten_field_reads_from_base_gen() {
726 let mut engine = three_field_engine();
729 engine.execute_tick().unwrap();
730 let snap = engine.snapshot();
733 let f2 = snap.read(FieldId(2)).unwrap();
734 assert!(f2.iter().all(|&v| v == 14.0));
735 }
736
737 #[test]
740 fn propagator_failure_no_snapshot_published() {
741 let mut engine = failing_engine(0);
742
743 let snap_before = engine.snapshot();
745 let tick_before = snap_before.tick_id();
746
747 let result = engine.execute_tick();
749 assert!(result.is_err());
750
751 let snap_after = engine.snapshot();
753 assert_eq!(snap_after.tick_id(), tick_before);
754 }
755
756 #[test]
757 fn partial_failure_rolls_back_all() {
758 let mut engine = partial_failure_engine();
761
762 let result = engine.execute_tick();
764 assert!(result.is_err());
765
766 let snap = engine.snapshot();
768 let f0 = snap.read(FieldId(0));
769 if let Some(data) = f0 {
772 assert!(
773 data.iter().all(|&v| v == 0.0),
774 "rollback should prevent PropA's writes from being visible"
775 );
776 }
777 }
778
779 #[test]
780 fn rollback_receipts_generated() {
781 let mut engine = failing_engine(0);
782
783 let cmd = Command {
785 payload: CommandPayload::SetField {
786 coord: smallvec::smallvec![0],
787 field_id: FieldId(0),
788 value: 1.0,
789 },
790 expires_after_tick: TickId(100),
791 source_id: None,
792 source_seq: None,
793 priority_class: 1,
794 arrival_seq: 0,
795 };
796 engine.submit_commands(vec![cmd]);
797
798 let result = engine.execute_tick();
799 match result {
800 Err(TickError {
801 kind: StepError::PropagatorFailed { .. },
802 receipts,
803 }) => {
804 assert_eq!(receipts.len(), 1);
806 assert!(receipts[0].accepted);
807 assert_eq!(receipts[0].reason_code, Some(IngressError::TickRollback));
808 }
809 other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
810 }
811 }
812
813 #[test]
814 fn rollback_preserves_rejected_receipts() {
815 let mut engine = failing_engine(0);
820
821 engine.submit_commands(vec![make_cmd(100)]);
823
824 let result = engine.execute_tick();
825 match result {
826 Err(TickError {
827 kind: StepError::PropagatorFailed { .. },
828 receipts,
829 }) => {
830 assert_eq!(receipts.len(), 1);
831 assert!(
834 !receipts[0].accepted,
835 "rejected receipt should stay rejected after rollback"
836 );
837 assert_eq!(
838 receipts[0].reason_code,
839 Some(IngressError::UnsupportedCommand),
840 "rejected receipt must preserve UnsupportedCommand reason after rollback"
841 );
842 }
843 other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
844 }
845 }
846
847 #[test]
850 fn consecutive_rollbacks_disable_ticking() {
851 let mut engine = failing_engine(0);
852
853 for _ in 0..3 {
854 let _ = engine.execute_tick();
855 }
856
857 assert!(engine.is_tick_disabled());
858 assert_eq!(engine.consecutive_rollback_count(), 3);
859 }
860
861 #[test]
862 fn success_resets_rollback_count() {
863 let mut engine = failing_engine(10);
866
867 engine.execute_tick().unwrap();
869 engine.execute_tick().unwrap();
870 assert_eq!(engine.consecutive_rollback_count(), 0);
871 assert_eq!(engine.current_tick(), TickId(2));
872 }
873
874 #[test]
875 fn tick_disabled_rejects_immediately() {
876 let mut engine = failing_engine(0);
877
878 for _ in 0..3 {
880 let _ = engine.execute_tick();
881 }
882 assert!(engine.is_tick_disabled());
883
884 match engine.execute_tick() {
886 Err(TickError {
887 kind: StepError::TickDisabled,
888 ..
889 }) => {}
890 other => panic!("expected TickDisabled, got {other:?}"),
891 }
892 }
893
894 #[test]
895 fn reset_clears_tick_disabled() {
896 let mut engine = failing_engine(0);
897
898 for _ in 0..3 {
899 let _ = engine.execute_tick();
900 }
901 assert!(engine.is_tick_disabled());
902
903 engine.reset().unwrap();
904 assert!(!engine.is_tick_disabled());
905 assert_eq!(engine.current_tick(), TickId(0));
906 assert_eq!(engine.consecutive_rollback_count(), 0);
907 }
908
909 #[test]
912 fn single_tick_end_to_end() {
913 let mut engine = simple_engine();
914 let result = engine.execute_tick().unwrap();
915
916 let snap = engine.snapshot();
917 let data = snap.read(FieldId(0)).unwrap();
918 assert_eq!(data.len(), 10);
919 assert!(data.iter().all(|&v| v == 42.0));
920 assert_eq!(engine.current_tick(), TickId(1));
921 assert!(!result.receipts.is_empty() || result.receipts.is_empty()); }
923
924 #[test]
925 fn multi_tick_determinism() {
926 let mut engine = simple_engine();
927
928 for _ in 0..10 {
929 engine.execute_tick().unwrap();
930 }
931
932 let snap = engine.snapshot();
933 let data = snap.read(FieldId(0)).unwrap();
934 assert!(data.iter().all(|&v| v == 42.0));
935 assert_eq!(engine.current_tick(), TickId(10));
936 }
937
938 #[test]
939 fn commands_flow_through_to_receipts() {
940 let mut engine = simple_engine();
941
942 let coord: Coord = vec![0i32].into();
944 let cmds = vec![
945 Command {
946 payload: CommandPayload::SetField {
947 coord: coord.clone(),
948 field_id: FieldId(0),
949 value: 1.0,
950 },
951 expires_after_tick: TickId(100),
952 source_id: None,
953 source_seq: None,
954 priority_class: 1,
955 arrival_seq: 0,
956 },
957 Command {
958 payload: CommandPayload::SetField {
959 coord: coord.clone(),
960 field_id: FieldId(0),
961 value: 2.0,
962 },
963 expires_after_tick: TickId(100),
964 source_id: None,
965 source_seq: None,
966 priority_class: 1,
967 arrival_seq: 0,
968 },
969 ];
970 let submit_receipts = engine.submit_commands(cmds);
971 assert_eq!(submit_receipts.len(), 2);
972 assert!(submit_receipts.iter().all(|r| r.accepted));
973
974 let result = engine.execute_tick().unwrap();
975 let applied: Vec<_> = result
977 .receipts
978 .iter()
979 .filter(|r| r.applied_tick_id.is_some())
980 .collect();
981 assert_eq!(applied.len(), 2);
982 assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
983 }
984
985 #[test]
986 fn non_setfield_commands_rejected_honestly() {
987 let mut engine = simple_engine();
988
989 let submit_receipts = engine.submit_commands(vec![make_cmd(100), make_cmd(100)]);
991 assert_eq!(submit_receipts.len(), 2);
992 assert!(submit_receipts.iter().all(|r| r.accepted));
993
994 let result = engine.execute_tick().unwrap();
995 assert_eq!(result.receipts.len(), 2);
996
997 for receipt in &result.receipts {
1000 assert!(
1001 !receipt.accepted,
1002 "unimplemented command type must be rejected"
1003 );
1004 assert_eq!(
1005 receipt.applied_tick_id, None,
1006 "unimplemented command must not have applied_tick_id"
1007 );
1008 assert_eq!(
1009 receipt.reason_code,
1010 Some(IngressError::UnsupportedCommand),
1011 "rejected unsupported command must carry UnsupportedCommand reason"
1012 );
1013 }
1014 }
1015
1016 #[test]
1019 fn timing_fields_populated() {
1020 let mut engine = simple_engine();
1021 let result = engine.execute_tick().unwrap();
1022
1023 let _ = result.metrics.total_us;
1025 assert_eq!(result.metrics.propagator_us.len(), 1);
1026 assert_eq!(result.metrics.propagator_us[0].0, "const");
1027 }
1028
1029 #[test]
1030 fn memory_bytes_matches_arena() {
1031 let mut engine = simple_engine();
1032 engine.execute_tick().unwrap();
1033
1034 let metrics = engine.last_metrics();
1035 assert!(metrics.memory_bytes > 0);
1036 }
1037
1038 #[test]
1041 fn reset_clears_pending_ingress() {
1042 let mut engine = simple_engine();
1043
1044 engine.submit_commands(vec![make_cmd(1000), make_cmd(1000)]);
1046
1047 engine.reset().unwrap();
1049
1050 let result = engine.execute_tick().unwrap();
1052 assert!(result.receipts.is_empty());
1053 }
1054
1055 #[test]
1056 fn command_index_preserved_after_reordering() {
1057 let mut engine = simple_engine();
1058
1059 let cmds = vec![
1062 Command {
1063 payload: CommandPayload::SetParameter {
1064 key: ParameterKey(0),
1065 value: 1.0,
1066 },
1067 expires_after_tick: TickId(100),
1068 source_id: None,
1069 source_seq: None,
1070 priority_class: 2, arrival_seq: 0,
1072 },
1073 Command {
1074 payload: CommandPayload::SetParameter {
1075 key: ParameterKey(0),
1076 value: 2.0,
1077 },
1078 expires_after_tick: TickId(100),
1079 source_id: None,
1080 source_seq: None,
1081 priority_class: 0, arrival_seq: 0,
1083 },
1084 ];
1085 engine.submit_commands(cmds);
1086
1087 let result = engine.execute_tick().unwrap();
1088 assert_eq!(result.receipts.len(), 2);
1092 assert_eq!(result.receipts[0].command_index, 1); assert_eq!(result.receipts[1].command_index, 0); }
1095
1096 #[test]
1097 fn writemode_incremental_seeds_from_previous_gen() {
1098 struct IncrementalOnce {
1104 written: std::cell::Cell<bool>,
1105 }
1106 impl IncrementalOnce {
1107 fn new() -> Self {
1108 Self {
1109 written: std::cell::Cell::new(false),
1110 }
1111 }
1112 }
1113 impl Propagator for IncrementalOnce {
1114 fn name(&self) -> &str {
1115 "incr_once"
1116 }
1117 fn reads(&self) -> murk_core::FieldSet {
1118 murk_core::FieldSet::empty()
1119 }
1120 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
1121 vec![(FieldId(0), WriteMode::Incremental)]
1122 }
1123 fn step(
1124 &self,
1125 ctx: &mut murk_propagator::StepContext<'_>,
1126 ) -> Result<(), murk_core::PropagatorError> {
1127 let buf = ctx.writes().write(FieldId(0)).unwrap();
1128 if !self.written.get() {
1129 buf[0] = 42.0;
1131 buf[1] = 99.0;
1132 self.written.set(true);
1133 }
1134 Ok(())
1136 }
1137 }
1138
1139 let config = WorldConfig {
1140 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1141 fields: vec![scalar_field("state")],
1142 propagators: vec![Box::new(IncrementalOnce::new())],
1143 dt: 0.1,
1144 seed: 42,
1145 ring_buffer_size: 8,
1146 max_ingress_queue: 1024,
1147 tick_rate_hz: None,
1148 backoff: crate::config::BackoffConfig::default(),
1149 };
1150 let mut engine = TickEngine::new(config).unwrap();
1151
1152 engine.execute_tick().unwrap();
1154 let snap = engine.snapshot();
1155 assert_eq!(snap.read(FieldId(0)).unwrap()[0], 42.0);
1156 assert_eq!(snap.read(FieldId(0)).unwrap()[1], 99.0);
1157
1158 engine.execute_tick().unwrap();
1160 let snap = engine.snapshot();
1161 assert_eq!(
1162 snap.read(FieldId(0)).unwrap()[0],
1163 42.0,
1164 "BUG-015: incremental field lost data across ticks"
1165 );
1166 assert_eq!(
1167 snap.read(FieldId(0)).unwrap()[1],
1168 99.0,
1169 "BUG-015: incremental field lost data across ticks"
1170 );
1171 assert_eq!(snap.read(FieldId(0)).unwrap()[2], 0.0);
1173
1174 engine.execute_tick().unwrap();
1176 let snap = engine.snapshot();
1177 assert_eq!(snap.read(FieldId(0)).unwrap()[0], 42.0);
1178 assert_eq!(snap.read(FieldId(0)).unwrap()[1], 99.0);
1179 }
1180}