1use std::fmt;
14use std::time::Instant;
15
16use murk_arena::config::ArenaConfig;
17use murk_arena::pingpong::PingPongArena;
18use murk_arena::read::Snapshot;
19use murk_arena::static_arena::StaticArena;
20use murk_core::command::{Command, CommandPayload, Receipt};
21use murk_core::error::{IngressError, StepError};
22use murk_core::id::{FieldId, ParameterVersion, TickId};
23use murk_core::traits::{FieldReader, FieldWriter};
24use murk_core::FieldMutability;
25use murk_propagator::pipeline::{ReadResolutionPlan, ReadSource};
26use murk_propagator::propagator::Propagator;
27use murk_propagator::scratch::ScratchRegion as PropagatorScratch;
28
29use crate::config::{ConfigError, WorldConfig};
30use crate::ingress::IngressQueue;
31use crate::metrics::StepMetrics;
32use crate::overlay::{BaseFieldCache, BaseFieldSet, OverlayReader, StagedFieldCache};
33
34#[derive(Debug)]
38pub struct TickResult {
39 pub receipts: Vec<Receipt>,
41 pub metrics: StepMetrics,
43}
44
45#[derive(Debug, PartialEq, Eq)]
53pub struct TickError {
54 pub kind: StepError,
56 pub receipts: Vec<Receipt>,
58}
59
60impl fmt::Display for TickError {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(f, "{}", self.kind)
63 }
64}
65
66impl std::error::Error for TickError {
67 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
68 Some(&self.kind)
69 }
70}
71
72#[derive(Default)]
79struct CumulativeCounters {
80 queue_full_rejections: u64,
81 tick_disabled_rejections: u64,
82 rollback_events: u64,
83 tick_disabled_transitions: u64,
84 worker_stall_events: u64,
85 ring_not_available_events: u64,
86 ring_eviction_events: u64,
87 ring_stale_read_events: u64,
88 ring_skew_retry_events: u64,
89}
90
91pub struct TickEngine {
97 arena: PingPongArena,
98 propagators: Vec<Box<dyn Propagator>>,
99 plan: ReadResolutionPlan,
100 ingress: IngressQueue,
101 space: Box<dyn murk_space::Space>,
102 dt: f64,
103 current_tick: TickId,
104 param_version: ParameterVersion,
105 consecutive_rollback_count: u32,
106 tick_disabled: bool,
107 max_consecutive_rollbacks: u32,
108 counters: CumulativeCounters,
109 propagator_scratch: PropagatorScratch,
110 base_field_set: BaseFieldSet,
111 base_cache: BaseFieldCache,
112 staged_cache: StagedFieldCache,
113 last_metrics: StepMetrics,
114}
115
116impl TickEngine {
117 pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
123 config.validate()?;
125 let defined_fields = config.defined_field_set()?;
126 let plan = murk_propagator::validate_pipeline(
127 &config.propagators,
128 &defined_fields,
129 config.dt,
130 &*config.space,
131 )?;
132
133 let arena_field_defs: Vec<(FieldId, murk_core::FieldDef)> = config
135 .fields
136 .iter()
137 .enumerate()
138 .map(|(i, def)| -> Result<_, ConfigError> {
139 let id = u32::try_from(i).map_err(|_| ConfigError::FieldCountOverflow {
140 value: config.fields.len(),
141 })?;
142 Ok((FieldId(id), def.clone()))
143 })
144 .collect::<Result<_, _>>()?;
145
146 let cell_count = u32::try_from(config.space.cell_count()).map_err(|_| {
148 ConfigError::CellCountOverflow {
149 value: config.space.cell_count(),
150 }
151 })?;
152 let arena_config = ArenaConfig::new(cell_count);
153
154 let static_fields: Vec<(FieldId, u32)> = arena_field_defs
156 .iter()
157 .filter(|(_, d)| d.mutability == FieldMutability::Static)
158 .map(|(id, d)| {
159 let len = cell_count.checked_mul(d.field_type.components()).ok_or(
160 ConfigError::CellCountOverflow {
161 value: cell_count as usize,
162 },
163 )?;
164 Ok((*id, len))
165 })
166 .collect::<Result<_, ConfigError>>()?;
167 let static_arena = StaticArena::new(&static_fields).into_shared();
168
169 let arena = PingPongArena::new(arena_config, arena_field_defs, static_arena)?;
170
171 let base_field_set = BaseFieldSet::from_plan(&plan, &config.propagators);
173
174 let max_scratch = config
176 .propagators
177 .iter()
178 .map(|p| p.scratch_bytes())
179 .max()
180 .unwrap_or(0);
181 let propagator_scratch = PropagatorScratch::with_byte_capacity(max_scratch);
182
183 let ingress = IngressQueue::new(config.max_ingress_queue);
184
185 Ok(Self {
186 arena,
187 propagators: config.propagators,
188 plan,
189 ingress,
190 space: config.space,
191 dt: config.dt,
192 current_tick: TickId(0),
193 param_version: ParameterVersion(0),
194 consecutive_rollback_count: 0,
195 tick_disabled: false,
196 max_consecutive_rollbacks: 3,
197 counters: CumulativeCounters::default(),
198 propagator_scratch,
199 base_field_set,
200 base_cache: BaseFieldCache::new(),
201 staged_cache: StagedFieldCache::new(),
202 last_metrics: StepMetrics::default(),
203 })
204 }
205
206 pub fn submit_commands(&mut self, commands: Vec<Command>) -> Vec<Receipt> {
210 let receipts = self.ingress.submit(commands, self.tick_disabled);
211 for receipt in &receipts {
212 match receipt.reason_code {
213 Some(IngressError::QueueFull) => {
214 self.counters.queue_full_rejections =
215 self.counters.queue_full_rejections.saturating_add(1);
216 }
217 Some(IngressError::TickDisabled) => {
218 self.counters.tick_disabled_rejections =
219 self.counters.tick_disabled_rejections.saturating_add(1);
220 }
221 Some(IngressError::Stale)
223 | Some(IngressError::TickRollback)
224 | Some(IngressError::ShuttingDown)
225 | Some(IngressError::UnsupportedCommand)
226 | Some(IngressError::NotApplied)
227 | None => {}
228 }
229 }
230 self.refresh_counter_metrics();
231 receipts
232 }
233
234 pub fn execute_tick(&mut self) -> Result<TickResult, TickError> {
240 let tick_start = Instant::now();
241
242 if self.tick_disabled {
244 return Err(TickError {
245 kind: StepError::TickDisabled,
246 receipts: Vec::new(),
247 });
248 }
249
250 let next_tick = TickId(self.current_tick.0 + 1);
251
252 {
254 let snapshot = self.arena.snapshot();
255 self.base_cache.populate(&snapshot, &self.base_field_set);
256 }
257
258 let mut guard = self.arena.begin_tick().map_err(|_| TickError {
260 kind: StepError::AllocationFailed,
261 receipts: Vec::new(),
262 })?;
263
264 let cmd_start = Instant::now();
266 let drain = self.ingress.drain(next_tick);
267 let mut receipts = drain.expired_receipts;
268 let commands = drain.commands;
269 let accepted_receipt_start = receipts.len();
270 for dc in &commands {
271 receipts.push(Receipt {
272 accepted: true,
273 applied_tick_id: None,
274 reason_code: None,
275 command_index: dc.command_index,
276 });
277 }
278 for (i, dc) in commands.iter().enumerate() {
280 let receipt = &mut receipts[accepted_receipt_start + i];
281 match &dc.command.payload {
282 CommandPayload::SetField {
283 ref coord,
284 field_id,
285 value,
286 } => {
287 let applied = if let Some(rank) = self.space.canonical_rank(coord) {
288 if let Some(buf) = guard.writer.write(*field_id) {
289 if rank < buf.len() {
290 buf[rank] = *value;
291 true
292 } else {
293 false
294 }
295 } else {
296 false
297 }
298 } else {
299 false
300 };
301 if !applied {
302 receipt.accepted = false;
303 receipt.reason_code = Some(IngressError::NotApplied);
304 }
305 }
306 CommandPayload::SetParameter { .. }
307 | CommandPayload::SetParameterBatch { .. }
308 | CommandPayload::Move { .. }
309 | CommandPayload::Spawn { .. }
310 | CommandPayload::Despawn { .. }
311 | CommandPayload::Custom { .. } => {
312 receipt.accepted = false;
313 receipt.reason_code = Some(IngressError::UnsupportedCommand);
314 }
315 }
316 }
317 let command_processing_us = cmd_start.elapsed().as_micros() as u64;
318
319 let mut propagator_us = Vec::with_capacity(self.propagators.len());
321 for (i, prop) in self.propagators.iter().enumerate() {
322 let prop_start = Instant::now();
323
324 self.staged_cache.clear();
326 if let Some(routes) = self.plan.routes_for(i) {
327 for (&field, &source) in routes {
328 if let ReadSource::Staged { .. } = source {
329 if let Some(data) = guard.writer.read(field) {
330 self.staged_cache.insert(field, data);
331 }
332 }
333 }
334 }
335
336 let empty_routes = indexmap::IndexMap::new();
338 let routes = self.plan.routes_for(i).unwrap_or(&empty_routes);
339 let overlay = OverlayReader::new(routes, &self.base_cache, &self.staged_cache);
340
341 for field in self.plan.incremental_fields_for(i) {
343 let prev_data = match self.base_cache.read(field) {
344 Some(data) => data,
345 None => continue,
347 };
348 let prev: Vec<f32> = prev_data.to_vec();
351 let write_buf = match guard.writer.write(field) {
352 Some(buf) => buf,
353 None => {
354 debug_assert!(
355 false,
356 "incremental field {:?} declared in plan but writer returned None \
357 for propagator '{}'",
358 field,
359 prop.name(),
360 );
361 continue;
362 }
363 };
364 let copy_len = prev.len().min(write_buf.len());
365 write_buf[..copy_len].copy_from_slice(&prev[..copy_len]);
366 }
367
368 self.propagator_scratch.reset();
370
371 {
373 let mut ctx = murk_propagator::StepContext::new(
374 &overlay,
375 &self.base_cache,
376 &mut guard.writer,
377 &mut self.propagator_scratch,
378 self.space.as_ref(),
379 next_tick,
380 self.dt,
381 );
382
383 if let Err(reason) = prop.step(&mut ctx) {
385 let prop_name = prop.name().to_string();
388 return self.handle_rollback(
389 prop_name,
390 reason,
391 receipts,
392 accepted_receipt_start,
393 );
394 }
395 }
396
397 propagator_us.push((
398 prop.name().to_string(),
399 prop_start.elapsed().as_micros() as u64,
400 ));
401 }
402
403 let publish_start = Instant::now();
407 self.arena
408 .publish(next_tick, self.param_version)
409 .map_err(|_| {
410 self.arena.reset_sparse_reuse_counters();
411 TickError {
412 kind: StepError::AllocationFailed,
413 receipts: vec![],
414 }
415 })?;
416 let snapshot_publish_us = publish_start.elapsed().as_micros() as u64;
417
418 self.current_tick = next_tick;
420 self.consecutive_rollback_count = 0;
421
422 for receipt in &mut receipts[accepted_receipt_start..] {
424 if receipt.accepted {
425 receipt.applied_tick_id = Some(next_tick);
426 }
427 }
428
429 let total_us = tick_start.elapsed().as_micros() as u64;
431 let metrics = StepMetrics {
432 total_us,
433 command_processing_us,
434 propagator_us,
435 snapshot_publish_us,
436 memory_bytes: self.arena.memory_bytes(),
437 sparse_retired_ranges: u32::try_from(self.arena.sparse_retired_range_count())
438 .unwrap_or(u32::MAX),
439 sparse_pending_retired: u32::try_from(self.arena.sparse_pending_retired_count())
440 .unwrap_or(u32::MAX),
441 sparse_reuse_hits: self.arena.sparse_reuse_hits(),
442 sparse_reuse_misses: self.arena.sparse_reuse_misses(),
443 queue_full_rejections: self.counters.queue_full_rejections,
444 tick_disabled_rejections: self.counters.tick_disabled_rejections,
445 rollback_events: self.counters.rollback_events,
446 tick_disabled_transitions: self.counters.tick_disabled_transitions,
447 worker_stall_events: self.counters.worker_stall_events,
448 ring_not_available_events: self.counters.ring_not_available_events,
449 ring_eviction_events: self.counters.ring_eviction_events,
450 ring_stale_read_events: self.counters.ring_stale_read_events,
451 ring_skew_retry_events: self.counters.ring_skew_retry_events,
452 };
453 self.arena.reset_sparse_reuse_counters();
454 self.last_metrics = metrics.clone();
455
456 Ok(TickResult { receipts, metrics })
457 }
458
459 fn handle_rollback(
464 &mut self,
465 prop_name: String,
466 reason: murk_core::PropagatorError,
467 mut receipts: Vec<Receipt>,
468 accepted_start: usize,
469 ) -> Result<TickResult, TickError> {
470 self.arena.cancel_tick();
473 self.arena.reset_sparse_reuse_counters();
474 self.counters.rollback_events = self.counters.rollback_events.saturating_add(1);
475 self.consecutive_rollback_count = self.consecutive_rollback_count.saturating_add(1);
476 if self.consecutive_rollback_count >= self.max_consecutive_rollbacks {
477 if !self.tick_disabled {
478 self.counters.tick_disabled_transitions =
479 self.counters.tick_disabled_transitions.saturating_add(1);
480 }
481 self.tick_disabled = true;
482 }
483 self.refresh_counter_metrics();
484
485 for receipt in &mut receipts[accepted_start..] {
489 if receipt.accepted {
490 receipt.applied_tick_id = None;
491 receipt.reason_code = Some(IngressError::TickRollback);
492 }
493 }
494
495 Err(TickError {
496 kind: StepError::PropagatorFailed {
497 name: prop_name,
498 reason,
499 },
500 receipts,
501 })
502 }
503
504 pub fn reset(&mut self) -> Result<(), ConfigError> {
506 self.arena.reset().map_err(ConfigError::Arena)?;
507 self.ingress.clear();
508 self.current_tick = TickId(0);
509 self.param_version = ParameterVersion(0);
510 self.tick_disabled = false;
511 self.consecutive_rollback_count = 0;
512 self.counters = CumulativeCounters::default();
513 self.last_metrics = StepMetrics::default();
514 Ok(())
515 }
516
517 pub(crate) fn record_worker_stall_events(&mut self, count: u64) {
518 self.counters.worker_stall_events = self.counters.worker_stall_events.saturating_add(count);
519 self.refresh_counter_metrics();
520 }
521
522 pub(crate) fn set_ring_not_available_events(&mut self, total: u64) {
523 self.counters.ring_not_available_events =
524 self.counters.ring_not_available_events.max(total);
525 self.refresh_counter_metrics();
526 }
527
528 pub(crate) fn set_ring_eviction_events(&mut self, total: u64) {
529 self.counters.ring_eviction_events = self.counters.ring_eviction_events.max(total);
530 self.refresh_counter_metrics();
531 }
532
533 pub(crate) fn set_ring_stale_read_events(&mut self, total: u64) {
534 self.counters.ring_stale_read_events = self.counters.ring_stale_read_events.max(total);
535 self.refresh_counter_metrics();
536 }
537
538 pub(crate) fn set_ring_skew_retry_events(&mut self, total: u64) {
539 self.counters.ring_skew_retry_events = self.counters.ring_skew_retry_events.max(total);
540 self.refresh_counter_metrics();
541 }
542
543 fn refresh_counter_metrics(&mut self) {
544 let c = &self.counters;
545 self.last_metrics.queue_full_rejections = c.queue_full_rejections;
546 self.last_metrics.tick_disabled_rejections = c.tick_disabled_rejections;
547 self.last_metrics.rollback_events = c.rollback_events;
548 self.last_metrics.tick_disabled_transitions = c.tick_disabled_transitions;
549 self.last_metrics.worker_stall_events = c.worker_stall_events;
550 self.last_metrics.ring_not_available_events = c.ring_not_available_events;
551 self.last_metrics.ring_eviction_events = c.ring_eviction_events;
552 self.last_metrics.ring_stale_read_events = c.ring_stale_read_events;
553 self.last_metrics.ring_skew_retry_events = c.ring_skew_retry_events;
554 }
555
556 pub fn snapshot(&self) -> Snapshot<'_> {
558 self.arena.snapshot()
559 }
560
561 pub fn owned_snapshot(&self) -> murk_arena::OwnedSnapshot {
566 self.arena.owned_snapshot()
567 }
568
569 pub fn current_tick(&self) -> TickId {
571 self.current_tick
572 }
573
574 pub fn is_tick_disabled(&self) -> bool {
576 self.tick_disabled
577 }
578
579 pub fn consecutive_rollback_count(&self) -> u32 {
581 self.consecutive_rollback_count
582 }
583
584 pub fn last_metrics(&self) -> &StepMetrics {
586 &self.last_metrics
587 }
588
589 pub fn space(&self) -> &dyn murk_space::Space {
591 self.space.as_ref()
592 }
593
594 pub fn ingress_queue_depth(&self) -> usize {
596 self.ingress.len()
597 }
598
599 pub fn ingress_queue_capacity(&self) -> usize {
601 self.ingress.capacity()
602 }
603}
604
605#[cfg(test)]
606mod tests {
607 use super::*;
608 use murk_core::command::CommandPayload;
609 use murk_core::id::{Coord, ParameterKey};
610 use murk_core::traits::SnapshotAccess;
611 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
612 use murk_propagator::propagator::WriteMode;
613 use murk_space::{EdgeBehavior, Line1D};
614 use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
615 use std::sync::atomic::{AtomicUsize, Ordering};
616
617 fn scalar_field(name: &str) -> FieldDef {
618 FieldDef {
619 name: name.to_string(),
620 field_type: FieldType::Scalar,
621 mutability: FieldMutability::PerTick,
622 units: None,
623 bounds: None,
624 boundary_behavior: BoundaryBehavior::Clamp,
625 }
626 }
627
628 fn sparse_scalar_field(name: &str) -> FieldDef {
629 FieldDef {
630 name: name.to_string(),
631 field_type: FieldType::Scalar,
632 mutability: FieldMutability::Sparse,
633 units: None,
634 bounds: None,
635 boundary_behavior: BoundaryBehavior::Clamp,
636 }
637 }
638
639 fn make_cmd(expires: u64) -> Command {
640 Command {
641 payload: CommandPayload::SetParameter {
642 key: ParameterKey(0),
643 value: 0.0,
644 },
645 expires_after_tick: TickId(expires),
646 source_id: None,
647 source_seq: None,
648 priority_class: 1,
649 arrival_seq: 0,
650 }
651 }
652
653 fn simple_engine() -> TickEngine {
654 let config = WorldConfig {
655 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
656 fields: vec![scalar_field("energy")],
657 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
658 dt: 0.1,
659 seed: 42,
660 ring_buffer_size: 8,
661 max_ingress_queue: 1024,
662 tick_rate_hz: None,
663 backoff: crate::config::BackoffConfig::default(),
664 };
665 TickEngine::new(config).unwrap()
666 }
667
668 fn two_field_engine() -> TickEngine {
669 let config = WorldConfig {
670 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
671 fields: vec![scalar_field("field0"), scalar_field("field1")],
672 propagators: vec![
673 Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
674 Box::new(IdentityPropagator::new(
675 "copy_f0_to_f1",
676 FieldId(0),
677 FieldId(1),
678 )),
679 ],
680 dt: 0.1,
681 seed: 42,
682 ring_buffer_size: 8,
683 max_ingress_queue: 1024,
684 tick_rate_hz: None,
685 backoff: crate::config::BackoffConfig::default(),
686 };
687 TickEngine::new(config).unwrap()
688 }
689
690 fn three_field_engine() -> TickEngine {
691 struct SumPropagator {
695 name: String,
696 input_a: FieldId,
697 input_b: FieldId,
698 output: FieldId,
699 }
700
701 impl SumPropagator {
702 fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
703 Self {
704 name: name.to_string(),
705 input_a: a,
706 input_b: b,
707 output: out,
708 }
709 }
710 }
711
712 impl Propagator for SumPropagator {
713 fn name(&self) -> &str {
714 &self.name
715 }
716 fn reads(&self) -> murk_core::FieldSet {
717 [self.input_a, self.input_b].into_iter().collect()
718 }
719 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
720 vec![(self.output, WriteMode::Full)]
721 }
722 fn step(
723 &self,
724 ctx: &mut murk_propagator::StepContext<'_>,
725 ) -> Result<(), murk_core::PropagatorError> {
726 let a = ctx.reads().read(self.input_a).unwrap().to_vec();
727 let b = ctx.reads().read(self.input_b).unwrap().to_vec();
728 let out = ctx.writes().write(self.output).unwrap();
729 for i in 0..out.len() {
730 out[i] = a[i] + b[i];
731 }
732 Ok(())
733 }
734 }
735
736 let config = WorldConfig {
737 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
738 fields: vec![
739 scalar_field("field0"),
740 scalar_field("field1"),
741 scalar_field("field2"),
742 ],
743 propagators: vec![
744 Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
745 Box::new(IdentityPropagator::new(
746 "copy_f0_to_f1",
747 FieldId(0),
748 FieldId(1),
749 )),
750 Box::new(SumPropagator::new(
751 "sum_f0_f1_to_f2",
752 FieldId(0),
753 FieldId(1),
754 FieldId(2),
755 )),
756 ],
757 dt: 0.1,
758 seed: 42,
759 ring_buffer_size: 8,
760 max_ingress_queue: 1024,
761 tick_rate_hz: None,
762 backoff: crate::config::BackoffConfig::default(),
763 };
764 TickEngine::new(config).unwrap()
765 }
766
767 fn failing_engine(succeed_count: usize) -> TickEngine {
768 let config = WorldConfig {
769 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
770 fields: vec![scalar_field("energy")],
771 propagators: vec![Box::new(FailingPropagator::new(
772 "fail",
773 FieldId(0),
774 succeed_count,
775 ))],
776 dt: 0.1,
777 seed: 42,
778 ring_buffer_size: 8,
779 max_ingress_queue: 1024,
780 tick_rate_hz: None,
781 backoff: crate::config::BackoffConfig::default(),
782 };
783 TickEngine::new(config).unwrap()
784 }
785
786 fn partial_failure_engine() -> TickEngine {
787 let config = WorldConfig {
789 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
790 fields: vec![scalar_field("field0"), scalar_field("field1")],
791 propagators: vec![
792 Box::new(ConstPropagator::new("ok_prop", FieldId(0), 1.0)),
793 Box::new(FailingPropagator::new("fail_prop", FieldId(1), 0)),
794 ],
795 dt: 0.1,
796 seed: 42,
797 ring_buffer_size: 8,
798 max_ingress_queue: 1024,
799 tick_rate_hz: None,
800 backoff: crate::config::BackoffConfig::default(),
801 };
802 TickEngine::new(config).unwrap()
803 }
804
805 #[test]
808 fn staged_read_sees_prior_propagator_write() {
809 let mut engine = two_field_engine();
811 let result = engine.execute_tick().unwrap();
812 let snap = engine.snapshot();
813 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
815 assert!(result.metrics.total_us > 0);
816 }
817
818 #[test]
819 fn reads_previous_sees_base_gen() {
820 struct ReadsPrevPropagator;
824 impl Propagator for ReadsPrevPropagator {
825 fn name(&self) -> &str {
826 "reads_prev"
827 }
828 fn reads(&self) -> murk_core::FieldSet {
829 murk_core::FieldSet::empty()
830 }
831 fn reads_previous(&self) -> murk_core::FieldSet {
832 [FieldId(0)].into_iter().collect()
833 }
834 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
835 vec![(FieldId(1), WriteMode::Full)]
836 }
837 fn step(
838 &self,
839 ctx: &mut murk_propagator::StepContext<'_>,
840 ) -> Result<(), murk_core::PropagatorError> {
841 let prev = ctx.reads_previous().read(FieldId(0)).unwrap().to_vec();
842 let out = ctx.writes().write(FieldId(1)).unwrap();
843 out.copy_from_slice(&prev);
844 Ok(())
845 }
846 }
847
848 let config = WorldConfig {
849 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
850 fields: vec![scalar_field("field0"), scalar_field("field1")],
851 propagators: vec![
852 Box::new(ConstPropagator::new("write_f0", FieldId(0), 99.0)),
853 Box::new(ReadsPrevPropagator),
854 ],
855 dt: 0.1,
856 seed: 42,
857 ring_buffer_size: 8,
858 max_ingress_queue: 1024,
859 tick_rate_hz: None,
860 backoff: crate::config::BackoffConfig::default(),
861 };
862 let mut engine = TickEngine::new(config).unwrap();
863
864 engine.execute_tick().unwrap();
867 let snap = engine.snapshot();
868 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 0.0);
870
871 engine.execute_tick().unwrap();
873 let snap = engine.snapshot();
874 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 99.0);
875 }
876
877 #[test]
878 fn three_propagator_overlay_visibility() {
879 let mut engine = three_field_engine();
883 engine.execute_tick().unwrap();
884 let snap = engine.snapshot();
885
886 assert_eq!(snap.read(FieldId(0)).unwrap()[0], 7.0);
887 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
888 assert_eq!(snap.read(FieldId(2)).unwrap()[0], 14.0); }
890
891 #[test]
892 fn unwritten_field_reads_from_base_gen() {
893 let mut engine = three_field_engine();
896 engine.execute_tick().unwrap();
897 let snap = engine.snapshot();
900 let f2 = snap.read(FieldId(2)).unwrap();
901 assert!(f2.iter().all(|&v| v == 14.0));
902 }
903
904 #[test]
907 fn propagator_failure_no_snapshot_published() {
908 let mut engine = failing_engine(0);
909
910 let snap_before = engine.snapshot();
912 let tick_before = snap_before.tick_id();
913
914 let result = engine.execute_tick();
916 assert!(result.is_err());
917
918 let snap_after = engine.snapshot();
920 assert_eq!(snap_after.tick_id(), tick_before);
921 }
922
923 #[test]
924 fn partial_failure_rolls_back_all() {
925 let mut engine = partial_failure_engine();
928
929 let result = engine.execute_tick();
931 assert!(result.is_err());
932
933 let snap = engine.snapshot();
935 let f0 = snap.read(FieldId(0));
936 if let Some(data) = f0 {
939 assert!(
940 data.iter().all(|&v| v == 0.0),
941 "rollback should prevent PropA's writes from being visible"
942 );
943 }
944 }
945
946 #[test]
947 fn rollback_receipts_generated() {
948 let mut engine = failing_engine(0);
949
950 let cmd = Command {
952 payload: CommandPayload::SetField {
953 coord: smallvec::smallvec![0],
954 field_id: FieldId(0),
955 value: 1.0,
956 },
957 expires_after_tick: TickId(100),
958 source_id: None,
959 source_seq: None,
960 priority_class: 1,
961 arrival_seq: 0,
962 };
963 engine.submit_commands(vec![cmd]);
964
965 let result = engine.execute_tick();
966 match result {
967 Err(TickError {
968 kind: StepError::PropagatorFailed { .. },
969 receipts,
970 }) => {
971 assert_eq!(receipts.len(), 1);
973 assert!(receipts[0].accepted);
974 assert_eq!(receipts[0].reason_code, Some(IngressError::TickRollback));
975 }
976 other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
977 }
978 }
979
980 #[test]
981 fn rollback_preserves_rejected_receipts() {
982 let mut engine = failing_engine(0);
987
988 engine.submit_commands(vec![make_cmd(100)]);
990
991 let result = engine.execute_tick();
992 match result {
993 Err(TickError {
994 kind: StepError::PropagatorFailed { .. },
995 receipts,
996 }) => {
997 assert_eq!(receipts.len(), 1);
998 assert!(
1001 !receipts[0].accepted,
1002 "rejected receipt should stay rejected after rollback"
1003 );
1004 assert_eq!(
1005 receipts[0].reason_code,
1006 Some(IngressError::UnsupportedCommand),
1007 "rejected receipt must preserve UnsupportedCommand reason after rollback"
1008 );
1009 }
1010 other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
1011 }
1012 }
1013
1014 #[test]
1017 fn consecutive_rollbacks_disable_ticking() {
1018 let mut engine = failing_engine(0);
1019
1020 for _ in 0..3 {
1021 let _ = engine.execute_tick();
1022 }
1023
1024 assert!(engine.is_tick_disabled());
1025 assert_eq!(engine.consecutive_rollback_count(), 3);
1026 assert_eq!(engine.last_metrics().rollback_events, 3);
1027 assert_eq!(engine.last_metrics().tick_disabled_transitions, 1);
1028 }
1029
1030 #[test]
1031 fn success_resets_rollback_count() {
1032 let mut engine = failing_engine(10);
1035
1036 engine.execute_tick().unwrap();
1038 engine.execute_tick().unwrap();
1039 assert_eq!(engine.consecutive_rollback_count(), 0);
1040 assert_eq!(engine.current_tick(), TickId(2));
1041 }
1042
1043 #[test]
1044 fn tick_disabled_rejects_immediately() {
1045 let mut engine = failing_engine(0);
1046
1047 for _ in 0..3 {
1049 let _ = engine.execute_tick();
1050 }
1051 assert!(engine.is_tick_disabled());
1052
1053 match engine.execute_tick() {
1055 Err(TickError {
1056 kind: StepError::TickDisabled,
1057 ..
1058 }) => {}
1059 other => panic!("expected TickDisabled, got {other:?}"),
1060 }
1061
1062 let receipts = engine.submit_commands(vec![make_cmd(100), make_cmd(100)]);
1063 assert!(receipts
1064 .iter()
1065 .all(|r| r.reason_code == Some(IngressError::TickDisabled)));
1066 assert_eq!(engine.last_metrics().tick_disabled_rejections, 2);
1067 assert_eq!(engine.last_metrics().rollback_events, 3);
1068 assert_eq!(engine.last_metrics().tick_disabled_transitions, 1);
1069 }
1070
1071 #[test]
1072 fn reset_clears_tick_disabled() {
1073 let mut engine = failing_engine(0);
1074
1075 for _ in 0..3 {
1076 let _ = engine.execute_tick();
1077 }
1078 assert!(engine.is_tick_disabled());
1079
1080 engine.reset().unwrap();
1081 assert!(!engine.is_tick_disabled());
1082 assert_eq!(engine.current_tick(), TickId(0));
1083 assert_eq!(engine.consecutive_rollback_count(), 0);
1084 assert_eq!(engine.last_metrics().queue_full_rejections, 0);
1085 assert_eq!(engine.last_metrics().tick_disabled_rejections, 0);
1086 assert_eq!(engine.last_metrics().rollback_events, 0);
1087 assert_eq!(engine.last_metrics().tick_disabled_transitions, 0);
1088 assert_eq!(engine.last_metrics().worker_stall_events, 0);
1089 assert_eq!(engine.last_metrics().ring_not_available_events, 0);
1090 assert_eq!(engine.last_metrics().ring_eviction_events, 0);
1091 assert_eq!(engine.last_metrics().ring_stale_read_events, 0);
1092 assert_eq!(engine.last_metrics().ring_skew_retry_events, 0);
1093 }
1094
1095 #[test]
1098 fn single_tick_end_to_end() {
1099 let mut engine = simple_engine();
1100 let result = engine.execute_tick().unwrap();
1101
1102 let snap = engine.snapshot();
1103 let data = snap.read(FieldId(0)).unwrap();
1104 assert_eq!(data.len(), 10);
1105 assert!(data.iter().all(|&v| v == 42.0));
1106 assert_eq!(engine.current_tick(), TickId(1));
1107 assert!(!result.receipts.is_empty() || result.receipts.is_empty()); }
1109
1110 #[test]
1111 fn multi_tick_determinism() {
1112 let mut engine = simple_engine();
1113
1114 for _ in 0..10 {
1115 engine.execute_tick().unwrap();
1116 }
1117
1118 let snap = engine.snapshot();
1119 let data = snap.read(FieldId(0)).unwrap();
1120 assert!(data.iter().all(|&v| v == 42.0));
1121 assert_eq!(engine.current_tick(), TickId(10));
1122 }
1123
1124 #[test]
1125 fn commands_flow_through_to_receipts() {
1126 let mut engine = simple_engine();
1127
1128 let coord: Coord = vec![0i32].into();
1130 let cmds = vec![
1131 Command {
1132 payload: CommandPayload::SetField {
1133 coord: coord.clone(),
1134 field_id: FieldId(0),
1135 value: 1.0,
1136 },
1137 expires_after_tick: TickId(100),
1138 source_id: None,
1139 source_seq: None,
1140 priority_class: 1,
1141 arrival_seq: 0,
1142 },
1143 Command {
1144 payload: CommandPayload::SetField {
1145 coord: coord.clone(),
1146 field_id: FieldId(0),
1147 value: 2.0,
1148 },
1149 expires_after_tick: TickId(100),
1150 source_id: None,
1151 source_seq: None,
1152 priority_class: 1,
1153 arrival_seq: 0,
1154 },
1155 ];
1156 let submit_receipts = engine.submit_commands(cmds);
1157 assert_eq!(submit_receipts.len(), 2);
1158 assert!(submit_receipts.iter().all(|r| r.accepted));
1159
1160 let result = engine.execute_tick().unwrap();
1161 let applied: Vec<_> = result
1163 .receipts
1164 .iter()
1165 .filter(|r| r.applied_tick_id.is_some())
1166 .collect();
1167 assert_eq!(applied.len(), 2);
1168 assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
1169 }
1170
1171 #[test]
1172 fn non_setfield_commands_rejected_honestly() {
1173 let mut engine = simple_engine();
1174
1175 let submit_receipts = engine.submit_commands(vec![make_cmd(100), make_cmd(100)]);
1177 assert_eq!(submit_receipts.len(), 2);
1178 assert!(submit_receipts.iter().all(|r| r.accepted));
1179
1180 let result = engine.execute_tick().unwrap();
1181 assert_eq!(result.receipts.len(), 2);
1182
1183 for receipt in &result.receipts {
1186 assert!(
1187 !receipt.accepted,
1188 "unimplemented command type must be rejected"
1189 );
1190 assert_eq!(
1191 receipt.applied_tick_id, None,
1192 "unimplemented command must not have applied_tick_id"
1193 );
1194 assert_eq!(
1195 receipt.reason_code,
1196 Some(IngressError::UnsupportedCommand),
1197 "rejected unsupported command must carry UnsupportedCommand reason"
1198 );
1199 }
1200 }
1201
1202 #[test]
1205 fn timing_fields_populated() {
1206 let mut engine = simple_engine();
1207 let result = engine.execute_tick().unwrap();
1208
1209 let _ = result.metrics.total_us;
1211 assert_eq!(result.metrics.propagator_us.len(), 1);
1212 assert_eq!(result.metrics.propagator_us[0].0, "const");
1213 assert_eq!(result.metrics.queue_full_rejections, 0);
1214 assert_eq!(result.metrics.tick_disabled_rejections, 0);
1215 assert_eq!(result.metrics.rollback_events, 0);
1216 assert_eq!(result.metrics.tick_disabled_transitions, 0);
1217 assert_eq!(result.metrics.worker_stall_events, 0);
1218 assert_eq!(result.metrics.ring_not_available_events, 0);
1219 assert_eq!(result.metrics.ring_eviction_events, 0);
1220 assert_eq!(result.metrics.ring_stale_read_events, 0);
1221 assert_eq!(result.metrics.ring_skew_retry_events, 0);
1222 }
1223
1224 #[test]
1225 fn memory_bytes_matches_arena() {
1226 let mut engine = simple_engine();
1227 engine.execute_tick().unwrap();
1228
1229 let metrics = engine.last_metrics();
1230 assert!(metrics.memory_bytes > 0);
1231 }
1232
1233 #[test]
1234 fn queue_full_rejections_are_counted_in_metrics() {
1235 let config = WorldConfig {
1236 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1237 fields: vec![scalar_field("energy")],
1238 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
1239 dt: 0.1,
1240 seed: 42,
1241 ring_buffer_size: 8,
1242 max_ingress_queue: 1,
1243 tick_rate_hz: None,
1244 backoff: crate::config::BackoffConfig::default(),
1245 };
1246 let mut engine = TickEngine::new(config).unwrap();
1247
1248 let submit_receipts =
1249 engine.submit_commands(vec![make_cmd(100), make_cmd(100), make_cmd(100)]);
1250 let queue_full = submit_receipts
1251 .iter()
1252 .filter(|r| r.reason_code == Some(IngressError::QueueFull))
1253 .count();
1254 assert_eq!(queue_full, 2);
1255
1256 let result = engine.execute_tick().unwrap();
1257 assert_eq!(result.metrics.queue_full_rejections, 2);
1258 assert_eq!(result.metrics.tick_disabled_rejections, 0);
1259 }
1260
1261 #[test]
1262 fn external_realtime_counters_are_reflected_in_metrics() {
1263 let mut engine = simple_engine();
1264 engine.record_worker_stall_events(3);
1265 engine.set_ring_not_available_events(7);
1266 engine.set_ring_eviction_events(5);
1267 engine.set_ring_stale_read_events(2);
1268 engine.set_ring_skew_retry_events(1);
1269
1270 assert_eq!(engine.last_metrics().worker_stall_events, 3);
1271 assert_eq!(engine.last_metrics().ring_not_available_events, 7);
1272 assert_eq!(engine.last_metrics().ring_eviction_events, 5);
1273 assert_eq!(engine.last_metrics().ring_stale_read_events, 2);
1274 assert_eq!(engine.last_metrics().ring_skew_retry_events, 1);
1275
1276 let result = engine.execute_tick().unwrap();
1277 assert_eq!(result.metrics.worker_stall_events, 3);
1278 assert_eq!(result.metrics.ring_not_available_events, 7);
1279 assert_eq!(result.metrics.ring_eviction_events, 5);
1280 assert_eq!(result.metrics.ring_stale_read_events, 2);
1281 assert_eq!(result.metrics.ring_skew_retry_events, 1);
1282 }
1283
1284 #[test]
1285 fn rollback_clears_sparse_reuse_counters_for_next_success() {
1286 struct MaybeFailPropagator {
1287 output: FieldId,
1288 fail_on_call: Option<usize>,
1289 call_count: AtomicUsize,
1290 }
1291
1292 impl MaybeFailPropagator {
1293 fn new(output: FieldId, fail_on_call: Option<usize>) -> Self {
1294 Self {
1295 output,
1296 fail_on_call,
1297 call_count: AtomicUsize::new(0),
1298 }
1299 }
1300 }
1301
1302 impl Propagator for MaybeFailPropagator {
1303 fn name(&self) -> &str {
1304 "maybe_fail"
1305 }
1306
1307 fn reads(&self) -> murk_core::FieldSet {
1308 murk_core::FieldSet::empty()
1309 }
1310
1311 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
1312 vec![(self.output, WriteMode::Full)]
1313 }
1314
1315 fn step(
1316 &self,
1317 ctx: &mut murk_propagator::StepContext<'_>,
1318 ) -> Result<(), murk_core::PropagatorError> {
1319 let n = self.call_count.fetch_add(1, Ordering::Relaxed);
1320 if self.fail_on_call == Some(n) {
1321 return Err(murk_core::PropagatorError::ExecutionFailed {
1322 reason: format!("deliberate failure on call {n}"),
1323 });
1324 }
1325
1326 let output = ctx.writes().write(self.output).ok_or_else(|| {
1327 murk_core::PropagatorError::ExecutionFailed {
1328 reason: format!("field {:?} not writable", self.output),
1329 }
1330 })?;
1331 output.fill(n as f32);
1332 Ok(())
1333 }
1334 }
1335
1336 fn sparse_engine(fail_on_call: Option<usize>) -> TickEngine {
1337 let config = WorldConfig {
1338 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1339 fields: vec![sparse_scalar_field("sparse0"), scalar_field("field1")],
1340 propagators: vec![
1341 Box::new(ConstPropagator::new("write_sparse", FieldId(0), 3.0)),
1342 Box::new(MaybeFailPropagator::new(FieldId(1), fail_on_call)),
1343 ],
1344 dt: 0.1,
1345 seed: 42,
1346 ring_buffer_size: 8,
1347 max_ingress_queue: 1024,
1348 tick_rate_hz: None,
1349 backoff: crate::config::BackoffConfig::default(),
1350 };
1351 let mut engine = TickEngine::new(config).unwrap();
1352 engine.arena.reset_sparse_reuse_counters();
1353 engine
1354 }
1355
1356 let mut rollback_engine = sparse_engine(Some(1)); rollback_engine.execute_tick().unwrap();
1358
1359 let rollback = rollback_engine.execute_tick();
1360 assert!(rollback.is_err(), "tick 2 should fail and roll back");
1361 assert_eq!(rollback_engine.arena.sparse_reuse_hits(), 0);
1362 assert_eq!(rollback_engine.arena.sparse_reuse_misses(), 0);
1363
1364 rollback_engine.execute_tick().unwrap();
1366 }
1367
1368 #[test]
1371 fn reset_clears_pending_ingress() {
1372 let mut engine = simple_engine();
1373
1374 engine.submit_commands(vec![make_cmd(1000), make_cmd(1000)]);
1376
1377 engine.reset().unwrap();
1379
1380 let result = engine.execute_tick().unwrap();
1382 assert!(result.receipts.is_empty());
1383 }
1384
1385 #[test]
1386 fn command_index_preserved_after_reordering() {
1387 let mut engine = simple_engine();
1388
1389 let cmds = vec![
1392 Command {
1393 payload: CommandPayload::SetParameter {
1394 key: ParameterKey(0),
1395 value: 1.0,
1396 },
1397 expires_after_tick: TickId(100),
1398 source_id: None,
1399 source_seq: None,
1400 priority_class: 2, arrival_seq: 0,
1402 },
1403 Command {
1404 payload: CommandPayload::SetParameter {
1405 key: ParameterKey(0),
1406 value: 2.0,
1407 },
1408 expires_after_tick: TickId(100),
1409 source_id: None,
1410 source_seq: None,
1411 priority_class: 0, arrival_seq: 0,
1413 },
1414 ];
1415 engine.submit_commands(cmds);
1416
1417 let result = engine.execute_tick().unwrap();
1418 assert_eq!(result.receipts.len(), 2);
1422 assert_eq!(result.receipts[0].command_index, 1); assert_eq!(result.receipts[1].command_index, 0); }
1425
1426 #[test]
1427 fn setfield_oob_receipt_not_applied() {
1428 let mut engine = simple_engine();
1432
1433 let oob_coord: Coord = vec![999i32].into();
1435 let cmd = Command {
1436 payload: CommandPayload::SetField {
1437 coord: oob_coord,
1438 field_id: FieldId(0),
1439 value: 1.0,
1440 },
1441 expires_after_tick: TickId(100),
1442 source_id: None,
1443 source_seq: None,
1444 priority_class: 1,
1445 arrival_seq: 0,
1446 };
1447 engine.submit_commands(vec![cmd]);
1448
1449 let result = engine.execute_tick().unwrap();
1450 assert_eq!(result.receipts.len(), 1);
1451 let receipt = &result.receipts[0];
1452 assert!(
1453 !receipt.accepted,
1454 "OOB SetField must be marked not accepted"
1455 );
1456 assert_eq!(
1457 receipt.applied_tick_id, None,
1458 "OOB SetField must not have applied_tick_id"
1459 );
1460 assert_eq!(
1461 receipt.reason_code,
1462 Some(IngressError::NotApplied),
1463 "OOB SetField must carry NotApplied reason"
1464 );
1465 }
1466
1467 #[test]
1468 fn setfield_unknown_field_receipt_not_applied() {
1469 let mut engine = simple_engine();
1471
1472 let coord: Coord = vec![0i32].into();
1473 let cmd = Command {
1474 payload: CommandPayload::SetField {
1475 coord,
1476 field_id: FieldId(99), value: 1.0,
1478 },
1479 expires_after_tick: TickId(100),
1480 source_id: None,
1481 source_seq: None,
1482 priority_class: 1,
1483 arrival_seq: 0,
1484 };
1485 engine.submit_commands(vec![cmd]);
1486
1487 let result = engine.execute_tick().unwrap();
1488 assert_eq!(result.receipts.len(), 1);
1489 assert!(!result.receipts[0].accepted);
1490 assert_eq!(
1491 result.receipts[0].reason_code,
1492 Some(IngressError::NotApplied)
1493 );
1494 }
1495
1496 #[test]
1497 fn writemode_incremental_seeds_from_previous_gen() {
1498 struct IncrementalOnce {
1504 written: std::cell::Cell<bool>,
1505 }
1506 impl IncrementalOnce {
1507 fn new() -> Self {
1508 Self {
1509 written: std::cell::Cell::new(false),
1510 }
1511 }
1512 }
1513 impl Propagator for IncrementalOnce {
1514 fn name(&self) -> &str {
1515 "incr_once"
1516 }
1517 fn reads(&self) -> murk_core::FieldSet {
1518 murk_core::FieldSet::empty()
1519 }
1520 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
1521 vec![(FieldId(0), WriteMode::Incremental)]
1522 }
1523 fn step(
1524 &self,
1525 ctx: &mut murk_propagator::StepContext<'_>,
1526 ) -> Result<(), murk_core::PropagatorError> {
1527 let buf = ctx.writes().write(FieldId(0)).unwrap();
1528 if !self.written.get() {
1529 buf[0] = 42.0;
1531 buf[1] = 99.0;
1532 self.written.set(true);
1533 }
1534 Ok(())
1536 }
1537 }
1538
1539 let config = WorldConfig {
1540 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1541 fields: vec![scalar_field("state")],
1542 propagators: vec![Box::new(IncrementalOnce::new())],
1543 dt: 0.1,
1544 seed: 42,
1545 ring_buffer_size: 8,
1546 max_ingress_queue: 1024,
1547 tick_rate_hz: None,
1548 backoff: crate::config::BackoffConfig::default(),
1549 };
1550 let mut engine = TickEngine::new(config).unwrap();
1551
1552 engine.execute_tick().unwrap();
1554 let snap = engine.snapshot();
1555 assert_eq!(snap.read(FieldId(0)).unwrap()[0], 42.0);
1556 assert_eq!(snap.read(FieldId(0)).unwrap()[1], 99.0);
1557
1558 engine.execute_tick().unwrap();
1560 let snap = engine.snapshot();
1561 assert_eq!(
1562 snap.read(FieldId(0)).unwrap()[0],
1563 42.0,
1564 "BUG-015: incremental field lost data across ticks"
1565 );
1566 assert_eq!(
1567 snap.read(FieldId(0)).unwrap()[1],
1568 99.0,
1569 "BUG-015: incremental field lost data across ticks"
1570 );
1571 assert_eq!(snap.read(FieldId(0)).unwrap()[2], 0.0);
1573
1574 engine.execute_tick().unwrap();
1576 let snap = engine.snapshot();
1577 assert_eq!(snap.read(FieldId(0)).unwrap()[0], 42.0);
1578 assert_eq!(snap.read(FieldId(0)).unwrap()[1], 99.0);
1579 }
1580
1581 #[test]
1583 fn static_field_overflow_returns_error() {
1584 let config = WorldConfig {
1585 space: Box::new(Line1D::new(3, EdgeBehavior::Absorb).unwrap()),
1586 fields: vec![FieldDef {
1587 name: "huge_vec".to_string(),
1588 field_type: FieldType::Vector {
1589 dims: u32::MAX / 2, },
1591 mutability: FieldMutability::Static,
1592 units: None,
1593 bounds: None,
1594 boundary_behavior: BoundaryBehavior::Clamp,
1595 }],
1596 propagators: vec![Box::new(ConstPropagator::new("c", FieldId(0), 1.0))],
1597 dt: 0.1,
1598 seed: 42,
1599 ring_buffer_size: 8,
1600 max_ingress_queue: 1024,
1601 tick_rate_hz: None,
1602 backoff: crate::config::BackoffConfig::default(),
1603 };
1604 match TickEngine::new(config) {
1605 Err(crate::config::ConfigError::CellCountOverflow { .. }) => {}
1606 Ok(_) => panic!("expected CellCountOverflow, got Ok"),
1607 Err(e) => panic!("expected CellCountOverflow, got {e}"),
1608 }
1609 }
1610}