Skip to main content

murk_engine/
tick.rs

1//! Tick engine: the single-threaded simulation loop.
2//!
3//! [`TickEngine`] wires together the arena, propagators, ingress queue,
4//! and overlay caches into a deterministic tick execution loop with
5//! rollback atomicity.
6//!
7//! # Lockstep mode only
8//!
9//! This module implements Lockstep mode — a callable struct with no
10//! background threads. RealtimeAsync mode (future WP) wraps this in
11//! a thread with a ring buffer.
12
13use std::fmt;
14use std::time::Instant;
15
16use murk_arena::config::ArenaConfig;
17use murk_arena::pingpong::PingPongArena;
18use murk_arena::read::Snapshot;
19use murk_arena::static_arena::StaticArena;
20use murk_core::command::{Command, CommandPayload, Receipt};
21use murk_core::error::{IngressError, StepError};
22use murk_core::id::{FieldId, ParameterVersion, TickId};
23use murk_core::traits::{FieldReader, FieldWriter};
24use murk_core::FieldMutability;
25use murk_propagator::pipeline::{ReadResolutionPlan, ReadSource};
26use murk_propagator::propagator::Propagator;
27use murk_propagator::scratch::ScratchRegion as PropagatorScratch;
28
29use crate::config::{ConfigError, WorldConfig};
30use crate::ingress::IngressQueue;
31use crate::metrics::StepMetrics;
32use crate::overlay::{BaseFieldCache, BaseFieldSet, OverlayReader, StagedFieldCache};
33
34// ── TickResult ───────────────────────────────────────────────────
35
36/// Result of a successful tick execution.
37#[derive(Debug)]
38pub struct TickResult {
39    /// Receipts for commands submitted before this tick.
40    pub receipts: Vec<Receipt>,
41    /// Performance metrics for this tick.
42    pub metrics: StepMetrics,
43}
44
45// ── TickError ───────────────────────────────────────────────────
46
47/// Error returned from [`TickEngine::execute_tick()`].
48///
49/// Wraps the underlying [`StepError`] and any receipts that were produced
50/// before the failure. On rollback, receipts carry `TickRollback` reason
51/// codes; callers must not discard them.
52#[derive(Debug, PartialEq, Eq)]
53pub struct TickError {
54    /// The underlying error.
55    pub kind: StepError,
56    /// Receipts produced before the failure (may include rollback receipts).
57    pub receipts: Vec<Receipt>,
58}
59
60impl fmt::Display for TickError {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        write!(f, "{}", self.kind)
63    }
64}
65
66impl std::error::Error for TickError {
67    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
68        Some(&self.kind)
69    }
70}
71
72// ── TickEngine ───────────────────────────────────────────────────
73
74/// Single-threaded tick engine for Lockstep mode.
75///
76/// Owns all simulation state and executes ticks synchronously. Each
77/// `execute_tick()` call runs the full propagator pipeline, publishes
78/// a snapshot, and returns receipts for any submitted commands.
79pub struct TickEngine {
80    arena: PingPongArena,
81    propagators: Vec<Box<dyn Propagator>>,
82    plan: ReadResolutionPlan,
83    ingress: IngressQueue,
84    space: Box<dyn murk_space::Space>,
85    dt: f64,
86    current_tick: TickId,
87    param_version: ParameterVersion,
88    consecutive_rollback_count: u32,
89    tick_disabled: bool,
90    max_consecutive_rollbacks: u32,
91    total_queue_full_rejections: u64,
92    total_tick_disabled_rejections: u64,
93    total_rollback_events: u64,
94    total_tick_disabled_transitions: u64,
95    total_worker_stall_events: u64,
96    total_ring_not_available_events: u64,
97    total_ring_eviction_events: u64,
98    total_ring_stale_read_events: u64,
99    total_ring_skew_retry_events: u64,
100    propagator_scratch: PropagatorScratch,
101    base_field_set: BaseFieldSet,
102    base_cache: BaseFieldCache,
103    staged_cache: StagedFieldCache,
104    last_metrics: StepMetrics,
105}
106
107impl TickEngine {
108    /// Construct a new tick engine from a [`WorldConfig`].
109    ///
110    /// Validates the configuration, builds the read resolution plan,
111    /// constructs the arena, and pre-computes the base field set.
112    /// Consumes the `WorldConfig`.
113    pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
114        // Validate and build read resolution plan.
115        config.validate()?;
116        let defined_fields = config.defined_field_set();
117        let plan = murk_propagator::validate_pipeline(
118            &config.propagators,
119            &defined_fields,
120            config.dt,
121            &*config.space,
122        )?;
123
124        // Build arena field defs.
125        // Safety: validate() already checked fields.len() fits in u32.
126        let arena_field_defs: Vec<(FieldId, murk_core::FieldDef)> = config
127            .fields
128            .iter()
129            .enumerate()
130            .map(|(i, def)| {
131                (
132                    FieldId(u32::try_from(i).expect("field count validated")),
133                    def.clone(),
134                )
135            })
136            .collect();
137
138        // Safety: validate() already checked cell_count fits in u32.
139        let cell_count = u32::try_from(config.space.cell_count()).expect("cell count validated");
140        let arena_config = ArenaConfig::new(cell_count);
141
142        // Build static arena for any Static fields.
143        let static_fields: Vec<(FieldId, u32)> = arena_field_defs
144            .iter()
145            .filter(|(_, d)| d.mutability == FieldMutability::Static)
146            .map(|(id, d)| (*id, cell_count * d.field_type.components()))
147            .collect();
148        let static_arena = StaticArena::new(&static_fields).into_shared();
149
150        let arena = PingPongArena::new(arena_config, arena_field_defs, static_arena)?;
151
152        // Pre-compute base field set.
153        let base_field_set = BaseFieldSet::from_plan(&plan, &config.propagators);
154
155        // Compute max scratch bytes across all propagators.
156        let max_scratch = config
157            .propagators
158            .iter()
159            .map(|p| p.scratch_bytes())
160            .max()
161            .unwrap_or(0);
162        let propagator_scratch = PropagatorScratch::with_byte_capacity(max_scratch);
163
164        let ingress = IngressQueue::new(config.max_ingress_queue);
165
166        Ok(Self {
167            arena,
168            propagators: config.propagators,
169            plan,
170            ingress,
171            space: config.space,
172            dt: config.dt,
173            current_tick: TickId(0),
174            param_version: ParameterVersion(0),
175            consecutive_rollback_count: 0,
176            tick_disabled: false,
177            max_consecutive_rollbacks: 3,
178            total_queue_full_rejections: 0,
179            total_tick_disabled_rejections: 0,
180            total_rollback_events: 0,
181            total_tick_disabled_transitions: 0,
182            total_worker_stall_events: 0,
183            total_ring_not_available_events: 0,
184            total_ring_eviction_events: 0,
185            total_ring_stale_read_events: 0,
186            total_ring_skew_retry_events: 0,
187            propagator_scratch,
188            base_field_set,
189            base_cache: BaseFieldCache::new(),
190            staged_cache: StagedFieldCache::new(),
191            last_metrics: StepMetrics::default(),
192        })
193    }
194
195    /// Submit commands to be processed in the next tick.
196    ///
197    /// Returns one receipt per command indicating acceptance or rejection.
198    pub fn submit_commands(&mut self, commands: Vec<Command>) -> Vec<Receipt> {
199        let receipts = self.ingress.submit(commands, self.tick_disabled);
200        for receipt in &receipts {
201            match receipt.reason_code {
202                Some(IngressError::QueueFull) => {
203                    self.total_queue_full_rejections =
204                        self.total_queue_full_rejections.saturating_add(1);
205                }
206                Some(IngressError::TickDisabled) => {
207                    self.total_tick_disabled_rejections =
208                        self.total_tick_disabled_rejections.saturating_add(1);
209                }
210                _ => {}
211            }
212        }
213        self.refresh_counter_metrics();
214        receipts
215    }
216
217    /// Execute one tick.
218    ///
219    /// Runs the full propagator pipeline, publishes the snapshot, and
220    /// returns receipts plus metrics. On propagator failure, the tick
221    /// is rolled back atomically (the staging buffer is abandoned).
222    pub fn execute_tick(&mut self) -> Result<TickResult, TickError> {
223        let tick_start = Instant::now();
224
225        // 0. Check if ticking is disabled.
226        if self.tick_disabled {
227            return Err(TickError {
228                kind: StepError::TickDisabled,
229                receipts: Vec::new(),
230            });
231        }
232
233        let next_tick = TickId(self.current_tick.0 + 1);
234
235        // 1. Populate base field cache from snapshot.
236        {
237            let snapshot = self.arena.snapshot();
238            self.base_cache.populate(&snapshot, &self.base_field_set);
239        }
240
241        // 2. Begin tick — if this fails, commands stay in the queue.
242        let mut guard = self.arena.begin_tick().map_err(|_| TickError {
243            kind: StepError::AllocationFailed,
244            receipts: Vec::new(),
245        })?;
246
247        // 3. Drain ingress queue (safe: begin_tick succeeded).
248        let cmd_start = Instant::now();
249        let drain = self.ingress.drain(next_tick);
250        let mut receipts = drain.expired_receipts;
251        let commands = drain.commands;
252        let accepted_receipt_start = receipts.len();
253        for dc in &commands {
254            receipts.push(Receipt {
255                accepted: true,
256                applied_tick_id: None,
257                reason_code: None,
258                command_index: dc.command_index,
259            });
260        }
261        // 3b. Apply commands to the staging writer.
262        for (i, dc) in commands.iter().enumerate() {
263            let receipt = &mut receipts[accepted_receipt_start + i];
264            match &dc.command.payload {
265                CommandPayload::SetField {
266                    ref coord,
267                    field_id,
268                    value,
269                } => {
270                    if let Some(rank) = self.space.canonical_rank(coord) {
271                        if let Some(buf) = guard.writer.write(*field_id) {
272                            if rank < buf.len() {
273                                buf[rank] = *value;
274                            }
275                        }
276                    }
277                }
278                CommandPayload::SetParameter { .. }
279                | CommandPayload::SetParameterBatch { .. }
280                | CommandPayload::Move { .. }
281                | CommandPayload::Spawn { .. }
282                | CommandPayload::Despawn { .. }
283                | CommandPayload::Custom { .. } => {
284                    receipt.accepted = false;
285                    receipt.reason_code = Some(IngressError::UnsupportedCommand);
286                }
287            }
288        }
289        let command_processing_us = cmd_start.elapsed().as_micros() as u64;
290
291        // 4. Run propagator pipeline.
292        let mut propagator_us = Vec::with_capacity(self.propagators.len());
293        for (i, prop) in self.propagators.iter().enumerate() {
294            let prop_start = Instant::now();
295
296            // 4a. Populate staged cache from guard.writer.read() per plan routes.
297            self.staged_cache.clear();
298            if let Some(routes) = self.plan.routes_for(i) {
299                for (&field, &source) in routes {
300                    if let ReadSource::Staged { .. } = source {
301                        if let Some(data) = guard.writer.read(field) {
302                            self.staged_cache.insert(field, data);
303                        }
304                    }
305                }
306            }
307
308            // 4b. Construct OverlayReader.
309            let empty_routes = indexmap::IndexMap::new();
310            let routes = self.plan.routes_for(i).unwrap_or(&empty_routes);
311            let overlay = OverlayReader::new(routes, &self.base_cache, &self.staged_cache);
312
313            // 4c. Seed WriteMode::Incremental buffers from previous generation.
314            for field in self.plan.incremental_fields_for(i) {
315                if let Some(prev_data) = self.base_cache.read(field) {
316                    // Copy through a temp buffer: base_cache borrows &self,
317                    // guard.writer.write() borrows &mut guard.
318                    let prev: Vec<f32> = prev_data.to_vec();
319                    if let Some(write_buf) = guard.writer.write(field) {
320                        let copy_len = prev.len().min(write_buf.len());
321                        write_buf[..copy_len].copy_from_slice(&prev[..copy_len]);
322                    }
323                }
324            }
325
326            // 4d. Reset propagator scratch.
327            self.propagator_scratch.reset();
328
329            // 4e. Construct StepContext and call step().
330            {
331                let mut ctx = murk_propagator::StepContext::new(
332                    &overlay,
333                    &self.base_cache,
334                    &mut guard.writer,
335                    &mut self.propagator_scratch,
336                    self.space.as_ref(),
337                    next_tick,
338                    self.dt,
339                );
340
341                // 4f. Call propagator step.
342                if let Err(reason) = prop.step(&mut ctx) {
343                    // 4g. Rollback on error — guard goes out of scope,
344                    // abandoning the staging buffer (free rollback).
345                    let prop_name = prop.name().to_string();
346                    return self.handle_rollback(
347                        prop_name,
348                        reason,
349                        receipts,
350                        accepted_receipt_start,
351                    );
352                }
353            }
354
355            propagator_us.push((
356                prop.name().to_string(),
357                prop_start.elapsed().as_micros() as u64,
358            ));
359        }
360
361        // 5. guard goes out of scope here (releases staging borrows).
362
363        // 6. Publish.
364        let publish_start = Instant::now();
365        self.arena
366            .publish(next_tick, self.param_version)
367            .map_err(|_| {
368                self.arena.reset_sparse_reuse_counters();
369                TickError {
370                    kind: StepError::AllocationFailed,
371                    receipts: vec![],
372                }
373            })?;
374        let snapshot_publish_us = publish_start.elapsed().as_micros() as u64;
375
376        // 7. Update state.
377        self.current_tick = next_tick;
378        self.consecutive_rollback_count = 0;
379
380        // 8. Finalize receipts with applied_tick_id (only for actually executed commands).
381        for receipt in &mut receipts[accepted_receipt_start..] {
382            if receipt.accepted {
383                receipt.applied_tick_id = Some(next_tick);
384            }
385        }
386
387        // 9. Build metrics.
388        let total_us = tick_start.elapsed().as_micros() as u64;
389        let metrics = StepMetrics {
390            total_us,
391            command_processing_us,
392            propagator_us,
393            snapshot_publish_us,
394            memory_bytes: self.arena.memory_bytes(),
395            sparse_retired_ranges: self.arena.sparse_retired_range_count() as u32,
396            sparse_pending_retired: self.arena.sparse_pending_retired_count() as u32,
397            sparse_reuse_hits: self.arena.sparse_reuse_hits(),
398            sparse_reuse_misses: self.arena.sparse_reuse_misses(),
399            queue_full_rejections: self.total_queue_full_rejections,
400            tick_disabled_rejections: self.total_tick_disabled_rejections,
401            rollback_events: self.total_rollback_events,
402            tick_disabled_transitions: self.total_tick_disabled_transitions,
403            worker_stall_events: self.total_worker_stall_events,
404            ring_not_available_events: self.total_ring_not_available_events,
405            ring_eviction_events: self.total_ring_eviction_events,
406            ring_stale_read_events: self.total_ring_stale_read_events,
407            ring_skew_retry_events: self.total_ring_skew_retry_events,
408        };
409        self.arena.reset_sparse_reuse_counters();
410        self.last_metrics = metrics.clone();
411
412        Ok(TickResult { receipts, metrics })
413    }
414
415    /// Handle a propagator failure by rolling back the tick.
416    ///
417    /// Takes ownership of `receipts` and returns them inside [`TickError`]
418    /// so the caller can inspect per-command rollback reason codes.
419    fn handle_rollback(
420        &mut self,
421        prop_name: String,
422        reason: murk_core::PropagatorError,
423        mut receipts: Vec<Receipt>,
424        accepted_start: usize,
425    ) -> Result<TickResult, TickError> {
426        // Guard was dropped → staging buffer abandoned (free rollback).
427        self.arena.reset_sparse_reuse_counters();
428        self.total_rollback_events = self.total_rollback_events.saturating_add(1);
429        self.consecutive_rollback_count += 1;
430        if self.consecutive_rollback_count >= self.max_consecutive_rollbacks {
431            if !self.tick_disabled {
432                self.total_tick_disabled_transitions =
433                    self.total_tick_disabled_transitions.saturating_add(1);
434            }
435            self.tick_disabled = true;
436        }
437        self.refresh_counter_metrics();
438
439        // Mark accepted command receipts as rolled back, but preserve
440        // receipts that were already rejected (e.g. unsupported command
441        // types) so callers see the original rejection reason.
442        for receipt in &mut receipts[accepted_start..] {
443            if receipt.accepted {
444                receipt.applied_tick_id = None;
445                receipt.reason_code = Some(IngressError::TickRollback);
446            }
447        }
448
449        Err(TickError {
450            kind: StepError::PropagatorFailed {
451                name: prop_name,
452                reason,
453            },
454            receipts,
455        })
456    }
457
458    /// Reset the engine to its initial state.
459    pub fn reset(&mut self) -> Result<(), ConfigError> {
460        self.arena.reset().map_err(ConfigError::Arena)?;
461        self.ingress.clear();
462        self.current_tick = TickId(0);
463        self.param_version = ParameterVersion(0);
464        self.tick_disabled = false;
465        self.consecutive_rollback_count = 0;
466        self.total_queue_full_rejections = 0;
467        self.total_tick_disabled_rejections = 0;
468        self.total_rollback_events = 0;
469        self.total_tick_disabled_transitions = 0;
470        self.total_worker_stall_events = 0;
471        self.total_ring_not_available_events = 0;
472        self.total_ring_eviction_events = 0;
473        self.total_ring_stale_read_events = 0;
474        self.total_ring_skew_retry_events = 0;
475        self.last_metrics = StepMetrics::default();
476        Ok(())
477    }
478
479    pub(crate) fn record_worker_stall_events(&mut self, count: u64) {
480        self.total_worker_stall_events = self.total_worker_stall_events.saturating_add(count);
481        self.refresh_counter_metrics();
482    }
483
484    pub(crate) fn set_ring_not_available_events(&mut self, total: u64) {
485        self.total_ring_not_available_events = self.total_ring_not_available_events.max(total);
486        self.refresh_counter_metrics();
487    }
488
489    pub(crate) fn set_ring_eviction_events(&mut self, total: u64) {
490        self.total_ring_eviction_events = self.total_ring_eviction_events.max(total);
491        self.refresh_counter_metrics();
492    }
493
494    pub(crate) fn set_ring_stale_read_events(&mut self, total: u64) {
495        self.total_ring_stale_read_events = self.total_ring_stale_read_events.max(total);
496        self.refresh_counter_metrics();
497    }
498
499    pub(crate) fn set_ring_skew_retry_events(&mut self, total: u64) {
500        self.total_ring_skew_retry_events = self.total_ring_skew_retry_events.max(total);
501        self.refresh_counter_metrics();
502    }
503
504    fn refresh_counter_metrics(&mut self) {
505        self.last_metrics.queue_full_rejections = self.total_queue_full_rejections;
506        self.last_metrics.tick_disabled_rejections = self.total_tick_disabled_rejections;
507        self.last_metrics.rollback_events = self.total_rollback_events;
508        self.last_metrics.tick_disabled_transitions = self.total_tick_disabled_transitions;
509        self.last_metrics.worker_stall_events = self.total_worker_stall_events;
510        self.last_metrics.ring_not_available_events = self.total_ring_not_available_events;
511        self.last_metrics.ring_eviction_events = self.total_ring_eviction_events;
512        self.last_metrics.ring_stale_read_events = self.total_ring_stale_read_events;
513        self.last_metrics.ring_skew_retry_events = self.total_ring_skew_retry_events;
514    }
515
516    /// Get a read-only snapshot of the current published generation.
517    pub fn snapshot(&self) -> Snapshot<'_> {
518        self.arena.snapshot()
519    }
520
521    /// Get an owned, thread-safe snapshot of the current published generation.
522    ///
523    /// Unlike [`TickEngine::snapshot()`], the returned `OwnedSnapshot` owns
524    /// clones of the segment data and can be sent across thread boundaries.
525    pub fn owned_snapshot(&self) -> murk_arena::OwnedSnapshot {
526        self.arena.owned_snapshot()
527    }
528
529    /// Current tick ID.
530    pub fn current_tick(&self) -> TickId {
531        self.current_tick
532    }
533
534    /// Whether ticking is disabled due to consecutive rollbacks.
535    pub fn is_tick_disabled(&self) -> bool {
536        self.tick_disabled
537    }
538
539    /// Number of consecutive rollbacks since the last successful tick.
540    pub fn consecutive_rollback_count(&self) -> u32 {
541        self.consecutive_rollback_count
542    }
543
544    /// Metrics from the most recent successful tick.
545    pub fn last_metrics(&self) -> &StepMetrics {
546        &self.last_metrics
547    }
548
549    /// The spatial topology for this engine.
550    pub fn space(&self) -> &dyn murk_space::Space {
551        self.space.as_ref()
552    }
553
554    /// Number of command batches currently buffered in ingress.
555    pub fn ingress_queue_depth(&self) -> usize {
556        self.ingress.len()
557    }
558
559    /// Maximum number of command batches ingress can buffer.
560    pub fn ingress_queue_capacity(&self) -> usize {
561        self.ingress.capacity()
562    }
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568    use murk_core::command::CommandPayload;
569    use murk_core::id::{Coord, ParameterKey};
570    use murk_core::traits::SnapshotAccess;
571    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
572    use murk_propagator::propagator::WriteMode;
573    use murk_space::{EdgeBehavior, Line1D};
574    use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
575    use std::sync::atomic::{AtomicUsize, Ordering};
576
577    fn scalar_field(name: &str) -> FieldDef {
578        FieldDef {
579            name: name.to_string(),
580            field_type: FieldType::Scalar,
581            mutability: FieldMutability::PerTick,
582            units: None,
583            bounds: None,
584            boundary_behavior: BoundaryBehavior::Clamp,
585        }
586    }
587
588    fn sparse_scalar_field(name: &str) -> FieldDef {
589        FieldDef {
590            name: name.to_string(),
591            field_type: FieldType::Scalar,
592            mutability: FieldMutability::Sparse,
593            units: None,
594            bounds: None,
595            boundary_behavior: BoundaryBehavior::Clamp,
596        }
597    }
598
599    fn make_cmd(expires: u64) -> Command {
600        Command {
601            payload: CommandPayload::SetParameter {
602                key: ParameterKey(0),
603                value: 0.0,
604            },
605            expires_after_tick: TickId(expires),
606            source_id: None,
607            source_seq: None,
608            priority_class: 1,
609            arrival_seq: 0,
610        }
611    }
612
613    fn simple_engine() -> TickEngine {
614        let config = WorldConfig {
615            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
616            fields: vec![scalar_field("energy")],
617            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
618            dt: 0.1,
619            seed: 42,
620            ring_buffer_size: 8,
621            max_ingress_queue: 1024,
622            tick_rate_hz: None,
623            backoff: crate::config::BackoffConfig::default(),
624        };
625        TickEngine::new(config).unwrap()
626    }
627
628    fn two_field_engine() -> TickEngine {
629        let config = WorldConfig {
630            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
631            fields: vec![scalar_field("field0"), scalar_field("field1")],
632            propagators: vec![
633                Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
634                Box::new(IdentityPropagator::new(
635                    "copy_f0_to_f1",
636                    FieldId(0),
637                    FieldId(1),
638                )),
639            ],
640            dt: 0.1,
641            seed: 42,
642            ring_buffer_size: 8,
643            max_ingress_queue: 1024,
644            tick_rate_hz: None,
645            backoff: crate::config::BackoffConfig::default(),
646        };
647        TickEngine::new(config).unwrap()
648    }
649
650    fn three_field_engine() -> TickEngine {
651        // PropA writes field0=7.0
652        // PropB reads field0, copies to field1
653        // PropC reads field0+field1, writes sum to field2
654        struct SumPropagator {
655            name: String,
656            input_a: FieldId,
657            input_b: FieldId,
658            output: FieldId,
659        }
660
661        impl SumPropagator {
662            fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
663                Self {
664                    name: name.to_string(),
665                    input_a: a,
666                    input_b: b,
667                    output: out,
668                }
669            }
670        }
671
672        impl Propagator for SumPropagator {
673            fn name(&self) -> &str {
674                &self.name
675            }
676            fn reads(&self) -> murk_core::FieldSet {
677                [self.input_a, self.input_b].into_iter().collect()
678            }
679            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
680                vec![(self.output, WriteMode::Full)]
681            }
682            fn step(
683                &self,
684                ctx: &mut murk_propagator::StepContext<'_>,
685            ) -> Result<(), murk_core::PropagatorError> {
686                let a = ctx.reads().read(self.input_a).unwrap().to_vec();
687                let b = ctx.reads().read(self.input_b).unwrap().to_vec();
688                let out = ctx.writes().write(self.output).unwrap();
689                for i in 0..out.len() {
690                    out[i] = a[i] + b[i];
691                }
692                Ok(())
693            }
694        }
695
696        let config = WorldConfig {
697            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
698            fields: vec![
699                scalar_field("field0"),
700                scalar_field("field1"),
701                scalar_field("field2"),
702            ],
703            propagators: vec![
704                Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
705                Box::new(IdentityPropagator::new(
706                    "copy_f0_to_f1",
707                    FieldId(0),
708                    FieldId(1),
709                )),
710                Box::new(SumPropagator::new(
711                    "sum_f0_f1_to_f2",
712                    FieldId(0),
713                    FieldId(1),
714                    FieldId(2),
715                )),
716            ],
717            dt: 0.1,
718            seed: 42,
719            ring_buffer_size: 8,
720            max_ingress_queue: 1024,
721            tick_rate_hz: None,
722            backoff: crate::config::BackoffConfig::default(),
723        };
724        TickEngine::new(config).unwrap()
725    }
726
727    fn failing_engine(succeed_count: usize) -> TickEngine {
728        let config = WorldConfig {
729            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
730            fields: vec![scalar_field("energy")],
731            propagators: vec![Box::new(FailingPropagator::new(
732                "fail",
733                FieldId(0),
734                succeed_count,
735            ))],
736            dt: 0.1,
737            seed: 42,
738            ring_buffer_size: 8,
739            max_ingress_queue: 1024,
740            tick_rate_hz: None,
741            backoff: crate::config::BackoffConfig::default(),
742        };
743        TickEngine::new(config).unwrap()
744    }
745
746    fn partial_failure_engine() -> TickEngine {
747        // PropA succeeds always, PropB fails immediately.
748        let config = WorldConfig {
749            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
750            fields: vec![scalar_field("field0"), scalar_field("field1")],
751            propagators: vec![
752                Box::new(ConstPropagator::new("ok_prop", FieldId(0), 1.0)),
753                Box::new(FailingPropagator::new("fail_prop", FieldId(1), 0)),
754            ],
755            dt: 0.1,
756            seed: 42,
757            ring_buffer_size: 8,
758            max_ingress_queue: 1024,
759            tick_rate_hz: None,
760            backoff: crate::config::BackoffConfig::default(),
761        };
762        TickEngine::new(config).unwrap()
763    }
764
765    // ── Three-propagator overlay visibility tests ─────────────
766
767    #[test]
768    fn staged_read_sees_prior_propagator_write() {
769        // PropB reads field0 via reads() → should see PropA's value (7.0)
770        let mut engine = two_field_engine();
771        let result = engine.execute_tick().unwrap();
772        let snap = engine.snapshot();
773        // field1 should be a copy of field0
774        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
775        assert!(result.metrics.total_us > 0);
776    }
777
778    #[test]
779    fn reads_previous_sees_base_gen() {
780        // With reads_previous, a propagator should always see the base gen
781        // (tick-start snapshot), not staged writes. We verify by checking
782        // that on tick 1, reads_previous sees zeros (initial state).
783        struct ReadsPrevPropagator;
784        impl Propagator for ReadsPrevPropagator {
785            fn name(&self) -> &str {
786                "reads_prev"
787            }
788            fn reads(&self) -> murk_core::FieldSet {
789                murk_core::FieldSet::empty()
790            }
791            fn reads_previous(&self) -> murk_core::FieldSet {
792                [FieldId(0)].into_iter().collect()
793            }
794            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
795                vec![(FieldId(1), WriteMode::Full)]
796            }
797            fn step(
798                &self,
799                ctx: &mut murk_propagator::StepContext<'_>,
800            ) -> Result<(), murk_core::PropagatorError> {
801                let prev = ctx.reads_previous().read(FieldId(0)).unwrap().to_vec();
802                let out = ctx.writes().write(FieldId(1)).unwrap();
803                out.copy_from_slice(&prev);
804                Ok(())
805            }
806        }
807
808        let config = WorldConfig {
809            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
810            fields: vec![scalar_field("field0"), scalar_field("field1")],
811            propagators: vec![
812                Box::new(ConstPropagator::new("write_f0", FieldId(0), 99.0)),
813                Box::new(ReadsPrevPropagator),
814            ],
815            dt: 0.1,
816            seed: 42,
817            ring_buffer_size: 8,
818            max_ingress_queue: 1024,
819            tick_rate_hz: None,
820            backoff: crate::config::BackoffConfig::default(),
821        };
822        let mut engine = TickEngine::new(config).unwrap();
823
824        // Tick 1: PropA writes 99.0 to field0. ReadsPrev reads field0
825        // via reads_previous → sees base gen (all zeros).
826        engine.execute_tick().unwrap();
827        let snap = engine.snapshot();
828        // field1 should be 0.0 (base gen of field0 on first tick)
829        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 0.0);
830
831        // Tick 2: reads_previous now sees 99.0 (published from tick 1).
832        engine.execute_tick().unwrap();
833        let snap = engine.snapshot();
834        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 99.0);
835    }
836
837    #[test]
838    fn three_propagator_overlay_visibility() {
839        // A writes 7.0 to f0
840        // B reads f0 (staged → 7.0), copies to f1
841        // C reads f0 (staged → 7.0) + f1 (staged → 7.0), writes sum to f2
842        let mut engine = three_field_engine();
843        engine.execute_tick().unwrap();
844        let snap = engine.snapshot();
845
846        assert_eq!(snap.read(FieldId(0)).unwrap()[0], 7.0);
847        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
848        assert_eq!(snap.read(FieldId(2)).unwrap()[0], 14.0); // 7 + 7
849    }
850
851    #[test]
852    fn unwritten_field_reads_from_base_gen() {
853        // On tick 1, fields start as zero (base gen). A propagator
854        // reading a field nobody wrote should see zero.
855        let mut engine = three_field_engine();
856        engine.execute_tick().unwrap();
857        // All fields are written in this pipeline, so let's just
858        // verify the snapshot is consistent.
859        let snap = engine.snapshot();
860        let f2 = snap.read(FieldId(2)).unwrap();
861        assert!(f2.iter().all(|&v| v == 14.0));
862    }
863
864    // ── Tick atomicity tests ─────────────────────────────────
865
866    #[test]
867    fn propagator_failure_no_snapshot_published() {
868        let mut engine = failing_engine(0);
869
870        // Before any tick, snapshot is at tick 0 (initial state).
871        let snap_before = engine.snapshot();
872        let tick_before = snap_before.tick_id();
873
874        // Execute tick → should fail.
875        let result = engine.execute_tick();
876        assert!(result.is_err());
877
878        // Snapshot should be unchanged.
879        let snap_after = engine.snapshot();
880        assert_eq!(snap_after.tick_id(), tick_before);
881    }
882
883    #[test]
884    fn partial_failure_rolls_back_all() {
885        // PropA writes 1.0 to field0 (succeeds), PropB fails.
886        // field0 should NOT show 1.0 in the snapshot after rollback.
887        let mut engine = partial_failure_engine();
888
889        // Snapshot before: field0 should be all zeros (or not yet published).
890        let result = engine.execute_tick();
891        assert!(result.is_err());
892
893        // field0 should still be at initial state (no publish happened).
894        let snap = engine.snapshot();
895        let f0 = snap.read(FieldId(0));
896        // On the very first tick with no prior publish, the snapshot
897        // shows initial state. No writes from PropA should be visible.
898        if let Some(data) = f0 {
899            assert!(
900                data.iter().all(|&v| v == 0.0),
901                "rollback should prevent PropA's writes from being visible"
902            );
903        }
904    }
905
906    #[test]
907    fn rollback_receipts_generated() {
908        let mut engine = failing_engine(0);
909
910        // Submit an accepted command (SetField) before the failing tick.
911        let cmd = Command {
912            payload: CommandPayload::SetField {
913                coord: smallvec::smallvec![0],
914                field_id: FieldId(0),
915                value: 1.0,
916            },
917            expires_after_tick: TickId(100),
918            source_id: None,
919            source_seq: None,
920            priority_class: 1,
921            arrival_seq: 0,
922        };
923        engine.submit_commands(vec![cmd]);
924
925        let result = engine.execute_tick();
926        match result {
927            Err(TickError {
928                kind: StepError::PropagatorFailed { .. },
929                receipts,
930            }) => {
931                // Accepted receipts must be surfaced with TickRollback.
932                assert_eq!(receipts.len(), 1);
933                assert!(receipts[0].accepted);
934                assert_eq!(receipts[0].reason_code, Some(IngressError::TickRollback));
935            }
936            other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
937        }
938    }
939
940    #[test]
941    fn rollback_preserves_rejected_receipts() {
942        // Submit an unsupported command (SetParameter → rejected) alongside
943        // the tick. When the propagator fails and triggers rollback, the
944        // rejected receipt must stay accepted=false, not be overwritten
945        // with TickRollback.
946        let mut engine = failing_engine(0);
947
948        // SetParameter is unsupported → should be rejected (accepted=false).
949        engine.submit_commands(vec![make_cmd(100)]);
950
951        let result = engine.execute_tick();
952        match result {
953            Err(TickError {
954                kind: StepError::PropagatorFailed { .. },
955                receipts,
956            }) => {
957                assert_eq!(receipts.len(), 1);
958                // The receipt must remain rejected, NOT overwritten with
959                // accepted=true + TickRollback.
960                assert!(
961                    !receipts[0].accepted,
962                    "rejected receipt should stay rejected after rollback"
963                );
964                assert_eq!(
965                    receipts[0].reason_code,
966                    Some(IngressError::UnsupportedCommand),
967                    "rejected receipt must preserve UnsupportedCommand reason after rollback"
968                );
969            }
970            other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
971        }
972    }
973
974    // ── Rollback tracking tests ─────────────────────────────
975
976    #[test]
977    fn consecutive_rollbacks_disable_ticking() {
978        let mut engine = failing_engine(0);
979
980        for _ in 0..3 {
981            let _ = engine.execute_tick();
982        }
983
984        assert!(engine.is_tick_disabled());
985        assert_eq!(engine.consecutive_rollback_count(), 3);
986        assert_eq!(engine.last_metrics().rollback_events, 3);
987        assert_eq!(engine.last_metrics().tick_disabled_transitions, 1);
988    }
989
990    #[test]
991    fn success_resets_rollback_count() {
992        // Succeeds 2 times, then fails, but the first 2 successes
993        // should keep rollback count at 0.
994        let mut engine = failing_engine(10);
995
996        // Two successful ticks.
997        engine.execute_tick().unwrap();
998        engine.execute_tick().unwrap();
999        assert_eq!(engine.consecutive_rollback_count(), 0);
1000        assert_eq!(engine.current_tick(), TickId(2));
1001    }
1002
1003    #[test]
1004    fn tick_disabled_rejects_immediately() {
1005        let mut engine = failing_engine(0);
1006
1007        // Cause 3 failures to disable ticking.
1008        for _ in 0..3 {
1009            let _ = engine.execute_tick();
1010        }
1011        assert!(engine.is_tick_disabled());
1012
1013        // Next tick should fail immediately with TickDisabled.
1014        match engine.execute_tick() {
1015            Err(TickError {
1016                kind: StepError::TickDisabled,
1017                ..
1018            }) => {}
1019            other => panic!("expected TickDisabled, got {other:?}"),
1020        }
1021
1022        let receipts = engine.submit_commands(vec![make_cmd(100), make_cmd(100)]);
1023        assert!(receipts
1024            .iter()
1025            .all(|r| r.reason_code == Some(IngressError::TickDisabled)));
1026        assert_eq!(engine.last_metrics().tick_disabled_rejections, 2);
1027        assert_eq!(engine.last_metrics().rollback_events, 3);
1028        assert_eq!(engine.last_metrics().tick_disabled_transitions, 1);
1029    }
1030
1031    #[test]
1032    fn reset_clears_tick_disabled() {
1033        let mut engine = failing_engine(0);
1034
1035        for _ in 0..3 {
1036            let _ = engine.execute_tick();
1037        }
1038        assert!(engine.is_tick_disabled());
1039
1040        engine.reset().unwrap();
1041        assert!(!engine.is_tick_disabled());
1042        assert_eq!(engine.current_tick(), TickId(0));
1043        assert_eq!(engine.consecutive_rollback_count(), 0);
1044        assert_eq!(engine.last_metrics().queue_full_rejections, 0);
1045        assert_eq!(engine.last_metrics().tick_disabled_rejections, 0);
1046        assert_eq!(engine.last_metrics().rollback_events, 0);
1047        assert_eq!(engine.last_metrics().tick_disabled_transitions, 0);
1048        assert_eq!(engine.last_metrics().worker_stall_events, 0);
1049        assert_eq!(engine.last_metrics().ring_not_available_events, 0);
1050        assert_eq!(engine.last_metrics().ring_eviction_events, 0);
1051        assert_eq!(engine.last_metrics().ring_stale_read_events, 0);
1052        assert_eq!(engine.last_metrics().ring_skew_retry_events, 0);
1053    }
1054
1055    // ── Integration tests ────────────────────────────────────
1056
1057    #[test]
1058    fn single_tick_end_to_end() {
1059        let mut engine = simple_engine();
1060        let result = engine.execute_tick().unwrap();
1061
1062        let snap = engine.snapshot();
1063        let data = snap.read(FieldId(0)).unwrap();
1064        assert_eq!(data.len(), 10);
1065        assert!(data.iter().all(|&v| v == 42.0));
1066        assert_eq!(engine.current_tick(), TickId(1));
1067        assert!(!result.receipts.is_empty() || result.receipts.is_empty()); // receipts exist
1068    }
1069
1070    #[test]
1071    fn multi_tick_determinism() {
1072        let mut engine = simple_engine();
1073
1074        for _ in 0..10 {
1075            engine.execute_tick().unwrap();
1076        }
1077
1078        let snap = engine.snapshot();
1079        let data = snap.read(FieldId(0)).unwrap();
1080        assert!(data.iter().all(|&v| v == 42.0));
1081        assert_eq!(engine.current_tick(), TickId(10));
1082    }
1083
1084    #[test]
1085    fn commands_flow_through_to_receipts() {
1086        let mut engine = simple_engine();
1087
1088        // Use SetField commands — the only command type currently executed.
1089        let coord: Coord = vec![0i32].into();
1090        let cmds = vec![
1091            Command {
1092                payload: CommandPayload::SetField {
1093                    coord: coord.clone(),
1094                    field_id: FieldId(0),
1095                    value: 1.0,
1096                },
1097                expires_after_tick: TickId(100),
1098                source_id: None,
1099                source_seq: None,
1100                priority_class: 1,
1101                arrival_seq: 0,
1102            },
1103            Command {
1104                payload: CommandPayload::SetField {
1105                    coord: coord.clone(),
1106                    field_id: FieldId(0),
1107                    value: 2.0,
1108                },
1109                expires_after_tick: TickId(100),
1110                source_id: None,
1111                source_seq: None,
1112                priority_class: 1,
1113                arrival_seq: 0,
1114            },
1115        ];
1116        let submit_receipts = engine.submit_commands(cmds);
1117        assert_eq!(submit_receipts.len(), 2);
1118        assert!(submit_receipts.iter().all(|r| r.accepted));
1119
1120        let result = engine.execute_tick().unwrap();
1121        // Should have receipts for the 2 commands.
1122        let applied: Vec<_> = result
1123            .receipts
1124            .iter()
1125            .filter(|r| r.applied_tick_id.is_some())
1126            .collect();
1127        assert_eq!(applied.len(), 2);
1128        assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
1129    }
1130
1131    #[test]
1132    fn non_setfield_commands_rejected_honestly() {
1133        let mut engine = simple_engine();
1134
1135        // Submit SetParameter commands — not yet implemented.
1136        let submit_receipts = engine.submit_commands(vec![make_cmd(100), make_cmd(100)]);
1137        assert_eq!(submit_receipts.len(), 2);
1138        assert!(submit_receipts.iter().all(|r| r.accepted));
1139
1140        let result = engine.execute_tick().unwrap();
1141        assert_eq!(result.receipts.len(), 2);
1142
1143        // Non-SetField commands must NOT report as applied and must carry
1144        // UnsupportedCommand reason so callers can distinguish the failure mode.
1145        for receipt in &result.receipts {
1146            assert!(
1147                !receipt.accepted,
1148                "unimplemented command type must be rejected"
1149            );
1150            assert_eq!(
1151                receipt.applied_tick_id, None,
1152                "unimplemented command must not have applied_tick_id"
1153            );
1154            assert_eq!(
1155                receipt.reason_code,
1156                Some(IngressError::UnsupportedCommand),
1157                "rejected unsupported command must carry UnsupportedCommand reason"
1158            );
1159        }
1160    }
1161
1162    // ── Metrics tests ────────────────────────────────────────
1163
1164    #[test]
1165    fn timing_fields_populated() {
1166        let mut engine = simple_engine();
1167        let result = engine.execute_tick().unwrap();
1168
1169        // total_us is u64, so it's always >= 0; just verify the struct is populated.
1170        let _ = result.metrics.total_us;
1171        assert_eq!(result.metrics.propagator_us.len(), 1);
1172        assert_eq!(result.metrics.propagator_us[0].0, "const");
1173        assert_eq!(result.metrics.queue_full_rejections, 0);
1174        assert_eq!(result.metrics.tick_disabled_rejections, 0);
1175        assert_eq!(result.metrics.rollback_events, 0);
1176        assert_eq!(result.metrics.tick_disabled_transitions, 0);
1177        assert_eq!(result.metrics.worker_stall_events, 0);
1178        assert_eq!(result.metrics.ring_not_available_events, 0);
1179        assert_eq!(result.metrics.ring_eviction_events, 0);
1180        assert_eq!(result.metrics.ring_stale_read_events, 0);
1181        assert_eq!(result.metrics.ring_skew_retry_events, 0);
1182    }
1183
1184    #[test]
1185    fn memory_bytes_matches_arena() {
1186        let mut engine = simple_engine();
1187        engine.execute_tick().unwrap();
1188
1189        let metrics = engine.last_metrics();
1190        assert!(metrics.memory_bytes > 0);
1191    }
1192
1193    #[test]
1194    fn queue_full_rejections_are_counted_in_metrics() {
1195        let config = WorldConfig {
1196            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1197            fields: vec![scalar_field("energy")],
1198            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
1199            dt: 0.1,
1200            seed: 42,
1201            ring_buffer_size: 8,
1202            max_ingress_queue: 1,
1203            tick_rate_hz: None,
1204            backoff: crate::config::BackoffConfig::default(),
1205        };
1206        let mut engine = TickEngine::new(config).unwrap();
1207
1208        let submit_receipts =
1209            engine.submit_commands(vec![make_cmd(100), make_cmd(100), make_cmd(100)]);
1210        let queue_full = submit_receipts
1211            .iter()
1212            .filter(|r| r.reason_code == Some(IngressError::QueueFull))
1213            .count();
1214        assert_eq!(queue_full, 2);
1215
1216        let result = engine.execute_tick().unwrap();
1217        assert_eq!(result.metrics.queue_full_rejections, 2);
1218        assert_eq!(result.metrics.tick_disabled_rejections, 0);
1219    }
1220
1221    #[test]
1222    fn external_realtime_counters_are_reflected_in_metrics() {
1223        let mut engine = simple_engine();
1224        engine.record_worker_stall_events(3);
1225        engine.set_ring_not_available_events(7);
1226        engine.set_ring_eviction_events(5);
1227        engine.set_ring_stale_read_events(2);
1228        engine.set_ring_skew_retry_events(1);
1229
1230        assert_eq!(engine.last_metrics().worker_stall_events, 3);
1231        assert_eq!(engine.last_metrics().ring_not_available_events, 7);
1232        assert_eq!(engine.last_metrics().ring_eviction_events, 5);
1233        assert_eq!(engine.last_metrics().ring_stale_read_events, 2);
1234        assert_eq!(engine.last_metrics().ring_skew_retry_events, 1);
1235
1236        let result = engine.execute_tick().unwrap();
1237        assert_eq!(result.metrics.worker_stall_events, 3);
1238        assert_eq!(result.metrics.ring_not_available_events, 7);
1239        assert_eq!(result.metrics.ring_eviction_events, 5);
1240        assert_eq!(result.metrics.ring_stale_read_events, 2);
1241        assert_eq!(result.metrics.ring_skew_retry_events, 1);
1242    }
1243
1244    #[test]
1245    fn rollback_clears_sparse_reuse_counters_for_next_success() {
1246        struct MaybeFailPropagator {
1247            output: FieldId,
1248            fail_on_call: Option<usize>,
1249            call_count: AtomicUsize,
1250        }
1251
1252        impl MaybeFailPropagator {
1253            fn new(output: FieldId, fail_on_call: Option<usize>) -> Self {
1254                Self {
1255                    output,
1256                    fail_on_call,
1257                    call_count: AtomicUsize::new(0),
1258                }
1259            }
1260        }
1261
1262        impl Propagator for MaybeFailPropagator {
1263            fn name(&self) -> &str {
1264                "maybe_fail"
1265            }
1266
1267            fn reads(&self) -> murk_core::FieldSet {
1268                murk_core::FieldSet::empty()
1269            }
1270
1271            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
1272                vec![(self.output, WriteMode::Full)]
1273            }
1274
1275            fn step(
1276                &self,
1277                ctx: &mut murk_propagator::StepContext<'_>,
1278            ) -> Result<(), murk_core::PropagatorError> {
1279                let n = self.call_count.fetch_add(1, Ordering::Relaxed);
1280                if self.fail_on_call == Some(n) {
1281                    return Err(murk_core::PropagatorError::ExecutionFailed {
1282                        reason: format!("deliberate failure on call {n}"),
1283                    });
1284                }
1285
1286                let output = ctx.writes().write(self.output).ok_or_else(|| {
1287                    murk_core::PropagatorError::ExecutionFailed {
1288                        reason: format!("field {:?} not writable", self.output),
1289                    }
1290                })?;
1291                output.fill(n as f32);
1292                Ok(())
1293            }
1294        }
1295
1296        fn sparse_engine(fail_on_call: Option<usize>) -> TickEngine {
1297            let config = WorldConfig {
1298                space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1299                fields: vec![sparse_scalar_field("sparse0"), scalar_field("field1")],
1300                propagators: vec![
1301                    Box::new(ConstPropagator::new("write_sparse", FieldId(0), 3.0)),
1302                    Box::new(MaybeFailPropagator::new(FieldId(1), fail_on_call)),
1303                ],
1304                dt: 0.1,
1305                seed: 42,
1306                ring_buffer_size: 8,
1307                max_ingress_queue: 1024,
1308                tick_rate_hz: None,
1309                backoff: crate::config::BackoffConfig::default(),
1310            };
1311            let mut engine = TickEngine::new(config).unwrap();
1312            engine.arena.reset_sparse_reuse_counters();
1313            engine
1314        }
1315
1316        let mut rollback_engine = sparse_engine(Some(1)); // fail on tick 2
1317        rollback_engine.execute_tick().unwrap();
1318
1319        let rollback = rollback_engine.execute_tick();
1320        assert!(rollback.is_err(), "tick 2 should fail and roll back");
1321        assert_eq!(rollback_engine.arena.sparse_reuse_hits(), 0);
1322        assert_eq!(rollback_engine.arena.sparse_reuse_misses(), 0);
1323
1324        // A subsequent tick should still execute successfully after rollback.
1325        rollback_engine.execute_tick().unwrap();
1326    }
1327
1328    // ── Bug-fix regression tests ─────────────────────────────
1329
1330    #[test]
1331    fn reset_clears_pending_ingress() {
1332        let mut engine = simple_engine();
1333
1334        // Submit commands but don't tick.
1335        engine.submit_commands(vec![make_cmd(1000), make_cmd(1000)]);
1336
1337        // Reset should discard pending commands.
1338        engine.reset().unwrap();
1339
1340        // Tick should produce zero receipts (no pending commands).
1341        let result = engine.execute_tick().unwrap();
1342        assert!(result.receipts.is_empty());
1343    }
1344
1345    #[test]
1346    fn command_index_preserved_after_reordering() {
1347        let mut engine = simple_engine();
1348
1349        // Submit commands with different priorities — they'll be reordered.
1350        // Low priority first (index 0), high priority second (index 1).
1351        let cmds = vec![
1352            Command {
1353                payload: CommandPayload::SetParameter {
1354                    key: ParameterKey(0),
1355                    value: 1.0,
1356                },
1357                expires_after_tick: TickId(100),
1358                source_id: None,
1359                source_seq: None,
1360                priority_class: 2, // low priority
1361                arrival_seq: 0,
1362            },
1363            Command {
1364                payload: CommandPayload::SetParameter {
1365                    key: ParameterKey(0),
1366                    value: 2.0,
1367                },
1368                expires_after_tick: TickId(100),
1369                source_id: None,
1370                source_seq: None,
1371                priority_class: 0, // high priority — sorted first
1372                arrival_seq: 0,
1373            },
1374        ];
1375        engine.submit_commands(cmds);
1376
1377        let result = engine.execute_tick().unwrap();
1378        // After reordering, priority_class=0 (batch index 1) executes first,
1379        // priority_class=2 (batch index 0) executes second.
1380        // command_index must reflect the ORIGINAL batch position.
1381        assert_eq!(result.receipts.len(), 2);
1382        assert_eq!(result.receipts[0].command_index, 1); // was batch[1]
1383        assert_eq!(result.receipts[1].command_index, 0); // was batch[0]
1384    }
1385
1386    #[test]
1387    fn writemode_incremental_seeds_from_previous_gen() {
1388        // Regression test for BUG-015: WriteMode::Incremental buffers must
1389        // be pre-seeded with previous-generation data, not zero-filled.
1390        //
1391        // An incremental propagator writes cell 0 on tick 1 and then does
1392        // nothing on tick 2. Cell 0 must retain its value across ticks.
1393        struct IncrementalOnce {
1394            written: std::cell::Cell<bool>,
1395        }
1396        impl IncrementalOnce {
1397            fn new() -> Self {
1398                Self {
1399                    written: std::cell::Cell::new(false),
1400                }
1401            }
1402        }
1403        impl Propagator for IncrementalOnce {
1404            fn name(&self) -> &str {
1405                "incr_once"
1406            }
1407            fn reads(&self) -> murk_core::FieldSet {
1408                murk_core::FieldSet::empty()
1409            }
1410            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
1411                vec![(FieldId(0), WriteMode::Incremental)]
1412            }
1413            fn step(
1414                &self,
1415                ctx: &mut murk_propagator::StepContext<'_>,
1416            ) -> Result<(), murk_core::PropagatorError> {
1417                let buf = ctx.writes().write(FieldId(0)).unwrap();
1418                if !self.written.get() {
1419                    // First tick: write a distinctive value.
1420                    buf[0] = 42.0;
1421                    buf[1] = 99.0;
1422                    self.written.set(true);
1423                }
1424                // Second tick onward: do nothing — rely on incremental seeding.
1425                Ok(())
1426            }
1427        }
1428
1429        let config = WorldConfig {
1430            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1431            fields: vec![scalar_field("state")],
1432            propagators: vec![Box::new(IncrementalOnce::new())],
1433            dt: 0.1,
1434            seed: 42,
1435            ring_buffer_size: 8,
1436            max_ingress_queue: 1024,
1437            tick_rate_hz: None,
1438            backoff: crate::config::BackoffConfig::default(),
1439        };
1440        let mut engine = TickEngine::new(config).unwrap();
1441
1442        // Tick 1: propagator writes 42.0 and 99.0.
1443        engine.execute_tick().unwrap();
1444        let snap = engine.snapshot();
1445        assert_eq!(snap.read(FieldId(0)).unwrap()[0], 42.0);
1446        assert_eq!(snap.read(FieldId(0)).unwrap()[1], 99.0);
1447
1448        // Tick 2: propagator does nothing — incremental seeding must preserve values.
1449        engine.execute_tick().unwrap();
1450        let snap = engine.snapshot();
1451        assert_eq!(
1452            snap.read(FieldId(0)).unwrap()[0],
1453            42.0,
1454            "BUG-015: incremental field lost data across ticks"
1455        );
1456        assert_eq!(
1457            snap.read(FieldId(0)).unwrap()[1],
1458            99.0,
1459            "BUG-015: incremental field lost data across ticks"
1460        );
1461        // Unwritten cells should remain zero (seeded from previous gen which was zero).
1462        assert_eq!(snap.read(FieldId(0)).unwrap()[2], 0.0);
1463
1464        // Tick 3: still preserved.
1465        engine.execute_tick().unwrap();
1466        let snap = engine.snapshot();
1467        assert_eq!(snap.read(FieldId(0)).unwrap()[0], 42.0);
1468        assert_eq!(snap.read(FieldId(0)).unwrap()[1], 99.0);
1469    }
1470}