1use std::fmt;
14use std::time::Instant;
15
16use murk_arena::config::ArenaConfig;
17use murk_arena::pingpong::PingPongArena;
18use murk_arena::read::Snapshot;
19use murk_arena::static_arena::StaticArena;
20use murk_core::command::{Command, CommandPayload, Receipt};
21use murk_core::error::{IngressError, StepError};
22use murk_core::id::{FieldId, ParameterVersion, TickId};
23use murk_core::traits::FieldWriter;
24use murk_core::FieldMutability;
25use murk_propagator::pipeline::{ReadResolutionPlan, ReadSource};
26use murk_propagator::propagator::Propagator;
27use murk_propagator::scratch::ScratchRegion as PropagatorScratch;
28
29use crate::config::{ConfigError, WorldConfig};
30use crate::ingress::IngressQueue;
31use crate::metrics::StepMetrics;
32use crate::overlay::{BaseFieldCache, BaseFieldSet, OverlayReader, StagedFieldCache};
33
34#[derive(Debug)]
38pub struct TickResult {
39 pub receipts: Vec<Receipt>,
41 pub metrics: StepMetrics,
43}
44
45#[derive(Debug)]
53pub struct TickError {
54 pub kind: StepError,
56 pub receipts: Vec<Receipt>,
58}
59
60impl fmt::Display for TickError {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(f, "{}", self.kind)
63 }
64}
65
66impl std::error::Error for TickError {
67 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
68 Some(&self.kind)
69 }
70}
71
72pub struct TickEngine {
80 arena: PingPongArena,
81 propagators: Vec<Box<dyn Propagator>>,
82 plan: ReadResolutionPlan,
83 ingress: IngressQueue,
84 space: Box<dyn murk_space::Space>,
85 dt: f64,
86 current_tick: TickId,
87 param_version: ParameterVersion,
88 consecutive_rollback_count: u32,
89 tick_disabled: bool,
90 max_consecutive_rollbacks: u32,
91 propagator_scratch: PropagatorScratch,
92 base_field_set: BaseFieldSet,
93 base_cache: BaseFieldCache,
94 staged_cache: StagedFieldCache,
95 last_metrics: StepMetrics,
96}
97
98impl TickEngine {
99 pub fn new(config: WorldConfig) -> Result<Self, ConfigError> {
105 config.validate()?;
107 let defined_fields = config.defined_field_set();
108 let plan =
109 murk_propagator::validate_pipeline(&config.propagators, &defined_fields, config.dt)?;
110
111 let arena_field_defs: Vec<(FieldId, murk_core::FieldDef)> = config
113 .fields
114 .iter()
115 .enumerate()
116 .map(|(i, def)| (FieldId(i as u32), def.clone()))
117 .collect();
118
119 let cell_count = config.space.cell_count() as u32;
120 let arena_config = ArenaConfig::new(cell_count);
121
122 let static_fields: Vec<(FieldId, u32)> = arena_field_defs
124 .iter()
125 .filter(|(_, d)| d.mutability == FieldMutability::Static)
126 .map(|(id, d)| (*id, cell_count * d.field_type.components()))
127 .collect();
128 let static_arena = StaticArena::new(&static_fields).into_shared();
129
130 let arena = PingPongArena::new(arena_config, arena_field_defs, static_arena)?;
131
132 let base_field_set = BaseFieldSet::from_plan(&plan, &config.propagators);
134
135 let max_scratch = config
137 .propagators
138 .iter()
139 .map(|p| p.scratch_bytes())
140 .max()
141 .unwrap_or(0);
142 let propagator_scratch = PropagatorScratch::with_byte_capacity(max_scratch);
143
144 let ingress = IngressQueue::new(config.max_ingress_queue);
145
146 Ok(Self {
147 arena,
148 propagators: config.propagators,
149 plan,
150 ingress,
151 space: config.space,
152 dt: config.dt,
153 current_tick: TickId(0),
154 param_version: ParameterVersion(0),
155 consecutive_rollback_count: 0,
156 tick_disabled: false,
157 max_consecutive_rollbacks: 3,
158 propagator_scratch,
159 base_field_set,
160 base_cache: BaseFieldCache::new(),
161 staged_cache: StagedFieldCache::new(),
162 last_metrics: StepMetrics::default(),
163 })
164 }
165
166 pub fn submit_commands(&mut self, commands: Vec<Command>) -> Vec<Receipt> {
170 self.ingress.submit(commands, self.tick_disabled)
171 }
172
173 pub fn execute_tick(&mut self) -> Result<TickResult, TickError> {
179 let tick_start = Instant::now();
180
181 if self.tick_disabled {
183 return Err(TickError {
184 kind: StepError::TickDisabled,
185 receipts: Vec::new(),
186 });
187 }
188
189 let next_tick = TickId(self.current_tick.0 + 1);
190
191 {
193 let snapshot = self.arena.snapshot();
194 self.base_cache.populate(&snapshot, &self.base_field_set);
195 }
196
197 let mut guard = self.arena.begin_tick().map_err(|_| TickError {
199 kind: StepError::AllocationFailed,
200 receipts: Vec::new(),
201 })?;
202
203 let cmd_start = Instant::now();
205 let drain = self.ingress.drain(next_tick);
206 let mut receipts = drain.expired_receipts;
207 let commands = drain.commands;
208 let accepted_receipt_start = receipts.len();
209 for dc in &commands {
210 receipts.push(Receipt {
211 accepted: true,
212 applied_tick_id: None,
213 reason_code: None,
214 command_index: dc.command_index,
215 });
216 }
217 for dc in &commands {
219 if let CommandPayload::SetField {
220 ref coord,
221 field_id,
222 value,
223 } = dc.command.payload
224 {
225 if let Some(rank) = self.space.canonical_rank(coord) {
226 if let Some(buf) = guard.writer.write(field_id) {
227 if rank < buf.len() {
228 buf[rank] = value;
229 }
230 }
231 }
232 }
233 }
234 let command_processing_us = cmd_start.elapsed().as_micros() as u64;
235
236 let mut propagator_us = Vec::with_capacity(self.propagators.len());
238 for (i, prop) in self.propagators.iter().enumerate() {
239 let prop_start = Instant::now();
240
241 self.staged_cache.clear();
243 if let Some(routes) = self.plan.routes_for(i) {
244 for (&field, &source) in routes {
245 if let ReadSource::Staged { .. } = source {
246 if let Some(data) = guard.writer.read(field) {
247 self.staged_cache.insert(field, data);
248 }
249 }
250 }
251 }
252
253 let empty_routes = indexmap::IndexMap::new();
255 let routes = self.plan.routes_for(i).unwrap_or(&empty_routes);
256 let overlay = OverlayReader::new(routes, &self.base_cache, &self.staged_cache);
257
258 self.propagator_scratch.reset();
260
261 {
263 let mut ctx = murk_propagator::StepContext::new(
264 &overlay,
265 &self.base_cache,
266 &mut guard.writer,
267 &mut self.propagator_scratch,
268 self.space.as_ref(),
269 next_tick,
270 self.dt,
271 );
272
273 if let Err(reason) = prop.step(&mut ctx) {
275 let prop_name = prop.name().to_string();
278 return self.handle_rollback(
279 prop_name,
280 reason,
281 receipts,
282 accepted_receipt_start,
283 );
284 }
285 }
286
287 propagator_us.push((
288 prop.name().to_string(),
289 prop_start.elapsed().as_micros() as u64,
290 ));
291 }
292
293 let publish_start = Instant::now();
297 self.arena.publish(next_tick, self.param_version);
298 let snapshot_publish_us = publish_start.elapsed().as_micros() as u64;
299
300 self.current_tick = next_tick;
302 self.consecutive_rollback_count = 0;
303
304 for receipt in &mut receipts[accepted_receipt_start..] {
306 receipt.applied_tick_id = Some(next_tick);
307 }
308
309 let total_us = tick_start.elapsed().as_micros() as u64;
311 let metrics = StepMetrics {
312 total_us,
313 command_processing_us,
314 propagator_us,
315 snapshot_publish_us,
316 memory_bytes: self.arena.memory_bytes(),
317 };
318 self.last_metrics = metrics.clone();
319
320 Ok(TickResult { receipts, metrics })
321 }
322
323 fn handle_rollback(
328 &mut self,
329 prop_name: String,
330 reason: murk_core::PropagatorError,
331 mut receipts: Vec<Receipt>,
332 accepted_start: usize,
333 ) -> Result<TickResult, TickError> {
334 self.consecutive_rollback_count += 1;
336 if self.consecutive_rollback_count >= self.max_consecutive_rollbacks {
337 self.tick_disabled = true;
338 }
339
340 for receipt in &mut receipts[accepted_start..] {
342 receipt.accepted = true;
343 receipt.applied_tick_id = None;
344 receipt.reason_code = Some(IngressError::TickRollback);
345 }
346
347 Err(TickError {
348 kind: StepError::PropagatorFailed {
349 name: prop_name,
350 reason,
351 },
352 receipts,
353 })
354 }
355
356 pub fn reset(&mut self) -> Result<(), ConfigError> {
358 self.arena.reset().map_err(ConfigError::Arena)?;
359 self.ingress.clear();
360 self.current_tick = TickId(0);
361 self.param_version = ParameterVersion(0);
362 self.tick_disabled = false;
363 self.consecutive_rollback_count = 0;
364 self.last_metrics = StepMetrics::default();
365 Ok(())
366 }
367
368 pub fn snapshot(&self) -> Snapshot<'_> {
370 self.arena.snapshot()
371 }
372
373 pub fn owned_snapshot(&self) -> murk_arena::OwnedSnapshot {
378 self.arena.owned_snapshot()
379 }
380
381 pub fn current_tick(&self) -> TickId {
383 self.current_tick
384 }
385
386 pub fn is_tick_disabled(&self) -> bool {
388 self.tick_disabled
389 }
390
391 pub fn consecutive_rollback_count(&self) -> u32 {
393 self.consecutive_rollback_count
394 }
395
396 pub fn last_metrics(&self) -> &StepMetrics {
398 &self.last_metrics
399 }
400
401 pub fn space(&self) -> &dyn murk_space::Space {
403 self.space.as_ref()
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use murk_core::command::CommandPayload;
411 use murk_core::id::ParameterKey;
412 use murk_core::traits::{FieldReader, SnapshotAccess};
413 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
414 use murk_propagator::propagator::WriteMode;
415 use murk_space::{EdgeBehavior, Line1D};
416 use murk_test_utils::{ConstPropagator, FailingPropagator, IdentityPropagator};
417
418 fn scalar_field(name: &str) -> FieldDef {
419 FieldDef {
420 name: name.to_string(),
421 field_type: FieldType::Scalar,
422 mutability: FieldMutability::PerTick,
423 units: None,
424 bounds: None,
425 boundary_behavior: BoundaryBehavior::Clamp,
426 }
427 }
428
429 fn make_cmd(expires: u64) -> Command {
430 Command {
431 payload: CommandPayload::SetParameter {
432 key: ParameterKey(0),
433 value: 0.0,
434 },
435 expires_after_tick: TickId(expires),
436 source_id: None,
437 source_seq: None,
438 priority_class: 1,
439 arrival_seq: 0,
440 }
441 }
442
443 fn simple_engine() -> TickEngine {
444 let config = WorldConfig {
445 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
446 fields: vec![scalar_field("energy")],
447 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
448 dt: 0.1,
449 seed: 42,
450 ring_buffer_size: 8,
451 max_ingress_queue: 1024,
452 tick_rate_hz: None,
453 backoff: crate::config::BackoffConfig::default(),
454 };
455 TickEngine::new(config).unwrap()
456 }
457
458 fn two_field_engine() -> TickEngine {
459 let config = WorldConfig {
460 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
461 fields: vec![scalar_field("field0"), scalar_field("field1")],
462 propagators: vec![
463 Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
464 Box::new(IdentityPropagator::new(
465 "copy_f0_to_f1",
466 FieldId(0),
467 FieldId(1),
468 )),
469 ],
470 dt: 0.1,
471 seed: 42,
472 ring_buffer_size: 8,
473 max_ingress_queue: 1024,
474 tick_rate_hz: None,
475 backoff: crate::config::BackoffConfig::default(),
476 };
477 TickEngine::new(config).unwrap()
478 }
479
480 fn three_field_engine() -> TickEngine {
481 struct SumPropagator {
485 name: String,
486 input_a: FieldId,
487 input_b: FieldId,
488 output: FieldId,
489 }
490
491 impl SumPropagator {
492 fn new(name: &str, a: FieldId, b: FieldId, out: FieldId) -> Self {
493 Self {
494 name: name.to_string(),
495 input_a: a,
496 input_b: b,
497 output: out,
498 }
499 }
500 }
501
502 impl Propagator for SumPropagator {
503 fn name(&self) -> &str {
504 &self.name
505 }
506 fn reads(&self) -> murk_core::FieldSet {
507 [self.input_a, self.input_b].into_iter().collect()
508 }
509 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
510 vec![(self.output, WriteMode::Full)]
511 }
512 fn step(
513 &self,
514 ctx: &mut murk_propagator::StepContext<'_>,
515 ) -> Result<(), murk_core::PropagatorError> {
516 let a = ctx.reads().read(self.input_a).unwrap().to_vec();
517 let b = ctx.reads().read(self.input_b).unwrap().to_vec();
518 let out = ctx.writes().write(self.output).unwrap();
519 for i in 0..out.len() {
520 out[i] = a[i] + b[i];
521 }
522 Ok(())
523 }
524 }
525
526 let config = WorldConfig {
527 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
528 fields: vec![
529 scalar_field("field0"),
530 scalar_field("field1"),
531 scalar_field("field2"),
532 ],
533 propagators: vec![
534 Box::new(ConstPropagator::new("write_f0", FieldId(0), 7.0)),
535 Box::new(IdentityPropagator::new(
536 "copy_f0_to_f1",
537 FieldId(0),
538 FieldId(1),
539 )),
540 Box::new(SumPropagator::new(
541 "sum_f0_f1_to_f2",
542 FieldId(0),
543 FieldId(1),
544 FieldId(2),
545 )),
546 ],
547 dt: 0.1,
548 seed: 42,
549 ring_buffer_size: 8,
550 max_ingress_queue: 1024,
551 tick_rate_hz: None,
552 backoff: crate::config::BackoffConfig::default(),
553 };
554 TickEngine::new(config).unwrap()
555 }
556
557 fn failing_engine(succeed_count: usize) -> TickEngine {
558 let config = WorldConfig {
559 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
560 fields: vec![scalar_field("energy")],
561 propagators: vec![Box::new(FailingPropagator::new(
562 "fail",
563 FieldId(0),
564 succeed_count,
565 ))],
566 dt: 0.1,
567 seed: 42,
568 ring_buffer_size: 8,
569 max_ingress_queue: 1024,
570 tick_rate_hz: None,
571 backoff: crate::config::BackoffConfig::default(),
572 };
573 TickEngine::new(config).unwrap()
574 }
575
576 fn partial_failure_engine() -> TickEngine {
577 let config = WorldConfig {
579 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
580 fields: vec![scalar_field("field0"), scalar_field("field1")],
581 propagators: vec![
582 Box::new(ConstPropagator::new("ok_prop", FieldId(0), 1.0)),
583 Box::new(FailingPropagator::new("fail_prop", FieldId(1), 0)),
584 ],
585 dt: 0.1,
586 seed: 42,
587 ring_buffer_size: 8,
588 max_ingress_queue: 1024,
589 tick_rate_hz: None,
590 backoff: crate::config::BackoffConfig::default(),
591 };
592 TickEngine::new(config).unwrap()
593 }
594
595 #[test]
598 fn staged_read_sees_prior_propagator_write() {
599 let mut engine = two_field_engine();
601 let result = engine.execute_tick().unwrap();
602 let snap = engine.snapshot();
603 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
605 assert!(result.metrics.total_us > 0);
606 }
607
608 #[test]
609 fn reads_previous_sees_base_gen() {
610 struct ReadsPrevPropagator;
614 impl Propagator for ReadsPrevPropagator {
615 fn name(&self) -> &str {
616 "reads_prev"
617 }
618 fn reads(&self) -> murk_core::FieldSet {
619 murk_core::FieldSet::empty()
620 }
621 fn reads_previous(&self) -> murk_core::FieldSet {
622 [FieldId(0)].into_iter().collect()
623 }
624 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
625 vec![(FieldId(1), WriteMode::Full)]
626 }
627 fn step(
628 &self,
629 ctx: &mut murk_propagator::StepContext<'_>,
630 ) -> Result<(), murk_core::PropagatorError> {
631 let prev = ctx.reads_previous().read(FieldId(0)).unwrap().to_vec();
632 let out = ctx.writes().write(FieldId(1)).unwrap();
633 out.copy_from_slice(&prev);
634 Ok(())
635 }
636 }
637
638 let config = WorldConfig {
639 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
640 fields: vec![scalar_field("field0"), scalar_field("field1")],
641 propagators: vec![
642 Box::new(ConstPropagator::new("write_f0", FieldId(0), 99.0)),
643 Box::new(ReadsPrevPropagator),
644 ],
645 dt: 0.1,
646 seed: 42,
647 ring_buffer_size: 8,
648 max_ingress_queue: 1024,
649 tick_rate_hz: None,
650 backoff: crate::config::BackoffConfig::default(),
651 };
652 let mut engine = TickEngine::new(config).unwrap();
653
654 engine.execute_tick().unwrap();
657 let snap = engine.snapshot();
658 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 0.0);
660
661 engine.execute_tick().unwrap();
663 let snap = engine.snapshot();
664 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 99.0);
665 }
666
667 #[test]
668 fn three_propagator_overlay_visibility() {
669 let mut engine = three_field_engine();
673 engine.execute_tick().unwrap();
674 let snap = engine.snapshot();
675
676 assert_eq!(snap.read(FieldId(0)).unwrap()[0], 7.0);
677 assert_eq!(snap.read(FieldId(1)).unwrap()[0], 7.0);
678 assert_eq!(snap.read(FieldId(2)).unwrap()[0], 14.0); }
680
681 #[test]
682 fn unwritten_field_reads_from_base_gen() {
683 let mut engine = three_field_engine();
686 engine.execute_tick().unwrap();
687 let snap = engine.snapshot();
690 let f2 = snap.read(FieldId(2)).unwrap();
691 assert!(f2.iter().all(|&v| v == 14.0));
692 }
693
694 #[test]
697 fn propagator_failure_no_snapshot_published() {
698 let mut engine = failing_engine(0);
699
700 let snap_before = engine.snapshot();
702 let tick_before = snap_before.tick_id();
703
704 let result = engine.execute_tick();
706 assert!(result.is_err());
707
708 let snap_after = engine.snapshot();
710 assert_eq!(snap_after.tick_id(), tick_before);
711 }
712
713 #[test]
714 fn partial_failure_rolls_back_all() {
715 let mut engine = partial_failure_engine();
718
719 let result = engine.execute_tick();
721 assert!(result.is_err());
722
723 let snap = engine.snapshot();
725 let f0 = snap.read(FieldId(0));
726 if let Some(data) = f0 {
729 assert!(
730 data.iter().all(|&v| v == 0.0),
731 "rollback should prevent PropA's writes from being visible"
732 );
733 }
734 }
735
736 #[test]
737 fn rollback_receipts_generated() {
738 let mut engine = failing_engine(0);
739
740 engine.submit_commands(vec![make_cmd(100)]);
742
743 let result = engine.execute_tick();
744 match result {
745 Err(TickError {
746 kind: StepError::PropagatorFailed { .. },
747 receipts,
748 }) => {
749 assert_eq!(receipts.len(), 1);
751 assert_eq!(receipts[0].reason_code, Some(IngressError::TickRollback));
752 }
753 other => panic!("expected PropagatorFailed with receipts, got {other:?}"),
754 }
755 }
756
757 #[test]
760 fn consecutive_rollbacks_disable_ticking() {
761 let mut engine = failing_engine(0);
762
763 for _ in 0..3 {
764 let _ = engine.execute_tick();
765 }
766
767 assert!(engine.is_tick_disabled());
768 assert_eq!(engine.consecutive_rollback_count(), 3);
769 }
770
771 #[test]
772 fn success_resets_rollback_count() {
773 let mut engine = failing_engine(10);
776
777 engine.execute_tick().unwrap();
779 engine.execute_tick().unwrap();
780 assert_eq!(engine.consecutive_rollback_count(), 0);
781 assert_eq!(engine.current_tick(), TickId(2));
782 }
783
784 #[test]
785 fn tick_disabled_rejects_immediately() {
786 let mut engine = failing_engine(0);
787
788 for _ in 0..3 {
790 let _ = engine.execute_tick();
791 }
792 assert!(engine.is_tick_disabled());
793
794 match engine.execute_tick() {
796 Err(TickError {
797 kind: StepError::TickDisabled,
798 ..
799 }) => {}
800 other => panic!("expected TickDisabled, got {other:?}"),
801 }
802 }
803
804 #[test]
805 fn reset_clears_tick_disabled() {
806 let mut engine = failing_engine(0);
807
808 for _ in 0..3 {
809 let _ = engine.execute_tick();
810 }
811 assert!(engine.is_tick_disabled());
812
813 engine.reset().unwrap();
814 assert!(!engine.is_tick_disabled());
815 assert_eq!(engine.current_tick(), TickId(0));
816 assert_eq!(engine.consecutive_rollback_count(), 0);
817 }
818
819 #[test]
822 fn single_tick_end_to_end() {
823 let mut engine = simple_engine();
824 let result = engine.execute_tick().unwrap();
825
826 let snap = engine.snapshot();
827 let data = snap.read(FieldId(0)).unwrap();
828 assert_eq!(data.len(), 10);
829 assert!(data.iter().all(|&v| v == 42.0));
830 assert_eq!(engine.current_tick(), TickId(1));
831 assert!(!result.receipts.is_empty() || result.receipts.is_empty()); }
833
834 #[test]
835 fn multi_tick_determinism() {
836 let mut engine = simple_engine();
837
838 for _ in 0..10 {
839 engine.execute_tick().unwrap();
840 }
841
842 let snap = engine.snapshot();
843 let data = snap.read(FieldId(0)).unwrap();
844 assert!(data.iter().all(|&v| v == 42.0));
845 assert_eq!(engine.current_tick(), TickId(10));
846 }
847
848 #[test]
849 fn commands_flow_through_to_receipts() {
850 let mut engine = simple_engine();
851
852 let submit_receipts = engine.submit_commands(vec![make_cmd(100), make_cmd(100)]);
853 assert_eq!(submit_receipts.len(), 2);
854 assert!(submit_receipts.iter().all(|r| r.accepted));
855
856 let result = engine.execute_tick().unwrap();
857 let applied: Vec<_> = result
859 .receipts
860 .iter()
861 .filter(|r| r.applied_tick_id.is_some())
862 .collect();
863 assert_eq!(applied.len(), 2);
864 assert!(applied.iter().all(|r| r.applied_tick_id == Some(TickId(1))));
865 }
866
867 #[test]
870 fn timing_fields_populated() {
871 let mut engine = simple_engine();
872 let result = engine.execute_tick().unwrap();
873
874 let _ = result.metrics.total_us;
876 assert_eq!(result.metrics.propagator_us.len(), 1);
877 assert_eq!(result.metrics.propagator_us[0].0, "const");
878 }
879
880 #[test]
881 fn memory_bytes_matches_arena() {
882 let mut engine = simple_engine();
883 engine.execute_tick().unwrap();
884
885 let metrics = engine.last_metrics();
886 assert!(metrics.memory_bytes > 0);
887 }
888
889 #[test]
892 fn reset_clears_pending_ingress() {
893 let mut engine = simple_engine();
894
895 engine.submit_commands(vec![make_cmd(1000), make_cmd(1000)]);
897
898 engine.reset().unwrap();
900
901 let result = engine.execute_tick().unwrap();
903 assert!(result.receipts.is_empty());
904 }
905
906 #[test]
907 fn command_index_preserved_after_reordering() {
908 let mut engine = simple_engine();
909
910 let cmds = vec![
913 Command {
914 payload: CommandPayload::SetParameter {
915 key: ParameterKey(0),
916 value: 1.0,
917 },
918 expires_after_tick: TickId(100),
919 source_id: None,
920 source_seq: None,
921 priority_class: 2, arrival_seq: 0,
923 },
924 Command {
925 payload: CommandPayload::SetParameter {
926 key: ParameterKey(0),
927 value: 2.0,
928 },
929 expires_after_tick: TickId(100),
930 source_id: None,
931 source_seq: None,
932 priority_class: 0, arrival_seq: 0,
934 },
935 ];
936 engine.submit_commands(cmds);
937
938 let result = engine.execute_tick().unwrap();
939 assert_eq!(result.receipts.len(), 2);
943 assert_eq!(result.receipts[0].command_index, 1); assert_eq!(result.receipts[1].command_index, 0); }
946}