1use crate::app::{
13 CuDistributedReplayApplication, CuRecordedReplayApplication, CuSimApplication, Subsystem,
14};
15use crate::config::{MultiCopperConfig, read_configuration_str, read_multi_configuration};
16use crate::copperlist::CopperList;
17use crate::curuntime::{
18 KeyFrame, RuntimeLifecycleConfigSource, RuntimeLifecycleEvent, RuntimeLifecycleRecord,
19 RuntimeLifecycleStackInfo,
20};
21use crate::debug::{
22 SectionIndexEntry, build_read_logger, decode_copperlists, index_log, read_section_at,
23};
24use crate::simulation::recorded_copperlist_timestamp;
25use bincode::config::standard;
26use bincode::decode_from_std_read;
27use bincode::error::DecodeError;
28use cu29_clock::{RobotClock, RobotClockMock};
29use cu29_traits::{CopperListTuple, CuError, CuResult, ErasedCuStampedDataSet, UnifiedLogType};
30use cu29_unifiedlog::memmap::MmapSectionStorage;
31use cu29_unifiedlog::{
32 NoopLogger, NoopSectionStorage, SectionStorage, UnifiedLogWrite, UnifiedLogger,
33 UnifiedLoggerBuilder, UnifiedLoggerIOReader, UnifiedLoggerRead, UnifiedLoggerWrite,
34};
35use std::any::type_name;
36use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
37use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
38use std::fs;
39use std::io::Read;
40use std::path::{Path, PathBuf};
41use std::sync::{Arc, Mutex};
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct DistributedReplayLog {
46 pub base_path: PathBuf,
47 pub stack: RuntimeLifecycleStackInfo,
48 pub config_source: RuntimeLifecycleConfigSource,
49 pub effective_config_ron: String,
50 pub mission: Option<String>,
51}
52
53impl DistributedReplayLog {
54 pub fn discover(path: impl AsRef<Path>) -> CuResult<Self> {
57 let requested_path = path.as_ref();
58 let normalized_path = normalize_candidate_log_base(requested_path);
59 match Self::discover_from_base_path(requested_path) {
60 Ok(log) => Ok(log),
61 Err(_) if normalized_path != requested_path => {
62 Self::discover_from_base_path(&normalized_path)
63 }
64 Err(err) => Err(err),
65 }
66 }
67
68 fn discover_from_base_path(base_path: &Path) -> CuResult<Self> {
69 let UnifiedLogger::Read(read_logger) = UnifiedLoggerBuilder::new()
70 .file_base_name(base_path)
71 .build()
72 .map_err(|err| {
73 CuError::new_with_cause(
74 &format!(
75 "Failed to open Copper log '{}' for distributed replay discovery",
76 base_path.display()
77 ),
78 err,
79 )
80 })?
81 else {
82 return Err(CuError::from(
83 "Expected a readable unified logger during distributed replay discovery",
84 ));
85 };
86
87 let mut reader = UnifiedLoggerIOReader::new(read_logger, UnifiedLogType::RuntimeLifecycle);
88 let mut instantiated: Option<(
89 RuntimeLifecycleConfigSource,
90 String,
91 RuntimeLifecycleStackInfo,
92 )> = None;
93 let mut mission = None;
94
95 while let Some(record) =
96 read_next_entry::<RuntimeLifecycleRecord>(&mut reader).map_err(|err| {
97 CuError::from(format!(
98 "Failed to decode runtime lifecycle for '{}': {err}",
99 base_path.display()
100 ))
101 })?
102 {
103 match record.event {
104 RuntimeLifecycleEvent::Instantiated {
105 config_source,
106 effective_config_ron,
107 stack,
108 } if instantiated.is_none() => {
109 instantiated = Some((config_source, effective_config_ron, stack));
110 }
111 RuntimeLifecycleEvent::MissionStarted {
112 mission: started_mission,
113 } if mission.is_none() => {
114 mission = Some(started_mission);
115 }
116 _ => {}
117 }
118
119 if instantiated.is_some() && mission.is_some() {
120 break;
121 }
122 }
123
124 let Some((config_source, effective_config_ron, stack)) = instantiated else {
125 return Err(CuError::from(format!(
126 "Copper log '{}' has no RuntimeLifecycle::Instantiated record",
127 base_path.display()
128 )));
129 };
130
131 Ok(Self {
132 base_path: base_path.to_path_buf(),
133 stack,
134 config_source,
135 effective_config_ron,
136 mission,
137 })
138 }
139
140 #[inline]
141 pub fn instance_id(&self) -> u32 {
142 self.stack.instance_id
143 }
144
145 #[inline]
146 pub fn subsystem_code(&self) -> u16 {
147 self.stack.subsystem_code
148 }
149
150 #[inline]
151 pub fn subsystem_id(&self) -> Option<&str> {
152 self.stack.subsystem_id.as_deref()
153 }
154}
155
156#[derive(Debug, Clone)]
158pub struct DistributedReplayDiscoveryFailure {
159 pub candidate_path: PathBuf,
160 pub error: String,
161}
162
163impl Display for DistributedReplayDiscoveryFailure {
164 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
165 write!(
166 f,
167 "{}: {}",
168 self.candidate_path.display(),
169 self.error.as_str()
170 )
171 }
172}
173
174#[derive(Debug, Clone, Default)]
176pub struct DistributedReplayCatalog {
177 pub logs: Vec<DistributedReplayLog>,
178 pub failures: Vec<DistributedReplayDiscoveryFailure>,
179}
180
181impl DistributedReplayCatalog {
182 pub fn discover<I, P>(inputs: I) -> CuResult<Self>
187 where
188 I: IntoIterator<Item = P>,
189 P: AsRef<Path>,
190 {
191 let mut candidates = BTreeSet::new();
192 for input in inputs {
193 collect_candidate_base_paths(input.as_ref(), &mut candidates)?;
194 }
195
196 let mut logs = Vec::new();
197 let mut failures = Vec::new();
198
199 for candidate in candidates {
200 match DistributedReplayLog::discover(&candidate) {
201 Ok(log) => logs.push(log),
202 Err(err) => failures.push(DistributedReplayDiscoveryFailure {
203 candidate_path: candidate,
204 error: err.to_string(),
205 }),
206 }
207 }
208
209 logs.sort_by(|left, right| {
210 (
211 left.instance_id(),
212 left.subsystem_code(),
213 left.subsystem_id(),
214 left.base_path.as_os_str(),
215 )
216 .cmp(&(
217 right.instance_id(),
218 right.subsystem_code(),
219 right.subsystem_id(),
220 right.base_path.as_os_str(),
221 ))
222 });
223 failures.sort_by(|left, right| left.candidate_path.cmp(&right.candidate_path));
224
225 Ok(Self { logs, failures })
226 }
227
228 pub fn discover_under(root: impl AsRef<Path>) -> CuResult<Self> {
230 Self::discover([root])
231 }
232}
233
234type DistributedReplaySessionFactory = fn(
235 &DistributedReplayAssignment,
236 &DistributedReplaySessionConfig,
237) -> CuResult<DistributedReplaySessionBuild>;
238
239const DEFAULT_SECTION_CACHE_CAP: usize = 8;
240const DEFAULT_REPLAY_LOG_SIZE_BYTES: usize = 64 * 1024 * 1024;
241
242#[derive(Debug, Clone, Default)]
243struct DistributedReplaySessionConfig {
244 output_root: Option<PathBuf>,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
248struct DistributedReplayOriginKey {
249 instance_id: u32,
250 subsystem_code: u16,
251 cl_id: u64,
252}
253
254#[derive(Debug, Clone, PartialEq, Eq, Hash)]
255pub struct DistributedReplayCursor {
256 pub instance_id: u32,
257 pub subsystem_id: String,
258 pub cl_id: u64,
259 subsystem_code: u16,
260}
261
262impl DistributedReplayCursor {
263 #[inline]
264 fn new(instance_id: u32, subsystem_id: String, subsystem_code: u16, cl_id: u64) -> Self {
265 Self {
266 instance_id,
267 subsystem_id,
268 cl_id,
269 subsystem_code,
270 }
271 }
272
273 #[inline]
274 pub fn subsystem_code(&self) -> u16 {
275 self.subsystem_code
276 }
277}
278
279#[derive(Debug, Clone)]
280struct DistributedReplayNodeDescriptor {
281 cursor: DistributedReplayCursor,
282 origin_key: DistributedReplayOriginKey,
283 incoming_origins: BTreeSet<DistributedReplayOriginKey>,
284}
285
286#[derive(Debug, Clone)]
287struct DistributedReplayGraphNode {
288 cursor: DistributedReplayCursor,
289 session_index: usize,
290 outgoing: Vec<usize>,
291 initial_dependencies: usize,
292 remaining_dependencies: usize,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
296struct DistributedReplayReadyNode {
297 instance_id: u32,
298 subsystem_code: u16,
299 cl_id: u64,
300 node_index: usize,
301}
302
303struct DistributedReplaySessionBuild {
304 session: Box<dyn DistributedReplaySession>,
305 nodes: Vec<DistributedReplayNodeDescriptor>,
306 output_log_path: Option<PathBuf>,
307}
308
309trait DistributedReplaySession {
310 fn goto_cl(&mut self, cl_id: u64) -> CuResult<()>;
311 fn shutdown(&mut self) -> CuResult<()>;
312}
313
314#[derive(Debug, Clone)]
315struct RecordedReplayCachedSection<P: CopperListTuple> {
316 entries: Vec<Arc<CopperList<P>>>,
317}
318
319struct RecordedReplaySession<App, P, S, L>
320where
321 App: CuDistributedReplayApplication<S, L>,
322 P: CopperListTuple,
323 S: SectionStorage,
324 L: UnifiedLogWrite<S> + 'static,
325{
326 assignment: DistributedReplayAssignment,
327 app: App,
328 clock_mock: RobotClockMock,
329 log_reader: UnifiedLoggerRead,
330 sections: Vec<SectionIndexEntry>,
331 total_entries: usize,
332 keyframes: Vec<KeyFrame>,
333 started: bool,
334 current_idx: Option<usize>,
335 last_keyframe: Option<u64>,
336 cache: HashMap<usize, RecordedReplayCachedSection<P>>,
337 cache_order: VecDeque<usize>,
338 cache_cap: usize,
339 phantom: std::marker::PhantomData<(S, L)>,
340}
341
342impl<App, P, S, L> RecordedReplaySession<App, P, S, L>
343where
344 App: CuDistributedReplayApplication<S, L>
345 + CuRecordedReplayApplication<S, L, RecordedDataSet = P>,
346 P: CopperListTuple,
347 S: SectionStorage,
348 L: UnifiedLogWrite<S> + 'static,
349{
350 fn from_log(
351 assignment: DistributedReplayAssignment,
352 app: App,
353 clock_mock: RobotClockMock,
354 log_base: &Path,
355 ) -> CuResult<Self> {
356 let (sections, keyframes, total_entries) =
357 index_log::<P, _>(log_base, &recorded_copperlist_timestamp::<P>)?;
358 let log_reader = build_read_logger(log_base)?;
359 Ok(Self {
360 assignment,
361 app,
362 clock_mock,
363 log_reader,
364 sections,
365 total_entries,
366 keyframes,
367 started: false,
368 current_idx: None,
369 last_keyframe: None,
370 cache: HashMap::new(),
371 cache_order: VecDeque::new(),
372 cache_cap: DEFAULT_SECTION_CACHE_CAP,
373 phantom: std::marker::PhantomData,
374 })
375 }
376
377 fn describe_nodes(&mut self) -> CuResult<Vec<DistributedReplayNodeDescriptor>> {
378 let mut nodes = Vec::with_capacity(self.total_entries);
379 for idx in 0..self.total_entries {
380 let (copperlist, _) = self.copperlist_at(idx)?;
381 let cursor = DistributedReplayCursor::new(
382 self.assignment.instance_id,
383 self.assignment.subsystem_id.clone(),
384 self.assignment.log.subsystem_code(),
385 copperlist.id,
386 );
387 nodes.push(DistributedReplayNodeDescriptor {
388 origin_key: DistributedReplayOriginKey {
389 instance_id: cursor.instance_id,
390 subsystem_code: cursor.subsystem_code(),
391 cl_id: cursor.cl_id,
392 },
393 incoming_origins: copperlist_origins(copperlist.as_ref()),
394 cursor,
395 });
396 }
397 Ok(nodes)
398 }
399
400 fn ensure_started(&mut self) -> CuResult<()> {
401 if self.started {
402 return Ok(());
403 }
404 let mut noop = |_step: App::Step<'_>| crate::simulation::SimOverride::ExecuteByRuntime;
405 <App as CuSimApplication<S, L>>::start_all_tasks(&mut self.app, &mut noop)?;
406 self.started = true;
407 Ok(())
408 }
409
410 fn nearest_keyframe(&self, target_cl_id: u64) -> Option<KeyFrame> {
411 self.keyframes
412 .iter()
413 .filter(|keyframe| keyframe.culistid <= target_cl_id)
414 .max_by_key(|keyframe| keyframe.culistid)
415 .cloned()
416 }
417
418 fn restore_keyframe(&mut self, keyframe: &KeyFrame) -> CuResult<()> {
419 <App as CuSimApplication<S, L>>::restore_keyframe(&mut self.app, keyframe)?;
420 self.clock_mock.set_value(keyframe.timestamp.as_nanos());
421 self.last_keyframe = Some(keyframe.culistid);
422 Ok(())
423 }
424
425 fn find_section_for_index(&self, idx: usize) -> Option<usize> {
426 self.sections
427 .binary_search_by(|section| {
428 if idx < section.start_idx {
429 std::cmp::Ordering::Greater
430 } else if idx >= section.start_idx + section.len {
431 std::cmp::Ordering::Less
432 } else {
433 std::cmp::Ordering::Equal
434 }
435 })
436 .ok()
437 }
438
439 fn find_section_for_cl_id(&self, cl_id: u64) -> Option<usize> {
440 self.sections
441 .binary_search_by(|section| {
442 if cl_id < section.first_id {
443 std::cmp::Ordering::Greater
444 } else if cl_id > section.last_id {
445 std::cmp::Ordering::Less
446 } else {
447 std::cmp::Ordering::Equal
448 }
449 })
450 .ok()
451 }
452
453 fn touch_cache(&mut self, key: usize) {
454 if let Some(position) = self.cache_order.iter().position(|entry| *entry == key) {
455 self.cache_order.remove(position);
456 }
457 self.cache_order.push_back(key);
458 while self.cache_order.len() > self.cache_cap {
459 if let Some(oldest) = self.cache_order.pop_front() {
460 self.cache.remove(&oldest);
461 }
462 }
463 }
464
465 fn load_section(&mut self, section_idx: usize) -> CuResult<&RecordedReplayCachedSection<P>> {
466 if self.cache.contains_key(§ion_idx) {
467 self.touch_cache(section_idx);
468 return Ok(self.cache.get(§ion_idx).expect("cache entry exists"));
469 }
470
471 let entry = &self.sections[section_idx];
472 let (header, data) = read_section_at(&mut self.log_reader, entry.pos)?;
473 if header.entry_type != UnifiedLogType::CopperList {
474 return Err(CuError::from(
475 "Section type mismatch while loading distributed replay copperlists",
476 ));
477 }
478 let (entries, _) = decode_copperlists::<P, _>(&data, &recorded_copperlist_timestamp::<P>)?;
479 self.cache
480 .insert(section_idx, RecordedReplayCachedSection { entries });
481 self.touch_cache(section_idx);
482 Ok(self.cache.get(§ion_idx).expect("cache entry exists"))
483 }
484
485 fn copperlist_at(&mut self, idx: usize) -> CuResult<(Arc<CopperList<P>>, Option<KeyFrame>)> {
486 let section_idx = self
487 .find_section_for_index(idx)
488 .ok_or_else(|| CuError::from("Distributed replay index is outside the log"))?;
489 let start_idx = self.sections[section_idx].start_idx;
490 let section = self.load_section(section_idx)?;
491 let local_idx = idx - start_idx;
492 let copperlist = section
493 .entries
494 .get(local_idx)
495 .ok_or_else(|| CuError::from("Corrupt distributed replay section index"))?
496 .clone();
497 let keyframe = self
498 .keyframes
499 .iter()
500 .find(|keyframe| keyframe.culistid == copperlist.id)
501 .cloned();
502 Ok((copperlist, keyframe))
503 }
504
505 fn index_for_cl_id(&mut self, cl_id: u64) -> CuResult<usize> {
506 let section_idx = self
507 .find_section_for_cl_id(cl_id)
508 .ok_or_else(|| CuError::from("Requested CopperList id is not present in the log"))?;
509 let start_idx = self.sections[section_idx].start_idx;
510 let section = self.load_section(section_idx)?;
511 for (offset, copperlist) in section.entries.iter().enumerate() {
512 if copperlist.id == cl_id {
513 return Ok(start_idx + offset);
514 }
515 }
516 Err(CuError::from(
517 "Requested CopperList id is missing from its indexed log section",
518 ))
519 }
520
521 fn replay_range(
522 &mut self,
523 start_idx: usize,
524 end_idx: usize,
525 replay_keyframe: Option<&KeyFrame>,
526 ) -> CuResult<()> {
527 for idx in start_idx..=end_idx {
528 let (copperlist, keyframe) = self.copperlist_at(idx)?;
529 let keyframe = replay_keyframe
530 .filter(|candidate| candidate.culistid == copperlist.id)
531 .or(keyframe
532 .as_ref()
533 .filter(|candidate| candidate.culistid == copperlist.id));
534 <App as CuRecordedReplayApplication<S, L>>::replay_recorded_copperlist(
535 &mut self.app,
536 &self.clock_mock,
537 copperlist.as_ref(),
538 keyframe,
539 )?;
540 self.current_idx = Some(idx);
541 }
542 Ok(())
543 }
544
545 fn goto_index(&mut self, target_idx: usize) -> CuResult<()> {
546 self.ensure_started()?;
547 if target_idx >= self.total_entries {
548 return Err(CuError::from(
549 "Distributed replay target is outside the log",
550 ));
551 }
552
553 let (target_copperlist, _) = self.copperlist_at(target_idx)?;
554 let target_cl_id = target_copperlist.id;
555
556 let replay_start_idx;
557 let replay_keyframe;
558
559 if let Some(current_idx) = self.current_idx {
560 if current_idx == target_idx {
561 return Ok(());
562 }
563
564 if target_idx > current_idx {
565 replay_start_idx = current_idx + 1;
566 replay_keyframe = None;
567 } else {
568 let keyframe = self.nearest_keyframe(target_cl_id).ok_or_else(|| {
569 CuError::from("No keyframe is available to rewind distributed replay")
570 })?;
571 self.restore_keyframe(&keyframe)?;
572 replay_start_idx = self.index_for_cl_id(keyframe.culistid)?;
573 replay_keyframe = Some(keyframe);
574 }
575 } else {
576 let keyframe = self.nearest_keyframe(target_cl_id).ok_or_else(|| {
577 CuError::from("No keyframe is available to initialize distributed replay")
578 })?;
579 self.restore_keyframe(&keyframe)?;
580 replay_start_idx = self.index_for_cl_id(keyframe.culistid)?;
581 replay_keyframe = Some(keyframe);
582 }
583
584 self.replay_range(replay_start_idx, target_idx, replay_keyframe.as_ref())
585 }
586}
587
588impl<App, P, S, L> DistributedReplaySession for RecordedReplaySession<App, P, S, L>
589where
590 App: CuDistributedReplayApplication<S, L>
591 + CuRecordedReplayApplication<S, L, RecordedDataSet = P>,
592 P: CopperListTuple + 'static,
593 S: SectionStorage,
594 L: UnifiedLogWrite<S> + 'static,
595{
596 fn goto_cl(&mut self, cl_id: u64) -> CuResult<()> {
597 let target_idx = self.index_for_cl_id(cl_id)?;
598 self.goto_index(target_idx)
599 }
600
601 fn shutdown(&mut self) -> CuResult<()> {
602 if !self.started {
603 return Ok(());
604 }
605
606 let mut noop = |_step: App::Step<'_>| crate::simulation::SimOverride::ExecuteByRuntime;
607 <App as CuSimApplication<S, L>>::stop_all_tasks(&mut self.app, &mut noop)?;
608 self.started = false;
609 Ok(())
610 }
611}
612
613#[derive(Clone)]
615pub struct DistributedReplayAppRegistration {
616 pub subsystem: Subsystem,
617 pub app_type_name: &'static str,
618 session_factory: DistributedReplaySessionFactory,
619}
620
621impl Debug for DistributedReplayAppRegistration {
622 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
623 f.debug_struct("DistributedReplayAppRegistration")
624 .field("subsystem", &self.subsystem)
625 .field("app_type_name", &self.app_type_name)
626 .finish()
627 }
628}
629
630impl PartialEq for DistributedReplayAppRegistration {
631 fn eq(&self, other: &Self) -> bool {
632 self.subsystem == other.subsystem && self.app_type_name == other.app_type_name
633 }
634}
635
636impl Eq for DistributedReplayAppRegistration {}
637
638#[derive(Debug, Clone, PartialEq, Eq)]
640pub struct DistributedReplayAssignment {
641 pub instance_id: u32,
642 pub subsystem_id: String,
643 pub log: DistributedReplayLog,
644 pub registration: DistributedReplayAppRegistration,
645}
646
647#[derive(Debug, Clone)]
649pub struct DistributedReplayPlan {
650 pub multi_config_path: PathBuf,
651 pub multi_config: MultiCopperConfig,
652 pub catalog: DistributedReplayCatalog,
653 pub selected_instances: Vec<u32>,
654 pub mission: Option<String>,
655 pub registrations: Vec<DistributedReplayAppRegistration>,
656 pub assignments: Vec<DistributedReplayAssignment>,
657}
658
659impl DistributedReplayPlan {
660 #[inline]
661 pub fn builder(multi_config_path: impl AsRef<Path>) -> CuResult<DistributedReplayBuilder> {
662 DistributedReplayBuilder::new(multi_config_path)
663 }
664
665 #[inline]
666 pub fn assignment(
667 &self,
668 instance_id: u32,
669 subsystem_id: &str,
670 ) -> Option<&DistributedReplayAssignment> {
671 self.assignments.iter().find(|assignment| {
672 assignment.instance_id == instance_id && assignment.subsystem_id == subsystem_id
673 })
674 }
675
676 pub fn start(self) -> CuResult<DistributedReplayEngine> {
678 DistributedReplayEngine::new(self, DistributedReplaySessionConfig::default())
679 }
680
681 pub fn start_recording_logs_under(
683 self,
684 output_root: impl AsRef<Path>,
685 ) -> CuResult<DistributedReplayEngine> {
686 DistributedReplayEngine::new(
687 self,
688 DistributedReplaySessionConfig {
689 output_root: Some(output_root.as_ref().to_path_buf()),
690 },
691 )
692 }
693}
694
695#[derive(Debug, Clone, Default)]
697pub struct DistributedReplayValidationError {
698 pub issues: Vec<String>,
699}
700
701impl DistributedReplayValidationError {
702 fn push(&mut self, issue: impl Into<String>) {
703 self.issues.push(issue.into());
704 }
705
706 fn is_empty(&self) -> bool {
707 self.issues.is_empty()
708 }
709}
710
711impl Display for DistributedReplayValidationError {
712 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
713 writeln!(f, "Distributed replay validation failed:")?;
714 for issue in &self.issues {
715 writeln!(f, " - {issue}")?;
716 }
717 Ok(())
718 }
719}
720
721#[derive(Debug, Clone)]
723pub struct DistributedReplayBuilder {
724 multi_config_path: PathBuf,
725 multi_config: MultiCopperConfig,
726 discovery_inputs: Vec<PathBuf>,
727 catalog: Option<DistributedReplayCatalog>,
728 registrations: BTreeMap<String, DistributedReplayAppRegistration>,
729 selected_instances: Option<BTreeSet<u32>>,
730}
731
732impl DistributedReplayBuilder {
733 pub fn new(multi_config_path: impl AsRef<Path>) -> CuResult<Self> {
735 let multi_config_path = multi_config_path.as_ref().to_path_buf();
736 let multi_config = read_multi_configuration(&multi_config_path.to_string_lossy())?;
737 Ok(Self {
738 multi_config_path,
739 multi_config,
740 discovery_inputs: Vec::new(),
741 catalog: None,
742 registrations: BTreeMap::new(),
743 selected_instances: None,
744 })
745 }
746
747 pub fn with_catalog(mut self, catalog: DistributedReplayCatalog) -> Self {
749 self.catalog = Some(catalog);
750 self
751 }
752
753 pub fn discover_logs<I, P>(mut self, inputs: I) -> CuResult<Self>
757 where
758 I: IntoIterator<Item = P>,
759 P: AsRef<Path>,
760 {
761 self.discovery_inputs
762 .extend(inputs.into_iter().map(|path| path.as_ref().to_path_buf()));
763 self.catalog = Some(DistributedReplayCatalog::discover(
764 self.discovery_inputs.iter().collect::<Vec<_>>(),
765 )?);
766 Ok(self)
767 }
768
769 pub fn discover_logs_under(self, root: impl AsRef<Path>) -> CuResult<Self> {
771 self.discover_logs([root.as_ref().to_path_buf()])
772 }
773
774 pub fn instances<I>(mut self, instances: I) -> Self
776 where
777 I: IntoIterator<Item = u32>,
778 {
779 self.selected_instances = Some(instances.into_iter().collect());
780 self
781 }
782
783 pub fn register<App>(mut self, subsystem_id: &str) -> CuResult<Self>
785 where
786 App: CuDistributedReplayApplication<NoopSectionStorage, NoopLogger>
787 + CuDistributedReplayApplication<MmapSectionStorage, UnifiedLoggerWrite>
788 + 'static,
789 {
790 if self.registrations.contains_key(subsystem_id) {
791 return Err(CuError::from(format!(
792 "Subsystem '{}' is already registered for distributed replay",
793 subsystem_id
794 )));
795 }
796
797 let expected_subsystem = self.multi_config.subsystem(subsystem_id).ok_or_else(|| {
798 CuError::from(format!(
799 "Multi-Copper config '{}' does not define subsystem '{}'",
800 self.multi_config_path.display(),
801 subsystem_id
802 ))
803 })?;
804
805 let registered_subsystem = App::subsystem();
806 let Some(registered_subsystem_id) = registered_subsystem.id() else {
807 return Err(CuError::from(format!(
808 "App type '{}' was not generated for a multi-Copper subsystem and cannot be registered for distributed replay",
809 type_name::<App>()
810 )));
811 };
812
813 if registered_subsystem_id != subsystem_id {
814 return Err(CuError::from(format!(
815 "App type '{}' declares subsystem '{}' but was registered as '{}'",
816 type_name::<App>(),
817 registered_subsystem_id,
818 subsystem_id
819 )));
820 }
821
822 let registered_subsystem_code = registered_subsystem.code();
823 if registered_subsystem_code != expected_subsystem.subsystem_code {
824 return Err(CuError::from(format!(
825 "App type '{}' declares subsystem code {} for '{}' but multi-Copper config '{}' expects {}",
826 type_name::<App>(),
827 registered_subsystem_code,
828 subsystem_id,
829 self.multi_config_path.display(),
830 expected_subsystem.subsystem_code
831 )));
832 }
833
834 self.registrations.insert(
835 subsystem_id.to_string(),
836 DistributedReplayAppRegistration {
837 subsystem: registered_subsystem,
838 app_type_name: type_name::<App>(),
839 session_factory: build_distributed_replay_session::<App>,
840 },
841 );
842 Ok(self)
843 }
844
845 pub fn build(self) -> CuResult<DistributedReplayPlan> {
847 let catalog = match self.catalog {
848 Some(catalog) => catalog,
849 None if self.discovery_inputs.is_empty() => DistributedReplayCatalog::default(),
850 None => DistributedReplayCatalog::discover(
851 self.discovery_inputs.iter().collect::<Vec<_>>(),
852 )?,
853 };
854
855 let mut validation = DistributedReplayValidationError::default();
856
857 for failure in &catalog.failures {
858 validation.push(format!(
859 "discovery failure for '{}': {}",
860 failure.candidate_path.display(),
861 failure.error
862 ));
863 }
864
865 let subsystem_map: BTreeMap<_, _> = self
866 .multi_config
867 .subsystems
868 .iter()
869 .map(|subsystem| (subsystem.id.clone(), subsystem))
870 .collect();
871
872 for subsystem in subsystem_map.keys() {
873 if !self.registrations.contains_key(subsystem) {
874 validation.push(format!(
875 "missing app registration for subsystem '{}'",
876 subsystem
877 ));
878 }
879 }
880
881 let mut discovered_instances = BTreeSet::new();
882 let mut logs_by_target: BTreeMap<(u32, String), Vec<DistributedReplayLog>> =
883 BTreeMap::new();
884
885 for log in &catalog.logs {
886 let Some(subsystem_id) = log.subsystem_id() else {
887 validation.push(format!(
888 "discovered log '{}' is missing subsystem_id runtime metadata",
889 log.base_path.display()
890 ));
891 continue;
892 };
893
894 let Some(expected_subsystem) = subsystem_map.get(subsystem_id) else {
895 validation.push(format!(
896 "discovered log '{}' belongs to subsystem '{}' which is not present in multi-Copper config '{}'",
897 log.base_path.display(),
898 subsystem_id,
899 self.multi_config_path.display()
900 ));
901 continue;
902 };
903
904 if log.subsystem_code() != expected_subsystem.subsystem_code {
905 validation.push(format!(
906 "discovered log '{}' reports subsystem code {} for '{}' but multi-Copper config '{}' expects {}",
907 log.base_path.display(),
908 log.subsystem_code(),
909 subsystem_id,
910 self.multi_config_path.display(),
911 expected_subsystem.subsystem_code
912 ));
913 }
914
915 discovered_instances.insert(log.instance_id());
916 logs_by_target
917 .entry((log.instance_id(), subsystem_id.to_string()))
918 .or_default()
919 .push(log.clone());
920 }
921
922 for ((instance_id, subsystem_id), logs) in &logs_by_target {
923 if logs.len() > 1 {
924 validation.push(format!(
925 "found {} logs for instance {} subsystem '{}': {}",
926 logs.len(),
927 instance_id,
928 subsystem_id,
929 join_log_paths(logs)
930 ));
931 }
932 }
933
934 let selected_instances: Vec<u32> =
935 if let Some(selected_instances) = &self.selected_instances {
936 let mut selected_instances: Vec<_> = selected_instances.iter().copied().collect();
937 selected_instances.sort_unstable();
938 for instance_id in &selected_instances {
939 if !discovered_instances.contains(instance_id) {
940 validation.push(format!(
941 "selected instance {} has no discovered logs",
942 instance_id
943 ));
944 }
945 }
946 selected_instances
947 } else {
948 discovered_instances.iter().copied().collect()
949 };
950
951 if selected_instances.is_empty() {
952 validation.push("no instances selected for distributed replay");
953 }
954
955 for instance_id in &selected_instances {
956 for subsystem in &self.multi_config.subsystems {
957 if !logs_by_target.contains_key(&(*instance_id, subsystem.id.clone())) {
958 validation.push(format!(
959 "missing log for instance {} subsystem '{}'",
960 instance_id, subsystem.id
961 ));
962 }
963 }
964 }
965
966 let mut known_missions = BTreeSet::new();
967 for instance_id in &selected_instances {
968 for subsystem in &self.multi_config.subsystems {
969 if let Some(logs) = logs_by_target.get(&(*instance_id, subsystem.id.clone()))
970 && let Some(log) = logs.first()
971 && let Some(mission) = &log.mission
972 {
973 known_missions.insert(mission.clone());
974 }
975 }
976 }
977 if known_missions.len() > 1 {
978 validation.push(format!(
979 "selected logs disagree on mission: {}",
980 known_missions.into_iter().collect::<Vec<_>>().join(", ")
981 ));
982 }
983
984 if !validation.is_empty() {
985 return Err(CuError::from(validation.to_string()));
986 }
987
988 let mission = selected_instances
989 .iter()
990 .flat_map(|instance_id| {
991 self.multi_config.subsystems.iter().filter_map(|subsystem| {
992 logs_by_target
993 .get(&(*instance_id, subsystem.id.clone()))
994 .and_then(|logs| logs.first())
995 .and_then(|log| log.mission.clone())
996 })
997 })
998 .next();
999
1000 let mut registrations: Vec<_> = self.registrations.into_values().collect();
1001 registrations.sort_by(|left, right| left.subsystem.id().cmp(&right.subsystem.id()));
1002
1003 let mut assignments = Vec::new();
1004 for instance_id in &selected_instances {
1005 for subsystem in &self.multi_config.subsystems {
1006 let log = logs_by_target
1007 .get(&(*instance_id, subsystem.id.clone()))
1008 .and_then(|logs| logs.first())
1009 .expect("validated distributed replay plan is missing a log")
1010 .clone();
1011 let registration = registrations
1012 .iter()
1013 .find(|registration| registration.subsystem.id() == Some(subsystem.id.as_str()))
1014 .expect("validated distributed replay plan is missing a registration")
1015 .clone();
1016 assignments.push(DistributedReplayAssignment {
1017 instance_id: *instance_id,
1018 subsystem_id: subsystem.id.clone(),
1019 log,
1020 registration,
1021 });
1022 }
1023 }
1024 assignments.sort_by(|left, right| {
1025 (
1026 left.instance_id,
1027 left.registration.subsystem.code(),
1028 left.subsystem_id.as_str(),
1029 )
1030 .cmp(&(
1031 right.instance_id,
1032 right.registration.subsystem.code(),
1033 right.subsystem_id.as_str(),
1034 ))
1035 });
1036
1037 Ok(DistributedReplayPlan {
1038 multi_config_path: self.multi_config_path,
1039 multi_config: self.multi_config,
1040 catalog,
1041 selected_instances,
1042 mission,
1043 registrations,
1044 assignments,
1045 })
1046 }
1047}
1048
1049fn build_distributed_replay_session<App>(
1050 assignment: &DistributedReplayAssignment,
1051 session_config: &DistributedReplaySessionConfig,
1052) -> CuResult<DistributedReplaySessionBuild>
1053where
1054 App: CuDistributedReplayApplication<NoopSectionStorage, NoopLogger>
1055 + CuDistributedReplayApplication<MmapSectionStorage, UnifiedLoggerWrite>
1056 + 'static,
1057{
1058 let config = read_configuration_str(assignment.log.effective_config_ron.clone(), None)
1059 .map_err(|err| {
1060 CuError::from(format!(
1061 "Failed to parse recorded effective config from '{}': {err}",
1062 assignment.log.base_path.display()
1063 ))
1064 })?;
1065 let (clock, clock_mock) = RobotClock::mock();
1066
1067 if let Some(output_root) = &session_config.output_root {
1068 let output_log_path = replay_output_log_path(output_root, assignment)?;
1069 let logger = build_replay_output_logger(
1070 &output_log_path,
1071 replay_output_log_size_bytes(assignment, &config),
1072 )?;
1073 let app = <App as CuDistributedReplayApplication<
1074 MmapSectionStorage,
1075 UnifiedLoggerWrite,
1076 >>::build_distributed_replay(
1077 clock.clone(), logger, assignment.instance_id, Some(config)
1078 )?;
1079 let mut session = RecordedReplaySession::<
1080 App,
1081 <App as CuRecordedReplayApplication<
1082 MmapSectionStorage,
1083 UnifiedLoggerWrite,
1084 >>::RecordedDataSet,
1085 MmapSectionStorage,
1086 UnifiedLoggerWrite,
1087 >::from_log(assignment.clone(), app, clock_mock, &assignment.log.base_path)?;
1088 let nodes = session.describe_nodes()?;
1089 return Ok(DistributedReplaySessionBuild {
1090 session: Box::new(session),
1091 nodes,
1092 output_log_path: Some(output_log_path),
1093 });
1094 }
1095
1096 let logger = Arc::new(Mutex::new(NoopLogger::new()));
1097 let app = <App as CuDistributedReplayApplication<NoopSectionStorage, NoopLogger>>::build_distributed_replay(
1098 clock,
1099 logger,
1100 assignment.instance_id,
1101 Some(config),
1102 )?;
1103 let mut session = RecordedReplaySession::<
1104 App,
1105 <App as CuRecordedReplayApplication<NoopSectionStorage, NoopLogger>>::RecordedDataSet,
1106 NoopSectionStorage,
1107 NoopLogger,
1108 >::from_log(
1109 assignment.clone(),
1110 app,
1111 clock_mock,
1112 &assignment.log.base_path,
1113 )?;
1114 let nodes = session.describe_nodes()?;
1115 Ok(DistributedReplaySessionBuild {
1116 session: Box::new(session),
1117 nodes,
1118 output_log_path: None,
1119 })
1120}
1121
1122fn replay_output_log_path(
1123 output_root: &Path,
1124 assignment: &DistributedReplayAssignment,
1125) -> CuResult<PathBuf> {
1126 let file_name = assignment
1127 .log
1128 .base_path
1129 .file_name()
1130 .ok_or_else(|| {
1131 CuError::from(format!(
1132 "Replay assignment log '{}' has no file name",
1133 assignment.log.base_path.display()
1134 ))
1135 })?
1136 .to_owned();
1137 Ok(output_root.join(file_name))
1138}
1139
1140fn build_replay_output_logger(
1141 path: &Path,
1142 preallocated_size: usize,
1143) -> CuResult<Arc<Mutex<UnifiedLoggerWrite>>> {
1144 if let Some(parent) = path.parent() {
1145 fs::create_dir_all(parent).map_err(|err| {
1146 CuError::new_with_cause(
1147 &format!(
1148 "Failed to create replay log directory '{}'",
1149 parent.display()
1150 ),
1151 err,
1152 )
1153 })?;
1154 }
1155 let UnifiedLogger::Write(writer) = UnifiedLoggerBuilder::new()
1156 .write(true)
1157 .create(true)
1158 .file_base_name(path)
1159 .preallocated_size(preallocated_size)
1160 .build()
1161 .map_err(|err| {
1162 CuError::new_with_cause(
1163 &format!("Failed to create replay log '{}'", path.display()),
1164 err,
1165 )
1166 })?
1167 else {
1168 return Err(CuError::from(format!(
1169 "Expected writable replay logger for '{}'",
1170 path.display()
1171 )));
1172 };
1173 Ok(Arc::new(Mutex::new(writer)))
1174}
1175
1176fn replay_output_log_size_bytes(
1177 assignment: &DistributedReplayAssignment,
1178 config: &crate::config::CuConfig,
1179) -> usize {
1180 if let Some(slab_zero) = slab_zero_path(&assignment.log.base_path)
1181 && let Ok(metadata) = fs::metadata(slab_zero)
1182 && let Ok(size) = usize::try_from(metadata.len())
1183 {
1184 return size.max(DEFAULT_REPLAY_LOG_SIZE_BYTES);
1185 }
1186
1187 config
1188 .logging
1189 .as_ref()
1190 .and_then(|logging| logging.slab_size_mib)
1191 .and_then(|size_mib| usize::try_from(size_mib).ok())
1192 .and_then(|size_mib| size_mib.checked_mul(1024 * 1024))
1193 .unwrap_or(DEFAULT_REPLAY_LOG_SIZE_BYTES)
1194}
1195
1196fn copperlist_origins<P: CopperListTuple>(
1197 copperlist: &CopperList<P>,
1198) -> BTreeSet<DistributedReplayOriginKey> {
1199 <CopperList<P> as ErasedCuStampedDataSet>::cumsgs(copperlist)
1200 .into_iter()
1201 .filter_map(|msg| msg.metadata().origin())
1202 .map(|origin| DistributedReplayOriginKey {
1203 instance_id: origin.instance_id,
1204 subsystem_code: origin.subsystem_code,
1205 cl_id: origin.cl_id,
1206 })
1207 .collect()
1208}
1209
1210#[derive(Default)]
1211struct DistributedReplayEngineState {
1212 sessions: Vec<Box<dyn DistributedReplaySession>>,
1213 nodes: Vec<DistributedReplayGraphNode>,
1214 node_lookup: BTreeMap<(u32, String, u64), usize>,
1215 output_log_paths: BTreeMap<(u32, String), PathBuf>,
1216 ready: BTreeSet<DistributedReplayReadyNode>,
1217 frontier: Vec<Option<DistributedReplayCursor>>,
1218}
1219
1220pub struct DistributedReplayEngine {
1222 plan: DistributedReplayPlan,
1223 session_config: DistributedReplaySessionConfig,
1224 sessions: Vec<Box<dyn DistributedReplaySession>>,
1225 nodes: Vec<DistributedReplayGraphNode>,
1226 node_lookup: BTreeMap<(u32, String, u64), usize>,
1227 output_log_paths: BTreeMap<(u32, String), PathBuf>,
1228 ready: BTreeSet<DistributedReplayReadyNode>,
1229 frontier: Vec<Option<DistributedReplayCursor>>,
1230 executed: Vec<bool>,
1231 executed_count: usize,
1232}
1233
1234impl DistributedReplayEngine {
1235 fn new(
1236 plan: DistributedReplayPlan,
1237 session_config: DistributedReplaySessionConfig,
1238 ) -> CuResult<Self> {
1239 let state = Self::build_state(&plan, &session_config)?;
1240 let executed = vec![false; state.nodes.len()];
1241 Ok(Self {
1242 plan,
1243 session_config,
1244 sessions: state.sessions,
1245 nodes: state.nodes,
1246 node_lookup: state.node_lookup,
1247 output_log_paths: state.output_log_paths,
1248 ready: state.ready,
1249 frontier: state.frontier,
1250 executed,
1251 executed_count: 0,
1252 })
1253 }
1254
1255 fn build_state(
1256 plan: &DistributedReplayPlan,
1257 session_config: &DistributedReplaySessionConfig,
1258 ) -> CuResult<DistributedReplayEngineState> {
1259 let mut sessions = Vec::with_capacity(plan.assignments.len());
1260 let mut pending_nodes = Vec::new();
1261 let mut session_nodes = Vec::with_capacity(plan.assignments.len());
1262 let mut output_log_paths = BTreeMap::new();
1263
1264 for assignment in &plan.assignments {
1265 let build = (assignment.registration.session_factory)(assignment, session_config)?;
1266 let session_index = sessions.len();
1267 let mut node_indices = Vec::with_capacity(build.nodes.len());
1268 for node in build.nodes {
1269 let pending_index = pending_nodes.len();
1270 pending_nodes.push((session_index, node));
1271 node_indices.push(pending_index);
1272 }
1273 if let Some(output_log_path) = build.output_log_path {
1274 let replaced = output_log_paths.insert(
1275 (assignment.instance_id, assignment.subsystem_id.clone()),
1276 output_log_path,
1277 );
1278 if replaced.is_some() {
1279 return Err(CuError::from(format!(
1280 "Duplicate replay output log assignment for instance {} subsystem '{}'",
1281 assignment.instance_id, assignment.subsystem_id
1282 )));
1283 }
1284 }
1285 sessions.push(build.session);
1286 session_nodes.push(node_indices);
1287 }
1288
1289 let mut nodes = Vec::with_capacity(pending_nodes.len());
1290 let mut origin_lookup = BTreeMap::new();
1291 let mut node_lookup = BTreeMap::new();
1292
1293 for (node_index, (session_index, descriptor)) in pending_nodes.iter().enumerate() {
1294 if origin_lookup
1295 .insert(descriptor.origin_key.clone(), node_index)
1296 .is_some()
1297 {
1298 return Err(CuError::from(format!(
1299 "Duplicate replay node detected for instance {} subsystem code {} CopperList {}",
1300 descriptor.origin_key.instance_id,
1301 descriptor.origin_key.subsystem_code,
1302 descriptor.origin_key.cl_id
1303 )));
1304 }
1305
1306 if node_lookup
1307 .insert(
1308 (
1309 descriptor.cursor.instance_id,
1310 descriptor.cursor.subsystem_id.clone(),
1311 descriptor.cursor.cl_id,
1312 ),
1313 node_index,
1314 )
1315 .is_some()
1316 {
1317 return Err(CuError::from(format!(
1318 "Duplicate replay cursor detected for instance {} subsystem '{}' CopperList {}",
1319 descriptor.cursor.instance_id,
1320 descriptor.cursor.subsystem_id,
1321 descriptor.cursor.cl_id
1322 )));
1323 }
1324
1325 nodes.push(DistributedReplayGraphNode {
1326 cursor: descriptor.cursor.clone(),
1327 session_index: *session_index,
1328 outgoing: Vec::new(),
1329 initial_dependencies: 0,
1330 remaining_dependencies: 0,
1331 });
1332 }
1333
1334 let mut edges = BTreeSet::new();
1335
1336 for node_indices in &session_nodes {
1337 for pair in node_indices.windows(2) {
1338 let from = pair[0];
1339 let to = pair[1];
1340 if edges.insert((from, to)) {
1341 nodes[from].outgoing.push(to);
1342 nodes[to].initial_dependencies += 1;
1343 }
1344 }
1345 }
1346
1347 for (target_index, (_, descriptor)) in pending_nodes.iter().enumerate() {
1348 for origin in &descriptor.incoming_origins {
1349 let source_index = origin_lookup.get(origin).copied().ok_or_else(|| {
1350 CuError::from(format!(
1351 "Unresolved recorded provenance edge into instance {} subsystem '{}' CopperList {} from instance {} subsystem code {} CopperList {}",
1352 descriptor.cursor.instance_id,
1353 descriptor.cursor.subsystem_id,
1354 descriptor.cursor.cl_id,
1355 origin.instance_id,
1356 origin.subsystem_code,
1357 origin.cl_id
1358 ))
1359 })?;
1360 if source_index == target_index {
1361 return Err(CuError::from(format!(
1362 "Recorded provenance on instance {} subsystem '{}' CopperList {} points to itself",
1363 descriptor.cursor.instance_id,
1364 descriptor.cursor.subsystem_id,
1365 descriptor.cursor.cl_id
1366 )));
1367 }
1368 if edges.insert((source_index, target_index)) {
1369 nodes[source_index].outgoing.push(target_index);
1370 nodes[target_index].initial_dependencies += 1;
1371 }
1372 }
1373 }
1374
1375 let mut ready = BTreeSet::new();
1376 for (node_index, node) in nodes.iter_mut().enumerate() {
1377 node.remaining_dependencies = node.initial_dependencies;
1378 if node.remaining_dependencies == 0 {
1379 ready.insert(DistributedReplayReadyNode {
1380 instance_id: node.cursor.instance_id,
1381 subsystem_code: node.cursor.subsystem_code(),
1382 cl_id: node.cursor.cl_id,
1383 node_index,
1384 });
1385 }
1386 }
1387
1388 if !nodes.is_empty() && ready.is_empty() {
1389 return Err(CuError::from(
1390 "Distributed replay graph has no causally ready starting point",
1391 ));
1392 }
1393
1394 Ok(DistributedReplayEngineState {
1395 frontier: vec![None; sessions.len()],
1396 sessions,
1397 nodes,
1398 node_lookup,
1399 output_log_paths,
1400 ready,
1401 })
1402 }
1403
1404 fn shutdown_sessions(sessions: &mut Vec<Box<dyn DistributedReplaySession>>) -> CuResult<()> {
1405 for session in sessions.iter_mut() {
1406 session.shutdown()?;
1407 }
1408 Ok(())
1409 }
1410
1411 fn ready_key(&self, node_index: usize) -> DistributedReplayReadyNode {
1412 let node = &self.nodes[node_index];
1413 DistributedReplayReadyNode {
1414 instance_id: node.cursor.instance_id,
1415 subsystem_code: node.cursor.subsystem_code(),
1416 cl_id: node.cursor.cl_id,
1417 node_index,
1418 }
1419 }
1420
1421 pub fn reset(&mut self) -> CuResult<()> {
1423 Self::shutdown_sessions(&mut self.sessions)?;
1424 let state = Self::build_state(&self.plan, &self.session_config)?;
1425 self.sessions = state.sessions;
1426 self.nodes = state.nodes;
1427 self.node_lookup = state.node_lookup;
1428 self.output_log_paths = state.output_log_paths;
1429 self.ready = state.ready;
1430 self.frontier = state.frontier;
1431 self.executed = vec![false; self.nodes.len()];
1432 self.executed_count = 0;
1433 Ok(())
1434 }
1435
1436 pub fn step_causal(&mut self) -> CuResult<Option<DistributedReplayCursor>> {
1438 let Some(next_ready) = self.ready.iter().next().copied() else {
1439 if self.executed_count == self.nodes.len() {
1440 return Ok(None);
1441 }
1442 return Err(CuError::from(
1443 "Distributed replay is deadlocked: no causally ready CopperList remains",
1444 ));
1445 };
1446 self.ready.remove(&next_ready);
1447
1448 let cursor = self.nodes[next_ready.node_index].cursor.clone();
1449 let session_index = self.nodes[next_ready.node_index].session_index;
1450 self.sessions[session_index].goto_cl(cursor.cl_id)?;
1451 self.executed[next_ready.node_index] = true;
1452 self.executed_count += 1;
1453 self.frontier[session_index] = Some(cursor.clone());
1454
1455 let outgoing = self.nodes[next_ready.node_index].outgoing.clone();
1456 for dependent in outgoing {
1457 let node = &mut self.nodes[dependent];
1458 node.remaining_dependencies = node.remaining_dependencies.saturating_sub(1);
1459 if node.remaining_dependencies == 0 {
1460 self.ready.insert(self.ready_key(dependent));
1461 }
1462 }
1463
1464 Ok(Some(cursor))
1465 }
1466
1467 pub fn run_all(&mut self) -> CuResult<()> {
1469 while self.step_causal()?.is_some() {}
1470 Ok(())
1471 }
1472
1473 pub fn goto(&mut self, instance_id: u32, subsystem_id: &str, cl_id: u64) -> CuResult<()> {
1475 let target = self
1476 .node_lookup
1477 .get(&(instance_id, subsystem_id.to_string(), cl_id))
1478 .copied()
1479 .ok_or_else(|| {
1480 CuError::from(format!(
1481 "Distributed replay target instance {} subsystem '{}' CopperList {} does not exist",
1482 instance_id, subsystem_id, cl_id
1483 ))
1484 })?;
1485 self.reset()?;
1486 while !self.executed[target] {
1487 let Some(_) = self.step_causal()? else {
1488 return Err(CuError::from(format!(
1489 "Distributed replay exhausted before reaching instance {} subsystem '{}' CopperList {}",
1490 instance_id, subsystem_id, cl_id
1491 )));
1492 };
1493 }
1494 Ok(())
1495 }
1496
1497 pub fn current_frontier(&self) -> Vec<DistributedReplayCursor> {
1499 self.frontier
1500 .iter()
1501 .filter_map(|cursor| cursor.clone())
1502 .collect()
1503 }
1504
1505 pub fn output_log_path(&self, instance_id: u32, subsystem_id: &str) -> Option<&Path> {
1506 self.output_log_paths
1507 .get(&(instance_id, subsystem_id.to_string()))
1508 .map(PathBuf::as_path)
1509 }
1510
1511 #[inline]
1512 pub fn total_nodes(&self) -> usize {
1513 self.nodes.len()
1514 }
1515
1516 #[inline]
1517 pub fn executed_nodes(&self) -> usize {
1518 self.executed_count
1519 }
1520}
1521
1522fn join_log_paths(logs: &[DistributedReplayLog]) -> String {
1523 logs.iter()
1524 .map(|log| log.base_path.display().to_string())
1525 .collect::<Vec<_>>()
1526 .join(", ")
1527}
1528
1529fn collect_candidate_base_paths(path: &Path, out: &mut BTreeSet<PathBuf>) -> CuResult<()> {
1530 if path.is_dir() {
1531 let mut entries = fs::read_dir(path)
1532 .map_err(|err| {
1533 CuError::new_with_cause(
1534 &format!(
1535 "Failed to read directory '{}' during distributed replay discovery",
1536 path.display()
1537 ),
1538 err,
1539 )
1540 })?
1541 .collect::<Result<Vec<_>, _>>()
1542 .map_err(|err| {
1543 CuError::new_with_cause(
1544 &format!(
1545 "Failed to enumerate directory '{}' during distributed replay discovery",
1546 path.display()
1547 ),
1548 err,
1549 )
1550 })?;
1551 entries.sort_by_key(|entry| entry.path());
1552 for entry in entries {
1553 collect_candidate_base_paths(&entry.path(), out)?;
1554 }
1555 return Ok(());
1556 }
1557
1558 if path
1559 .extension()
1560 .and_then(|ext| ext.to_str())
1561 .is_some_and(|ext| ext == "copper")
1562 {
1563 out.insert(normalize_candidate_log_base(path));
1564 }
1565
1566 Ok(())
1567}
1568
1569fn normalize_candidate_log_base(path: &Path) -> PathBuf {
1570 let Some(extension) = path.extension().and_then(|ext| ext.to_str()) else {
1571 return path.to_path_buf();
1572 };
1573 let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
1574 return path.to_path_buf();
1575 };
1576 let Some((base_stem, slab_suffix)) = stem.rsplit_once('_') else {
1577 return path.to_path_buf();
1578 };
1579
1580 if slab_suffix.is_empty() || !slab_suffix.chars().all(|c| c.is_ascii_digit()) {
1581 return path.to_path_buf();
1582 }
1583
1584 let mut normalized = path.to_path_buf();
1585 normalized.set_file_name(format!("{base_stem}.{extension}"));
1586 if slab_zero_path(&normalized).is_some_and(|slab_zero| slab_zero.exists()) {
1587 normalized
1588 } else {
1589 path.to_path_buf()
1590 }
1591}
1592
1593fn slab_zero_path(base_path: &Path) -> Option<PathBuf> {
1594 let extension = base_path.extension()?.to_str()?;
1595 let stem = base_path.file_stem()?.to_str()?;
1596 let mut slab_zero = base_path.to_path_buf();
1597 slab_zero.set_file_name(format!("{stem}_0.{extension}"));
1598 Some(slab_zero)
1599}
1600
1601fn read_next_entry<T: bincode::Decode<()>>(src: &mut impl Read) -> CuResult<Option<T>> {
1602 match decode_from_std_read::<T, _, _>(src, standard()) {
1603 Ok(entry) => Ok(Some(entry)),
1604 Err(DecodeError::UnexpectedEnd { .. }) => Ok(None),
1605 Err(DecodeError::Io { inner, .. }) if inner.kind() == std::io::ErrorKind::UnexpectedEof => {
1606 Ok(None)
1607 }
1608 Err(err) => Err(CuError::new_with_cause(
1609 "Failed to decode bincode entry during distributed replay discovery",
1610 err,
1611 )),
1612 }
1613}
1614
1615#[cfg(test)]
1616mod tests {
1617 use super::*;
1618 use crate::app::{
1619 CuDistributedReplayApplication, CuRecordedReplayApplication, CuSimApplication,
1620 CuSubsystemMetadata,
1621 };
1622 use crate::config::CuConfig;
1623 use crate::copperlist::CopperList;
1624 use crate::curuntime::KeyFrame;
1625 use crate::simulation::SimOverride;
1626 use bincode::{Decode, Encode};
1627 use cu29_clock::CuTime;
1628 use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks, WriteStream};
1629 use cu29_unifiedlog::memmap::MmapSectionStorage;
1630 use cu29_unifiedlog::stream_write;
1631 use serde::Serialize;
1632 use std::sync::{Arc, Mutex};
1633 use tempfile::TempDir;
1634
1635 fn write_runtime_lifecycle_log(
1636 base_path: &Path,
1637 stack: RuntimeLifecycleStackInfo,
1638 mission: Option<&str>,
1639 ) -> CuResult<()> {
1640 if let Some(parent) = base_path.parent() {
1641 fs::create_dir_all(parent).map_err(|err| {
1642 CuError::new_with_cause(
1643 &format!("Failed to create test log directory '{}'", parent.display()),
1644 err,
1645 )
1646 })?;
1647 }
1648
1649 let UnifiedLogger::Write(writer) = UnifiedLoggerBuilder::new()
1650 .write(true)
1651 .create(true)
1652 .preallocated_size(256 * 1024)
1653 .file_base_name(base_path)
1654 .build()
1655 .map_err(|err| {
1656 CuError::new_with_cause(
1657 &format!("Failed to create test log '{}'", base_path.display()),
1658 err,
1659 )
1660 })?
1661 else {
1662 return Err(CuError::from("Expected writable unified logger in test"));
1663 };
1664
1665 let logger = Arc::new(Mutex::new(writer));
1666 let mut stream = stream_write::<RuntimeLifecycleRecord, MmapSectionStorage>(
1667 logger.clone(),
1668 UnifiedLogType::RuntimeLifecycle,
1669 4096,
1670 )?;
1671 stream.log(&RuntimeLifecycleRecord {
1672 timestamp: CuTime::default(),
1673 event: RuntimeLifecycleEvent::Instantiated {
1674 config_source: RuntimeLifecycleConfigSource::ExternalFile,
1675 effective_config_ron: "(runtime: ())".to_string(),
1676 stack,
1677 },
1678 })?;
1679 if let Some(mission) = mission {
1680 stream.log(&RuntimeLifecycleRecord {
1681 timestamp: CuTime::from_nanos(1),
1682 event: RuntimeLifecycleEvent::MissionStarted {
1683 mission: mission.to_string(),
1684 },
1685 })?;
1686 }
1687 drop(stream);
1688 drop(logger);
1689 Ok(())
1690 }
1691
1692 fn test_stack(
1693 subsystem_id: &str,
1694 subsystem_code: u16,
1695 instance_id: u32,
1696 ) -> RuntimeLifecycleStackInfo {
1697 RuntimeLifecycleStackInfo {
1698 app_name: "demo".to_string(),
1699 app_version: "0.1.0".to_string(),
1700 git_commit: Some("abc123".to_string()),
1701 git_dirty: Some(false),
1702 subsystem_id: Some(subsystem_id.to_string()),
1703 subsystem_code,
1704 instance_id,
1705 }
1706 }
1707
1708 fn write_multi_config_fixture(temp_dir: &TempDir, subsystem_ids: &[&str]) -> CuResult<PathBuf> {
1709 for subsystem_id in subsystem_ids {
1710 let subsystem_config = temp_dir.path().join(format!("{subsystem_id}_config.ron"));
1711 fs::write(&subsystem_config, "(tasks: [], cnx: [])").map_err(|err| {
1712 CuError::new_with_cause(
1713 &format!(
1714 "Failed to write subsystem config '{}'",
1715 subsystem_config.display()
1716 ),
1717 err,
1718 )
1719 })?;
1720 }
1721
1722 let subsystem_entries = subsystem_ids
1723 .iter()
1724 .map(|subsystem_id| {
1725 format!(
1726 r#"(
1727 id: "{subsystem_id}",
1728 config: "{subsystem_id}_config.ron",
1729 )"#
1730 )
1731 })
1732 .collect::<Vec<_>>()
1733 .join(",\n");
1734
1735 let multi_config = format!(
1736 "(\n subsystems: [\n{entries}\n ],\n interconnects: [],\n)\n",
1737 entries = subsystem_entries
1738 );
1739 let multi_config_path = temp_dir.path().join("multi_copper.ron");
1740 fs::write(&multi_config_path, multi_config).map_err(|err| {
1741 CuError::new_with_cause(
1742 &format!(
1743 "Failed to write multi-Copper config '{}'",
1744 multi_config_path.display()
1745 ),
1746 err,
1747 )
1748 })?;
1749 Ok(multi_config_path)
1750 }
1751
1752 #[derive(Debug, Default, Encode, Decode, Serialize)]
1753 struct DummyRecordedDataSet;
1754
1755 impl ErasedCuStampedDataSet for DummyRecordedDataSet {
1756 fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1757 Vec::new()
1758 }
1759 }
1760
1761 impl MatchingTasks for DummyRecordedDataSet {
1762 fn get_all_task_ids() -> &'static [&'static str] {
1763 &[]
1764 }
1765 }
1766
1767 macro_rules! impl_registered_test_app {
1768 ($name:ident, $subsystem_id:expr, $subsystem_code:expr) => {
1769 struct $name;
1770
1771 impl CuSubsystemMetadata for $name {
1772 fn subsystem() -> Subsystem {
1773 Subsystem::new(Some($subsystem_id), $subsystem_code)
1774 }
1775 }
1776
1777 impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
1778 CuSimApplication<S, L> for $name
1779 {
1780 type Step<'z> = ();
1781
1782 fn get_original_config() -> String {
1783 "(tasks: [], cnx: [])".to_string()
1784 }
1785
1786 fn start_all_tasks(
1787 &mut self,
1788 _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1789 ) -> CuResult<()> {
1790 Ok(())
1791 }
1792
1793 fn run_one_iteration(
1794 &mut self,
1795 _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1796 ) -> CuResult<()> {
1797 Ok(())
1798 }
1799
1800 fn run(
1801 &mut self,
1802 _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1803 ) -> CuResult<()> {
1804 Ok(())
1805 }
1806
1807 fn stop_all_tasks(
1808 &mut self,
1809 _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1810 ) -> CuResult<()> {
1811 Ok(())
1812 }
1813
1814 fn restore_keyframe(&mut self, _freezer: &KeyFrame) -> CuResult<()> {
1815 Ok(())
1816 }
1817 }
1818
1819 impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
1820 CuRecordedReplayApplication<S, L> for $name
1821 {
1822 type RecordedDataSet = DummyRecordedDataSet;
1823
1824 fn replay_recorded_copperlist(
1825 &mut self,
1826 _clock_mock: &RobotClockMock,
1827 _copperlist: &CopperList<Self::RecordedDataSet>,
1828 _keyframe: Option<&KeyFrame>,
1829 ) -> CuResult<()> {
1830 Ok(())
1831 }
1832 }
1833
1834 impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
1835 CuDistributedReplayApplication<S, L> for $name
1836 {
1837 fn build_distributed_replay(
1838 _clock: RobotClock,
1839 _unified_logger: Arc<Mutex<L>>,
1840 _instance_id: u32,
1841 _config_override: Option<CuConfig>,
1842 ) -> CuResult<Self> {
1843 Ok(Self)
1844 }
1845 }
1846 };
1847 }
1848
1849 impl_registered_test_app!(PingRegisteredApp, "ping", 0);
1850 impl_registered_test_app!(PongRegisteredApp, "pong", 1);
1851 impl_registered_test_app!(PingWrongCodeApp, "ping", 99);
1852
1853 struct FakeReplaySession;
1854
1855 impl DistributedReplaySession for FakeReplaySession {
1856 fn goto_cl(&mut self, _cl_id: u64) -> CuResult<()> {
1857 Ok(())
1858 }
1859
1860 fn shutdown(&mut self) -> CuResult<()> {
1861 Ok(())
1862 }
1863 }
1864
1865 fn fake_registration(
1866 subsystem_id: &'static str,
1867 subsystem_code: u16,
1868 session_factory: DistributedReplaySessionFactory,
1869 ) -> DistributedReplayAppRegistration {
1870 DistributedReplayAppRegistration {
1871 subsystem: Subsystem::new(Some(subsystem_id), subsystem_code),
1872 app_type_name: "fake",
1873 session_factory,
1874 }
1875 }
1876
1877 fn fake_assignment(
1878 instance_id: u32,
1879 subsystem_id: &'static str,
1880 subsystem_code: u16,
1881 session_factory: DistributedReplaySessionFactory,
1882 ) -> DistributedReplayAssignment {
1883 DistributedReplayAssignment {
1884 instance_id,
1885 subsystem_id: subsystem_id.to_string(),
1886 log: DistributedReplayLog {
1887 base_path: PathBuf::from(format!("{subsystem_id}_{instance_id}.copper")),
1888 stack: test_stack(subsystem_id, subsystem_code, instance_id),
1889 config_source: RuntimeLifecycleConfigSource::ExternalFile,
1890 effective_config_ron: "(tasks: [], cnx: [])".to_string(),
1891 mission: Some("default".to_string()),
1892 },
1893 registration: fake_registration(subsystem_id, subsystem_code, session_factory),
1894 }
1895 }
1896
1897 fn fake_plan(assignments: Vec<DistributedReplayAssignment>) -> DistributedReplayPlan {
1898 let mut registrations: Vec<_> = assignments
1899 .iter()
1900 .map(|assignment| assignment.registration.clone())
1901 .collect();
1902 registrations.sort_by(|left, right| left.subsystem.id().cmp(&right.subsystem.id()));
1903 let mut selected_instances: Vec<_> = assignments
1904 .iter()
1905 .map(|assignment| assignment.instance_id)
1906 .collect::<BTreeSet<_>>()
1907 .into_iter()
1908 .collect();
1909 selected_instances.sort_unstable();
1910 DistributedReplayPlan {
1911 multi_config_path: PathBuf::from("fake_multi.ron"),
1912 multi_config: MultiCopperConfig {
1913 subsystems: Vec::new(),
1914 interconnects: Vec::new(),
1915 instance_overrides_root: None,
1916 },
1917 catalog: DistributedReplayCatalog::default(),
1918 selected_instances,
1919 mission: Some("default".to_string()),
1920 registrations,
1921 assignments,
1922 }
1923 }
1924
1925 fn fake_ping_session(
1926 assignment: &DistributedReplayAssignment,
1927 _session_config: &DistributedReplaySessionConfig,
1928 ) -> CuResult<DistributedReplaySessionBuild> {
1929 Ok(DistributedReplaySessionBuild {
1930 session: Box::new(FakeReplaySession),
1931 nodes: vec![
1932 DistributedReplayNodeDescriptor {
1933 cursor: DistributedReplayCursor::new(
1934 assignment.instance_id,
1935 assignment.subsystem_id.clone(),
1936 assignment.log.subsystem_code(),
1937 0,
1938 ),
1939 origin_key: DistributedReplayOriginKey {
1940 instance_id: assignment.instance_id,
1941 subsystem_code: assignment.log.subsystem_code(),
1942 cl_id: 0,
1943 },
1944 incoming_origins: BTreeSet::new(),
1945 },
1946 DistributedReplayNodeDescriptor {
1947 cursor: DistributedReplayCursor::new(
1948 assignment.instance_id,
1949 assignment.subsystem_id.clone(),
1950 assignment.log.subsystem_code(),
1951 1,
1952 ),
1953 origin_key: DistributedReplayOriginKey {
1954 instance_id: assignment.instance_id,
1955 subsystem_code: assignment.log.subsystem_code(),
1956 cl_id: 1,
1957 },
1958 incoming_origins: BTreeSet::new(),
1959 },
1960 ],
1961 output_log_path: None,
1962 })
1963 }
1964
1965 fn fake_pong_session(
1966 assignment: &DistributedReplayAssignment,
1967 _session_config: &DistributedReplaySessionConfig,
1968 ) -> CuResult<DistributedReplaySessionBuild> {
1969 Ok(DistributedReplaySessionBuild {
1970 session: Box::new(FakeReplaySession),
1971 nodes: vec![
1972 DistributedReplayNodeDescriptor {
1973 cursor: DistributedReplayCursor::new(
1974 assignment.instance_id,
1975 assignment.subsystem_id.clone(),
1976 assignment.log.subsystem_code(),
1977 0,
1978 ),
1979 origin_key: DistributedReplayOriginKey {
1980 instance_id: assignment.instance_id,
1981 subsystem_code: assignment.log.subsystem_code(),
1982 cl_id: 0,
1983 },
1984 incoming_origins: BTreeSet::from([DistributedReplayOriginKey {
1985 instance_id: assignment.instance_id,
1986 subsystem_code: 0,
1987 cl_id: 0,
1988 }]),
1989 },
1990 DistributedReplayNodeDescriptor {
1991 cursor: DistributedReplayCursor::new(
1992 assignment.instance_id,
1993 assignment.subsystem_id.clone(),
1994 assignment.log.subsystem_code(),
1995 1,
1996 ),
1997 origin_key: DistributedReplayOriginKey {
1998 instance_id: assignment.instance_id,
1999 subsystem_code: assignment.log.subsystem_code(),
2000 cl_id: 1,
2001 },
2002 incoming_origins: BTreeSet::from([DistributedReplayOriginKey {
2003 instance_id: assignment.instance_id,
2004 subsystem_code: 0,
2005 cl_id: 1,
2006 }]),
2007 },
2008 ],
2009 output_log_path: None,
2010 })
2011 }
2012
2013 fn fake_bad_pong_session(
2014 assignment: &DistributedReplayAssignment,
2015 _session_config: &DistributedReplaySessionConfig,
2016 ) -> CuResult<DistributedReplaySessionBuild> {
2017 Ok(DistributedReplaySessionBuild {
2018 session: Box::new(FakeReplaySession),
2019 nodes: vec![DistributedReplayNodeDescriptor {
2020 cursor: DistributedReplayCursor::new(
2021 assignment.instance_id,
2022 assignment.subsystem_id.clone(),
2023 assignment.log.subsystem_code(),
2024 0,
2025 ),
2026 origin_key: DistributedReplayOriginKey {
2027 instance_id: assignment.instance_id,
2028 subsystem_code: assignment.log.subsystem_code(),
2029 cl_id: 0,
2030 },
2031 incoming_origins: BTreeSet::from([DistributedReplayOriginKey {
2032 instance_id: assignment.instance_id,
2033 subsystem_code: 0,
2034 cl_id: 99,
2035 }]),
2036 }],
2037 output_log_path: None,
2038 })
2039 }
2040
2041 const STRESS_SUBSYSTEMS: [(&str, u16); 4] =
2042 [("sense", 0), ("plan", 1), ("control", 2), ("telemetry", 3)];
2043
2044 fn stress_origins_for(
2045 subsystem_id: &str,
2046 instance_id: u32,
2047 cl_id: u64,
2048 ) -> BTreeSet<DistributedReplayOriginKey> {
2049 match subsystem_id {
2050 "sense" => BTreeSet::new(),
2051 "plan" => BTreeSet::from([DistributedReplayOriginKey {
2052 instance_id,
2053 subsystem_code: 0,
2054 cl_id,
2055 }]),
2056 "control" => BTreeSet::from([DistributedReplayOriginKey {
2057 instance_id,
2058 subsystem_code: 1,
2059 cl_id,
2060 }]),
2061 "telemetry" => BTreeSet::from([
2062 DistributedReplayOriginKey {
2063 instance_id,
2064 subsystem_code: 0,
2065 cl_id,
2066 },
2067 DistributedReplayOriginKey {
2068 instance_id,
2069 subsystem_code: 2,
2070 cl_id,
2071 },
2072 ]),
2073 _ => panic!("unexpected synthetic stress subsystem '{subsystem_id}'"),
2074 }
2075 }
2076
2077 fn build_stress_session(
2078 assignment: &DistributedReplayAssignment,
2079 _session_config: &DistributedReplaySessionConfig,
2080 cl_count: u64,
2081 ) -> CuResult<DistributedReplaySessionBuild> {
2082 let subsystem_code = assignment.log.subsystem_code();
2083 let nodes = (0..cl_count)
2084 .map(|cl_id| DistributedReplayNodeDescriptor {
2085 cursor: DistributedReplayCursor::new(
2086 assignment.instance_id,
2087 assignment.subsystem_id.clone(),
2088 subsystem_code,
2089 cl_id,
2090 ),
2091 origin_key: DistributedReplayOriginKey {
2092 instance_id: assignment.instance_id,
2093 subsystem_code,
2094 cl_id,
2095 },
2096 incoming_origins: stress_origins_for(
2097 &assignment.subsystem_id,
2098 assignment.instance_id,
2099 cl_id,
2100 ),
2101 })
2102 .collect();
2103 Ok(DistributedReplaySessionBuild {
2104 session: Box::new(FakeReplaySession),
2105 nodes,
2106 output_log_path: None,
2107 })
2108 }
2109
2110 fn stress_session_ci(
2111 assignment: &DistributedReplayAssignment,
2112 session_config: &DistributedReplaySessionConfig,
2113 ) -> CuResult<DistributedReplaySessionBuild> {
2114 build_stress_session(assignment, session_config, 24)
2115 }
2116
2117 fn stress_session_goto(
2118 assignment: &DistributedReplayAssignment,
2119 session_config: &DistributedReplaySessionConfig,
2120 ) -> CuResult<DistributedReplaySessionBuild> {
2121 build_stress_session(assignment, session_config, 32)
2122 }
2123
2124 fn stress_session_heavy(
2125 assignment: &DistributedReplayAssignment,
2126 session_config: &DistributedReplaySessionConfig,
2127 ) -> CuResult<DistributedReplaySessionBuild> {
2128 build_stress_session(assignment, session_config, 96)
2129 }
2130
2131 fn stress_plan(
2132 instance_count: u32,
2133 session_factory: DistributedReplaySessionFactory,
2134 ) -> DistributedReplayPlan {
2135 let assignments = (1..=instance_count)
2136 .flat_map(|instance_id| {
2137 STRESS_SUBSYSTEMS
2138 .into_iter()
2139 .map(move |(subsystem_id, subsystem_code)| {
2140 fake_assignment(instance_id, subsystem_id, subsystem_code, session_factory)
2141 })
2142 })
2143 .collect();
2144 fake_plan(assignments)
2145 }
2146
2147 fn collect_engine_order(
2148 engine: &mut DistributedReplayEngine,
2149 ) -> CuResult<Vec<DistributedReplayCursor>> {
2150 let mut order = Vec::new();
2151 while let Some(cursor) = engine.step_causal()? {
2152 order.push(cursor);
2153 }
2154 Ok(order)
2155 }
2156
2157 fn assert_stress_order_is_topological(
2158 order: &[DistributedReplayCursor],
2159 instance_count: u32,
2160 cl_count: u64,
2161 ) {
2162 let expected_len = instance_count as usize * STRESS_SUBSYSTEMS.len() * cl_count as usize;
2163 assert_eq!(order.len(), expected_len);
2164
2165 let positions: BTreeMap<_, _> = order
2166 .iter()
2167 .enumerate()
2168 .map(|(idx, cursor)| {
2169 (
2170 (
2171 cursor.instance_id,
2172 cursor.subsystem_id.clone(),
2173 cursor.cl_id,
2174 ),
2175 idx,
2176 )
2177 })
2178 .collect();
2179 assert_eq!(positions.len(), expected_len);
2180
2181 for instance_id in 1..=instance_count {
2182 for (subsystem_id, _) in STRESS_SUBSYSTEMS {
2183 for cl_id in 1..cl_count {
2184 let previous = positions
2185 .get(&(instance_id, subsystem_id.to_string(), cl_id - 1))
2186 .expect("previous local node missing");
2187 let current = positions
2188 .get(&(instance_id, subsystem_id.to_string(), cl_id))
2189 .expect("current local node missing");
2190 assert!(
2191 previous < current,
2192 "local order violated for instance {instance_id} subsystem '{subsystem_id}' cl {cl_id}"
2193 );
2194 }
2195 }
2196
2197 for cl_id in 0..cl_count {
2198 let sense = positions
2199 .get(&(instance_id, "sense".to_string(), cl_id))
2200 .expect("sense node missing");
2201 let plan = positions
2202 .get(&(instance_id, "plan".to_string(), cl_id))
2203 .expect("plan node missing");
2204 let control = positions
2205 .get(&(instance_id, "control".to_string(), cl_id))
2206 .expect("control node missing");
2207 let telemetry = positions
2208 .get(&(instance_id, "telemetry".to_string(), cl_id))
2209 .expect("telemetry node missing");
2210 assert!(sense < plan);
2211 assert!(plan < control);
2212 assert!(sense < telemetry);
2213 assert!(control < telemetry);
2214 }
2215 }
2216 }
2217
2218 #[test]
2219 fn discovers_single_log_identity_from_runtime_lifecycle() -> CuResult<()> {
2220 let temp_dir = TempDir::new()
2221 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2222 let base_path = temp_dir.path().join("logs/ping.copper");
2223 write_runtime_lifecycle_log(&base_path, test_stack("ping", 7, 42), Some("default"))?;
2224
2225 let discovered = DistributedReplayLog::discover(&base_path)?;
2226 assert_eq!(discovered.base_path, base_path);
2227 assert_eq!(discovered.subsystem_id(), Some("ping"));
2228 assert_eq!(discovered.subsystem_code(), 7);
2229 assert_eq!(discovered.instance_id(), 42);
2230 assert_eq!(discovered.mission.as_deref(), Some("default"));
2231 Ok(())
2232 }
2233
2234 #[test]
2235 fn catalog_discovery_normalizes_slab_paths_and_deduplicates_candidates() -> CuResult<()> {
2236 let temp_dir = TempDir::new()
2237 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2238 let base_path = temp_dir.path().join("logs/pong.copper");
2239 let slab_zero_path = temp_dir.path().join("logs/pong_0.copper");
2240 write_runtime_lifecycle_log(&base_path, test_stack("pong", 3, 9), Some("default"))?;
2241
2242 let catalog = DistributedReplayCatalog::discover([base_path.clone(), slab_zero_path])?;
2243 assert!(
2244 catalog.failures.is_empty(),
2245 "unexpected failures: {:?}",
2246 catalog.failures
2247 );
2248 assert_eq!(catalog.logs.len(), 1);
2249 assert_eq!(catalog.logs[0].base_path, base_path);
2250 assert_eq!(catalog.logs[0].subsystem_id(), Some("pong"));
2251 Ok(())
2252 }
2253
2254 #[test]
2255 fn catalog_discovery_walks_directories_using_physical_slab_files() -> CuResult<()> {
2256 let temp_dir = TempDir::new()
2257 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2258 let ping_base = temp_dir.path().join("logs/ping.copper");
2259 let pong_base = temp_dir.path().join("logs/pong.copper");
2260 write_runtime_lifecycle_log(&ping_base, test_stack("ping", 0, 1), Some("alpha"))?;
2261 write_runtime_lifecycle_log(&pong_base, test_stack("pong", 1, 1), Some("alpha"))?;
2262
2263 let catalog = DistributedReplayCatalog::discover_under(temp_dir.path())?;
2264 assert!(
2265 catalog.failures.is_empty(),
2266 "unexpected failures: {:?}",
2267 catalog.failures
2268 );
2269 assert_eq!(catalog.logs.len(), 2);
2270 assert_eq!(catalog.logs[0].subsystem_id(), Some("ping"));
2271 assert_eq!(catalog.logs[1].subsystem_id(), Some("pong"));
2272 assert_eq!(catalog.logs[0].base_path, ping_base);
2273 assert_eq!(catalog.logs[1].base_path, pong_base);
2274 Ok(())
2275 }
2276
2277 #[test]
2278 fn catalog_reports_invalid_logs_without_aborting_scan() -> CuResult<()> {
2279 let temp_dir = TempDir::new()
2280 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2281 let good_base = temp_dir.path().join("logs/good.copper");
2282 write_runtime_lifecycle_log(&good_base, test_stack("good", 2, 5), Some("beta"))?;
2283
2284 let bad_slab = temp_dir.path().join("logs/bad_0.copper");
2285 if let Some(parent) = bad_slab.parent() {
2286 fs::create_dir_all(parent).map_err(|err| {
2287 CuError::new_with_cause(
2288 &format!("Failed to create bad log dir '{}'", parent.display()),
2289 err,
2290 )
2291 })?;
2292 }
2293 fs::write(&bad_slab, b"not a copper log").map_err(|err| {
2294 CuError::new_with_cause(
2295 &format!("Failed to create bad log '{}'", bad_slab.display()),
2296 err,
2297 )
2298 })?;
2299
2300 let catalog = DistributedReplayCatalog::discover_under(temp_dir.path())?;
2301 assert_eq!(catalog.logs.len(), 1);
2302 assert_eq!(catalog.failures.len(), 1);
2303 assert_eq!(catalog.logs[0].subsystem_id(), Some("good"));
2304 assert_eq!(
2305 catalog.failures[0].candidate_path,
2306 temp_dir.path().join("logs/bad.copper")
2307 );
2308 Ok(())
2309 }
2310
2311 #[test]
2312 fn builder_builds_validated_plan_for_selected_instances() -> CuResult<()> {
2313 let temp_dir = TempDir::new()
2314 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2315 let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2316 let logs_root = temp_dir.path().join("logs");
2317
2318 write_runtime_lifecycle_log(
2319 &logs_root.join("instance1_ping.copper"),
2320 test_stack("ping", 0, 1),
2321 Some("default"),
2322 )?;
2323 write_runtime_lifecycle_log(
2324 &logs_root.join("instance1_pong.copper"),
2325 test_stack("pong", 1, 1),
2326 Some("default"),
2327 )?;
2328 write_runtime_lifecycle_log(
2329 &logs_root.join("instance2_ping.copper"),
2330 test_stack("ping", 0, 2),
2331 Some("default"),
2332 )?;
2333 write_runtime_lifecycle_log(
2334 &logs_root.join("instance2_pong.copper"),
2335 test_stack("pong", 1, 2),
2336 Some("default"),
2337 )?;
2338
2339 let plan = DistributedReplayPlan::builder(&multi_config_path)?
2340 .discover_logs_under(&logs_root)?
2341 .register::<PingRegisteredApp>("ping")?
2342 .register::<PongRegisteredApp>("pong")?
2343 .instances([2])
2344 .build()?;
2345
2346 assert_eq!(plan.selected_instances, vec![2]);
2347 assert_eq!(plan.mission.as_deref(), Some("default"));
2348 assert_eq!(plan.assignments.len(), 2);
2349 assert_eq!(
2350 plan.assignment(2, "ping").unwrap().log.base_path,
2351 logs_root.join("instance2_ping.copper")
2352 );
2353 assert_eq!(
2354 plan.assignment(2, "pong").unwrap().log.base_path,
2355 logs_root.join("instance2_pong.copper")
2356 );
2357 Ok(())
2358 }
2359
2360 #[test]
2361 fn register_rejects_subsystem_code_mismatch() -> CuResult<()> {
2362 let temp_dir = TempDir::new()
2363 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2364 let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2365
2366 let err = DistributedReplayPlan::builder(&multi_config_path)?
2367 .register::<PingWrongCodeApp>("ping")
2368 .unwrap_err();
2369 assert!(err.to_string().contains("declares subsystem code 99"));
2370 Ok(())
2371 }
2372
2373 #[test]
2374 fn build_reports_missing_logs_and_missing_registrations() -> CuResult<()> {
2375 let temp_dir = TempDir::new()
2376 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2377 let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2378 let logs_root = temp_dir.path().join("logs");
2379
2380 write_runtime_lifecycle_log(
2381 &logs_root.join("instance1_ping.copper"),
2382 test_stack("ping", 0, 1),
2383 Some("default"),
2384 )?;
2385
2386 let err = DistributedReplayPlan::builder(&multi_config_path)?
2387 .discover_logs_under(&logs_root)?
2388 .register::<PingRegisteredApp>("ping")?
2389 .build()
2390 .unwrap_err();
2391 let err_text = err.to_string();
2392 assert!(err_text.contains("missing app registration for subsystem 'pong'"));
2393 assert!(err_text.contains("missing log for instance 1 subsystem 'pong'"));
2394 Ok(())
2395 }
2396
2397 #[test]
2398 fn build_reports_duplicate_logs_for_one_target() -> CuResult<()> {
2399 let temp_dir = TempDir::new()
2400 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2401 let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2402 let logs_root = temp_dir.path().join("logs");
2403
2404 write_runtime_lifecycle_log(
2405 &logs_root.join("instance1_ping_a.copper"),
2406 test_stack("ping", 0, 1),
2407 Some("default"),
2408 )?;
2409 write_runtime_lifecycle_log(
2410 &logs_root.join("instance1_ping_b.copper"),
2411 test_stack("ping", 0, 1),
2412 Some("default"),
2413 )?;
2414 write_runtime_lifecycle_log(
2415 &logs_root.join("instance1_pong.copper"),
2416 test_stack("pong", 1, 1),
2417 Some("default"),
2418 )?;
2419
2420 let err = DistributedReplayPlan::builder(&multi_config_path)?
2421 .discover_logs_under(&logs_root)?
2422 .register::<PingRegisteredApp>("ping")?
2423 .register::<PongRegisteredApp>("pong")?
2424 .build()
2425 .unwrap_err();
2426 assert!(
2427 err.to_string()
2428 .contains("found 2 logs for instance 1 subsystem 'ping'")
2429 );
2430 Ok(())
2431 }
2432
2433 #[test]
2434 fn build_reports_mission_mismatch_across_selected_logs() -> CuResult<()> {
2435 let temp_dir = TempDir::new()
2436 .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2437 let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2438 let logs_root = temp_dir.path().join("logs");
2439
2440 write_runtime_lifecycle_log(
2441 &logs_root.join("instance1_ping.copper"),
2442 test_stack("ping", 0, 1),
2443 Some("default"),
2444 )?;
2445 write_runtime_lifecycle_log(
2446 &logs_root.join("instance1_pong.copper"),
2447 test_stack("pong", 1, 1),
2448 Some("recovery"),
2449 )?;
2450
2451 let err = DistributedReplayPlan::builder(&multi_config_path)?
2452 .discover_logs_under(&logs_root)?
2453 .register::<PingRegisteredApp>("ping")?
2454 .register::<PongRegisteredApp>("pong")?
2455 .build()
2456 .unwrap_err();
2457 assert!(
2458 err.to_string()
2459 .contains("selected logs disagree on mission: default, recovery")
2460 );
2461 Ok(())
2462 }
2463
2464 #[test]
2465 fn engine_steps_in_stable_causal_order() -> CuResult<()> {
2466 let plan = fake_plan(vec![
2467 fake_assignment(1, "ping", 0, fake_ping_session),
2468 fake_assignment(1, "pong", 1, fake_pong_session),
2469 ]);
2470
2471 let mut engine = plan.start()?;
2472 let mut order = Vec::new();
2473 while let Some(cursor) = engine.step_causal()? {
2474 order.push((cursor.subsystem_id, cursor.cl_id));
2475 }
2476
2477 assert_eq!(
2478 order,
2479 vec![
2480 ("ping".to_string(), 0),
2481 ("ping".to_string(), 1),
2482 ("pong".to_string(), 0),
2483 ("pong".to_string(), 1),
2484 ]
2485 );
2486 assert_eq!(engine.executed_nodes(), 4);
2487 Ok(())
2488 }
2489
2490 #[test]
2491 fn engine_goto_rebuilds_and_replays_to_target() -> CuResult<()> {
2492 let plan = fake_plan(vec![
2493 fake_assignment(1, "ping", 0, fake_ping_session),
2494 fake_assignment(1, "pong", 1, fake_pong_session),
2495 ]);
2496
2497 let mut engine = plan.start()?;
2498 engine.run_all()?;
2499 engine.goto(1, "pong", 0)?;
2500
2501 assert_eq!(engine.executed_nodes(), 3);
2502 let frontier = engine.current_frontier();
2503 assert_eq!(frontier.len(), 2);
2504 assert!(frontier.iter().any(|cursor| {
2505 cursor.instance_id == 1 && cursor.subsystem_id == "ping" && cursor.cl_id == 1
2506 }));
2507 assert!(frontier.iter().any(|cursor| {
2508 cursor.instance_id == 1 && cursor.subsystem_id == "pong" && cursor.cl_id == 0
2509 }));
2510 Ok(())
2511 }
2512
2513 #[test]
2514 fn engine_reports_unresolved_recorded_provenance() -> CuResult<()> {
2515 let plan = fake_plan(vec![
2516 fake_assignment(1, "ping", 0, fake_ping_session),
2517 fake_assignment(1, "pong", 1, fake_bad_pong_session),
2518 ]);
2519
2520 let err = match plan.start() {
2521 Ok(_) => return Err(CuError::from("expected distributed replay startup failure")),
2522 Err(err) => err,
2523 };
2524 assert!(
2525 err.to_string()
2526 .contains("Unresolved recorded provenance edge")
2527 );
2528 Ok(())
2529 }
2530
2531 #[test]
2532 fn engine_run_all_scales_across_many_identical_instances() -> CuResult<()> {
2533 let mut engine = stress_plan(6, stress_session_ci).start()?;
2534 let order = collect_engine_order(&mut engine)?;
2535
2536 assert_stress_order_is_topological(&order, 6, 24);
2537 assert_eq!(engine.executed_nodes(), 6 * STRESS_SUBSYSTEMS.len() * 24);
2538
2539 let frontier = engine.current_frontier();
2540 assert_eq!(frontier.len(), 6 * STRESS_SUBSYSTEMS.len());
2541 for instance_id in 1..=6 {
2542 for (subsystem_id, _) in STRESS_SUBSYSTEMS {
2543 assert!(frontier.iter().any(|cursor| {
2544 cursor.instance_id == instance_id
2545 && cursor.subsystem_id == subsystem_id
2546 && cursor.cl_id == 23
2547 }));
2548 }
2549 }
2550 Ok(())
2551 }
2552
2553 #[test]
2554 fn engine_goto_matches_manual_replay_on_large_graph() -> CuResult<()> {
2555 let plan = stress_plan(5, stress_session_goto);
2556 let mut manual = plan.clone().start()?;
2557
2558 let (expected_steps, expected_frontier) = {
2559 let mut expected_steps = 0usize;
2560 loop {
2561 let Some(cursor) = manual.step_causal()? else {
2562 return Err(CuError::from(
2563 "manual distributed replay exhausted before reaching stress target",
2564 ));
2565 };
2566 expected_steps += 1;
2567 if cursor.instance_id == 4 && cursor.subsystem_id == "control" && cursor.cl_id == 17
2568 {
2569 break (expected_steps, manual.current_frontier());
2570 }
2571 }
2572 };
2573
2574 let mut via_goto = plan.start()?;
2575 via_goto.goto(4, "control", 17)?;
2576
2577 assert_eq!(via_goto.executed_nodes(), expected_steps);
2578 assert_eq!(via_goto.current_frontier(), expected_frontier);
2579 Ok(())
2580 }
2581
2582 #[test]
2583 #[ignore = "stress"]
2584 fn engine_heavy_stress_run_all_completes() -> CuResult<()> {
2585 let mut engine = stress_plan(12, stress_session_heavy).start()?;
2586 engine.run_all()?;
2587
2588 let expected = 12 * STRESS_SUBSYSTEMS.len() * 96;
2589 assert_eq!(engine.executed_nodes(), expected);
2590 assert_eq!(
2591 engine.current_frontier().len(),
2592 12 * STRESS_SUBSYSTEMS.len()
2593 );
2594 Ok(())
2595 }
2596}