1use std::fmt;
14use std::time::Instant;
15
16use murk_arena::config::ArenaConfig;
17use murk_arena::pingpong::PingPongArena;
18use murk_arena::read::Snapshot;
19use murk_arena::static_arena::StaticArena;
20use murk_core::command::{Command, CommandPayload, Receipt};
21use murk_core::error::{IngressError, StepError};
22use murk_core::id::{FieldId, ParameterVersion, TickId};
23use murk_core::traits::{FieldReader, FieldWriter};
24use murk_core::FieldMutability;
25use murk_propagator::pipeline::{ReadResolutionPlan, ReadSource};
26use murk_propagator::propagator::Propagator;
27use murk_propagator::scratch::ScratchRegion as PropagatorScratch;
28
29use crate::config::{ConfigError, WorldConfig};
30use crate::ingress::IngressQueue;
31use crate::metrics::StepMetrics;
32use crate::overlay::{BaseFieldCache, BaseFieldSet, OverlayReader, StagedFieldCache};
33
34#[derive(Debug)]
38pub struct TickResult {
39 pub receipts: Vec<Receipt>,
41 pub metrics: StepMetrics,
43}
44
45#[derive(Debug, PartialEq, Eq)]
53pub struct TickError {
54 pub kind: StepError,
56 pub receipts: Vec<Receipt>,
58}
59
60impl fmt::Display for TickError {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(f, "{}", self.kind)
63 }
64}
65
66impl std::error::Error for TickError {
67 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
68 Some(&self.kind)
69 }
70}
71
72pub struct TickEngine {
80 arena: PingPongArena,
81 propagators: Vec<Box<dyn Propagator>>,
82 plan: ReadResolutionPlan,
83 ingress: IngressQueue,
84 space: Box<dyn murk_space::Space>,
85 dt: f64,
86 current_tick: TickId,
87 param_version: ParameterVersion,
88 consecutive_rollback_count: u32,
89 tick_disabled: bool,
90 max_consecutive_rollbacks: u32,
91 total_queue_full_rejections: u64,
92 total_tick_disabled_rejections: u64,
93 total_rollback_events: u64,
94 total_tick_disabled_transitions: u64,
95 total_worker_stall_events: u64,
96 total_ring_not_available_events: u64,
97 total_ring_eviction_events: u64,
98 total_ring_stale_read_events: u64,
99 total_ring_skew_retry_events: u64,
100 propagator_scratch: PropagatorScratch,
101 base_field_set: BaseFieldSet,
102 base_cache: BaseFieldCache,
103 staged_cache: StagedFieldCache,
104 last_metrics: StepMetrics,
105}
106
107impl TickEngine {
108 pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
114 config.validate()?;
116 let defined_fields = config.defined_field_set();
117 let plan = murk_propagator::validate_pipeline(
118 &config.propagators,
119 &defined_fields,
120 config.dt,
121 &*config.space,
122 )?;
123
124 let arena_field_defs: Vec<(FieldId, murk_core::FieldDef)> = config
127 .fields
128 .iter()
129 .enumerate()
130 .map(|(i, def)| {
131 (
132 FieldId(u32::try_from(i).expect("field count validated")),
133 def.clone(),
134 )
135 })
136 .collect();
137
138 let cell_count = u32::try_from(config.space.cell_count()).expect("cell count validated");
140 let arena_config = ArenaConfig::new(cell_count);
141
142 let static_fields: Vec<(FieldId, u32)> = arena_field_defs
144 .iter()
145 .filter(|(_, d)| d.mutability == FieldMutability::Static)
146 .map(|(id, d)| (*id, cell_count * d.field_type.components()))
147 .collect();
148 let static_arena = StaticArena::new(&static_fields).into_shared();
149
150 let arena = PingPongArena::new(arena_config, arena_field_defs, static_arena)?;
151
152 let base_field_set = BaseFieldSet::from_plan(&plan, &config.propagators);
154
155 let max_scratch = config
157 .propagators
158 .iter()
159 .map(|p| p.scratch_bytes())
160 .max()
161 .unwrap_or(0);
162 let propagator_scratch = PropagatorScratch::with_byte_capacity(max_scratch);
163
164 let ingress = IngressQueue::new(config.max_ingress_queue);
165
166 Ok(Self {
167 arena,
168 propagators: config.propagators,
169 plan,
170 ingress,
171 space: config.space,
172 dt: config.dt,
173 current_tick: TickId(0),
174 param_version: ParameterVersion(0),
175 consecutive_rollback_count: 0,
176 tick_disabled: false,
177 max_consecutive_rollbacks: 3,
178 total_queue_full_rejections: 0,
179 total_tick_disabled_rejections: 0,
180 total_rollback_events: 0,
181 total_tick_disabled_transitions: 0,
182 total_worker_stall_events: 0,
183 total_ring_not_available_events: 0,
184 total_ring_eviction_events: 0,
185 total_ring_stale_read_events: 0,
186 total_ring_skew_retry_events: 0,
187 propagator_scratch,
188 base_field_set,
189 base_cache: BaseFieldCache::new(),
190 staged_cache: StagedFieldCache::new(),
191 last_metrics: StepMetrics::default(),
192 })
193 }
194
195 pub fn submit_commands(&mut self, commands: Vec<Command>) -> Vec<Receipt> {
199 let receipts = self.ingress.submit(commands, self.tick_disabled);
200 for receipt in &receipts {
201 match receipt.reason_code {
202 Some(IngressError::QueueFull) => {
203 self.total_queue_full_rejections =
204 self.total_queue_full_rejections.saturating_add(1);
205 }
206 Some(IngressError::TickDisabled) => {
207 self.total_tick_disabled_rejections =
208 self.total_tick_disabled_rejections.saturating_add(1);
209 }
210 _ => {}
211 }
212 }
213 self.refresh_counter_metrics();
214 receipts
215 }
216
217 pub fn execute_tick(&mut self) -> Result<TickResult, TickError> {
223 let tick_start = Instant::now();
224
225 if self.tick_disabled {
227 return Err(TickError {
228 kind: StepError::TickDisabled,
229 receipts: Vec::new(),
230 });
231 }
232
233 let next_tick = TickId(self.current_tick.0 + 1);
234
235 {
237 let snapshot = self.arena.snapshot();
238 self.base_cache.populate(&snapshot, &self.base_field_set);
239 }
240
241 let mut guard = self.arena.begin_tick().map_err(|_| TickError {
243 kind: StepError::AllocationFailed,
244 receipts: Vec::new(),
245 })?;
246
247 let cmd_start = Instant::now();
249 let drain = self.ingress.drain(next_tick);
250 let mut receipts = drain.expired_receipts;
251 let commands = drain.commands;
252 let accepted_receipt_start = receipts.len();
253 for dc in &commands {
254 receipts.push(Receipt {
255 accepted: true,
256 applied_tick_id: None,
257 reason_code: None,
258 command_index: dc.command_index,
259 });
260 }
261 for (i, dc) in commands.iter().enumerate() {
263 let receipt = &mut receipts[accepted_receipt_start + i];
264 match &dc.command.payload {
265 CommandPayload::SetField {
266 ref coord,
267 field_id,
268 value,
269 } => {
270 if let Some(rank) = self.space.canonical_rank(coord) {
271 if let Some(buf) = guard.writer.write(*field_id) {
272 if rank < buf.len() {
273 buf[rank] = *value;
274 }
275 }
276 }
277 }
278 CommandPayload::SetParameter { .. }
279 | CommandPayload::SetParameterBatch { .. }
280 | CommandPayload::Move { .. }
281 | CommandPayload::Spawn { .. }
282 | CommandPayload::Despawn { .. }
283 | CommandPayload::Custom { .. } => {
284 receipt.accepted = false;
285 receipt.reason_code = Some(IngressError::UnsupportedCommand);
286 }
287 }
288 }
289 let command_processing_us = cmd_start.elapsed().as_micros() as u64;
290
291 let mut propagator_us = Vec::with_capacity(self.propagators.len());
293 for (i, prop) in self.propagators.iter().enumerate() {
294 let prop_start = Instant::now();
295
296 self.staged_cache.clear();
298 if let Some(routes) = self.plan.routes_for(i) {
299 for (&field, &source) in routes {
300 if let ReadSource::Staged { .. } = source {
301 if let Some(data) = guard.writer.read(field) {
302 self.staged_cache.insert(field, data);
303 }
304 }
305 }
306 }
307
308 let empty_routes = indexmap::IndexMap::new();
310 let routes = self.plan.routes_for(i).unwrap_or(&empty_routes);
311 let overlay = OverlayReader::new(routes, &self.base_cache, &self.staged_cache);
312
313 for field in self.plan.incremental_fields_for(i) {
315 if let Some(prev_data) = self.base_cache.read(field) {
316 let prev: Vec<f32> = prev_data.to_vec();
319 if let Some(write_buf) = guard.writer.write(field) {
320 let copy_len = prev.len().min(write_buf.len());
321 write_buf[..copy_len].copy_from_slice(&prev[..copy_len]);
322 }
323 }
324 }
325
326 self.propagator_scratch.reset();
328
329 {
331 let mut ctx = murk_propagator::StepContext::new(
332 &overlay,
333 &self.base_cache,
334 &mut guard.writer,
335 &mut self.propagator_scratch,
336 self.space.as_ref(),
337 next_tick,
338 self.dt,
339 );
340
341 if let Err(reason) = prop.step(&mut ctx) {
343 let prop_name = prop.name().to_string();
346 return self.handle_rollback(
347 prop_name,
348 reason,
349 receipts,
350 accepted_receipt_start,
351 );
352 }
353 }
354
355 propagator_us.push((
356 prop.name().to_string(),
357 prop_start.elapsed().as_micros() as u64,
358 ));
359 }
360
361 let publish_start = Instant::now();
365 self.arena
366 .publish(next_tick, self.param_version)
367 .map_err(|_| {
368 self.arena.reset_sparse_reuse_counters();
369 TickError {
370 kind: StepError::AllocationFailed,
371 receipts: vec![],
372 }
373 })?;
374 let snapshot_publish_us = publish_start.elapsed().as_micros() as u64;
375
376 self.current_tick = next_tick;
378 self.consecutive_rollback_count = 0;
379
380 for receipt in &mut receipts[accepted_receipt_start..] {
382 if receipt.accepted {
383 receipt.applied_tick_id = Some(next_tick);
384 }
385 }
386
387 let total_us = tick_start.elapsed().as_micros() as u64;
389 let metrics = StepMetrics {
390 total_us,
391 command_processing_us,
392 propagator_us,
393 snapshot_publish_us,
394 memory_bytes: self.arena.memory_bytes(),
395 sparse_retired_ranges: self.arena.sparse_retired_range_count() as u32,
396 sparse_pending_retired: self.arena.sparse_pending_retired_count() as u32,
397 sparse_reuse_hits: self.arena.sparse_reuse_hits(),
398 sparse_reuse_misses: self.arena.sparse_reuse_misses(),
399 queue_full_rejections: self.total_queue_full_rejections,
400 tick_disabled_rejections: self.total_tick_disabled_rejections,
401 rollback_events: self.total_rollback_events,
402 tick_disabled_transitions: self.total_tick_disabled_transitions,
403 worker_stall_events: self.total_worker_stall_events,
404 ring_not_available_events: self.total_ring_not_available_events,
405 ring_eviction_events: self.total_ring_eviction_events,
406 ring_stale_read_events: self.total_ring_stale_read_events,
407 ring_skew_retry_events: self.total_ring_skew_retry_events,
408 };
409 self.arena.reset_sparse_reuse_counters();
410 self.last_metrics = metrics.clone();
411
412 Ok(TickResult { receipts, metrics })
413 }
414
415 fn handle_rollback(
420 &mut self,
421 prop_name: String,
422 reason: murk_core::PropagatorError,
423 mut receipts: Vec<Receipt>,
424 accepted_start: usize,
425 ) -> Result<TickResult, TickError> {
426 self.arena.reset_sparse_reuse_counters();
428 self.total_rollback_events = self.total_rollback_events.saturating_add(1);
429 self.consecutive_rollback_count += 1;
430 if self.consecutive_rollback_count >= self.max_consecutive_rollbacks {
431 if !self.tick_disabled {
432 self.total_tick_disabled_transitions =
433 self.total_tick_disabled_transitions.saturating_add(1);
434 }
435 self.tick_disabled = true;
436 }
437 self.refresh_counter_metrics();
438
439 for receipt in &mut receipts[accepted_start..] {
443 if receipt.accepted {
444 receipt.applied_tick_id = None;
445 receipt.reason_code = Some(IngressError::TickRollback);
446 }
447 }
448
449 Err(TickError {
450 kind: StepError::PropagatorFailed {
451 name: prop_name,
452 reason,
453 },
454 receipts,
455 })
456 }
457
458 pub fn reset(&mut self) -> Result<(), ConfigError> {
460 self.arena.reset().map_err(ConfigError::Arena)?;
461 self.ingress.clear();
462 self.current_tick = TickId(0);
463 self.param_version = ParameterVersion(0);
464 self.tick_disabled = false;
465 self.consecutive_rollback_count = 0;
466 self.total_queue_full_rejections = 0;
467 self.total_tick_disabled_rejections = 0;
468 self.total_rollback_events = 0;
469 self.total_tick_disabled_transitions = 0;
470 self.total_worker_stall_events = 0;
471 self.total_ring_not_available_events = 0;
472 self.total_ring_eviction_events = 0;
473 self.total_ring_stale_read_events = 0;
474 self.total_ring_skew_retry_events = 0;
475 self.last_metrics = StepMetrics::default();
476 Ok(())
477 }
478
479 pub(crate) fn record_worker_stall_events(&mut self, count: u64) {
480 self.total_worker_stall_events = self.total_worker_stall_events.saturating_add(count);
481 self.refresh_counter_metrics();
482 }
483
484 pub(crate) fn set_ring_not_available_events(&mut self, total: u64) {
485 self.total_ring_not_available_events = self.total_ring_not_available_events.max(total);
486 self.refresh_counter_metrics();
487 }
488
489 pub(crate) fn set_ring_eviction_events(&mut self, total: u64) {
490 self.total_ring_eviction_events = self.total_ring_eviction_events.max(total);
491 self.refresh_counter_metrics();
492 }
493
494 pub(crate) fn set_ring_stale_read_events(&mut self, total: u64) {
495 self.total_ring_stale_read_events = self.total_ring_stale_read_events.max(total);
496 self.refresh_counter_metrics();
497 }
498
499 pub(crate) fn set_ring_skew_retry_events(&mut self, total: u64) {
500 self.total_ring_skew_retry_events = self.total_ring_skew_retry_events.max(total);
501 self.refresh_counter_metrics();
502 }
503
504 fn refresh_counter_metrics(&mut self) {
505 self.last_metrics.queue_full_rejections = self.total_queue_full_rejections;
506 self.last_metrics.tick_disabled_rejections = self.total_tick_disabled_rejections;
507 self.last_metrics.rollback_events = self.total_rollback_events;
508 self.last_metrics.tick_disabled_transitions = self.total_tick_disabled_transitions;
509 self.last_metrics.worker_stall_events = self.total_worker_stall_events;
510 self.last_metrics.ring_not_available_events = self.total_ring_not_available_events;
511 self.last_metrics.ring_eviction_events = self.total_ring_eviction_events;
512 self.last_metrics.ring_stale_read_events = self.total_ring_stale_read_events;
513 self.last_metrics.ring_skew_retry_events = self.total_ring_skew_retry_events;
514 }
515
516 pub fn snapshot(&self) -> Snapshot<'_> {
518 self.arena.snapshot()
519 }
520
521 pub fn owned_snapshot(&self) -> murk_arena::OwnedSnapshot {
526 self.arena.owned_snapshot()
527 }
528
529 pub fn current_tick(&self) -> TickId {
531 self.current_tick
532 }
533
534 pub fn is_tick_disabled(&self) -> bool {
536 self.tick_disabled
537 }
538
539 pub fn consecutive_rollback_count(&self) -> u32 {
541 self.consecutive_rollback_count
542 }
543
544 pub fn last_metrics(&self) -> &StepMetrics {
546 &self.last_metrics
547 }
548
549 pub fn space(&self) -> &dyn murk_space::Space {
551 self.space.as_ref()
552 }
553
554 pub fn ingress_queue_depth(&self) -> usize {
556 self.ingress.len()
557 }
558
559 pub fn ingress_queue_capacity(&self) -> usize {
561 self.ingress.capacity()
562 }
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568 use murk_core::command::CommandPayload;
569 use murk_core::id::{Coord, ParameterKey};
570 use murk_core::traits::SnapshotAccess;
571 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
572 use murk_propagator::propagator::WriteMode;
573 use murk_space::{EdgeBehavior, Line1D};
574 use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
575 use std::sync::atomic::{AtomicUsize, Ordering};
576
577 fn scalar_field(name: &str) -> FieldDef {
578 FieldDef {
579 name: name.to_string(),
580 field_type: FieldType::Scalar,
581 mutability: FieldMutability::PerTick,
582 units: None,
583 bounds: None,
584 boundary_behavior: BoundaryBehavior::Clamp,
585 }
586 }
587
588 fn sparse_scalar_field(name: &str) -> FieldDef {
589 FieldDef {
590 name: name.to_string(),
591 field_type: FieldType::Scalar,
592 mutability: FieldMutability::Sparse,
593 units: None,
594 bounds: None,
595 boundary_behavior: BoundaryBehavior::Clamp,
596 }
597 }
598
599 fn make_cmd(expires: u64) -> Command {
600 Command {
601 payload: CommandPayload::SetParameter {
602 key: ParameterKey(0),
603 value: 0.0,
604 },
605 expires_after_tick: TickId(expires),
606 source_id: None,
607 source_seq: None,
608 priority_class: 1,
609 arrival_seq: 0,
610 }
611 }
612
613 fn simple_engine() -> TickEngine {
614 let config = WorldConfig {
615 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
616 fields: vec![scalar_field("energy")],
617 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
618 dt: 0.1,
619 seed: 42,
620 ring_buffer_size: 8,
621 max_ingress_queue: 1024,
622 tick_rate_hz: None,
623 backoff: crate::config::BackoffConfig::default(),
624 };
625 TickEngine::new(config).unwrap()
626 }
627
628 fn two_field_engine() -> TickEngine {
629 let config = WorldConfig {
630 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
631 fields: vec![scalar_field("field0"), scalar_field("field1")],
632 propagators: vec![
633 Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
634 Box::new(IdentityPropagator::new(
635 "copy_f0_to_f1",
636 FieldId(0),
637 FieldId(1),
638 )),
639 ],
640 dt: 0.1,
641 seed: 42,
642 ring_buffer_size: 8,
643 max_ingress_queue: 1024,
644 tick_rate_hz: None,
645 backoff: crate::config::BackoffConfig::default(),
646 };
647 TickEngine::new(config).unwrap()
648 }
649
650 fn three_field_engine() -> TickEngine {
651 struct SumPropagator {
655 name: String,
656 input_a: FieldId,
657 input_b: FieldId,
658 output: FieldId,
659 }
660
661 impl SumPropagator {
662 fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
663 Self {
664 name: name.to_string(),
665 input_a: a,
666 input_b: b,
667 output: out,
668 }
669 }
670 }
671
672 impl Propagator for SumPropagator {
673 fn name(&self) -> &str {
674 &self.name
675 }
676 fn reads(&self) -> murk_core::FieldSet {
677 [self.input_a, self.input_b].into_iter().collect()
678 }
679 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
680 vec![(self.output, WriteMode::Full)]
681 }
682 fn step(
683 &self,
684 ctx: &mut murk_propagator::StepContext<'_>,
685 ) -> Result<(), murk_core::PropagatorError> {
686 let a = ctx.reads().read(self.input_a).unwrap().to_vec();
687 let b = ctx.reads().read(self.input_b).unwrap().to_vec();
688 let out = ctx.writes().write(self.output).unwrap();
689 for i in 0..out.len() {
690 out[i] = a[i] + b[i];
691 }
692 Ok(())
693 }
694 }
695
696 let config = WorldConfig {
697 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
698 fields: vec![
699 scalar_field("field0"),
700 scalar_field("field1"),
701 scalar_field("field2"),
702 ],
703 propagators: vec![
704 Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
705 Box::new(IdentityPropagator::new(
706 "copy_f0_to_f1",
707 FieldId(0),
708 FieldId(1),
709 )),
710 Box::new(SumPropagator::new(
711 "sum_f0_f1_to_f2",
712 FieldId(0),
713 FieldId(1),
714 FieldId(2),
715 )),
716 ],
717 dt: 0.1,
718 seed: 42,
719 ring_buffer_size: 8,
720 max_ingress_queue: 1024,
721 tick_rate_hz: None,
722 backoff: crate::config::BackoffConfig::default(),
723 };
724 TickEngine::new(config).unwrap()
725 }
726
727 fn failing_engine(succeed_count: usize) -> TickEngine {
728 let config = WorldConfig {
729 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
730 fields: vec![scalar_field("energy")],
731 propagators: vec![Box::new(FailingPropagator::new(
732 "fail",
733 FieldId(0),
734 succeed_count,
735 ))],
736 dt: 0.1,
737 seed: 42,
738 ring_buffer_size: 8,
739 max_ingress_queue: 1024,
740 tick_rate_hz: None,
741 backoff: crate::config::BackoffConfig::default(),
742 };
743 TickEngine::new(config).unwrap()
744 }
745
746 fn partial_failure_engine() -> TickEngine {
747 let config = WorldConfig {
749 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
750 fields: vec![scalar_field("field0"), scalar_field("field1")],
751 propagators: vec![
752 Box::new(ConstPropagator::new("ok_prop", FieldId(0), 1.0)),
753 Box::new(FailingPropagator::new("fail_prop", FieldId(1), 0)),
754 ],
755 dt: 0.1,
756 seed: 42,
757 ring_buffer_size: 8,
758 max_ingress_queue: 1024,
759 tick_rate_hz: None,
760 backoff: crate::config::BackoffConfig::default(),
761 };
762 TickEngine::new(config).unwrap()
763 }
764
765 #[test]
768 fn staged_read_sees_prior_propagator_write() {
769 let mut engine = two_field_engine();
771 let result = engine.execute_tick().unwrap();
772 let snap = engine.snapshot();
773 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
775 assert!(result.metrics.total_us > 0);
776 }
777
778 #[test]
779 fn reads_previous_sees_base_gen() {
780 struct ReadsPrevPropagator;
784 impl Propagator for ReadsPrevPropagator {
785 fn name(&self) -> &str {
786 "reads_prev"
787 }
788 fn reads(&self) -> murk_core::FieldSet {
789 murk_core::FieldSet::empty()
790 }
791 fn reads_previous(&self) -> murk_core::FieldSet {
792 [FieldId(0)].into_iter().collect()
793 }
794 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
795 vec![(FieldId(1), WriteMode::Full)]
796 }
797 fn step(
798 &self,
799 ctx: &mut murk_propagator::StepContext<'_>,
800 ) -> Result<(), murk_core::PropagatorError> {
801 let prev = ctx.reads_previous().read(FieldId(0)).unwrap().to_vec();
802 let out = ctx.writes().write(FieldId(1)).unwrap();
803 out.copy_from_slice(&prev);
804 Ok(())
805 }
806 }
807
808 let config = WorldConfig {
809 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
810 fields: vec![scalar_field("field0"), scalar_field("field1")],
811 propagators: vec![
812 Box::new(ConstPropagator::new("write_f0", FieldId(0), 99.0)),
813 Box::new(ReadsPrevPropagator),
814 ],
815 dt: 0.1,
816 seed: 42,
817 ring_buffer_size: 8,
818 max_ingress_queue: 1024,
819 tick_rate_hz: None,
820 backoff: crate::config::BackoffConfig::default(),
821 };
822 let mut engine = TickEngine::new(config).unwrap();
823
824 engine.execute_tick().unwrap();
827 let snap = engine.snapshot();
828 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 0.0);
830
831 engine.execute_tick().unwrap();
833 let snap = engine.snapshot();
834 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 99.0);
835 }
836
837 #[test]
838 fn three_propagator_overlay_visibility() {
839 let mut engine = three_field_engine();
843 engine.execute_tick().unwrap();
844 let snap = engine.snapshot();
845
846 assert_eq!(snap.read(FieldId(0)).unwrap()[0], 7.0);
847 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
848 assert_eq!(snap.read(FieldId(2)).unwrap()[0], 14.0); }
850
851 #[test]
852 fn unwritten_field_reads_from_base_gen() {
853 let mut engine = three_field_engine();
856 engine.execute_tick().unwrap();
857 let snap = engine.snapshot();
860 let f2 = snap.read(FieldId(2)).unwrap();
861 assert!(f2.iter().all(|&v| v == 14.0));
862 }
863
864 #[test]
867 fn propagator_failure_no_snapshot_published() {
868 let mut engine = failing_engine(0);
869
870 let snap_before = engine.snapshot();
872 let tick_before = snap_before.tick_id();
873
874 let result = engine.execute_tick();
876 assert!(result.is_err());
877
878 let snap_after = engine.snapshot();
880 assert_eq!(snap_after.tick_id(), tick_before);
881 }
882
883 #[test]
884 fn partial_failure_rolls_back_all() {
885 let mut engine = partial_failure_engine();
888
889 let result = engine.execute_tick();
891 assert!(result.is_err());
892
893 let snap = engine.snapshot();
895 let f0 = snap.read(FieldId(0));
896 if let Some(data) = f0 {
899 assert!(
900 data.iter().all(|&v| v == 0.0),
901 "rollback should prevent PropA's writes from being visible"
902 );
903 }
904 }
905
906 #[test]
907 fn rollback_receipts_generated() {
908 let mut engine = failing_engine(0);
909
910 let cmd = Command {
912 payload: CommandPayload::SetField {
913 coord: smallvec::smallvec![0],
914 field_id: FieldId(0),
915 value: 1.0,
916 },
917 expires_after_tick: TickId(100),
918 source_id: None,
919 source_seq: None,
920 priority_class: 1,
921 arrival_seq: 0,
922 };
923 engine.submit_commands(vec![cmd]);
924
925 let result = engine.execute_tick();
926 match result {
927 Err(TickError {
928 kind: StepError::PropagatorFailed { .. },
929 receipts,
930 }) => {
931 assert_eq!(receipts.len(), 1);
933 assert!(receipts[0].accepted);
934 assert_eq!(receipts[0].reason_code, Some(IngressError::TickRollback));
935 }
936 other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
937 }
938 }
939
940 #[test]
941 fn rollback_preserves_rejected_receipts() {
942 let mut engine = failing_engine(0);
947
948 engine.submit_commands(vec![make_cmd(100)]);
950
951 let result = engine.execute_tick();
952 match result {
953 Err(TickError {
954 kind: StepError::PropagatorFailed { .. },
955 receipts,
956 }) => {
957 assert_eq!(receipts.len(), 1);
958 assert!(
961 !receipts[0].accepted,
962 "rejected receipt should stay rejected after rollback"
963 );
964 assert_eq!(
965 receipts[0].reason_code,
966 Some(IngressError::UnsupportedCommand),
967 "rejected receipt must preserve UnsupportedCommand reason after rollback"
968 );
969 }
970 other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
971 }
972 }
973
974 #[test]
977 fn consecutive_rollbacks_disable_ticking() {
978 let mut engine = failing_engine(0);
979
980 for _ in 0..3 {
981 let _ = engine.execute_tick();
982 }
983
984 assert!(engine.is_tick_disabled());
985 assert_eq!(engine.consecutive_rollback_count(), 3);
986 assert_eq!(engine.last_metrics().rollback_events, 3);
987 assert_eq!(engine.last_metrics().tick_disabled_transitions, 1);
988 }
989
990 #[test]
991 fn success_resets_rollback_count() {
992 let mut engine = failing_engine(10);
995
996 engine.execute_tick().unwrap();
998 engine.execute_tick().unwrap();
999 assert_eq!(engine.consecutive_rollback_count(), 0);
1000 assert_eq!(engine.current_tick(), TickId(2));
1001 }
1002
1003 #[test]
1004 fn tick_disabled_rejects_immediately() {
1005 let mut engine = failing_engine(0);
1006
1007 for _ in 0..3 {
1009 let _ = engine.execute_tick();
1010 }
1011 assert!(engine.is_tick_disabled());
1012
1013 match engine.execute_tick() {
1015 Err(TickError {
1016 kind: StepError::TickDisabled,
1017 ..
1018 }) => {}
1019 other => panic!("expected TickDisabled, got {other:?}"),
1020 }
1021
1022 let receipts = engine.submit_commands(vec![make_cmd(100), make_cmd(100)]);
1023 assert!(receipts
1024 .iter()
1025 .all(|r| r.reason_code == Some(IngressError::TickDisabled)));
1026 assert_eq!(engine.last_metrics().tick_disabled_rejections, 2);
1027 assert_eq!(engine.last_metrics().rollback_events, 3);
1028 assert_eq!(engine.last_metrics().tick_disabled_transitions, 1);
1029 }
1030
1031 #[test]
1032 fn reset_clears_tick_disabled() {
1033 let mut engine = failing_engine(0);
1034
1035 for _ in 0..3 {
1036 let _ = engine.execute_tick();
1037 }
1038 assert!(engine.is_tick_disabled());
1039
1040 engine.reset().unwrap();
1041 assert!(!engine.is_tick_disabled());
1042 assert_eq!(engine.current_tick(), TickId(0));
1043 assert_eq!(engine.consecutive_rollback_count(), 0);
1044 assert_eq!(engine.last_metrics().queue_full_rejections, 0);
1045 assert_eq!(engine.last_metrics().tick_disabled_rejections, 0);
1046 assert_eq!(engine.last_metrics().rollback_events, 0);
1047 assert_eq!(engine.last_metrics().tick_disabled_transitions, 0);
1048 assert_eq!(engine.last_metrics().worker_stall_events, 0);
1049 assert_eq!(engine.last_metrics().ring_not_available_events, 0);
1050 assert_eq!(engine.last_metrics().ring_eviction_events, 0);
1051 assert_eq!(engine.last_metrics().ring_stale_read_events, 0);
1052 assert_eq!(engine.last_metrics().ring_skew_retry_events, 0);
1053 }
1054
1055 #[test]
1058 fn single_tick_end_to_end() {
1059 let mut engine = simple_engine();
1060 let result = engine.execute_tick().unwrap();
1061
1062 let snap = engine.snapshot();
1063 let data = snap.read(FieldId(0)).unwrap();
1064 assert_eq!(data.len(), 10);
1065 assert!(data.iter().all(|&v| v == 42.0));
1066 assert_eq!(engine.current_tick(), TickId(1));
1067 assert!(!result.receipts.is_empty() || result.receipts.is_empty()); }
1069
1070 #[test]
1071 fn multi_tick_determinism() {
1072 let mut engine = simple_engine();
1073
1074 for _ in 0..10 {
1075 engine.execute_tick().unwrap();
1076 }
1077
1078 let snap = engine.snapshot();
1079 let data = snap.read(FieldId(0)).unwrap();
1080 assert!(data.iter().all(|&v| v == 42.0));
1081 assert_eq!(engine.current_tick(), TickId(10));
1082 }
1083
1084 #[test]
1085 fn commands_flow_through_to_receipts() {
1086 let mut engine = simple_engine();
1087
1088 let coord: Coord = vec![0i32].into();
1090 let cmds = vec![
1091 Command {
1092 payload: CommandPayload::SetField {
1093 coord: coord.clone(),
1094 field_id: FieldId(0),
1095 value: 1.0,
1096 },
1097 expires_after_tick: TickId(100),
1098 source_id: None,
1099 source_seq: None,
1100 priority_class: 1,
1101 arrival_seq: 0,
1102 },
1103 Command {
1104 payload: CommandPayload::SetField {
1105 coord: coord.clone(),
1106 field_id: FieldId(0),
1107 value: 2.0,
1108 },
1109 expires_after_tick: TickId(100),
1110 source_id: None,
1111 source_seq: None,
1112 priority_class: 1,
1113 arrival_seq: 0,
1114 },
1115 ];
1116 let submit_receipts = engine.submit_commands(cmds);
1117 assert_eq!(submit_receipts.len(), 2);
1118 assert!(submit_receipts.iter().all(|r| r.accepted));
1119
1120 let result = engine.execute_tick().unwrap();
1121 let applied: Vec<_> = result
1123 .receipts
1124 .iter()
1125 .filter(|r| r.applied_tick_id.is_some())
1126 .collect();
1127 assert_eq!(applied.len(), 2);
1128 assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
1129 }
1130
1131 #[test]
1132 fn non_setfield_commands_rejected_honestly() {
1133 let mut engine = simple_engine();
1134
1135 let submit_receipts = engine.submit_commands(vec![make_cmd(100), make_cmd(100)]);
1137 assert_eq!(submit_receipts.len(), 2);
1138 assert!(submit_receipts.iter().all(|r| r.accepted));
1139
1140 let result = engine.execute_tick().unwrap();
1141 assert_eq!(result.receipts.len(), 2);
1142
1143 for receipt in &result.receipts {
1146 assert!(
1147 !receipt.accepted,
1148 "unimplemented command type must be rejected"
1149 );
1150 assert_eq!(
1151 receipt.applied_tick_id, None,
1152 "unimplemented command must not have applied_tick_id"
1153 );
1154 assert_eq!(
1155 receipt.reason_code,
1156 Some(IngressError::UnsupportedCommand),
1157 "rejected unsupported command must carry UnsupportedCommand reason"
1158 );
1159 }
1160 }
1161
1162 #[test]
1165 fn timing_fields_populated() {
1166 let mut engine = simple_engine();
1167 let result = engine.execute_tick().unwrap();
1168
1169 let _ = result.metrics.total_us;
1171 assert_eq!(result.metrics.propagator_us.len(), 1);
1172 assert_eq!(result.metrics.propagator_us[0].0, "const");
1173 assert_eq!(result.metrics.queue_full_rejections, 0);
1174 assert_eq!(result.metrics.tick_disabled_rejections, 0);
1175 assert_eq!(result.metrics.rollback_events, 0);
1176 assert_eq!(result.metrics.tick_disabled_transitions, 0);
1177 assert_eq!(result.metrics.worker_stall_events, 0);
1178 assert_eq!(result.metrics.ring_not_available_events, 0);
1179 assert_eq!(result.metrics.ring_eviction_events, 0);
1180 assert_eq!(result.metrics.ring_stale_read_events, 0);
1181 assert_eq!(result.metrics.ring_skew_retry_events, 0);
1182 }
1183
1184 #[test]
1185 fn memory_bytes_matches_arena() {
1186 let mut engine = simple_engine();
1187 engine.execute_tick().unwrap();
1188
1189 let metrics = engine.last_metrics();
1190 assert!(metrics.memory_bytes > 0);
1191 }
1192
1193 #[test]
1194 fn queue_full_rejections_are_counted_in_metrics() {
1195 let config = WorldConfig {
1196 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1197 fields: vec![scalar_field("energy")],
1198 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
1199 dt: 0.1,
1200 seed: 42,
1201 ring_buffer_size: 8,
1202 max_ingress_queue: 1,
1203 tick_rate_hz: None,
1204 backoff: crate::config::BackoffConfig::default(),
1205 };
1206 let mut engine = TickEngine::new(config).unwrap();
1207
1208 let submit_receipts =
1209 engine.submit_commands(vec![make_cmd(100), make_cmd(100), make_cmd(100)]);
1210 let queue_full = submit_receipts
1211 .iter()
1212 .filter(|r| r.reason_code == Some(IngressError::QueueFull))
1213 .count();
1214 assert_eq!(queue_full, 2);
1215
1216 let result = engine.execute_tick().unwrap();
1217 assert_eq!(result.metrics.queue_full_rejections, 2);
1218 assert_eq!(result.metrics.tick_disabled_rejections, 0);
1219 }
1220
1221 #[test]
1222 fn external_realtime_counters_are_reflected_in_metrics() {
1223 let mut engine = simple_engine();
1224 engine.record_worker_stall_events(3);
1225 engine.set_ring_not_available_events(7);
1226 engine.set_ring_eviction_events(5);
1227 engine.set_ring_stale_read_events(2);
1228 engine.set_ring_skew_retry_events(1);
1229
1230 assert_eq!(engine.last_metrics().worker_stall_events, 3);
1231 assert_eq!(engine.last_metrics().ring_not_available_events, 7);
1232 assert_eq!(engine.last_metrics().ring_eviction_events, 5);
1233 assert_eq!(engine.last_metrics().ring_stale_read_events, 2);
1234 assert_eq!(engine.last_metrics().ring_skew_retry_events, 1);
1235
1236 let result = engine.execute_tick().unwrap();
1237 assert_eq!(result.metrics.worker_stall_events, 3);
1238 assert_eq!(result.metrics.ring_not_available_events, 7);
1239 assert_eq!(result.metrics.ring_eviction_events, 5);
1240 assert_eq!(result.metrics.ring_stale_read_events, 2);
1241 assert_eq!(result.metrics.ring_skew_retry_events, 1);
1242 }
1243
1244 #[test]
1245 fn rollback_clears_sparse_reuse_counters_for_next_success() {
1246 struct MaybeFailPropagator {
1247 output: FieldId,
1248 fail_on_call: Option<usize>,
1249 call_count: AtomicUsize,
1250 }
1251
1252 impl MaybeFailPropagator {
1253 fn new(output: FieldId, fail_on_call: Option<usize>) -> Self {
1254 Self {
1255 output,
1256 fail_on_call,
1257 call_count: AtomicUsize::new(0),
1258 }
1259 }
1260 }
1261
1262 impl Propagator for MaybeFailPropagator {
1263 fn name(&self) -> &str {
1264 "maybe_fail"
1265 }
1266
1267 fn reads(&self) -> murk_core::FieldSet {
1268 murk_core::FieldSet::empty()
1269 }
1270
1271 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
1272 vec![(self.output, WriteMode::Full)]
1273 }
1274
1275 fn step(
1276 &self,
1277 ctx: &mut murk_propagator::StepContext<'_>,
1278 ) -> Result<(), murk_core::PropagatorError> {
1279 let n = self.call_count.fetch_add(1, Ordering::Relaxed);
1280 if self.fail_on_call == Some(n) {
1281 return Err(murk_core::PropagatorError::ExecutionFailed {
1282 reason: format!("deliberate failure on call {n}"),
1283 });
1284 }
1285
1286 let output = ctx.writes().write(self.output).ok_or_else(|| {
1287 murk_core::PropagatorError::ExecutionFailed {
1288 reason: format!("field {:?} not writable", self.output),
1289 }
1290 })?;
1291 output.fill(n as f32);
1292 Ok(())
1293 }
1294 }
1295
1296 fn sparse_engine(fail_on_call: Option<usize>) -> TickEngine {
1297 let config = WorldConfig {
1298 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1299 fields: vec![sparse_scalar_field("sparse0"), scalar_field("field1")],
1300 propagators: vec![
1301 Box::new(ConstPropagator::new("write_sparse", FieldId(0), 3.0)),
1302 Box::new(MaybeFailPropagator::new(FieldId(1), fail_on_call)),
1303 ],
1304 dt: 0.1,
1305 seed: 42,
1306 ring_buffer_size: 8,
1307 max_ingress_queue: 1024,
1308 tick_rate_hz: None,
1309 backoff: crate::config::BackoffConfig::default(),
1310 };
1311 let mut engine = TickEngine::new(config).unwrap();
1312 engine.arena.reset_sparse_reuse_counters();
1313 engine
1314 }
1315
1316 let mut rollback_engine = sparse_engine(Some(1)); rollback_engine.execute_tick().unwrap();
1318
1319 let rollback = rollback_engine.execute_tick();
1320 assert!(rollback.is_err(), "tick 2 should fail and roll back");
1321 assert_eq!(rollback_engine.arena.sparse_reuse_hits(), 0);
1322 assert_eq!(rollback_engine.arena.sparse_reuse_misses(), 0);
1323
1324 rollback_engine.execute_tick().unwrap();
1326 }
1327
1328 #[test]
1331 fn reset_clears_pending_ingress() {
1332 let mut engine = simple_engine();
1333
1334 engine.submit_commands(vec![make_cmd(1000), make_cmd(1000)]);
1336
1337 engine.reset().unwrap();
1339
1340 let result = engine.execute_tick().unwrap();
1342 assert!(result.receipts.is_empty());
1343 }
1344
1345 #[test]
1346 fn command_index_preserved_after_reordering() {
1347 let mut engine = simple_engine();
1348
1349 let cmds = vec![
1352 Command {
1353 payload: CommandPayload::SetParameter {
1354 key: ParameterKey(0),
1355 value: 1.0,
1356 },
1357 expires_after_tick: TickId(100),
1358 source_id: None,
1359 source_seq: None,
1360 priority_class: 2, arrival_seq: 0,
1362 },
1363 Command {
1364 payload: CommandPayload::SetParameter {
1365 key: ParameterKey(0),
1366 value: 2.0,
1367 },
1368 expires_after_tick: TickId(100),
1369 source_id: None,
1370 source_seq: None,
1371 priority_class: 0, arrival_seq: 0,
1373 },
1374 ];
1375 engine.submit_commands(cmds);
1376
1377 let result = engine.execute_tick().unwrap();
1378 assert_eq!(result.receipts.len(), 2);
1382 assert_eq!(result.receipts[0].command_index, 1); assert_eq!(result.receipts[1].command_index, 0); }
1385
1386 #[test]
1387 fn writemode_incremental_seeds_from_previous_gen() {
1388 struct IncrementalOnce {
1394 written: std::cell::Cell<bool>,
1395 }
1396 impl IncrementalOnce {
1397 fn new() -> Self {
1398 Self {
1399 written: std::cell::Cell::new(false),
1400 }
1401 }
1402 }
1403 impl Propagator for IncrementalOnce {
1404 fn name(&self) -> &str {
1405 "incr_once"
1406 }
1407 fn reads(&self) -> murk_core::FieldSet {
1408 murk_core::FieldSet::empty()
1409 }
1410 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
1411 vec![(FieldId(0), WriteMode::Incremental)]
1412 }
1413 fn step(
1414 &self,
1415 ctx: &mut murk_propagator::StepContext<'_>,
1416 ) -> Result<(), murk_core::PropagatorError> {
1417 let buf = ctx.writes().write(FieldId(0)).unwrap();
1418 if !self.written.get() {
1419 buf[0] = 42.0;
1421 buf[1] = 99.0;
1422 self.written.set(true);
1423 }
1424 Ok(())
1426 }
1427 }
1428
1429 let config = WorldConfig {
1430 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1431 fields: vec![scalar_field("state")],
1432 propagators: vec![Box::new(IncrementalOnce::new())],
1433 dt: 0.1,
1434 seed: 42,
1435 ring_buffer_size: 8,
1436 max_ingress_queue: 1024,
1437 tick_rate_hz: None,
1438 backoff: crate::config::BackoffConfig::default(),
1439 };
1440 let mut engine = TickEngine::new(config).unwrap();
1441
1442 engine.execute_tick().unwrap();
1444 let snap = engine.snapshot();
1445 assert_eq!(snap.read(FieldId(0)).unwrap()[0], 42.0);
1446 assert_eq!(snap.read(FieldId(0)).unwrap()[1], 99.0);
1447
1448 engine.execute_tick().unwrap();
1450 let snap = engine.snapshot();
1451 assert_eq!(
1452 snap.read(FieldId(0)).unwrap()[0],
1453 42.0,
1454 "BUG-015: incremental field lost data across ticks"
1455 );
1456 assert_eq!(
1457 snap.read(FieldId(0)).unwrap()[1],
1458 99.0,
1459 "BUG-015: incremental field lost data across ticks"
1460 );
1461 assert_eq!(snap.read(FieldId(0)).unwrap()[2], 0.0);
1463
1464 engine.execute_tick().unwrap();
1466 let snap = engine.snapshot();
1467 assert_eq!(snap.read(FieldId(0)).unwrap()[0], 42.0);
1468 assert_eq!(snap.read(FieldId(0)).unwrap()[1], 99.0);
1469 }
1470}