Skip to main content

murk_engine/
tick.rs

1//! Tick engine: the single-threaded simulation loop.
2//!
3//! [`TickEngine`] wires together the arena, propagators, ingress queue,
4//! and overlay caches into a deterministic tick execution loop with
5//! rollback atomicity.
6//!
7//! # Lockstep mode only
8//!
9//! This module implements Lockstep mode — a callable struct with no
10//! background threads. RealtimeAsync mode (future WP) wraps this in
11//! a thread with a ring buffer.
12
13use std::fmt;
14use std::time::Instant;
15
16use murk_arena::config::ArenaConfig;
17use murk_arena::pingpong::PingPongArena;
18use murk_arena::read::Snapshot;
19use murk_arena::static_arena::StaticArena;
20use murk_core::command::{Command, CommandPayload, Receipt};
21use murk_core::error::{IngressError, StepError};
22use murk_core::id::{FieldId, ParameterVersion, TickId};
23use murk_core::traits::{FieldReader, FieldWriter};
24use murk_core::FieldMutability;
25use murk_propagator::pipeline::{ReadResolutionPlan, ReadSource};
26use murk_propagator::propagator::Propagator;
27use murk_propagator::scratch::ScratchRegion as PropagatorScratch;
28
29use crate::config::{ConfigError, WorldConfig};
30use crate::ingress::IngressQueue;
31use crate::metrics::StepMetrics;
32use crate::overlay::{BaseFieldCache, BaseFieldSet, OverlayReader, StagedFieldCache};
33
34// ── TickResult ───────────────────────────────────────────────────
35
36/// Result of a successful tick execution.
37#[derive(Debug)]
38pub struct TickResult {
39    /// Receipts for commands submitted before this tick.
40    pub receipts: Vec<Receipt>,
41    /// Performance metrics for this tick.
42    pub metrics: StepMetrics,
43}
44
45// ── TickError ───────────────────────────────────────────────────
46
47/// Error returned from [`TickEngine::execute_tick()`].
48///
49/// Wraps the underlying [`StepError`] and any receipts that were produced
50/// before the failure. On rollback, receipts carry `TickRollback` reason
51/// codes; callers must not discard them.
52#[derive(Debug, PartialEq, Eq)]
53pub struct TickError {
54    /// The underlying error.
55    pub kind: StepError,
56    /// Receipts produced before the failure (may include rollback receipts).
57    pub receipts: Vec<Receipt>,
58}
59
60impl fmt::Display for TickError {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        write!(f, "{}", self.kind)
63    }
64}
65
66impl std::error::Error for TickError {
67    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
68        Some(&self.kind)
69    }
70}
71
72// ── TickEngine ───────────────────────────────────────────────────
73
74/// Single-threaded tick engine for Lockstep mode.
75///
76/// Owns all simulation state and executes ticks synchronously. Each
77/// `execute_tick()` call runs the full propagator pipeline, publishes
78/// a snapshot, and returns receipts for any submitted commands.
79pub struct TickEngine {
80    arena: PingPongArena,
81    propagators: Vec<Box<dyn Propagator>>,
82    plan: ReadResolutionPlan,
83    ingress: IngressQueue,
84    space: Box<dyn murk_space::Space>,
85    dt: f64,
86    current_tick: TickId,
87    param_version: ParameterVersion,
88    consecutive_rollback_count: u32,
89    tick_disabled: bool,
90    max_consecutive_rollbacks: u32,
91    propagator_scratch: PropagatorScratch,
92    base_field_set: BaseFieldSet,
93    base_cache: BaseFieldCache,
94    staged_cache: StagedFieldCache,
95    last_metrics: StepMetrics,
96}
97
98impl TickEngine {
99    /// Construct a new tick engine from a [`WorldConfig`].
100    ///
101    /// Validates the configuration, builds the read resolution plan,
102    /// constructs the arena, and pre-computes the base field set.
103    /// Consumes the `WorldConfig`.
104    pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
105        // Validate and build read resolution plan.
106        config.validate()?;
107        let defined_fields = config.defined_field_set();
108        let plan =
109            murk_propagator::validate_pipeline(&config.propagators, &defined_fields, config.dt)?;
110
111        // Build arena field defs.
112        // Safety: validate() already checked fields.len() fits in u32.
113        let arena_field_defs: Vec<(FieldId, murk_core::FieldDef)> = config
114            .fields
115            .iter()
116            .enumerate()
117            .map(|(i, def)| {
118                (
119                    FieldId(u32::try_from(i).expect("field count validated")),
120                    def.clone(),
121                )
122            })
123            .collect();
124
125        // Safety: validate() already checked cell_count fits in u32.
126        let cell_count = u32::try_from(config.space.cell_count()).expect("cell count validated");
127        let arena_config = ArenaConfig::new(cell_count);
128
129        // Build static arena for any Static fields.
130        let static_fields: Vec<(FieldId, u32)> = arena_field_defs
131            .iter()
132            .filter(|(_, d)| d.mutability == FieldMutability::Static)
133            .map(|(id, d)| (*id, cell_count * d.field_type.components()))
134            .collect();
135        let static_arena = StaticArena::new(&static_fields).into_shared();
136
137        let arena = PingPongArena::new(arena_config, arena_field_defs, static_arena)?;
138
139        // Pre-compute base field set.
140        let base_field_set = BaseFieldSet::from_plan(&plan, &config.propagators);
141
142        // Compute max scratch bytes across all propagators.
143        let max_scratch = config
144            .propagators
145            .iter()
146            .map(|p| p.scratch_bytes())
147            .max()
148            .unwrap_or(0);
149        let propagator_scratch = PropagatorScratch::with_byte_capacity(max_scratch);
150
151        let ingress = IngressQueue::new(config.max_ingress_queue);
152
153        Ok(Self {
154            arena,
155            propagators: config.propagators,
156            plan,
157            ingress,
158            space: config.space,
159            dt: config.dt,
160            current_tick: TickId(0),
161            param_version: ParameterVersion(0),
162            consecutive_rollback_count: 0,
163            tick_disabled: false,
164            max_consecutive_rollbacks: 3,
165            propagator_scratch,
166            base_field_set,
167            base_cache: BaseFieldCache::new(),
168            staged_cache: StagedFieldCache::new(),
169            last_metrics: StepMetrics::default(),
170        })
171    }
172
173    /// Submit commands to be processed in the next tick.
174    ///
175    /// Returns one receipt per command indicating acceptance or rejection.
176    pub fn submit_commands(&mut self, commands: Vec<Command>) -> Vec<Receipt> {
177        self.ingress.submit(commands, self.tick_disabled)
178    }
179
180    /// Execute one tick.
181    ///
182    /// Runs the full propagator pipeline, publishes the snapshot, and
183    /// returns receipts plus metrics. On propagator failure, the tick
184    /// is rolled back atomically (the staging buffer is abandoned).
185    pub fn execute_tick(&mut self) -> Result<TickResult, TickError> {
186        let tick_start = Instant::now();
187
188        // 0. Check if ticking is disabled.
189        if self.tick_disabled {
190            return Err(TickError {
191                kind: StepError::TickDisabled,
192                receipts: Vec::new(),
193            });
194        }
195
196        let next_tick = TickId(self.current_tick.0 + 1);
197
198        // 1. Populate base field cache from snapshot.
199        {
200            let snapshot = self.arena.snapshot();
201            self.base_cache.populate(&snapshot, &self.base_field_set);
202        }
203
204        // 2. Begin tick — if this fails, commands stay in the queue.
205        let mut guard = self.arena.begin_tick().map_err(|_| TickError {
206            kind: StepError::AllocationFailed,
207            receipts: Vec::new(),
208        })?;
209
210        // 3. Drain ingress queue (safe: begin_tick succeeded).
211        let cmd_start = Instant::now();
212        let drain = self.ingress.drain(next_tick);
213        let mut receipts = drain.expired_receipts;
214        let commands = drain.commands;
215        let accepted_receipt_start = receipts.len();
216        for dc in &commands {
217            receipts.push(Receipt {
218                accepted: true,
219                applied_tick_id: None,
220                reason_code: None,
221                command_index: dc.command_index,
222            });
223        }
224        // 3b. Apply commands to the staging writer.
225        for (i, dc) in commands.iter().enumerate() {
226            let receipt = &mut receipts[accepted_receipt_start + i];
227            match &dc.command.payload {
228                CommandPayload::SetField {
229                    ref coord,
230                    field_id,
231                    value,
232                } => {
233                    if let Some(rank) = self.space.canonical_rank(coord) {
234                        if let Some(buf) = guard.writer.write(*field_id) {
235                            if rank < buf.len() {
236                                buf[rank] = *value;
237                            }
238                        }
239                    }
240                }
241                CommandPayload::SetParameter { .. }
242                | CommandPayload::SetParameterBatch { .. }
243                | CommandPayload::Move { .. }
244                | CommandPayload::Spawn { .. }
245                | CommandPayload::Despawn { .. }
246                | CommandPayload::Custom { .. } => {
247                    receipt.accepted = false;
248                    receipt.reason_code = Some(IngressError::UnsupportedCommand);
249                }
250            }
251        }
252        let command_processing_us = cmd_start.elapsed().as_micros() as u64;
253
254        // 4. Run propagator pipeline.
255        let mut propagator_us = Vec::with_capacity(self.propagators.len());
256        for (i, prop) in self.propagators.iter().enumerate() {
257            let prop_start = Instant::now();
258
259            // 4a. Populate staged cache from guard.writer.read() per plan routes.
260            self.staged_cache.clear();
261            if let Some(routes) = self.plan.routes_for(i) {
262                for (&field, &source) in routes {
263                    if let ReadSource::Staged { .. } = source {
264                        if let Some(data) = guard.writer.read(field) {
265                            self.staged_cache.insert(field, data);
266                        }
267                    }
268                }
269            }
270
271            // 4b. Construct OverlayReader.
272            let empty_routes = indexmap::IndexMap::new();
273            let routes = self.plan.routes_for(i).unwrap_or(&empty_routes);
274            let overlay = OverlayReader::new(routes, &self.base_cache, &self.staged_cache);
275
276            // 4c. Seed WriteMode::Incremental buffers from previous generation.
277            for field in self.plan.incremental_fields_for(i) {
278                if let Some(prev_data) = self.base_cache.read(field) {
279                    // Copy through a temp buffer: base_cache borrows &self,
280                    // guard.writer.write() borrows &mut guard.
281                    let prev: Vec<f32> = prev_data.to_vec();
282                    if let Some(write_buf) = guard.writer.write(field) {
283                        let copy_len = prev.len().min(write_buf.len());
284                        write_buf[..copy_len].copy_from_slice(&prev[..copy_len]);
285                    }
286                }
287            }
288
289            // 4d. Reset propagator scratch.
290            self.propagator_scratch.reset();
291
292            // 4e. Construct StepContext and call step().
293            {
294                let mut ctx = murk_propagator::StepContext::new(
295                    &overlay,
296                    &self.base_cache,
297                    &mut guard.writer,
298                    &mut self.propagator_scratch,
299                    self.space.as_ref(),
300                    next_tick,
301                    self.dt,
302                );
303
304                // 4f. Call propagator step.
305                if let Err(reason) = prop.step(&mut ctx) {
306                    // 4g. Rollback on error — guard goes out of scope,
307                    // abandoning the staging buffer (free rollback).
308                    let prop_name = prop.name().to_string();
309                    return self.handle_rollback(
310                        prop_name,
311                        reason,
312                        receipts,
313                        accepted_receipt_start,
314                    );
315                }
316            }
317
318            propagator_us.push((
319                prop.name().to_string(),
320                prop_start.elapsed().as_micros() as u64,
321            ));
322        }
323
324        // 5. guard goes out of scope here (releases staging borrows).
325
326        // 6. Publish.
327        let publish_start = Instant::now();
328        self.arena
329            .publish(next_tick, self.param_version)
330            .map_err(|_| TickError {
331                kind: StepError::AllocationFailed,
332                receipts: vec![],
333            })?;
334        let snapshot_publish_us = publish_start.elapsed().as_micros() as u64;
335
336        // 7. Update state.
337        self.current_tick = next_tick;
338        self.consecutive_rollback_count = 0;
339
340        // 8. Finalize receipts with applied_tick_id (only for actually executed commands).
341        for receipt in &mut receipts[accepted_receipt_start..] {
342            if receipt.accepted {
343                receipt.applied_tick_id = Some(next_tick);
344            }
345        }
346
347        // 9. Build metrics.
348        let total_us = tick_start.elapsed().as_micros() as u64;
349        let metrics = StepMetrics {
350            total_us,
351            command_processing_us,
352            propagator_us,
353            snapshot_publish_us,
354            memory_bytes: self.arena.memory_bytes(),
355            sparse_retired_ranges: self.arena.sparse_retired_range_count() as u32,
356            sparse_pending_retired: self.arena.sparse_pending_retired_count() as u32,
357        };
358        self.last_metrics = metrics.clone();
359
360        Ok(TickResult { receipts, metrics })
361    }
362
363    /// Handle a propagator failure by rolling back the tick.
364    ///
365    /// Takes ownership of `receipts` and returns them inside [`TickError`]
366    /// so the caller can inspect per-command rollback reason codes.
367    fn handle_rollback(
368        &mut self,
369        prop_name: String,
370        reason: murk_core::PropagatorError,
371        mut receipts: Vec<Receipt>,
372        accepted_start: usize,
373    ) -> Result<TickResult, TickError> {
374        // Guard was dropped → staging buffer abandoned (free rollback).
375        self.consecutive_rollback_count += 1;
376        if self.consecutive_rollback_count >= self.max_consecutive_rollbacks {
377            self.tick_disabled = true;
378        }
379
380        // Mark accepted command receipts as rolled back, but preserve
381        // receipts that were already rejected (e.g. unsupported command
382        // types) so callers see the original rejection reason.
383        for receipt in &mut receipts[accepted_start..] {
384            if receipt.accepted {
385                receipt.applied_tick_id = None;
386                receipt.reason_code = Some(IngressError::TickRollback);
387            }
388        }
389
390        Err(TickError {
391            kind: StepError::PropagatorFailed {
392                name: prop_name,
393                reason,
394            },
395            receipts,
396        })
397    }
398
399    /// Reset the engine to its initial state.
400    pub fn reset(&mut self) -> Result<(), ConfigError> {
401        self.arena.reset().map_err(ConfigError::Arena)?;
402        self.ingress.clear();
403        self.current_tick = TickId(0);
404        self.param_version = ParameterVersion(0);
405        self.tick_disabled = false;
406        self.consecutive_rollback_count = 0;
407        self.last_metrics = StepMetrics::default();
408        Ok(())
409    }
410
411    /// Get a read-only snapshot of the current published generation.
412    pub fn snapshot(&self) -> Snapshot<'_> {
413        self.arena.snapshot()
414    }
415
416    /// Get an owned, thread-safe snapshot of the current published generation.
417    ///
418    /// Unlike [`TickEngine::snapshot()`], the returned `OwnedSnapshot` owns
419    /// clones of the segment data and can be sent across thread boundaries.
420    pub fn owned_snapshot(&self) -> murk_arena::OwnedSnapshot {
421        self.arena.owned_snapshot()
422    }
423
424    /// Current tick ID.
425    pub fn current_tick(&self) -> TickId {
426        self.current_tick
427    }
428
429    /// Whether ticking is disabled due to consecutive rollbacks.
430    pub fn is_tick_disabled(&self) -> bool {
431        self.tick_disabled
432    }
433
434    /// Number of consecutive rollbacks since the last successful tick.
435    pub fn consecutive_rollback_count(&self) -> u32 {
436        self.consecutive_rollback_count
437    }
438
439    /// Metrics from the most recent successful tick.
440    pub fn last_metrics(&self) -> &StepMetrics {
441        &self.last_metrics
442    }
443
444    /// The spatial topology for this engine.
445    pub fn space(&self) -> &dyn murk_space::Space {
446        self.space.as_ref()
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use murk_core::command::CommandPayload;
454    use murk_core::id::{Coord, ParameterKey};
455    use murk_core::traits::SnapshotAccess;
456    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
457    use murk_propagator::propagator::WriteMode;
458    use murk_space::{EdgeBehavior, Line1D};
459    use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
460
461    fn scalar_field(name: &str) -> FieldDef {
462        FieldDef {
463            name: name.to_string(),
464            field_type: FieldType::Scalar,
465            mutability: FieldMutability::PerTick,
466            units: None,
467            bounds: None,
468            boundary_behavior: BoundaryBehavior::Clamp,
469        }
470    }
471
472    fn make_cmd(expires: u64) -> Command {
473        Command {
474            payload: CommandPayload::SetParameter {
475                key: ParameterKey(0),
476                value: 0.0,
477            },
478            expires_after_tick: TickId(expires),
479            source_id: None,
480            source_seq: None,
481            priority_class: 1,
482            arrival_seq: 0,
483        }
484    }
485
486    fn simple_engine() -> TickEngine {
487        let config = WorldConfig {
488            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
489            fields: vec![scalar_field("energy")],
490            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
491            dt: 0.1,
492            seed: 42,
493            ring_buffer_size: 8,
494            max_ingress_queue: 1024,
495            tick_rate_hz: None,
496            backoff: crate::config::BackoffConfig::default(),
497        };
498        TickEngine::new(config).unwrap()
499    }
500
501    fn two_field_engine() -> TickEngine {
502        let config = WorldConfig {
503            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
504            fields: vec![scalar_field("field0"), scalar_field("field1")],
505            propagators: vec![
506                Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
507                Box::new(IdentityPropagator::new(
508                    "copy_f0_to_f1",
509                    FieldId(0),
510                    FieldId(1),
511                )),
512            ],
513            dt: 0.1,
514            seed: 42,
515            ring_buffer_size: 8,
516            max_ingress_queue: 1024,
517            tick_rate_hz: None,
518            backoff: crate::config::BackoffConfig::default(),
519        };
520        TickEngine::new(config).unwrap()
521    }
522
523    fn three_field_engine() -> TickEngine {
524        // PropA writes field0=7.0
525        // PropB reads field0, copies to field1
526        // PropC reads field0+field1, writes sum to field2
527        struct SumPropagator {
528            name: String,
529            input_a: FieldId,
530            input_b: FieldId,
531            output: FieldId,
532        }
533
534        impl SumPropagator {
535            fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
536                Self {
537                    name: name.to_string(),
538                    input_a: a,
539                    input_b: b,
540                    output: out,
541                }
542            }
543        }
544
545        impl Propagator for SumPropagator {
546            fn name(&self) -> &str {
547                &self.name
548            }
549            fn reads(&self) -> murk_core::FieldSet {
550                [self.input_a, self.input_b].into_iter().collect()
551            }
552            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
553                vec![(self.output, WriteMode::Full)]
554            }
555            fn step(
556                &self,
557                ctx: &mut murk_propagator::StepContext<'_>,
558            ) -> Result<(), murk_core::PropagatorError> {
559                let a = ctx.reads().read(self.input_a).unwrap().to_vec();
560                let b = ctx.reads().read(self.input_b).unwrap().to_vec();
561                let out = ctx.writes().write(self.output).unwrap();
562                for i in 0..out.len() {
563                    out[i] = a[i] + b[i];
564                }
565                Ok(())
566            }
567        }
568
569        let config = WorldConfig {
570            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
571            fields: vec![
572                scalar_field("field0"),
573                scalar_field("field1"),
574                scalar_field("field2"),
575            ],
576            propagators: vec![
577                Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
578                Box::new(IdentityPropagator::new(
579                    "copy_f0_to_f1",
580                    FieldId(0),
581                    FieldId(1),
582                )),
583                Box::new(SumPropagator::new(
584                    "sum_f0_f1_to_f2",
585                    FieldId(0),
586                    FieldId(1),
587                    FieldId(2),
588                )),
589            ],
590            dt: 0.1,
591            seed: 42,
592            ring_buffer_size: 8,
593            max_ingress_queue: 1024,
594            tick_rate_hz: None,
595            backoff: crate::config::BackoffConfig::default(),
596        };
597        TickEngine::new(config).unwrap()
598    }
599
600    fn failing_engine(succeed_count: usize) -> TickEngine {
601        let config = WorldConfig {
602            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
603            fields: vec![scalar_field("energy")],
604            propagators: vec![Box::new(FailingPropagator::new(
605                "fail",
606                FieldId(0),
607                succeed_count,
608            ))],
609            dt: 0.1,
610            seed: 42,
611            ring_buffer_size: 8,
612            max_ingress_queue: 1024,
613            tick_rate_hz: None,
614            backoff: crate::config::BackoffConfig::default(),
615        };
616        TickEngine::new(config).unwrap()
617    }
618
619    fn partial_failure_engine() -> TickEngine {
620        // PropA succeeds always, PropB fails immediately.
621        let config = WorldConfig {
622            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
623            fields: vec![scalar_field("field0"), scalar_field("field1")],
624            propagators: vec![
625                Box::new(ConstPropagator::new("ok_prop", FieldId(0), 1.0)),
626                Box::new(FailingPropagator::new("fail_prop", FieldId(1), 0)),
627            ],
628            dt: 0.1,
629            seed: 42,
630            ring_buffer_size: 8,
631            max_ingress_queue: 1024,
632            tick_rate_hz: None,
633            backoff: crate::config::BackoffConfig::default(),
634        };
635        TickEngine::new(config).unwrap()
636    }
637
638    // ── Three-propagator overlay visibility tests ─────────────
639
640    #[test]
641    fn staged_read_sees_prior_propagator_write() {
642        // PropB reads field0 via reads() → should see PropA's value (7.0)
643        let mut engine = two_field_engine();
644        let result = engine.execute_tick().unwrap();
645        let snap = engine.snapshot();
646        // field1 should be a copy of field0
647        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
648        assert!(result.metrics.total_us > 0);
649    }
650
651    #[test]
652    fn reads_previous_sees_base_gen() {
653        // With reads_previous, a propagator should always see the base gen
654        // (tick-start snapshot), not staged writes. We verify by checking
655        // that on tick 1, reads_previous sees zeros (initial state).
656        struct ReadsPrevPropagator;
657        impl Propagator for ReadsPrevPropagator {
658            fn name(&self) -> &str {
659                "reads_prev"
660            }
661            fn reads(&self) -> murk_core::FieldSet {
662                murk_core::FieldSet::empty()
663            }
664            fn reads_previous(&self) -> murk_core::FieldSet {
665                [FieldId(0)].into_iter().collect()
666            }
667            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
668                vec![(FieldId(1), WriteMode::Full)]
669            }
670            fn step(
671                &self,
672                ctx: &mut murk_propagator::StepContext<'_>,
673            ) -> Result<(), murk_core::PropagatorError> {
674                let prev = ctx.reads_previous().read(FieldId(0)).unwrap().to_vec();
675                let out = ctx.writes().write(FieldId(1)).unwrap();
676                out.copy_from_slice(&prev);
677                Ok(())
678            }
679        }
680
681        let config = WorldConfig {
682            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
683            fields: vec![scalar_field("field0"), scalar_field("field1")],
684            propagators: vec![
685                Box::new(ConstPropagator::new("write_f0", FieldId(0), 99.0)),
686                Box::new(ReadsPrevPropagator),
687            ],
688            dt: 0.1,
689            seed: 42,
690            ring_buffer_size: 8,
691            max_ingress_queue: 1024,
692            tick_rate_hz: None,
693            backoff: crate::config::BackoffConfig::default(),
694        };
695        let mut engine = TickEngine::new(config).unwrap();
696
697        // Tick 1: PropA writes 99.0 to field0. ReadsPrev reads field0
698        // via reads_previous → sees base gen (all zeros).
699        engine.execute_tick().unwrap();
700        let snap = engine.snapshot();
701        // field1 should be 0.0 (base gen of field0 on first tick)
702        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 0.0);
703
704        // Tick 2: reads_previous now sees 99.0 (published from tick 1).
705        engine.execute_tick().unwrap();
706        let snap = engine.snapshot();
707        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 99.0);
708    }
709
710    #[test]
711    fn three_propagator_overlay_visibility() {
712        // A writes 7.0 to f0
713        // B reads f0 (staged → 7.0), copies to f1
714        // C reads f0 (staged → 7.0) + f1 (staged → 7.0), writes sum to f2
715        let mut engine = three_field_engine();
716        engine.execute_tick().unwrap();
717        let snap = engine.snapshot();
718
719        assert_eq!(snap.read(FieldId(0)).unwrap()[0], 7.0);
720        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
721        assert_eq!(snap.read(FieldId(2)).unwrap()[0], 14.0); // 7 + 7
722    }
723
724    #[test]
725    fn unwritten_field_reads_from_base_gen() {
726        // On tick 1, fields start as zero (base gen). A propagator
727        // reading a field nobody wrote should see zero.
728        let mut engine = three_field_engine();
729        engine.execute_tick().unwrap();
730        // All fields are written in this pipeline, so let's just
731        // verify the snapshot is consistent.
732        let snap = engine.snapshot();
733        let f2 = snap.read(FieldId(2)).unwrap();
734        assert!(f2.iter().all(|&v| v == 14.0));
735    }
736
737    // ── Tick atomicity tests ─────────────────────────────────
738
739    #[test]
740    fn propagator_failure_no_snapshot_published() {
741        let mut engine = failing_engine(0);
742
743        // Before any tick, snapshot is at tick 0 (initial state).
744        let snap_before = engine.snapshot();
745        let tick_before = snap_before.tick_id();
746
747        // Execute tick → should fail.
748        let result = engine.execute_tick();
749        assert!(result.is_err());
750
751        // Snapshot should be unchanged.
752        let snap_after = engine.snapshot();
753        assert_eq!(snap_after.tick_id(), tick_before);
754    }
755
756    #[test]
757    fn partial_failure_rolls_back_all() {
758        // PropA writes 1.0 to field0 (succeeds), PropB fails.
759        // field0 should NOT show 1.0 in the snapshot after rollback.
760        let mut engine = partial_failure_engine();
761
762        // Snapshot before: field0 should be all zeros (or not yet published).
763        let result = engine.execute_tick();
764        assert!(result.is_err());
765
766        // field0 should still be at initial state (no publish happened).
767        let snap = engine.snapshot();
768        let f0 = snap.read(FieldId(0));
769        // On the very first tick with no prior publish, the snapshot
770        // shows initial state. No writes from PropA should be visible.
771        if let Some(data) = f0 {
772            assert!(
773                data.iter().all(|&v| v == 0.0),
774                "rollback should prevent PropA's writes from being visible"
775            );
776        }
777    }
778
779    #[test]
780    fn rollback_receipts_generated() {
781        let mut engine = failing_engine(0);
782
783        // Submit an accepted command (SetField) before the failing tick.
784        let cmd = Command {
785            payload: CommandPayload::SetField {
786                coord: smallvec::smallvec![0],
787                field_id: FieldId(0),
788                value: 1.0,
789            },
790            expires_after_tick: TickId(100),
791            source_id: None,
792            source_seq: None,
793            priority_class: 1,
794            arrival_seq: 0,
795        };
796        engine.submit_commands(vec![cmd]);
797
798        let result = engine.execute_tick();
799        match result {
800            Err(TickError {
801                kind: StepError::PropagatorFailed { .. },
802                receipts,
803            }) => {
804                // Accepted receipts must be surfaced with TickRollback.
805                assert_eq!(receipts.len(), 1);
806                assert!(receipts[0].accepted);
807                assert_eq!(receipts[0].reason_code, Some(IngressError::TickRollback));
808            }
809            other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
810        }
811    }
812
813    #[test]
814    fn rollback_preserves_rejected_receipts() {
815        // Submit an unsupported command (SetParameter → rejected) alongside
816        // the tick. When the propagator fails and triggers rollback, the
817        // rejected receipt must stay accepted=false, not be overwritten
818        // with TickRollback.
819        let mut engine = failing_engine(0);
820
821        // SetParameter is unsupported → should be rejected (accepted=false).
822        engine.submit_commands(vec![make_cmd(100)]);
823
824        let result = engine.execute_tick();
825        match result {
826            Err(TickError {
827                kind: StepError::PropagatorFailed { .. },
828                receipts,
829            }) => {
830                assert_eq!(receipts.len(), 1);
831                // The receipt must remain rejected, NOT overwritten with
832                // accepted=true + TickRollback.
833                assert!(
834                    !receipts[0].accepted,
835                    "rejected receipt should stay rejected after rollback"
836                );
837                assert_eq!(
838                    receipts[0].reason_code,
839                    Some(IngressError::UnsupportedCommand),
840                    "rejected receipt must preserve UnsupportedCommand reason after rollback"
841                );
842            }
843            other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
844        }
845    }
846
847    // ── Rollback tracking tests ─────────────────────────────
848
849    #[test]
850    fn consecutive_rollbacks_disable_ticking() {
851        let mut engine = failing_engine(0);
852
853        for _ in 0..3 {
854            let _ = engine.execute_tick();
855        }
856
857        assert!(engine.is_tick_disabled());
858        assert_eq!(engine.consecutive_rollback_count(), 3);
859    }
860
861    #[test]
862    fn success_resets_rollback_count() {
863        // Succeeds 2 times, then fails, but the first 2 successes
864        // should keep rollback count at 0.
865        let mut engine = failing_engine(10);
866
867        // Two successful ticks.
868        engine.execute_tick().unwrap();
869        engine.execute_tick().unwrap();
870        assert_eq!(engine.consecutive_rollback_count(), 0);
871        assert_eq!(engine.current_tick(), TickId(2));
872    }
873
874    #[test]
875    fn tick_disabled_rejects_immediately() {
876        let mut engine = failing_engine(0);
877
878        // Cause 3 failures to disable ticking.
879        for _ in 0..3 {
880            let _ = engine.execute_tick();
881        }
882        assert!(engine.is_tick_disabled());
883
884        // Next tick should fail immediately with TickDisabled.
885        match engine.execute_tick() {
886            Err(TickError {
887                kind: StepError::TickDisabled,
888                ..
889            }) => {}
890            other => panic!("expected TickDisabled, got {other:?}"),
891        }
892    }
893
894    #[test]
895    fn reset_clears_tick_disabled() {
896        let mut engine = failing_engine(0);
897
898        for _ in 0..3 {
899            let _ = engine.execute_tick();
900        }
901        assert!(engine.is_tick_disabled());
902
903        engine.reset().unwrap();
904        assert!(!engine.is_tick_disabled());
905        assert_eq!(engine.current_tick(), TickId(0));
906        assert_eq!(engine.consecutive_rollback_count(), 0);
907    }
908
909    // ── Integration tests ────────────────────────────────────
910
911    #[test]
912    fn single_tick_end_to_end() {
913        let mut engine = simple_engine();
914        let result = engine.execute_tick().unwrap();
915
916        let snap = engine.snapshot();
917        let data = snap.read(FieldId(0)).unwrap();
918        assert_eq!(data.len(), 10);
919        assert!(data.iter().all(|&v| v == 42.0));
920        assert_eq!(engine.current_tick(), TickId(1));
921        assert!(!result.receipts.is_empty() || result.receipts.is_empty()); // receipts exist
922    }
923
924    #[test]
925    fn multi_tick_determinism() {
926        let mut engine = simple_engine();
927
928        for _ in 0..10 {
929            engine.execute_tick().unwrap();
930        }
931
932        let snap = engine.snapshot();
933        let data = snap.read(FieldId(0)).unwrap();
934        assert!(data.iter().all(|&v| v == 42.0));
935        assert_eq!(engine.current_tick(), TickId(10));
936    }
937
938    #[test]
939    fn commands_flow_through_to_receipts() {
940        let mut engine = simple_engine();
941
942        // Use SetField commands — the only command type currently executed.
943        let coord: Coord = vec![0i32].into();
944        let cmds = vec![
945            Command {
946                payload: CommandPayload::SetField {
947                    coord: coord.clone(),
948                    field_id: FieldId(0),
949                    value: 1.0,
950                },
951                expires_after_tick: TickId(100),
952                source_id: None,
953                source_seq: None,
954                priority_class: 1,
955                arrival_seq: 0,
956            },
957            Command {
958                payload: CommandPayload::SetField {
959                    coord: coord.clone(),
960                    field_id: FieldId(0),
961                    value: 2.0,
962                },
963                expires_after_tick: TickId(100),
964                source_id: None,
965                source_seq: None,
966                priority_class: 1,
967                arrival_seq: 0,
968            },
969        ];
970        let submit_receipts = engine.submit_commands(cmds);
971        assert_eq!(submit_receipts.len(), 2);
972        assert!(submit_receipts.iter().all(|r| r.accepted));
973
974        let result = engine.execute_tick().unwrap();
975        // Should have receipts for the 2 commands.
976        let applied: Vec<_> = result
977            .receipts
978            .iter()
979            .filter(|r| r.applied_tick_id.is_some())
980            .collect();
981        assert_eq!(applied.len(), 2);
982        assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
983    }
984
985    #[test]
986    fn non_setfield_commands_rejected_honestly() {
987        let mut engine = simple_engine();
988
989        // Submit SetParameter commands — not yet implemented.
990        let submit_receipts = engine.submit_commands(vec![make_cmd(100), make_cmd(100)]);
991        assert_eq!(submit_receipts.len(), 2);
992        assert!(submit_receipts.iter().all(|r| r.accepted));
993
994        let result = engine.execute_tick().unwrap();
995        assert_eq!(result.receipts.len(), 2);
996
997        // Non-SetField commands must NOT report as applied and must carry
998        // UnsupportedCommand reason so callers can distinguish the failure mode.
999        for receipt in &result.receipts {
1000            assert!(
1001                !receipt.accepted,
1002                "unimplemented command type must be rejected"
1003            );
1004            assert_eq!(
1005                receipt.applied_tick_id, None,
1006                "unimplemented command must not have applied_tick_id"
1007            );
1008            assert_eq!(
1009                receipt.reason_code,
1010                Some(IngressError::UnsupportedCommand),
1011                "rejected unsupported command must carry UnsupportedCommand reason"
1012            );
1013        }
1014    }
1015
1016    // ── Metrics tests ────────────────────────────────────────
1017
1018    #[test]
1019    fn timing_fields_populated() {
1020        let mut engine = simple_engine();
1021        let result = engine.execute_tick().unwrap();
1022
1023        // total_us is u64, so it's always >= 0; just verify the struct is populated.
1024        let _ = result.metrics.total_us;
1025        assert_eq!(result.metrics.propagator_us.len(), 1);
1026        assert_eq!(result.metrics.propagator_us[0].0, "const");
1027    }
1028
1029    #[test]
1030    fn memory_bytes_matches_arena() {
1031        let mut engine = simple_engine();
1032        engine.execute_tick().unwrap();
1033
1034        let metrics = engine.last_metrics();
1035        assert!(metrics.memory_bytes > 0);
1036    }
1037
1038    // ── Bug-fix regression tests ─────────────────────────────
1039
1040    #[test]
1041    fn reset_clears_pending_ingress() {
1042        let mut engine = simple_engine();
1043
1044        // Submit commands but don't tick.
1045        engine.submit_commands(vec![make_cmd(1000), make_cmd(1000)]);
1046
1047        // Reset should discard pending commands.
1048        engine.reset().unwrap();
1049
1050        // Tick should produce zero receipts (no pending commands).
1051        let result = engine.execute_tick().unwrap();
1052        assert!(result.receipts.is_empty());
1053    }
1054
1055    #[test]
1056    fn command_index_preserved_after_reordering() {
1057        let mut engine = simple_engine();
1058
1059        // Submit commands with different priorities — they'll be reordered.
1060        // Low priority first (index 0), high priority second (index 1).
1061        let cmds = vec![
1062            Command {
1063                payload: CommandPayload::SetParameter {
1064                    key: ParameterKey(0),
1065                    value: 1.0,
1066                },
1067                expires_after_tick: TickId(100),
1068                source_id: None,
1069                source_seq: None,
1070                priority_class: 2, // low priority
1071                arrival_seq: 0,
1072            },
1073            Command {
1074                payload: CommandPayload::SetParameter {
1075                    key: ParameterKey(0),
1076                    value: 2.0,
1077                },
1078                expires_after_tick: TickId(100),
1079                source_id: None,
1080                source_seq: None,
1081                priority_class: 0, // high priority — sorted first
1082                arrival_seq: 0,
1083            },
1084        ];
1085        engine.submit_commands(cmds);
1086
1087        let result = engine.execute_tick().unwrap();
1088        // After reordering, priority_class=0 (batch index 1) executes first,
1089        // priority_class=2 (batch index 0) executes second.
1090        // command_index must reflect the ORIGINAL batch position.
1091        assert_eq!(result.receipts.len(), 2);
1092        assert_eq!(result.receipts[0].command_index, 1); // was batch[1]
1093        assert_eq!(result.receipts[1].command_index, 0); // was batch[0]
1094    }
1095
1096    #[test]
1097    fn writemode_incremental_seeds_from_previous_gen() {
1098        // Regression test for BUG-015: WriteMode::Incremental buffers must
1099        // be pre-seeded with previous-generation data, not zero-filled.
1100        //
1101        // An incremental propagator writes cell 0 on tick 1 and then does
1102        // nothing on tick 2. Cell 0 must retain its value across ticks.
1103        struct IncrementalOnce {
1104            written: std::cell::Cell<bool>,
1105        }
1106        impl IncrementalOnce {
1107            fn new() -> Self {
1108                Self {
1109                    written: std::cell::Cell::new(false),
1110                }
1111            }
1112        }
1113        impl Propagator for IncrementalOnce {
1114            fn name(&self) -> &str {
1115                "incr_once"
1116            }
1117            fn reads(&self) -> murk_core::FieldSet {
1118                murk_core::FieldSet::empty()
1119            }
1120            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
1121                vec![(FieldId(0), WriteMode::Incremental)]
1122            }
1123            fn step(
1124                &self,
1125                ctx: &mut murk_propagator::StepContext<'_>,
1126            ) -> Result<(), murk_core::PropagatorError> {
1127                let buf = ctx.writes().write(FieldId(0)).unwrap();
1128                if !self.written.get() {
1129                    // First tick: write a distinctive value.
1130                    buf[0] = 42.0;
1131                    buf[1] = 99.0;
1132                    self.written.set(true);
1133                }
1134                // Second tick onward: do nothing — rely on incremental seeding.
1135                Ok(())
1136            }
1137        }
1138
1139        let config = WorldConfig {
1140            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
1141            fields: vec![scalar_field("state")],
1142            propagators: vec![Box::new(IncrementalOnce::new())],
1143            dt: 0.1,
1144            seed: 42,
1145            ring_buffer_size: 8,
1146            max_ingress_queue: 1024,
1147            tick_rate_hz: None,
1148            backoff: crate::config::BackoffConfig::default(),
1149        };
1150        let mut engine = TickEngine::new(config).unwrap();
1151
1152        // Tick 1: propagator writes 42.0 and 99.0.
1153        engine.execute_tick().unwrap();
1154        let snap = engine.snapshot();
1155        assert_eq!(snap.read(FieldId(0)).unwrap()[0], 42.0);
1156        assert_eq!(snap.read(FieldId(0)).unwrap()[1], 99.0);
1157
1158        // Tick 2: propagator does nothing — incremental seeding must preserve values.
1159        engine.execute_tick().unwrap();
1160        let snap = engine.snapshot();
1161        assert_eq!(
1162            snap.read(FieldId(0)).unwrap()[0],
1163            42.0,
1164            "BUG-015: incremental field lost data across ticks"
1165        );
1166        assert_eq!(
1167            snap.read(FieldId(0)).unwrap()[1],
1168            99.0,
1169            "BUG-015: incremental field lost data across ticks"
1170        );
1171        // Unwritten cells should remain zero (seeded from previous gen which was zero).
1172        assert_eq!(snap.read(FieldId(0)).unwrap()[2], 0.0);
1173
1174        // Tick 3: still preserved.
1175        engine.execute_tick().unwrap();
1176        let snap = engine.snapshot();
1177        assert_eq!(snap.read(FieldId(0)).unwrap()[0], 42.0);
1178        assert_eq!(snap.read(FieldId(0)).unwrap()[1], 99.0);
1179    }
1180}