Skip to main content

murk_engine/
tick.rs

1//! Tick engine: the single-threaded simulation loop.
2//!
3//! [`TickEngine`] wires together the arena, propagators, ingress queue,
4//! and overlay caches into a deterministic tick execution loop with
5//! rollback atomicity.
6//!
7//! # Lockstep mode only
8//!
9//! This module implements Lockstep mode — a callable struct with no
10//! background threads. RealtimeAsync mode (future WP) wraps this in
11//! a thread with a ring buffer.
12
13use std::fmt;
14use std::time::Instant;
15
16use murk_arena::config::ArenaConfig;
17use murk_arena::pingpong::PingPongArena;
18use murk_arena::read::Snapshot;
19use murk_arena::static_arena::StaticArena;
20use murk_core::command::{Command, CommandPayload, Receipt};
21use murk_core::error::{IngressError, StepError};
22use murk_core::id::{FieldId, ParameterVersion, TickId};
23use murk_core::traits::{FieldReader, FieldWriter};
24use murk_core::FieldMutability;
25use murk_propagator::pipeline::{ReadResolutionPlan, ReadSource};
26use murk_propagator::propagator::Propagator;
27use murk_propagator::scratch::ScratchRegion as PropagatorScratch;
28
29use crate::config::{ConfigError, WorldConfig};
30use crate::ingress::IngressQueue;
31use crate::metrics::StepMetrics;
32use crate::overlay::{BaseFieldCache, BaseFieldSet, OverlayReader, StagedFieldCache};
33
34// ── TickResult ───────────────────────────────────────────────────
35
36/// Result of a successful tick execution.
37#[derive(Debug)]
38pub struct TickResult {
39    /// Receipts for commands submitted before this tick.
40    pub receipts: Vec<Receipt>,
41    /// Performance metrics for this tick.
42    pub metrics: StepMetrics,
43}
44
45// ── TickError ───────────────────────────────────────────────────
46
47/// Error returned from [`TickEngine::execute_tick()`].
48///
49/// Wraps the underlying [`StepError`] and any receipts that were produced
50/// before the failure. On rollback, receipts carry `TickRollback` reason
51/// codes; callers must not discard them.
52#[derive(Debug, PartialEq, Eq)]
53pub struct TickError {
54    /// The underlying error.
55    pub kind: StepError,
56    /// Receipts produced before the failure (may include rollback receipts).
57    pub receipts: Vec<Receipt>,
58}
59
60impl fmt::Display for TickError {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        write!(f, "{}", self.kind)
63    }
64}
65
66impl std::error::Error for TickError {
67    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
68        Some(&self.kind)
69    }
70}
71
72// ── TickEngine ───────────────────────────────────────────────────
73
74/// Cumulative counters tracked across ticks and reported in metrics.
75///
76/// Extracting these into a `Default`-deriving struct keeps `new()`,
77/// `reset()`, and `refresh_counter_metrics()` in sync automatically.
78#[derive(Default)]
79struct CumulativeCounters {
80    queue_full_rejections: u64,
81    tick_disabled_rejections: u64,
82    rollback_events: u64,
83    tick_disabled_transitions: u64,
84    worker_stall_events: u64,
85    ring_not_available_events: u64,
86    ring_eviction_events: u64,
87    ring_stale_read_events: u64,
88    ring_skew_retry_events: u64,
89}
90
91/// Single-threaded tick engine for Lockstep mode.
92///
93/// Owns all simulation state and executes ticks synchronously. Each
94/// `execute_tick()` call runs the full propagator pipeline, publishes
95/// a snapshot, and returns receipts for any submitted commands.
96pub struct TickEngine {
97    arena: PingPongArena,
98    propagators: Vec<Box<dyn Propagator>>,
99    plan: ReadResolutionPlan,
100    ingress: IngressQueue,
101    space: Box<dyn murk_space::Space>,
102    dt: f64,
103    current_tick: TickId,
104    param_version: ParameterVersion,
105    consecutive_rollback_count: u32,
106    tick_disabled: bool,
107    max_consecutive_rollbacks: u32,
108    counters: CumulativeCounters,
109    propagator_scratch: PropagatorScratch,
110    base_field_set: BaseFieldSet,
111    base_cache: BaseFieldCache,
112    staged_cache: StagedFieldCache,
113    last_metrics: StepMetrics,
114}
115
116impl TickEngine {
117    /// Construct a new tick engine from a [`WorldConfig`].
118    ///
119    /// Validates the configuration, builds the read resolution plan,
120    /// constructs the arena, and pre-computes the base field set.
121    /// Consumes the `WorldConfig`.
122    pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
123        // Validate and build read resolution plan.
124        config.validate()?;
125        let defined_fields = config.defined_field_set()?;
126        let plan = murk_propagator::validate_pipeline(
127            &config.propagators,
128            &defined_fields,
129            config.dt,
130            &*config.space,
131        )?;
132
133        // Build arena field defs.
134        let arena_field_defs: Vec<(FieldId, murk_core::FieldDef)> = config
135            .fields
136            .iter()
137            .enumerate()
138            .map(|(i, def)| -> Result<_, ConfigError> {
139                let id = u32::try_from(i).map_err(|_| ConfigError::FieldCountOverflow {
140                    value: config.fields.len(),
141                })?;
142                Ok((FieldId(id), def.clone()))
143            })
144            .collect::<Result<_, _>>()?;
145
146        // Safety: validate() already checked cell_count fits in u32.
147        let cell_count = u32::try_from(config.space.cell_count()).map_err(|_| {
148            ConfigError::CellCountOverflow {
149                value: config.space.cell_count(),
150            }
151        })?;
152        let arena_config = ArenaConfig::new(cell_count);
153
154        // Build static arena for any Static fields.
155        let static_fields: Vec<(FieldId, u32)> = arena_field_defs
156            .iter()
157            .filter(|(_, d)| d.mutability == FieldMutability::Static)
158            .map(|(id, d)| {
159                let len = cell_count.checked_mul(d.field_type.components()).ok_or(
160                    ConfigError::CellCountOverflow {
161                        value: cell_count as usize,
162                    },
163                )?;
164                Ok((*id, len))
165            })
166            .collect::<Result<_, ConfigError>>()?;
167        let static_arena = StaticArena::new(&static_fields).into_shared();
168
169        let arena = PingPongArena::new(arena_config, arena_field_defs, static_arena)?;
170
171        // Pre-compute base field set.
172        let base_field_set = BaseFieldSet::from_plan(&plan, &config.propagators);
173
174        // Compute max scratch bytes across all propagators.
175        let max_scratch = config
176            .propagators
177            .iter()
178            .map(|p| p.scratch_bytes())
179            .max()
180            .unwrap_or(0);
181        let propagator_scratch = PropagatorScratch::with_byte_capacity(max_scratch);
182
183        let ingress = IngressQueue::new(config.max_ingress_queue);
184
185        Ok(Self {
186            arena,
187            propagators: config.propagators,
188            plan,
189            ingress,
190            space: config.space,
191            dt: config.dt,
192            current_tick: TickId(0),
193            param_version: ParameterVersion(0),
194            consecutive_rollback_count: 0,
195            tick_disabled: false,
196            max_consecutive_rollbacks: 3,
197            counters: CumulativeCounters::default(),
198            propagator_scratch,
199            base_field_set,
200            base_cache: BaseFieldCache::new(),
201            staged_cache: StagedFieldCache::new(),
202            last_metrics: StepMetrics::default(),
203        })
204    }
205
206    /// Submit commands to be processed in the next tick.
207    ///
208    /// Returns one receipt per command indicating acceptance or rejection.
209    pub fn submit_commands(&mut self, commands: Vec<Command>) -> Vec<Receipt> {
210        let receipts = self.ingress.submit(commands, self.tick_disabled);
211        for receipt in &receipts {
212            match receipt.reason_code {
213                Some(IngressError::QueueFull) => {
214                    self.counters.queue_full_rejections =
215                        self.counters.queue_full_rejections.saturating_add(1);
216                }
217                Some(IngressError::TickDisabled) => {
218                    self.counters.tick_disabled_rejections =
219                        self.counters.tick_disabled_rejections.saturating_add(1);
220                }
221                // Accepted commands and other rejection types don't need counting.
222                Some(IngressError::Stale)
223                | Some(IngressError::TickRollback)
224                | Some(IngressError::ShuttingDown)
225                | Some(IngressError::UnsupportedCommand)
226                | Some(IngressError::NotApplied)
227                | None => {}
228            }
229        }
230        self.refresh_counter_metrics();
231        receipts
232    }
233
234    /// Execute one tick.
235    ///
236    /// Runs the full propagator pipeline, publishes the snapshot, and
237    /// returns receipts plus metrics. On propagator failure, the tick
238    /// is rolled back atomically (the staging buffer is abandoned).
239    pub fn execute_tick(&mut self) -> Result<TickResult, TickError> {
240        let tick_start = Instant::now();
241
242        // 0. Check if ticking is disabled.
243        if self.tick_disabled {
244            return Err(TickError {
245                kind: StepError::TickDisabled,
246                receipts: Vec::new(),
247            });
248        }
249
250        let next_tick = TickId(self.current_tick.0 + 1);
251
252        // 1. Populate base field cache from snapshot.
253        {
254            let snapshot = self.arena.snapshot();
255            self.base_cache.populate(&snapshot, &self.base_field_set);
256        }
257
258        // 2. Begin tick — if this fails, commands stay in the queue.
259        let mut guard = self.arena.begin_tick().map_err(|_| TickError {
260            kind: StepError::AllocationFailed,
261            receipts: Vec::new(),
262        })?;
263
264        // 3. Drain ingress queue (safe: begin_tick succeeded).
265        let cmd_start = Instant::now();
266        let drain = self.ingress.drain(next_tick);
267        let mut receipts = drain.expired_receipts;
268        let commands = drain.commands;
269        let accepted_receipt_start = receipts.len();
270        for dc in &commands {
271            receipts.push(Receipt {
272                accepted: true,
273                applied_tick_id: None,
274                reason_code: None,
275                command_index: dc.command_index,
276            });
277        }
278        // 3b. Apply commands to the staging writer.
279        for (i, dc) in commands.iter().enumerate() {
280            let receipt = &mut receipts[accepted_receipt_start + i];
281            match &dc.command.payload {
282                CommandPayload::SetField {
283                    ref coord,
284                    field_id,
285                    value,
286                } => {
287                    let applied = if let Some(rank) = self.space.canonical_rank(coord) {
288                        if let Some(buf) = guard.writer.write(*field_id) {
289                            if rank < buf.len() {
290                                buf[rank] = *value;
291                                true
292                            } else {
293                                false
294                            }
295                        } else {
296                            false
297                        }
298                    } else {
299                        false
300                    };
301                    if !applied {
302                        receipt.accepted = false;
303                        receipt.reason_code = Some(IngressError::NotApplied);
304                    }
305                }
306                CommandPayload::SetParameter { .. }
307                | CommandPayload::SetParameterBatch { .. }
308                | CommandPayload::Move { .. }
309                | CommandPayload::Spawn { .. }
310                | CommandPayload::Despawn { .. }
311                | CommandPayload::Custom { .. } => {
312                    receipt.accepted = false;
313                    receipt.reason_code = Some(IngressError::UnsupportedCommand);
314                }
315            }
316        }
317        let command_processing_us = cmd_start.elapsed().as_micros() as u64;
318
319        // 4. Run propagator pipeline.
320        let mut propagator_us = Vec::with_capacity(self.propagators.len());
321        for (i, prop) in self.propagators.iter().enumerate() {
322            let prop_start = Instant::now();
323
324            // 4a. Populate staged cache from guard.writer.read() per plan routes.
325            self.staged_cache.clear();
326            if let Some(routes) = self.plan.routes_for(i) {
327                for (&field, &source) in routes {
328                    if let ReadSource::Staged { .. } = source {
329                        if let Some(data) = guard.writer.read(field) {
330                            self.staged_cache.insert(field, data);
331                        }
332                    }
333                }
334            }
335
336            // 4b. Construct OverlayReader.
337            let empty_routes = indexmap::IndexMap::new();
338            let routes = self.plan.routes_for(i).unwrap_or(&empty_routes);
339            let overlay = OverlayReader::new(routes, &self.base_cache, &self.staged_cache);
340
341            // 4c. Seed WriteMode::Incremental buffers from previous generation.
342            for field in self.plan.incremental_fields_for(i) {
343                let prev_data = match self.base_cache.read(field) {
344                    Some(data) => data,
345                    // First tick: no previous generation to seed from. Expected.
346                    None => continue,
347                };
348                // Copy through a temp buffer: base_cache borrows &self,
349                // guard.writer.write() borrows &mut guard.
350                let prev: Vec<f32> = prev_data.to_vec();
351                let write_buf = match guard.writer.write(field) {
352                    Some(buf) => buf,
353                    None => {
354                        debug_assert!(
355                            false,
356                            "incremental field {:?} declared in plan but writer returned None \
357                             for propagator '{}'",
358                            field,
359                            prop.name(),
360                        );
361                        continue;
362                    }
363                };
364                let copy_len = prev.len().min(write_buf.len());
365                write_buf[..copy_len].copy_from_slice(&prev[..copy_len]);
366            }
367
368            // 4d. Reset propagator scratch.
369            self.propagator_scratch.reset();
370
371            // 4e. Construct StepContext and call step().
372            {
373                let mut ctx = murk_propagator::StepContext::new(
374                    &overlay,
375                    &self.base_cache,
376                    &mut guard.writer,
377                    &mut self.propagator_scratch,
378                    self.space.as_ref(),
379                    next_tick,
380                    self.dt,
381                );
382
383                // 4f. Call propagator step.
384                if let Err(reason) = prop.step(&mut ctx) {
385                    // 4g. Rollback on error — guard goes out of scope,
386                    // abandoning the staging buffer (free rollback).
387                    let prop_name = prop.name().to_string();
388                    return self.handle_rollback(
389                        prop_name,
390                        reason,
391                        receipts,
392                        accepted_receipt_start,
393                    );
394                }
395            }
396
397            propagator_us.push((
398                prop.name().to_string(),
399                prop_start.elapsed().as_micros() as u64,
400            ));
401        }
402
403        // 5. guard goes out of scope here (releases staging borrows).
404
405        // 6. Publish.
406        let publish_start = Instant::now();
407        self.arena
408            .publish(next_tick, self.param_version)
409            .map_err(|_| {
410                self.arena.reset_sparse_reuse_counters();
411                TickError {
412                    kind: StepError::AllocationFailed,
413                    receipts: vec![],
414                }
415            })?;
416        let snapshot_publish_us = publish_start.elapsed().as_micros() as u64;
417
418        // 7. Update state.
419        self.current_tick = next_tick;
420        self.consecutive_rollback_count = 0;
421
422        // 8. Finalize receipts with applied_tick_id (only for actually executed commands).
423        for receipt in &mut receipts[accepted_receipt_start..] {
424            if receipt.accepted {
425                receipt.applied_tick_id = Some(next_tick);
426            }
427        }
428
429        // 9. Build metrics.
430        let total_us = tick_start.elapsed().as_micros() as u64;
431        let metrics = StepMetrics {
432            total_us,
433            command_processing_us,
434            propagator_us,
435            snapshot_publish_us,
436            memory_bytes: self.arena.memory_bytes(),
437            sparse_retired_ranges: u32::try_from(self.arena.sparse_retired_range_count())
438                .unwrap_or(u32::MAX),
439            sparse_pending_retired: u32::try_from(self.arena.sparse_pending_retired_count())
440                .unwrap_or(u32::MAX),
441            sparse_reuse_hits: self.arena.sparse_reuse_hits(),
442            sparse_reuse_misses: self.arena.sparse_reuse_misses(),
443            queue_full_rejections: self.counters.queue_full_rejections,
444            tick_disabled_rejections: self.counters.tick_disabled_rejections,
445            rollback_events: self.counters.rollback_events,
446            tick_disabled_transitions: self.counters.tick_disabled_transitions,
447            worker_stall_events: self.counters.worker_stall_events,
448            ring_not_available_events: self.counters.ring_not_available_events,
449            ring_eviction_events: self.counters.ring_eviction_events,
450            ring_stale_read_events: self.counters.ring_stale_read_events,
451            ring_skew_retry_events: self.counters.ring_skew_retry_events,
452        };
453        self.arena.reset_sparse_reuse_counters();
454        self.last_metrics = metrics.clone();
455
456        Ok(TickResult { receipts, metrics })
457    }
458
459    /// Handle a propagator failure by rolling back the tick.
460    ///
461    /// Takes ownership of `receipts` and returns them inside [`TickError`]
462    /// so the caller can inspect per-command rollback reason codes.
463    fn handle_rollback(
464        &mut self,
465        prop_name: String,
466        reason: murk_core::PropagatorError,
467        mut receipts: Vec<Receipt>,
468        accepted_start: usize,
469    ) -> Result<TickResult, TickError> {
470        // Guard was dropped → staging buffer abandoned (free rollback).
471        // Cancel the in-progress tick so begin_tick() can be called again.
472        self.arena.cancel_tick();
473        self.arena.reset_sparse_reuse_counters();
474        self.counters.rollback_events = self.counters.rollback_events.saturating_add(1);
475        self.consecutive_rollback_count = self.consecutive_rollback_count.saturating_add(1);
476        if self.consecutive_rollback_count >= self.max_consecutive_rollbacks {
477            if !self.tick_disabled {
478                self.counters.tick_disabled_transitions =
479                    self.counters.tick_disabled_transitions.saturating_add(1);
480            }
481            self.tick_disabled = true;
482        }
483        self.refresh_counter_metrics();
484
485        // Mark accepted command receipts as rolled back, but preserve
486        // receipts that were already rejected (e.g. unsupported command
487        // types) so callers see the original rejection reason.
488        for receipt in &mut receipts[accepted_start..] {
489            if receipt.accepted {
490                receipt.applied_tick_id = None;
491                receipt.reason_code = Some(IngressError::TickRollback);
492            }
493        }
494
495        Err(TickError {
496            kind: StepError::PropagatorFailed {
497                name: prop_name,
498                reason,
499            },
500            receipts,
501        })
502    }
503
504    /// Reset the engine to its initial state.
505    pub fn reset(&mut self) -> Result<(), ConfigError> {
506        self.arena.reset().map_err(ConfigError::Arena)?;
507        self.ingress.clear();
508        self.current_tick = TickId(0);
509        self.param_version = ParameterVersion(0);
510        self.tick_disabled = false;
511        self.consecutive_rollback_count = 0;
512        self.counters = CumulativeCounters::default();
513        self.last_metrics = StepMetrics::default();
514        Ok(())
515    }
516
517    pub(crate) fn record_worker_stall_events(&mut self, count: u64) {
518        self.counters.worker_stall_events = self.counters.worker_stall_events.saturating_add(count);
519        self.refresh_counter_metrics();
520    }
521
522    pub(crate) fn set_ring_not_available_events(&mut self, total: u64) {
523        self.counters.ring_not_available_events =
524            self.counters.ring_not_available_events.max(total);
525        self.refresh_counter_metrics();
526    }
527
528    pub(crate) fn set_ring_eviction_events(&mut self, total: u64) {
529        self.counters.ring_eviction_events = self.counters.ring_eviction_events.max(total);
530        self.refresh_counter_metrics();
531    }
532
533    pub(crate) fn set_ring_stale_read_events(&mut self, total: u64) {
534        self.counters.ring_stale_read_events = self.counters.ring_stale_read_events.max(total);
535        self.refresh_counter_metrics();
536    }
537
538    pub(crate) fn set_ring_skew_retry_events(&mut self, total: u64) {
539        self.counters.ring_skew_retry_events = self.counters.ring_skew_retry_events.max(total);
540        self.refresh_counter_metrics();
541    }
542
543    fn refresh_counter_metrics(&mut self) {
544        let c = &self.counters;
545        self.last_metrics.queue_full_rejections = c.queue_full_rejections;
546        self.last_metrics.tick_disabled_rejections = c.tick_disabled_rejections;
547        self.last_metrics.rollback_events = c.rollback_events;
548        self.last_metrics.tick_disabled_transitions = c.tick_disabled_transitions;
549        self.last_metrics.worker_stall_events = c.worker_stall_events;
550        self.last_metrics.ring_not_available_events = c.ring_not_available_events;
551        self.last_metrics.ring_eviction_events = c.ring_eviction_events;
552        self.last_metrics.ring_stale_read_events = c.ring_stale_read_events;
553        self.last_metrics.ring_skew_retry_events = c.ring_skew_retry_events;
554    }
555
556    /// Get a read-only snapshot of the current published generation.
557    pub fn snapshot(&self) -> Snapshot<'_> {
558        self.arena.snapshot()
559    }
560
561    /// Get an owned, thread-safe snapshot of the current published generation.
562    ///
563    /// Unlike [`TickEngine::snapshot()`], the returned `OwnedSnapshot` owns
564    /// clones of the segment data and can be sent across thread boundaries.
565    pub fn owned_snapshot(&self) -> murk_arena::OwnedSnapshot {
566        self.arena.owned_snapshot()
567    }
568
569    /// Current tick ID.
570    pub fn current_tick(&self) -> TickId {
571        self.current_tick
572    }
573
574    /// Whether ticking is disabled due to consecutive rollbacks.
575    pub fn is_tick_disabled(&self) -> bool {
576        self.tick_disabled
577    }
578
579    /// Number of consecutive rollbacks since the last successful tick.
580    pub fn consecutive_rollback_count(&self) -> u32 {
581        self.consecutive_rollback_count
582    }
583
584    /// Metrics from the most recent successful tick.
585    pub fn last_metrics(&self) -> &StepMetrics {
586        &self.last_metrics
587    }
588
589    /// The spatial topology for this engine.
590    pub fn space(&self) -> &dyn murk_space::Space {
591        self.space.as_ref()
592    }
593
594    /// Number of command batches currently buffered in ingress.
595    pub fn ingress_queue_depth(&self) -> usize {
596        self.ingress.len()
597    }
598
599    /// Maximum number of command batches ingress can buffer.
600    pub fn ingress_queue_capacity(&self) -> usize {
601        self.ingress.capacity()
602    }
603}
604
605#[cfg(test)]
606mod tests {
607    use super::*;
608    use murk_core::command::CommandPayload;
609    use murk_core::id::{Coord, ParameterKey};
610    use murk_core::traits::SnapshotAccess;
611    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
612    use murk_propagator::propagator::WriteMode;
613    use murk_space::{EdgeBehavior, Line1D};
614    use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
615    use std::sync::atomic::{AtomicUsize, Ordering};
616
617    fn scalar_field(name: &str) -> FieldDef {
618        FieldDef {
619            name: name.to_string(),
620            field_type: FieldType::Scalar,
621            mutability: FieldMutability::PerTick,
622            units: None,
623            bounds: None,
624            boundary_behavior: BoundaryBehavior::Clamp,
625        }
626    }
627
628    fn sparse_scalar_field(name: &str) -> FieldDef {
629        FieldDef {
630            name: name.to_string(),
631            field_type: FieldType::Scalar,
632            mutability: FieldMutability::Sparse,
633            units: None,
634            bounds: None,
635            boundary_behavior: BoundaryBehavior::Clamp,
636        }
637    }
638
639    fn make_cmd(expires: u64) -> Command {
640        Command {
641            payload: CommandPayload::SetParameter {
642                key: ParameterKey(0),
643                value: 0.0,
644            },
645            expires_after_tick: TickId(expires),
646            source_id: None,
647            source_seq: None,
648            priority_class: 1,
649            arrival_seq: 0,
650        }
651    }
652
653    fn simple_engine() -> TickEngine {
654        let config = WorldConfig {
655            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
656            fields: vec![scalar_field("energy")],
657            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
658            dt: 0.1,
659            seed: 42,
660            ring_buffer_size: 8,
661            max_ingress_queue: 1024,
662            tick_rate_hz: None,
663            backoff: crate::config::BackoffConfig::default(),
664        };
665        TickEngine::new(config).unwrap()
666    }
667
668    fn two_field_engine() -> TickEngine {
669        let config = WorldConfig {
670            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
671            fields: vec![scalar_field("field0"), scalar_field("field1")],
672            propagators: vec![
673                Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
674                Box::new(IdentityPropagator::new(
675                    "copy_f0_to_f1",
676                    FieldId(0),
677                    FieldId(1),
678                )),
679            ],
680            dt: 0.1,
681            seed: 42,
682            ring_buffer_size: 8,
683            max_ingress_queue: 1024,
684            tick_rate_hz: None,
685            backoff: crate::config::BackoffConfig::default(),
686        };
687        TickEngine::new(config).unwrap()
688    }
689
690    fn three_field_engine() -> TickEngine {
691        // PropA writes field0=7.0
692        // PropB reads field0, copies to field1
693        // PropC reads field0+field1, writes sum to field2
694        struct SumPropagator {
695            name: String,
696            input_a: FieldId,
697            input_b: FieldId,
698            output: FieldId,
699        }
700
701        impl SumPropagator {
702            fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
703                Self {
704                    name: name.to_string(),
705                    input_a: a,
706                    input_b: b,
707                    output: out,
708                }
709            }
710        }
711
712        impl Propagator for SumPropagator {
713            fn name(&self) -> &str {
714                &self.name
715            }
716            fn reads(&self) -> murk_core::FieldSet {
717                [self.input_a, self.input_b].into_iter().collect()
718            }
719            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
720                vec![(self.output, WriteMode::Full)]
721            }
722            fn step(
723                &self,
724                ctx: &mut murk_propagator::StepContext<'_>,
725            ) -> Result<(), murk_core::PropagatorError> {
726                let a = ctx.reads().read(self.input_a).unwrap().to_vec();
727                let b = ctx.reads().read(self.input_b).unwrap().to_vec();
728                let out = ctx.writes().write(self.output).unwrap();
729                for i in 0..out.len() {
730                    out[i] = a[i] + b[i];
731                }
732                Ok(())
733            }
734        }
735
736        let config = WorldConfig {
737            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
738            fields: vec![
739                scalar_field("field0"),
740                scalar_field("field1"),
741                scalar_field("field2"),
742            ],
743            propagators: vec![
744                Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
745                Box::new(IdentityPropagator::new(
746                    "copy_f0_to_f1",
747                    FieldId(0),
748                    FieldId(1),
749                )),
750                Box::new(SumPropagator::new(
751                    "sum_f0_f1_to_f2",
752                    FieldId(0),
753                    FieldId(1),
754                    FieldId(2),
755                )),
756            ],
757            dt: 0.1,
758            seed: 42,
759            ring_buffer_size: 8,
760            max_ingress_queue: 1024,
761            tick_rate_hz: None,
762            backoff: crate::config::BackoffConfig::default(),
763        };
764        TickEngine::new(config).unwrap()
765    }
766
767    fn failing_engine(succeed_count: usize) -> TickEngine {
768        let config = WorldConfig {
769            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
770            fields: vec![scalar_field("energy")],
771            propagators: vec![Box::new(FailingPropagator::new(
772                "fail",
773                FieldId(0),
774                succeed_count,
775            ))],
776            dt: 0.1,
777            seed: 42,
778            ring_buffer_size: 8,
779            max_ingress_queue: 1024,
780            tick_rate_hz: None,
781            backoff: crate::config::BackoffConfig::default(),
782        };
783        TickEngine::new(config).unwrap()
784    }
785
786    fn partial_failure_engine() -> TickEngine {
787        // PropA succeeds always, PropB fails immediately.
788        let config = WorldConfig {
789            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
790            fields: vec![scalar_field("field0"), scalar_field("field1")],
791            propagators: vec![
792                Box::new(ConstPropagator::new("ok_prop", FieldId(0), 1.0)),
793                Box::new(FailingPropagator::new("fail_prop", FieldId(1), 0)),
794            ],
795            dt: 0.1,
796            seed: 42,
797            ring_buffer_size: 8,
798            max_ingress_queue: 1024,
799            tick_rate_hz: None,
800            backoff: crate::config::BackoffConfig::default(),
801        };
802        TickEngine::new(config).unwrap()
803    }
804
805    // ── Three-propagator overlay visibility tests ─────────────
806
807    #[test]
808    fn staged_read_sees_prior_propagator_write() {
809        // PropB reads field0 via reads() → should see PropA's value (7.0)
810        let mut engine = two_field_engine();
811        let result = engine.execute_tick().unwrap();
812        let snap = engine.snapshot();
813        // field1 should be a copy of field0
814        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
815        assert!(result.metrics.total_us > 0);
816    }
817
818    #[test]
819    fn reads_previous_sees_base_gen() {
820        // With reads_previous, a propagator should always see the base gen
821        // (tick-start snapshot), not staged writes. We verify by checking
822        // that on tick 1, reads_previous sees zeros (initial state).
823        struct ReadsPrevPropagator;
824        impl Propagator for ReadsPrevPropagator {
825            fn name(&self) -> &str {
826                "reads_prev"
827            }
828            fn reads(&self) -> murk_core::FieldSet {
829                murk_core::FieldSet::empty()
830            }
831            fn reads_previous(&self) -> murk_core::FieldSet {
832                [FieldId(0)].into_iter().collect()
833            }
834            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
835                vec![(FieldId(1), WriteMode::Full)]
836            }
837            fn step(
838                &self,
839                ctx: &mut murk_propagator::StepContext<'_>,
840            ) -> Result<(), murk_core::PropagatorError> {
841                let prev = ctx.reads_previous().read(FieldId(0)).unwrap().to_vec();
842                let out = ctx.writes().write(FieldId(1)).unwrap();
843                out.copy_from_slice(&prev);
844                Ok(())
845            }
846        }
847
848        let config = WorldConfig {
849            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
850            fields: vec![scalar_field("field0"), scalar_field("field1")],
851            propagators: vec![
852                Box::new(ConstPropagator::new("write_f0", FieldId(0), 99.0)),
853                Box::new(ReadsPrevPropagator),
854            ],
855            dt: 0.1,
856            seed: 42,
857            ring_buffer_size: 8,
858            max_ingress_queue: 1024,
859            tick_rate_hz: None,
860            backoff: crate::config::BackoffConfig::default(),
861        };
862        let mut engine = TickEngine::new(config).unwrap();
863
864        // Tick 1: PropA writes 99.0 to field0. ReadsPrev reads field0
865        // via reads_previous → sees base gen (all zeros).
866        engine.execute_tick().unwrap();
867        let snap = engine.snapshot();
868        // field1 should be 0.0 (base gen of field0 on first tick)
869        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 0.0);
870
871        // Tick 2: reads_previous now sees 99.0 (published from tick 1).
872        engine.execute_tick().unwrap();
873        let snap = engine.snapshot();
874        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 99.0);
875    }
876
877    #[test]
878    fn three_propagator_overlay_visibility() {
879        // A writes 7.0 to f0
880        // B reads f0 (staged → 7.0), copies to f1
881        // C reads f0 (staged → 7.0) + f1 (staged → 7.0), writes sum to f2
882        let mut engine = three_field_engine();
883        engine.execute_tick().unwrap();
884        let snap = engine.snapshot();
885
886        assert_eq!(snap.read(FieldId(0)).unwrap()[0], 7.0);
887        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
888        assert_eq!(snap.read(FieldId(2)).unwrap()[0], 14.0); // 7 + 7
889    }
890
891    #[test]
892    fn unwritten_field_reads_from_base_gen() {
893        // On tick 1, fields start as zero (base gen). A propagator
894        // reading a field nobody wrote should see zero.
895        let mut engine = three_field_engine();
896        engine.execute_tick().unwrap();
897        // All fields are written in this pipeline, so let's just
898        // verify the snapshot is consistent.
899        let snap = engine.snapshot();
900        let f2 = snap.read(FieldId(2)).unwrap();
901        assert!(f2.iter().all(|&v| v == 14.0));
902    }
903
904    // ── Tick atomicity tests ─────────────────────────────────
905
906    #[test]
907    fn propagator_failure_no_snapshot_published() {
908        let mut engine = failing_engine(0);
909
910        // Before any tick, snapshot is at tick 0 (initial state).
911        let snap_before = engine.snapshot();
912        let tick_before = snap_before.tick_id();
913
914        // Execute tick → should fail.
915        let result = engine.execute_tick();
916        assert!(result.is_err());
917
918        // Snapshot should be unchanged.
919        let snap_after = engine.snapshot();
920        assert_eq!(snap_after.tick_id(), tick_before);
921    }
922
923    #[test]
924    fn partial_failure_rolls_back_all() {
925        // PropA writes 1.0 to field0 (succeeds), PropB fails.
926        // field0 should NOT show 1.0 in the snapshot after rollback.
927        let mut engine = partial_failure_engine();
928
929        // Snapshot before: field0 should be all zeros (or not yet published).
930        let result = engine.execute_tick();
931        assert!(result.is_err());
932
933        // field0 should still be at initial state (no publish happened).
934        let snap = engine.snapshot();
935        let f0 = snap.read(FieldId(0));
936        // On the very first tick with no prior publish, the snapshot
937        // shows initial state. No writes from PropA should be visible.
938        if let Some(data) = f0 {
939            assert!(
940                data.iter().all(|&v| v == 0.0),
941                "rollback should prevent PropA's writes from being visible"
942            );
943        }
944    }
945
946    #[test]
947    fn rollback_receipts_generated() {
948        let mut engine = failing_engine(0);
949
950        // Submit an accepted command (SetField) before the failing tick.
951        let cmd = Command {
952            payload: CommandPayload::SetField {
953                coord: smallvec::smallvec![0],
954                field_id: FieldId(0),
955                value: 1.0,
956            },
957            expires_after_tick: TickId(100),
958            source_id: None,
959            source_seq: None,
960            priority_class: 1,
961            arrival_seq: 0,
962        };
963        engine.submit_commands(vec![cmd]);
964
965        let result = engine.execute_tick();
966        match result {
967            Err(TickError {
968                kind: StepError::PropagatorFailed { .. },
969                receipts,
970            }) => {
971                // Accepted receipts must be surfaced with TickRollback.
972                assert_eq!(receipts.len(), 1);
973                assert!(receipts[0].accepted);
974                assert_eq!(receipts[0].reason_code, Some(IngressError::TickRollback));
975            }
976            other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
977        }
978    }
979
980    #[test]
981    fn rollback_preserves_rejected_receipts() {
982        // Submit an unsupported command (SetParameter → rejected) alongside
983        // the tick. When the propagator fails and triggers rollback, the
984        // rejected receipt must stay accepted=false, not be overwritten
985        // with TickRollback.
986        let mut engine = failing_engine(0);
987
988        // SetParameter is unsupported → should be rejected (accepted=false).
989        engine.submit_commands(vec![make_cmd(100)]);
990
991        let result = engine.execute_tick();
992        match result {
993            Err(TickError {
994                kind: StepError::PropagatorFailed { .. },
995                receipts,
996            }) => {
997                assert_eq!(receipts.len(), 1);
998                // The receipt must remain rejected, NOT overwritten with
999                // accepted=true + TickRollback.
1000                assert!(
1001                    !receipts[0].accepted,
1002                    "rejected receipt should stay rejected after rollback"
1003                );
1004                assert_eq!(
1005                    receipts[0].reason_code,
1006                    Some(IngressError::UnsupportedCommand),
1007                    "rejected receipt must preserve UnsupportedCommand reason after rollback"
1008                );
1009            }
1010            other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
1011        }
1012    }
1013
1014    // ── Rollback tracking tests ─────────────────────────────
1015
1016    #[test]
1017    fn consecutive_rollbacks_disable_ticking() {
1018        let mut engine = failing_engine(0);
1019
1020        for _ in 0..3 {
1021            let _ = engine.execute_tick();
1022        }
1023
1024        assert!(engine.is_tick_disabled());
1025        assert_eq!(engine.consecutive_rollback_count(), 3);
1026        assert_eq!(engine.last_metrics().rollback_events, 3);
1027        assert_eq!(engine.last_metrics().tick_disabled_transitions, 1);
1028    }
1029
1030    #[test]
1031    fn success_resets_rollback_count() {
1032        // Succeeds 2 times, then fails, but the first 2 successes
1033        // should keep rollback count at 0.
1034        let mut engine = failing_engine(10);
1035
1036        // Two successful ticks.
1037        engine.execute_tick().unwrap();
1038        engine.execute_tick().unwrap();
1039        assert_eq!(engine.consecutive_rollback_count(), 0);
1040        assert_eq!(engine.current_tick(), TickId(2));
1041    }
1042
1043    #[test]
1044    fn tick_disabled_rejects_immediately() {
1045        let mut engine = failing_engine(0);
1046
1047        // Cause 3 failures to disable ticking.
1048        for _ in 0..3 {
1049            let _ = engine.execute_tick();
1050        }
1051        assert!(engine.is_tick_disabled());
1052
1053        // Next tick should fail immediately with TickDisabled.
1054        match engine.execute_tick() {
1055            Err(TickError {
1056                kind: StepError::TickDisabled,
1057                ..
1058            }) => {}
1059            other => panic!("expected TickDisabled, got {other:?}"),
1060        }
1061
1062        let receipts = engine.submit_commands(vec![make_cmd(100), make_cmd(100)]);
1063        assert!(receipts
1064            .iter()
1065            .all(|r| r.reason_code == Some(IngressError::TickDisabled)));
1066        assert_eq!(engine.last_metrics().tick_disabled_rejections, 2);
1067        assert_eq!(engine.last_metrics().rollback_events, 3);
1068        assert_eq!(engine.last_metrics().tick_disabled_transitions, 1);
1069    }
1070
1071    #[test]
1072    fn reset_clears_tick_disabled() {
1073        let mut engine = failing_engine(0);
1074
1075        for _ in 0..3 {
1076            let _ = engine.execute_tick();
1077        }
1078        assert!(engine.is_tick_disabled());
1079
1080        engine.reset().unwrap();
1081        assert!(!engine.is_tick_disabled());
1082        assert_eq!(engine.current_tick(), TickId(0));
1083        assert_eq!(engine.consecutive_rollback_count(), 0);
1084        assert_eq!(engine.last_metrics().queue_full_rejections, 0);
1085        assert_eq!(engine.last_metrics().tick_disabled_rejections, 0);
1086        assert_eq!(engine.last_metrics().rollback_events, 0);
1087        assert_eq!(engine.last_metrics().tick_disabled_transitions, 0);
1088        assert_eq!(engine.last_metrics().worker_stall_events, 0);
1089        assert_eq!(engine.last_metrics().ring_not_available_events, 0);
1090        assert_eq!(engine.last_metrics().ring_eviction_events, 0);
1091        assert_eq!(engine.last_metrics().ring_stale_read_events, 0);
1092        assert_eq!(engine.last_metrics().ring_skew_retry_events, 0);
1093    }
1094
1095    // ── Integration tests ────────────────────────────────────
1096
1097    #[test]
1098    fn single_tick_end_to_end() {
1099        let mut engine = simple_engine();
1100        let result = engine.execute_tick().unwrap();
1101
1102        let snap = engine.snapshot();
1103        let data = snap.read(FieldId(0)).unwrap();
1104        assert_eq!(data.len(), 10);
1105        assert!(data.iter().all(|&v| v == 42.0));
1106        assert_eq!(engine.current_tick(), TickId(1));
1107        assert!(!result.receipts.is_empty() || result.receipts.is_empty()); // receipts exist
1108    }
1109
1110    #[test]
1111    fn multi_tick_determinism() {
1112        let mut engine = simple_engine();
1113
1114        for _ in 0..10 {
1115            engine.execute_tick().unwrap();
1116        }
1117
1118        let snap = engine.snapshot();
1119        let data = snap.read(FieldId(0)).unwrap();
1120        assert!(data.iter().all(|&v| v == 42.0));
1121        assert_eq!(engine.current_tick(), TickId(10));
1122    }
1123
1124    #[test]
1125    fn commands_flow_through_to_receipts() {
1126        let mut engine = simple_engine();
1127
1128        // Use SetField commands — the only command type currently executed.
1129        let coord: Coord = vec![0i32].into();
1130        let cmds = vec![
1131            Command {
1132                payload: CommandPayload::SetField {
1133                    coord: coord.clone(),
1134                    field_id: FieldId(0),
1135                    value: 1.0,
1136                },
1137                expires_after_tick: TickId(100),
1138                source_id: None,
1139                source_seq: None,
1140                priority_class: 1,
1141                arrival_seq: 0,
1142            },
1143            Command {
1144                payload: CommandPayload::SetField {
1145                    coord: coord.clone(),
1146                    field_id: FieldId(0),
1147                    value: 2.0,
1148                },
1149                expires_after_tick: TickId(100),
1150                source_id: None,
1151                source_seq: None,
1152                priority_class: 1,
1153                arrival_seq: 0,
1154            },
1155        ];
1156        let submit_receipts = engine.submit_commands(cmds);
1157        assert_eq!(submit_receipts.len(), 2);
1158        assert!(submit_receipts.iter().all(|r| r.accepted));
1159
1160        let result = engine.execute_tick().unwrap();
1161        // Should have receipts for the 2 commands.
1162        let applied: Vec<_> = result
1163            .receipts
1164            .iter()
1165            .filter(|r| r.applied_tick_id.is_some())
1166            .collect();
1167        assert_eq!(applied.len(), 2);
1168        assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
1169    }
1170
1171    #[test]
1172    fn non_setfield_commands_rejected_honestly() {
1173        let mut engine = simple_engine();
1174
1175        // Submit SetParameter commands — not yet implemented.
1176        let submit_receipts = engine.submit_commands(vec![make_cmd(100), make_cmd(100)]);
1177        assert_eq!(submit_receipts.len(), 2);
1178        assert!(submit_receipts.iter().all(|r| r.accepted));
1179
1180        let result = engine.execute_tick().unwrap();
1181        assert_eq!(result.receipts.len(), 2);
1182
1183        // Non-SetField commands must NOT report as applied and must carry
1184        // UnsupportedCommand reason so callers can distinguish the failure mode.
1185        for receipt in &result.receipts {
1186            assert!(
1187                !receipt.accepted,
1188                "unimplemented command type must be rejected"
1189            );
1190            assert_eq!(
1191                receipt.applied_tick_id, None,
1192                "unimplemented command must not have applied_tick_id"
1193            );
1194            assert_eq!(
1195                receipt.reason_code,
1196                Some(IngressError::UnsupportedCommand),
1197                "rejected unsupported command must carry UnsupportedCommand reason"
1198            );
1199        }
1200    }
1201
1202    // ── Metrics tests ────────────────────────────────────────
1203
1204    #[test]
1205    fn timing_fields_populated() {
1206        let mut engine = simple_engine();
1207        let result = engine.execute_tick().unwrap();
1208
1209        // total_us is u64, so it's always >= 0; just verify the struct is populated.
1210        let _ = result.metrics.total_us;
1211        assert_eq!(result.metrics.propagator_us.len(), 1);
1212        assert_eq!(result.metrics.propagator_us[0].0, "const");
1213        assert_eq!(result.metrics.queue_full_rejections, 0);
1214        assert_eq!(result.metrics.tick_disabled_rejections, 0);
1215        assert_eq!(result.metrics.rollback_events, 0);
1216        assert_eq!(result.metrics.tick_disabled_transitions, 0);
1217        assert_eq!(result.metrics.worker_stall_events, 0);
1218        assert_eq!(result.metrics.ring_not_available_events, 0);
1219        assert_eq!(result.metrics.ring_eviction_events, 0);
1220        assert_eq!(result.metrics.ring_stale_read_events, 0);
1221        assert_eq!(result.metrics.ring_skew_retry_events, 0);
1222    }
1223
1224    #[test]
1225    fn memory_bytes_matches_arena() {
1226        let mut engine = simple_engine();
1227        engine.execute_tick().unwrap();
1228
1229        let metrics = engine.last_metrics();
1230        assert!(metrics.memory_bytes > 0);
1231    }
1232
1233    #[test]
1234    fn queue_full_rejections_are_counted_in_metrics() {
1235        let config = WorldConfig {
1236            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1237            fields: vec![scalar_field("energy")],
1238            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
1239            dt: 0.1,
1240            seed: 42,
1241            ring_buffer_size: 8,
1242            max_ingress_queue: 1,
1243            tick_rate_hz: None,
1244            backoff: crate::config::BackoffConfig::default(),
1245        };
1246        let mut engine = TickEngine::new(config).unwrap();
1247
1248        let submit_receipts =
1249            engine.submit_commands(vec![make_cmd(100), make_cmd(100), make_cmd(100)]);
1250        let queue_full = submit_receipts
1251            .iter()
1252            .filter(|r| r.reason_code == Some(IngressError::QueueFull))
1253            .count();
1254        assert_eq!(queue_full, 2);
1255
1256        let result = engine.execute_tick().unwrap();
1257        assert_eq!(result.metrics.queue_full_rejections, 2);
1258        assert_eq!(result.metrics.tick_disabled_rejections, 0);
1259    }
1260
1261    #[test]
1262    fn external_realtime_counters_are_reflected_in_metrics() {
1263        let mut engine = simple_engine();
1264        engine.record_worker_stall_events(3);
1265        engine.set_ring_not_available_events(7);
1266        engine.set_ring_eviction_events(5);
1267        engine.set_ring_stale_read_events(2);
1268        engine.set_ring_skew_retry_events(1);
1269
1270        assert_eq!(engine.last_metrics().worker_stall_events, 3);
1271        assert_eq!(engine.last_metrics().ring_not_available_events, 7);
1272        assert_eq!(engine.last_metrics().ring_eviction_events, 5);
1273        assert_eq!(engine.last_metrics().ring_stale_read_events, 2);
1274        assert_eq!(engine.last_metrics().ring_skew_retry_events, 1);
1275
1276        let result = engine.execute_tick().unwrap();
1277        assert_eq!(result.metrics.worker_stall_events, 3);
1278        assert_eq!(result.metrics.ring_not_available_events, 7);
1279        assert_eq!(result.metrics.ring_eviction_events, 5);
1280        assert_eq!(result.metrics.ring_stale_read_events, 2);
1281        assert_eq!(result.metrics.ring_skew_retry_events, 1);
1282    }
1283
1284    #[test]
1285    fn rollback_clears_sparse_reuse_counters_for_next_success() {
1286        struct MaybeFailPropagator {
1287            output: FieldId,
1288            fail_on_call: Option<usize>,
1289            call_count: AtomicUsize,
1290        }
1291
1292        impl MaybeFailPropagator {
1293            fn new(output: FieldId, fail_on_call: Option<usize>) -> Self {
1294                Self {
1295                    output,
1296                    fail_on_call,
1297                    call_count: AtomicUsize::new(0),
1298                }
1299            }
1300        }
1301
1302        impl Propagator for MaybeFailPropagator {
1303            fn name(&self) -> &str {
1304                "maybe_fail"
1305            }
1306
1307            fn reads(&self) -> murk_core::FieldSet {
1308                murk_core::FieldSet::empty()
1309            }
1310
1311            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
1312                vec![(self.output, WriteMode::Full)]
1313            }
1314
1315            fn step(
1316                &self,
1317                ctx: &mut murk_propagator::StepContext<'_>,
1318            ) -> Result<(), murk_core::PropagatorError> {
1319                let n = self.call_count.fetch_add(1, Ordering::Relaxed);
1320                if self.fail_on_call == Some(n) {
1321                    return Err(murk_core::PropagatorError::ExecutionFailed {
1322                        reason: format!("deliberate failure on call {n}"),
1323                    });
1324                }
1325
1326                let output = ctx.writes().write(self.output).ok_or_else(|| {
1327                    murk_core::PropagatorError::ExecutionFailed {
1328                        reason: format!("field {:?} not writable", self.output),
1329                    }
1330                })?;
1331                output.fill(n as f32);
1332                Ok(())
1333            }
1334        }
1335
1336        fn sparse_engine(fail_on_call: Option<usize>) -> TickEngine {
1337            let config = WorldConfig {
1338                space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1339                fields: vec![sparse_scalar_field("sparse0"), scalar_field("field1")],
1340                propagators: vec![
1341                    Box::new(ConstPropagator::new("write_sparse", FieldId(0), 3.0)),
1342                    Box::new(MaybeFailPropagator::new(FieldId(1), fail_on_call)),
1343                ],
1344                dt: 0.1,
1345                seed: 42,
1346                ring_buffer_size: 8,
1347                max_ingress_queue: 1024,
1348                tick_rate_hz: None,
1349                backoff: crate::config::BackoffConfig::default(),
1350            };
1351            let mut engine = TickEngine::new(config).unwrap();
1352            engine.arena.reset_sparse_reuse_counters();
1353            engine
1354        }
1355
1356        let mut rollback_engine = sparse_engine(Some(1)); // fail on tick 2
1357        rollback_engine.execute_tick().unwrap();
1358
1359        let rollback = rollback_engine.execute_tick();
1360        assert!(rollback.is_err(), "tick 2 should fail and roll back");
1361        assert_eq!(rollback_engine.arena.sparse_reuse_hits(), 0);
1362        assert_eq!(rollback_engine.arena.sparse_reuse_misses(), 0);
1363
1364        // A subsequent tick should still execute successfully after rollback.
1365        rollback_engine.execute_tick().unwrap();
1366    }
1367
1368    // ── Bug-fix regression tests ─────────────────────────────
1369
1370    #[test]
1371    fn reset_clears_pending_ingress() {
1372        let mut engine = simple_engine();
1373
1374        // Submit commands but don't tick.
1375        engine.submit_commands(vec![make_cmd(1000), make_cmd(1000)]);
1376
1377        // Reset should discard pending commands.
1378        engine.reset().unwrap();
1379
1380        // Tick should produce zero receipts (no pending commands).
1381        let result = engine.execute_tick().unwrap();
1382        assert!(result.receipts.is_empty());
1383    }
1384
1385    #[test]
1386    fn command_index_preserved_after_reordering() {
1387        let mut engine = simple_engine();
1388
1389        // Submit commands with different priorities — they'll be reordered.
1390        // Low priority first (index 0), high priority second (index 1).
1391        let cmds = vec![
1392            Command {
1393                payload: CommandPayload::SetParameter {
1394                    key: ParameterKey(0),
1395                    value: 1.0,
1396                },
1397                expires_after_tick: TickId(100),
1398                source_id: None,
1399                source_seq: None,
1400                priority_class: 2, // low priority
1401                arrival_seq: 0,
1402            },
1403            Command {
1404                payload: CommandPayload::SetParameter {
1405                    key: ParameterKey(0),
1406                    value: 2.0,
1407                },
1408                expires_after_tick: TickId(100),
1409                source_id: None,
1410                source_seq: None,
1411                priority_class: 0, // high priority — sorted first
1412                arrival_seq: 0,
1413            },
1414        ];
1415        engine.submit_commands(cmds);
1416
1417        let result = engine.execute_tick().unwrap();
1418        // After reordering, priority_class=0 (batch index 1) executes first,
1419        // priority_class=2 (batch index 0) executes second.
1420        // command_index must reflect the ORIGINAL batch position.
1421        assert_eq!(result.receipts.len(), 2);
1422        assert_eq!(result.receipts[0].command_index, 1); // was batch[1]
1423        assert_eq!(result.receipts[1].command_index, 0); // was batch[0]
1424    }
1425
1426    #[test]
1427    fn setfield_oob_receipt_not_applied() {
1428        // BUG-099: SetField with OOB coord was silently skipped but receipt
1429        // stayed accepted=true with applied_tick_id set. After fix, OOB
1430        // SetField must produce accepted=false with NotApplied reason.
1431        let mut engine = simple_engine();
1432
1433        // Coord [999] is out of bounds for a Line1D(10).
1434        let oob_coord: Coord = vec![999i32].into();
1435        let cmd = Command {
1436            payload: CommandPayload::SetField {
1437                coord: oob_coord,
1438                field_id: FieldId(0),
1439                value: 1.0,
1440            },
1441            expires_after_tick: TickId(100),
1442            source_id: None,
1443            source_seq: None,
1444            priority_class: 1,
1445            arrival_seq: 0,
1446        };
1447        engine.submit_commands(vec![cmd]);
1448
1449        let result = engine.execute_tick().unwrap();
1450        assert_eq!(result.receipts.len(), 1);
1451        let receipt = &result.receipts[0];
1452        assert!(
1453            !receipt.accepted,
1454            "OOB SetField must be marked not accepted"
1455        );
1456        assert_eq!(
1457            receipt.applied_tick_id, None,
1458            "OOB SetField must not have applied_tick_id"
1459        );
1460        assert_eq!(
1461            receipt.reason_code,
1462            Some(IngressError::NotApplied),
1463            "OOB SetField must carry NotApplied reason"
1464        );
1465    }
1466
1467    #[test]
1468    fn setfield_unknown_field_receipt_not_applied() {
1469        // SetField targeting a non-existent field should also report NotApplied.
1470        let mut engine = simple_engine();
1471
1472        let coord: Coord = vec![0i32].into();
1473        let cmd = Command {
1474            payload: CommandPayload::SetField {
1475                coord,
1476                field_id: FieldId(99), // no such field
1477                value: 1.0,
1478            },
1479            expires_after_tick: TickId(100),
1480            source_id: None,
1481            source_seq: None,
1482            priority_class: 1,
1483            arrival_seq: 0,
1484        };
1485        engine.submit_commands(vec![cmd]);
1486
1487        let result = engine.execute_tick().unwrap();
1488        assert_eq!(result.receipts.len(), 1);
1489        assert!(!result.receipts[0].accepted);
1490        assert_eq!(
1491            result.receipts[0].reason_code,
1492            Some(IngressError::NotApplied)
1493        );
1494    }
1495
1496    #[test]
1497    fn writemode_incremental_seeds_from_previous_gen() {
1498        // Regression test for BUG-015: WriteMode::Incremental buffers must
1499        // be pre-seeded with previous-generation data, not zero-filled.
1500        //
1501        // An incremental propagator writes cell 0 on tick 1 and then does
1502        // nothing on tick 2. Cell 0 must retain its value across ticks.
1503        struct IncrementalOnce {
1504            written: std::cell::Cell<bool>,
1505        }
1506        impl IncrementalOnce {
1507            fn new() -> Self {
1508                Self {
1509                    written: std::cell::Cell::new(false),
1510                }
1511            }
1512        }
1513        impl Propagator for IncrementalOnce {
1514            fn name(&self) -> &str {
1515                "incr_once"
1516            }
1517            fn reads(&self) -> murk_core::FieldSet {
1518                murk_core::FieldSet::empty()
1519            }
1520            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
1521                vec![(FieldId(0), WriteMode::Incremental)]
1522            }
1523            fn step(
1524                &self,
1525                ctx: &mut murk_propagator::StepContext<'_>,
1526            ) -> Result<(), murk_core::PropagatorError> {
1527                let buf = ctx.writes().write(FieldId(0)).unwrap();
1528                if !self.written.get() {
1529                    // First tick: write a distinctive value.
1530                    buf[0] = 42.0;
1531                    buf[1] = 99.0;
1532                    self.written.set(true);
1533                }
1534                // Second tick onward: do nothing — rely on incremental seeding.
1535                Ok(())
1536            }
1537        }
1538
1539        let config = WorldConfig {
1540            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1541            fields: vec![scalar_field("state")],
1542            propagators: vec![Box::new(IncrementalOnce::new())],
1543            dt: 0.1,
1544            seed: 42,
1545            ring_buffer_size: 8,
1546            max_ingress_queue: 1024,
1547            tick_rate_hz: None,
1548            backoff: crate::config::BackoffConfig::default(),
1549        };
1550        let mut engine = TickEngine::new(config).unwrap();
1551
1552        // Tick 1: propagator writes 42.0 and 99.0.
1553        engine.execute_tick().unwrap();
1554        let snap = engine.snapshot();
1555        assert_eq!(snap.read(FieldId(0)).unwrap()[0], 42.0);
1556        assert_eq!(snap.read(FieldId(0)).unwrap()[1], 99.0);
1557
1558        // Tick 2: propagator does nothing — incremental seeding must preserve values.
1559        engine.execute_tick().unwrap();
1560        let snap = engine.snapshot();
1561        assert_eq!(
1562            snap.read(FieldId(0)).unwrap()[0],
1563            42.0,
1564            "BUG-015: incremental field lost data across ticks"
1565        );
1566        assert_eq!(
1567            snap.read(FieldId(0)).unwrap()[1],
1568            99.0,
1569            "BUG-015: incremental field lost data across ticks"
1570        );
1571        // Unwritten cells should remain zero (seeded from previous gen which was zero).
1572        assert_eq!(snap.read(FieldId(0)).unwrap()[2], 0.0);
1573
1574        // Tick 3: still preserved.
1575        engine.execute_tick().unwrap();
1576        let snap = engine.snapshot();
1577        assert_eq!(snap.read(FieldId(0)).unwrap()[0], 42.0);
1578        assert_eq!(snap.read(FieldId(0)).unwrap()[1], 99.0);
1579    }
1580
1581    /// BUG-106: Unchecked u32 * u32 for static field length panics on overflow.
1582    #[test]
1583    fn static_field_overflow_returns_error() {
1584        let config = WorldConfig {
1585            space: Box::new(Line1D::new(3, EdgeBehavior::Absorb).unwrap()),
1586            fields: vec![FieldDef {
1587                name: "huge_vec".to_string(),
1588                field_type: FieldType::Vector {
1589                    dims: u32::MAX / 2, // 3 * (u32::MAX/2) overflows u32
1590                },
1591                mutability: FieldMutability::Static,
1592                units: None,
1593                bounds: None,
1594                boundary_behavior: BoundaryBehavior::Clamp,
1595            }],
1596            propagators: vec![Box::new(ConstPropagator::new("c", FieldId(0), 1.0))],
1597            dt: 0.1,
1598            seed: 42,
1599            ring_buffer_size: 8,
1600            max_ingress_queue: 1024,
1601            tick_rate_hz: None,
1602            backoff: crate::config::BackoffConfig::default(),
1603        };
1604        match TickEngine::new(config) {
1605            Err(crate::config::ConfigError::CellCountOverflow { .. }) => {}
1606            Ok(_) => panic!("expected CellCountOverflow, got Ok"),
1607            Err(e) => panic!("expected CellCountOverflow, got {e}"),
1608        }
1609    }
1610}