Skip to main content

murk_engine/
tick.rs

1//! Tick engine: the single-threaded simulation loop.
2//!
3//! [`TickEngine`] wires together the arena, propagators, ingress queue,
4//! and overlay caches into a deterministic tick execution loop with
5//! rollback atomicity.
6//!
7//! # Lockstep mode only
8//!
9//! This module implements Lockstep mode — a callable struct with no
10//! background threads. RealtimeAsync mode (future WP) wraps this in
11//! a thread with a ring buffer.
12
13use std::fmt;
14use std::time::Instant;
15
16use murk_arena::config::ArenaConfig;
17use murk_arena::pingpong::PingPongArena;
18use murk_arena::read::Snapshot;
19use murk_arena::static_arena::StaticArena;
20use murk_core::command::{Command, CommandPayload, Receipt};
21use murk_core::error::{IngressError, StepError};
22use murk_core::id::{FieldId, ParameterVersion, TickId};
23use murk_core::traits::FieldWriter;
24use murk_core::FieldMutability;
25use murk_propagator::pipeline::{ReadResolutionPlan, ReadSource};
26use murk_propagator::propagator::Propagator;
27use murk_propagator::scratch::ScratchRegion as PropagatorScratch;
28
29use crate::config::{ConfigError, WorldConfig};
30use crate::ingress::IngressQueue;
31use crate::metrics::StepMetrics;
32use crate::overlay::{BaseFieldCache, BaseFieldSet, OverlayReader, StagedFieldCache};
33
34// ── TickResult ───────────────────────────────────────────────────
35
36/// Result of a successful tick execution.
37#[derive(Debug)]
38pub struct TickResult {
39    /// Receipts for commands submitted before this tick.
40    pub receipts: Vec<Receipt>,
41    /// Performance metrics for this tick.
42    pub metrics: StepMetrics,
43}
44
45// ── TickError ───────────────────────────────────────────────────
46
47/// Error returned from [`TickEngine::execute_tick()`].
48///
49/// Wraps the underlying [`StepError`] and any receipts that were produced
50/// before the failure. On rollback, receipts carry `TickRollback` reason
51/// codes; callers must not discard them.
52#[derive(Debug)]
53pub struct TickError {
54    /// The underlying error.
55    pub kind: StepError,
56    /// Receipts produced before the failure (may include rollback receipts).
57    pub receipts: Vec<Receipt>,
58}
59
60impl fmt::Display for TickError {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        write!(f, "{}", self.kind)
63    }
64}
65
66impl std::error::Error for TickError {
67    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
68        Some(&self.kind)
69    }
70}
71
72// ── TickEngine ───────────────────────────────────────────────────
73
74/// Single-threaded tick engine for Lockstep mode.
75///
76/// Owns all simulation state and executes ticks synchronously. Each
77/// `execute_tick()` call runs the full propagator pipeline, publishes
78/// a snapshot, and returns receipts for any submitted commands.
79pub struct TickEngine {
80    arena: PingPongArena,
81    propagators: Vec<Box<dyn Propagator>>,
82    plan: ReadResolutionPlan,
83    ingress: IngressQueue,
84    space: Box<dyn murk_space::Space>,
85    dt: f64,
86    current_tick: TickId,
87    param_version: ParameterVersion,
88    consecutive_rollback_count: u32,
89    tick_disabled: bool,
90    max_consecutive_rollbacks: u32,
91    propagator_scratch: PropagatorScratch,
92    base_field_set: BaseFieldSet,
93    base_cache: BaseFieldCache,
94    staged_cache: StagedFieldCache,
95    last_metrics: StepMetrics,
96}
97
98impl TickEngine {
99    /// Construct a new tick engine from a [`WorldConfig`].
100    ///
101    /// Validates the configuration, builds the read resolution plan,
102    /// constructs the arena, and pre-computes the base field set.
103    /// Consumes the `WorldConfig`.
104    pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
105        // Validate and build read resolution plan.
106        config.validate()?;
107        let defined_fields = config.defined_field_set();
108        let plan =
109            murk_propagator::validate_pipeline(&config.propagators, &defined_fields, config.dt)?;
110
111        // Build arena field defs.
112        let arena_field_defs: Vec<(FieldId, murk_core::FieldDef)> = config
113            .fields
114            .iter()
115            .enumerate()
116            .map(|(i, def)| (FieldId(i as u32), def.clone()))
117            .collect();
118
119        let cell_count = config.space.cell_count() as u32;
120        let arena_config = ArenaConfig::new(cell_count);
121
122        // Build static arena for any Static fields.
123        let static_fields: Vec<(FieldId, u32)> = arena_field_defs
124            .iter()
125            .filter(|(_, d)| d.mutability == FieldMutability::Static)
126            .map(|(id, d)| (*id, cell_count * d.field_type.components()))
127            .collect();
128        let static_arena = StaticArena::new(&static_fields).into_shared();
129
130        let arena = PingPongArena::new(arena_config, arena_field_defs, static_arena)?;
131
132        // Pre-compute base field set.
133        let base_field_set = BaseFieldSet::from_plan(&plan, &config.propagators);
134
135        // Compute max scratch bytes across all propagators.
136        let max_scratch = config
137            .propagators
138            .iter()
139            .map(|p| p.scratch_bytes())
140            .max()
141            .unwrap_or(0);
142        let propagator_scratch = PropagatorScratch::with_byte_capacity(max_scratch);
143
144        let ingress = IngressQueue::new(config.max_ingress_queue);
145
146        Ok(Self {
147            arena,
148            propagators: config.propagators,
149            plan,
150            ingress,
151            space: config.space,
152            dt: config.dt,
153            current_tick: TickId(0),
154            param_version: ParameterVersion(0),
155            consecutive_rollback_count: 0,
156            tick_disabled: false,
157            max_consecutive_rollbacks: 3,
158            propagator_scratch,
159            base_field_set,
160            base_cache: BaseFieldCache::new(),
161            staged_cache: StagedFieldCache::new(),
162            last_metrics: StepMetrics::default(),
163        })
164    }
165
166    /// Submit commands to be processed in the next tick.
167    ///
168    /// Returns one receipt per command indicating acceptance or rejection.
169    pub fn submit_commands(&mut self, commands: Vec<Command>) -> Vec<Receipt> {
170        self.ingress.submit(commands, self.tick_disabled)
171    }
172
173    /// Execute one tick.
174    ///
175    /// Runs the full propagator pipeline, publishes the snapshot, and
176    /// returns receipts plus metrics. On propagator failure, the tick
177    /// is rolled back atomically (the staging buffer is abandoned).
178    pub fn execute_tick(&mut self) -> Result<TickResult, TickError> {
179        let tick_start = Instant::now();
180
181        // 0. Check if ticking is disabled.
182        if self.tick_disabled {
183            return Err(TickError {
184                kind: StepError::TickDisabled,
185                receipts: Vec::new(),
186            });
187        }
188
189        let next_tick = TickId(self.current_tick.0 + 1);
190
191        // 1. Populate base field cache from snapshot.
192        {
193            let snapshot = self.arena.snapshot();
194            self.base_cache.populate(&snapshot, &self.base_field_set);
195        }
196
197        // 2. Begin tick — if this fails, commands stay in the queue.
198        let mut guard = self.arena.begin_tick().map_err(|_| TickError {
199            kind: StepError::AllocationFailed,
200            receipts: Vec::new(),
201        })?;
202
203        // 3. Drain ingress queue (safe: begin_tick succeeded).
204        let cmd_start = Instant::now();
205        let drain = self.ingress.drain(next_tick);
206        let mut receipts = drain.expired_receipts;
207        let commands = drain.commands;
208        let accepted_receipt_start = receipts.len();
209        for dc in &commands {
210            receipts.push(Receipt {
211                accepted: true,
212                applied_tick_id: None,
213                reason_code: None,
214                command_index: dc.command_index,
215            });
216        }
217        // 3b. Apply SetField commands to the staging writer.
218        for dc in &commands {
219            if let CommandPayload::SetField {
220                ref coord,
221                field_id,
222                value,
223            } = dc.command.payload
224            {
225                if let Some(rank) = self.space.canonical_rank(coord) {
226                    if let Some(buf) = guard.writer.write(field_id) {
227                        if rank < buf.len() {
228                            buf[rank] = value;
229                        }
230                    }
231                }
232            }
233        }
234        let command_processing_us = cmd_start.elapsed().as_micros() as u64;
235
236        // 4. Run propagator pipeline.
237        let mut propagator_us = Vec::with_capacity(self.propagators.len());
238        for (i, prop) in self.propagators.iter().enumerate() {
239            let prop_start = Instant::now();
240
241            // 4a. Populate staged cache from guard.writer.read() per plan routes.
242            self.staged_cache.clear();
243            if let Some(routes) = self.plan.routes_for(i) {
244                for (&field, &source) in routes {
245                    if let ReadSource::Staged { .. } = source {
246                        if let Some(data) = guard.writer.read(field) {
247                            self.staged_cache.insert(field, data);
248                        }
249                    }
250                }
251            }
252
253            // 4b. Construct OverlayReader.
254            let empty_routes = indexmap::IndexMap::new();
255            let routes = self.plan.routes_for(i).unwrap_or(&empty_routes);
256            let overlay = OverlayReader::new(routes, &self.base_cache, &self.staged_cache);
257
258            // 4c. Reset propagator scratch.
259            self.propagator_scratch.reset();
260
261            // 4d. Construct StepContext and call step().
262            {
263                let mut ctx = murk_propagator::StepContext::new(
264                    &overlay,
265                    &self.base_cache,
266                    &mut guard.writer,
267                    &mut self.propagator_scratch,
268                    self.space.as_ref(),
269                    next_tick,
270                    self.dt,
271                );
272
273                // 4e. Call propagator step.
274                if let Err(reason) = prop.step(&mut ctx) {
275                    // 4g. Rollback on error — guard goes out of scope,
276                    // abandoning the staging buffer (free rollback).
277                    let prop_name = prop.name().to_string();
278                    return self.handle_rollback(
279                        prop_name,
280                        reason,
281                        receipts,
282                        accepted_receipt_start,
283                    );
284                }
285            }
286
287            propagator_us.push((
288                prop.name().to_string(),
289                prop_start.elapsed().as_micros() as u64,
290            ));
291        }
292
293        // 5. guard goes out of scope here (releases staging borrows).
294
295        // 6. Publish.
296        let publish_start = Instant::now();
297        self.arena.publish(next_tick, self.param_version);
298        let snapshot_publish_us = publish_start.elapsed().as_micros() as u64;
299
300        // 7. Update state.
301        self.current_tick = next_tick;
302        self.consecutive_rollback_count = 0;
303
304        // 8. Finalize receipts with applied_tick_id.
305        for receipt in &mut receipts[accepted_receipt_start..] {
306            receipt.applied_tick_id = Some(next_tick);
307        }
308
309        // 9. Build metrics.
310        let total_us = tick_start.elapsed().as_micros() as u64;
311        let metrics = StepMetrics {
312            total_us,
313            command_processing_us,
314            propagator_us,
315            snapshot_publish_us,
316            memory_bytes: self.arena.memory_bytes(),
317        };
318        self.last_metrics = metrics.clone();
319
320        Ok(TickResult { receipts, metrics })
321    }
322
323    /// Handle a propagator failure by rolling back the tick.
324    ///
325    /// Takes ownership of `receipts` and returns them inside [`TickError`]
326    /// so the caller can inspect per-command rollback reason codes.
327    fn handle_rollback(
328        &mut self,
329        prop_name: String,
330        reason: murk_core::PropagatorError,
331        mut receipts: Vec<Receipt>,
332        accepted_start: usize,
333    ) -> Result<TickResult, TickError> {
334        // Guard was dropped → staging buffer abandoned (free rollback).
335        self.consecutive_rollback_count += 1;
336        if self.consecutive_rollback_count >= self.max_consecutive_rollbacks {
337            self.tick_disabled = true;
338        }
339
340        // Mark accepted command receipts as rolled back.
341        for receipt in &mut receipts[accepted_start..] {
342            receipt.accepted = true;
343            receipt.applied_tick_id = None;
344            receipt.reason_code = Some(IngressError::TickRollback);
345        }
346
347        Err(TickError {
348            kind: StepError::PropagatorFailed {
349                name: prop_name,
350                reason,
351            },
352            receipts,
353        })
354    }
355
356    /// Reset the engine to its initial state.
357    pub fn reset(&mut self) -> Result<(), ConfigError> {
358        self.arena.reset().map_err(ConfigError::Arena)?;
359        self.ingress.clear();
360        self.current_tick = TickId(0);
361        self.param_version = ParameterVersion(0);
362        self.tick_disabled = false;
363        self.consecutive_rollback_count = 0;
364        self.last_metrics = StepMetrics::default();
365        Ok(())
366    }
367
368    /// Get a read-only snapshot of the current published generation.
369    pub fn snapshot(&self) -> Snapshot<'_> {
370        self.arena.snapshot()
371    }
372
373    /// Get an owned, thread-safe snapshot of the current published generation.
374    ///
375    /// Unlike [`TickEngine::snapshot()`], the returned `OwnedSnapshot` owns
376    /// clones of the segment data and can be sent across thread boundaries.
377    pub fn owned_snapshot(&self) -> murk_arena::OwnedSnapshot {
378        self.arena.owned_snapshot()
379    }
380
381    /// Current tick ID.
382    pub fn current_tick(&self) -> TickId {
383        self.current_tick
384    }
385
386    /// Whether ticking is disabled due to consecutive rollbacks.
387    pub fn is_tick_disabled(&self) -> bool {
388        self.tick_disabled
389    }
390
391    /// Number of consecutive rollbacks since the last successful tick.
392    pub fn consecutive_rollback_count(&self) -> u32 {
393        self.consecutive_rollback_count
394    }
395
396    /// Metrics from the most recent successful tick.
397    pub fn last_metrics(&self) -> &StepMetrics {
398        &self.last_metrics
399    }
400
401    /// The spatial topology for this engine.
402    pub fn space(&self) -> &dyn murk_space::Space {
403        self.space.as_ref()
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use murk_core::command::CommandPayload;
411    use murk_core::id::ParameterKey;
412    use murk_core::traits::{FieldReader, SnapshotAccess};
413    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
414    use murk_propagator::propagator::WriteMode;
415    use murk_space::{EdgeBehavior, Line1D};
416    use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
417
418    fn scalar_field(name: &str) -> FieldDef {
419        FieldDef {
420            name: name.to_string(),
421            field_type: FieldType::Scalar,
422            mutability: FieldMutability::PerTick,
423            units: None,
424            bounds: None,
425            boundary_behavior: BoundaryBehavior::Clamp,
426        }
427    }
428
429    fn make_cmd(expires: u64) -> Command {
430        Command {
431            payload: CommandPayload::SetParameter {
432                key: ParameterKey(0),
433                value: 0.0,
434            },
435            expires_after_tick: TickId(expires),
436            source_id: None,
437            source_seq: None,
438            priority_class: 1,
439            arrival_seq: 0,
440        }
441    }
442
443    fn simple_engine() -> TickEngine {
444        let config = WorldConfig {
445            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
446            fields: vec![scalar_field("energy")],
447            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
448            dt: 0.1,
449            seed: 42,
450            ring_buffer_size: 8,
451            max_ingress_queue: 1024,
452            tick_rate_hz: None,
453            backoff: crate::config::BackoffConfig::default(),
454        };
455        TickEngine::new(config).unwrap()
456    }
457
458    fn two_field_engine() -> TickEngine {
459        let config = WorldConfig {
460            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
461            fields: vec![scalar_field("field0"), scalar_field("field1")],
462            propagators: vec![
463                Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
464                Box::new(IdentityPropagator::new(
465                    "copy_f0_to_f1",
466                    FieldId(0),
467                    FieldId(1),
468                )),
469            ],
470            dt: 0.1,
471            seed: 42,
472            ring_buffer_size: 8,
473            max_ingress_queue: 1024,
474            tick_rate_hz: None,
475            backoff: crate::config::BackoffConfig::default(),
476        };
477        TickEngine::new(config).unwrap()
478    }
479
480    fn three_field_engine() -> TickEngine {
481        // PropA writes field0=7.0
482        // PropB reads field0, copies to field1
483        // PropC reads field0+field1, writes sum to field2
484        struct SumPropagator {
485            name: String,
486            input_a: FieldId,
487            input_b: FieldId,
488            output: FieldId,
489        }
490
491        impl SumPropagator {
492            fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
493                Self {
494                    name: name.to_string(),
495                    input_a: a,
496                    input_b: b,
497                    output: out,
498                }
499            }
500        }
501
502        impl Propagator for SumPropagator {
503            fn name(&self) -> &str {
504                &self.name
505            }
506            fn reads(&self) -> murk_core::FieldSet {
507                [self.input_a, self.input_b].into_iter().collect()
508            }
509            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
510                vec![(self.output, WriteMode::Full)]
511            }
512            fn step(
513                &self,
514                ctx: &mut murk_propagator::StepContext<'_>,
515            ) -> Result<(), murk_core::PropagatorError> {
516                let a = ctx.reads().read(self.input_a).unwrap().to_vec();
517                let b = ctx.reads().read(self.input_b).unwrap().to_vec();
518                let out = ctx.writes().write(self.output).unwrap();
519                for i in 0..out.len() {
520                    out[i] = a[i] + b[i];
521                }
522                Ok(())
523            }
524        }
525
526        let config = WorldConfig {
527            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
528            fields: vec![
529                scalar_field("field0"),
530                scalar_field("field1"),
531                scalar_field("field2"),
532            ],
533            propagators: vec![
534                Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
535                Box::new(IdentityPropagator::new(
536                    "copy_f0_to_f1",
537                    FieldId(0),
538                    FieldId(1),
539                )),
540                Box::new(SumPropagator::new(
541                    "sum_f0_f1_to_f2",
542                    FieldId(0),
543                    FieldId(1),
544                    FieldId(2),
545                )),
546            ],
547            dt: 0.1,
548            seed: 42,
549            ring_buffer_size: 8,
550            max_ingress_queue: 1024,
551            tick_rate_hz: None,
552            backoff: crate::config::BackoffConfig::default(),
553        };
554        TickEngine::new(config).unwrap()
555    }
556
557    fn failing_engine(succeed_count: usize) -> TickEngine {
558        let config = WorldConfig {
559            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
560            fields: vec![scalar_field("energy")],
561            propagators: vec![Box::new(FailingPropagator::new(
562                "fail",
563                FieldId(0),
564                succeed_count,
565            ))],
566            dt: 0.1,
567            seed: 42,
568            ring_buffer_size: 8,
569            max_ingress_queue: 1024,
570            tick_rate_hz: None,
571            backoff: crate::config::BackoffConfig::default(),
572        };
573        TickEngine::new(config).unwrap()
574    }
575
576    fn partial_failure_engine() -> TickEngine {
577        // PropA succeeds always, PropB fails immediately.
578        let config = WorldConfig {
579            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
580            fields: vec![scalar_field("field0"), scalar_field("field1")],
581            propagators: vec![
582                Box::new(ConstPropagator::new("ok_prop", FieldId(0), 1.0)),
583                Box::new(FailingPropagator::new("fail_prop", FieldId(1), 0)),
584            ],
585            dt: 0.1,
586            seed: 42,
587            ring_buffer_size: 8,
588            max_ingress_queue: 1024,
589            tick_rate_hz: None,
590            backoff: crate::config::BackoffConfig::default(),
591        };
592        TickEngine::new(config).unwrap()
593    }
594
595    // ── Three-propagator overlay visibility tests ─────────────
596
597    #[test]
598    fn staged_read_sees_prior_propagator_write() {
599        // PropB reads field0 via reads() → should see PropA's value (7.0)
600        let mut engine = two_field_engine();
601        let result = engine.execute_tick().unwrap();
602        let snap = engine.snapshot();
603        // field1 should be a copy of field0
604        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
605        assert!(result.metrics.total_us > 0);
606    }
607
608    #[test]
609    fn reads_previous_sees_base_gen() {
610        // With reads_previous, a propagator should always see the base gen
611        // (tick-start snapshot), not staged writes. We verify by checking
612        // that on tick 1, reads_previous sees zeros (initial state).
613        struct ReadsPrevPropagator;
614        impl Propagator for ReadsPrevPropagator {
615            fn name(&self) -> &str {
616                "reads_prev"
617            }
618            fn reads(&self) -> murk_core::FieldSet {
619                murk_core::FieldSet::empty()
620            }
621            fn reads_previous(&self) -> murk_core::FieldSet {
622                [FieldId(0)].into_iter().collect()
623            }
624            fn writes(&self) -> Vec<(FieldId, WriteMode)> {
625                vec![(FieldId(1), WriteMode::Full)]
626            }
627            fn step(
628                &self,
629                ctx: &mut murk_propagator::StepContext<'_>,
630            ) -> Result<(), murk_core::PropagatorError> {
631                let prev = ctx.reads_previous().read(FieldId(0)).unwrap().to_vec();
632                let out = ctx.writes().write(FieldId(1)).unwrap();
633                out.copy_from_slice(&prev);
634                Ok(())
635            }
636        }
637
638        let config = WorldConfig {
639            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
640            fields: vec![scalar_field("field0"), scalar_field("field1")],
641            propagators: vec![
642                Box::new(ConstPropagator::new("write_f0", FieldId(0), 99.0)),
643                Box::new(ReadsPrevPropagator),
644            ],
645            dt: 0.1,
646            seed: 42,
647            ring_buffer_size: 8,
648            max_ingress_queue: 1024,
649            tick_rate_hz: None,
650            backoff: crate::config::BackoffConfig::default(),
651        };
652        let mut engine = TickEngine::new(config).unwrap();
653
654        // Tick 1: PropA writes 99.0 to field0. ReadsPrev reads field0
655        // via reads_previous → sees base gen (all zeros).
656        engine.execute_tick().unwrap();
657        let snap = engine.snapshot();
658        // field1 should be 0.0 (base gen of field0 on first tick)
659        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 0.0);
660
661        // Tick 2: reads_previous now sees 99.0 (published from tick 1).
662        engine.execute_tick().unwrap();
663        let snap = engine.snapshot();
664        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 99.0);
665    }
666
667    #[test]
668    fn three_propagator_overlay_visibility() {
669        // A writes 7.0 to f0
670        // B reads f0 (staged → 7.0), copies to f1
671        // C reads f0 (staged → 7.0) + f1 (staged → 7.0), writes sum to f2
672        let mut engine = three_field_engine();
673        engine.execute_tick().unwrap();
674        let snap = engine.snapshot();
675
676        assert_eq!(snap.read(FieldId(0)).unwrap()[0], 7.0);
677        assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
678        assert_eq!(snap.read(FieldId(2)).unwrap()[0], 14.0); // 7 + 7
679    }
680
681    #[test]
682    fn unwritten_field_reads_from_base_gen() {
683        // On tick 1, fields start as zero (base gen). A propagator
684        // reading a field nobody wrote should see zero.
685        let mut engine = three_field_engine();
686        engine.execute_tick().unwrap();
687        // All fields are written in this pipeline, so let's just
688        // verify the snapshot is consistent.
689        let snap = engine.snapshot();
690        let f2 = snap.read(FieldId(2)).unwrap();
691        assert!(f2.iter().all(|&v| v == 14.0));
692    }
693
694    // ── Tick atomicity tests ─────────────────────────────────
695
696    #[test]
697    fn propagator_failure_no_snapshot_published() {
698        let mut engine = failing_engine(0);
699
700        // Before any tick, snapshot is at tick 0 (initial state).
701        let snap_before = engine.snapshot();
702        let tick_before = snap_before.tick_id();
703
704        // Execute tick → should fail.
705        let result = engine.execute_tick();
706        assert!(result.is_err());
707
708        // Snapshot should be unchanged.
709        let snap_after = engine.snapshot();
710        assert_eq!(snap_after.tick_id(), tick_before);
711    }
712
713    #[test]
714    fn partial_failure_rolls_back_all() {
715        // PropA writes 1.0 to field0 (succeeds), PropB fails.
716        // field0 should NOT show 1.0 in the snapshot after rollback.
717        let mut engine = partial_failure_engine();
718
719        // Snapshot before: field0 should be all zeros (or not yet published).
720        let result = engine.execute_tick();
721        assert!(result.is_err());
722
723        // field0 should still be at initial state (no publish happened).
724        let snap = engine.snapshot();
725        let f0 = snap.read(FieldId(0));
726        // On the very first tick with no prior publish, the snapshot
727        // shows initial state. No writes from PropA should be visible.
728        if let Some(data) = f0 {
729            assert!(
730                data.iter().all(|&v| v == 0.0),
731                "rollback should prevent PropA's writes from being visible"
732            );
733        }
734    }
735
736    #[test]
737    fn rollback_receipts_generated() {
738        let mut engine = failing_engine(0);
739
740        // Submit commands before the failing tick.
741        engine.submit_commands(vec![make_cmd(100)]);
742
743        let result = engine.execute_tick();
744        match result {
745            Err(TickError {
746                kind: StepError::PropagatorFailed { .. },
747                receipts,
748            }) => {
749                // Receipts must be surfaced, not silently dropped.
750                assert_eq!(receipts.len(), 1);
751                assert_eq!(receipts[0].reason_code, Some(IngressError::TickRollback));
752            }
753            other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
754        }
755    }
756
757    // ── Rollback tracking tests ─────────────────────────────
758
759    #[test]
760    fn consecutive_rollbacks_disable_ticking() {
761        let mut engine = failing_engine(0);
762
763        for _ in 0..3 {
764            let _ = engine.execute_tick();
765        }
766
767        assert!(engine.is_tick_disabled());
768        assert_eq!(engine.consecutive_rollback_count(), 3);
769    }
770
771    #[test]
772    fn success_resets_rollback_count() {
773        // Succeeds 2 times, then fails, but the first 2 successes
774        // should keep rollback count at 0.
775        let mut engine = failing_engine(10);
776
777        // Two successful ticks.
778        engine.execute_tick().unwrap();
779        engine.execute_tick().unwrap();
780        assert_eq!(engine.consecutive_rollback_count(), 0);
781        assert_eq!(engine.current_tick(), TickId(2));
782    }
783
784    #[test]
785    fn tick_disabled_rejects_immediately() {
786        let mut engine = failing_engine(0);
787
788        // Cause 3 failures to disable ticking.
789        for _ in 0..3 {
790            let _ = engine.execute_tick();
791        }
792        assert!(engine.is_tick_disabled());
793
794        // Next tick should fail immediately with TickDisabled.
795        match engine.execute_tick() {
796            Err(TickError {
797                kind: StepError::TickDisabled,
798                ..
799            }) => {}
800            other => panic!("expected TickDisabled, got {other:?}"),
801        }
802    }
803
804    #[test]
805    fn reset_clears_tick_disabled() {
806        let mut engine = failing_engine(0);
807
808        for _ in 0..3 {
809            let _ = engine.execute_tick();
810        }
811        assert!(engine.is_tick_disabled());
812
813        engine.reset().unwrap();
814        assert!(!engine.is_tick_disabled());
815        assert_eq!(engine.current_tick(), TickId(0));
816        assert_eq!(engine.consecutive_rollback_count(), 0);
817    }
818
819    // ── Integration tests ────────────────────────────────────
820
821    #[test]
822    fn single_tick_end_to_end() {
823        let mut engine = simple_engine();
824        let result = engine.execute_tick().unwrap();
825
826        let snap = engine.snapshot();
827        let data = snap.read(FieldId(0)).unwrap();
828        assert_eq!(data.len(), 10);
829        assert!(data.iter().all(|&v| v == 42.0));
830        assert_eq!(engine.current_tick(), TickId(1));
831        assert!(!result.receipts.is_empty() || result.receipts.is_empty()); // receipts exist
832    }
833
834    #[test]
835    fn multi_tick_determinism() {
836        let mut engine = simple_engine();
837
838        for _ in 0..10 {
839            engine.execute_tick().unwrap();
840        }
841
842        let snap = engine.snapshot();
843        let data = snap.read(FieldId(0)).unwrap();
844        assert!(data.iter().all(|&v| v == 42.0));
845        assert_eq!(engine.current_tick(), TickId(10));
846    }
847
848    #[test]
849    fn commands_flow_through_to_receipts() {
850        let mut engine = simple_engine();
851
852        let submit_receipts = engine.submit_commands(vec![make_cmd(100), make_cmd(100)]);
853        assert_eq!(submit_receipts.len(), 2);
854        assert!(submit_receipts.iter().all(|r| r.accepted));
855
856        let result = engine.execute_tick().unwrap();
857        // Should have receipts for the 2 commands.
858        let applied: Vec<_> = result
859            .receipts
860            .iter()
861            .filter(|r| r.applied_tick_id.is_some())
862            .collect();
863        assert_eq!(applied.len(), 2);
864        assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
865    }
866
867    // ── Metrics tests ────────────────────────────────────────
868
869    #[test]
870    fn timing_fields_populated() {
871        let mut engine = simple_engine();
872        let result = engine.execute_tick().unwrap();
873
874        // total_us is u64, so it's always >= 0; just verify the struct is populated.
875        let _ = result.metrics.total_us;
876        assert_eq!(result.metrics.propagator_us.len(), 1);
877        assert_eq!(result.metrics.propagator_us[0].0, "const");
878    }
879
880    #[test]
881    fn memory_bytes_matches_arena() {
882        let mut engine = simple_engine();
883        engine.execute_tick().unwrap();
884
885        let metrics = engine.last_metrics();
886        assert!(metrics.memory_bytes > 0);
887    }
888
889    // ── Bug-fix regression tests ─────────────────────────────
890
891    #[test]
892    fn reset_clears_pending_ingress() {
893        let mut engine = simple_engine();
894
895        // Submit commands but don't tick.
896        engine.submit_commands(vec![make_cmd(1000), make_cmd(1000)]);
897
898        // Reset should discard pending commands.
899        engine.reset().unwrap();
900
901        // Tick should produce zero receipts (no pending commands).
902        let result = engine.execute_tick().unwrap();
903        assert!(result.receipts.is_empty());
904    }
905
906    #[test]
907    fn command_index_preserved_after_reordering() {
908        let mut engine = simple_engine();
909
910        // Submit commands with different priorities — they'll be reordered.
911        // Low priority first (index 0), high priority second (index 1).
912        let cmds = vec![
913            Command {
914                payload: CommandPayload::SetParameter {
915                    key: ParameterKey(0),
916                    value: 1.0,
917                },
918                expires_after_tick: TickId(100),
919                source_id: None,
920                source_seq: None,
921                priority_class: 2, // low priority
922                arrival_seq: 0,
923            },
924            Command {
925                payload: CommandPayload::SetParameter {
926                    key: ParameterKey(0),
927                    value: 2.0,
928                },
929                expires_after_tick: TickId(100),
930                source_id: None,
931                source_seq: None,
932                priority_class: 0, // high priority — sorted first
933                arrival_seq: 0,
934            },
935        ];
936        engine.submit_commands(cmds);
937
938        let result = engine.execute_tick().unwrap();
939        // After reordering, priority_class=0 (batch index 1) executes first,
940        // priority_class=2 (batch index 0) executes second.
941        // command_index must reflect the ORIGINAL batch position.
942        assert_eq!(result.receipts.len(), 2);
943        assert_eq!(result.receipts[0].command_index, 1); // was batch[1]
944        assert_eq!(result.receipts[1].command_index, 0); // was batch[0]
945    }
946}