Skip to main content

cu29_runtime/
distributed_replay.rs

1//! Discovery, validation, planning, and causal execution helpers for
2//! distributed deterministic replay.
3//!
4//! The distributed replay flow is:
5//! - discover Copper logs and recover runtime identity from lifecycle metadata
6//! - validate those logs against a strict multi-Copper topology
7//! - register the generated replayable app type for each subsystem
8//! - build one replay session per `(instance_id, subsystem_id)` assignment
9//! - stitch sessions together through recorded message provenance
10//! - replay the fleet in a stable causal order
11
12use crate::app::{
13    CuDistributedReplayApplication, CuRecordedReplayApplication, CuSimApplication, Subsystem,
14};
15use crate::config::{MultiCopperConfig, read_configuration_str, read_multi_configuration};
16use crate::copperlist::CopperList;
17use crate::curuntime::{
18    KeyFrame, RuntimeLifecycleConfigSource, RuntimeLifecycleEvent, RuntimeLifecycleRecord,
19    RuntimeLifecycleStackInfo,
20};
21use crate::debug::{
22    SectionIndexEntry, build_read_logger, decode_copperlists, index_log, read_section_at,
23};
24use crate::simulation::recorded_copperlist_timestamp;
25use bincode::config::standard;
26use bincode::decode_from_std_read;
27use bincode::error::DecodeError;
28use cu29_clock::{RobotClock, RobotClockMock};
29use cu29_traits::{CopperListTuple, CuError, CuResult, ErasedCuStampedDataSet, UnifiedLogType};
30use cu29_unifiedlog::memmap::MmapSectionStorage;
31use cu29_unifiedlog::{
32    NoopLogger, NoopSectionStorage, SectionStorage, UnifiedLogWrite, UnifiedLogger,
33    UnifiedLoggerBuilder, UnifiedLoggerIOReader, UnifiedLoggerRead, UnifiedLoggerWrite,
34};
35use std::any::type_name;
36use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
37use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
38use std::fs;
39use std::io::Read;
40use std::path::{Path, PathBuf};
41use std::sync::{Arc, Mutex};
42
43/// One discovered Copper log that can participate in distributed replay.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct DistributedReplayLog {
46    pub base_path: PathBuf,
47    pub stack: RuntimeLifecycleStackInfo,
48    pub config_source: RuntimeLifecycleConfigSource,
49    pub effective_config_ron: String,
50    pub mission: Option<String>,
51}
52
53impl DistributedReplayLog {
54    /// Discover a single Copper log from either its base path (`foo.copper`) or
55    /// one of its slab paths (`foo_0.copper`, `foo_1.copper`, ...).
56    pub fn discover(path: impl AsRef<Path>) -> CuResult<Self> {
57        let requested_path = path.as_ref();
58        let normalized_path = normalize_candidate_log_base(requested_path);
59        match Self::discover_from_base_path(requested_path) {
60            Ok(log) => Ok(log),
61            Err(_) if normalized_path != requested_path => {
62                Self::discover_from_base_path(&normalized_path)
63            }
64            Err(err) => Err(err),
65        }
66    }
67
68    fn discover_from_base_path(base_path: &Path) -> CuResult<Self> {
69        let UnifiedLogger::Read(read_logger) = UnifiedLoggerBuilder::new()
70            .file_base_name(base_path)
71            .build()
72            .map_err(|err| {
73                CuError::new_with_cause(
74                    &format!(
75                        "Failed to open Copper log '{}' for distributed replay discovery",
76                        base_path.display()
77                    ),
78                    err,
79                )
80            })?
81        else {
82            return Err(CuError::from(
83                "Expected a readable unified logger during distributed replay discovery",
84            ));
85        };
86
87        let mut reader = UnifiedLoggerIOReader::new(read_logger, UnifiedLogType::RuntimeLifecycle);
88        let mut instantiated: Option<(
89            RuntimeLifecycleConfigSource,
90            String,
91            RuntimeLifecycleStackInfo,
92        )> = None;
93        let mut mission = None;
94
95        while let Some(record) =
96            read_next_entry::<RuntimeLifecycleRecord>(&mut reader).map_err(|err| {
97                CuError::from(format!(
98                    "Failed to decode runtime lifecycle for '{}': {err}",
99                    base_path.display()
100                ))
101            })?
102        {
103            match record.event {
104                RuntimeLifecycleEvent::Instantiated {
105                    config_source,
106                    effective_config_ron,
107                    stack,
108                } if instantiated.is_none() => {
109                    instantiated = Some((config_source, effective_config_ron, stack));
110                }
111                RuntimeLifecycleEvent::MissionStarted {
112                    mission: started_mission,
113                } if mission.is_none() => {
114                    mission = Some(started_mission);
115                }
116                _ => {}
117            }
118
119            if instantiated.is_some() && mission.is_some() {
120                break;
121            }
122        }
123
124        let Some((config_source, effective_config_ron, stack)) = instantiated else {
125            return Err(CuError::from(format!(
126                "Copper log '{}' has no RuntimeLifecycle::Instantiated record",
127                base_path.display()
128            )));
129        };
130
131        Ok(Self {
132            base_path: base_path.to_path_buf(),
133            stack,
134            config_source,
135            effective_config_ron,
136            mission,
137        })
138    }
139
140    #[inline]
141    pub fn instance_id(&self) -> u32 {
142        self.stack.instance_id
143    }
144
145    #[inline]
146    pub fn subsystem_code(&self) -> u16 {
147        self.stack.subsystem_code
148    }
149
150    #[inline]
151    pub fn subsystem_id(&self) -> Option<&str> {
152        self.stack.subsystem_id.as_deref()
153    }
154}
155
156/// Discovery error recorded for one log candidate.
157#[derive(Debug, Clone)]
158pub struct DistributedReplayDiscoveryFailure {
159    pub candidate_path: PathBuf,
160    pub error: String,
161}
162
163impl Display for DistributedReplayDiscoveryFailure {
164    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
165        write!(
166            f,
167            "{}: {}",
168            self.candidate_path.display(),
169            self.error.as_str()
170        )
171    }
172}
173
174/// Result of scanning one or more paths for distributed replay logs.
175#[derive(Debug, Clone, Default)]
176pub struct DistributedReplayCatalog {
177    pub logs: Vec<DistributedReplayLog>,
178    pub failures: Vec<DistributedReplayDiscoveryFailure>,
179}
180
181impl DistributedReplayCatalog {
182    /// Discover logs from a list of files and/or directories.
183    ///
184    /// Directories are traversed recursively. Any physical slab file
185    /// (`*_0.copper`, `*_1.copper`, ...) is normalized back to its base log path.
186    pub fn discover<I, P>(inputs: I) -> CuResult<Self>
187    where
188        I: IntoIterator<Item = P>,
189        P: AsRef<Path>,
190    {
191        let mut candidates = BTreeSet::new();
192        for input in inputs {
193            collect_candidate_base_paths(input.as_ref(), &mut candidates)?;
194        }
195
196        let mut logs = Vec::new();
197        let mut failures = Vec::new();
198
199        for candidate in candidates {
200            match DistributedReplayLog::discover(&candidate) {
201                Ok(log) => logs.push(log),
202                Err(err) => failures.push(DistributedReplayDiscoveryFailure {
203                    candidate_path: candidate,
204                    error: err.to_string(),
205                }),
206            }
207        }
208
209        logs.sort_by(|left, right| {
210            (
211                left.instance_id(),
212                left.subsystem_code(),
213                left.subsystem_id(),
214                left.base_path.as_os_str(),
215            )
216                .cmp(&(
217                    right.instance_id(),
218                    right.subsystem_code(),
219                    right.subsystem_id(),
220                    right.base_path.as_os_str(),
221                ))
222        });
223        failures.sort_by(|left, right| left.candidate_path.cmp(&right.candidate_path));
224
225        Ok(Self { logs, failures })
226    }
227
228    /// Convenience wrapper for recursive discovery rooted at one directory.
229    pub fn discover_under(root: impl AsRef<Path>) -> CuResult<Self> {
230        Self::discover([root])
231    }
232}
233
234type DistributedReplaySessionFactory = fn(
235    &DistributedReplayAssignment,
236    &DistributedReplaySessionConfig,
237) -> CuResult<DistributedReplaySessionBuild>;
238
239const DEFAULT_SECTION_CACHE_CAP: usize = 8;
240const DEFAULT_REPLAY_LOG_SIZE_BYTES: usize = 64 * 1024 * 1024;
241
242#[derive(Debug, Clone, Default)]
243struct DistributedReplaySessionConfig {
244    output_root: Option<PathBuf>,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
248struct DistributedReplayOriginKey {
249    instance_id: u32,
250    subsystem_code: u16,
251    cl_id: u64,
252}
253
254#[derive(Debug, Clone, PartialEq, Eq, Hash)]
255pub struct DistributedReplayCursor {
256    pub instance_id: u32,
257    pub subsystem_id: String,
258    pub cl_id: u64,
259    subsystem_code: u16,
260}
261
262impl DistributedReplayCursor {
263    #[inline]
264    fn new(instance_id: u32, subsystem_id: String, subsystem_code: u16, cl_id: u64) -> Self {
265        Self {
266            instance_id,
267            subsystem_id,
268            cl_id,
269            subsystem_code,
270        }
271    }
272
273    #[inline]
274    pub fn subsystem_code(&self) -> u16 {
275        self.subsystem_code
276    }
277}
278
279#[derive(Debug, Clone)]
280struct DistributedReplayNodeDescriptor {
281    cursor: DistributedReplayCursor,
282    origin_key: DistributedReplayOriginKey,
283    incoming_origins: BTreeSet<DistributedReplayOriginKey>,
284}
285
286#[derive(Debug, Clone)]
287struct DistributedReplayGraphNode {
288    cursor: DistributedReplayCursor,
289    session_index: usize,
290    outgoing: Vec<usize>,
291    initial_dependencies: usize,
292    remaining_dependencies: usize,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
296struct DistributedReplayReadyNode {
297    instance_id: u32,
298    subsystem_code: u16,
299    cl_id: u64,
300    node_index: usize,
301}
302
303struct DistributedReplaySessionBuild {
304    session: Box<dyn DistributedReplaySession>,
305    nodes: Vec<DistributedReplayNodeDescriptor>,
306    output_log_path: Option<PathBuf>,
307}
308
309trait DistributedReplaySession {
310    fn goto_cl(&mut self, cl_id: u64) -> CuResult<()>;
311    fn shutdown(&mut self) -> CuResult<()>;
312}
313
314#[derive(Debug, Clone)]
315struct RecordedReplayCachedSection<P: CopperListTuple> {
316    entries: Vec<Arc<CopperList<P>>>,
317}
318
319struct RecordedReplaySession<App, P, S, L>
320where
321    App: CuDistributedReplayApplication<S, L>,
322    P: CopperListTuple,
323    S: SectionStorage,
324    L: UnifiedLogWrite<S> + 'static,
325{
326    assignment: DistributedReplayAssignment,
327    app: App,
328    clock_mock: RobotClockMock,
329    log_reader: UnifiedLoggerRead,
330    sections: Vec<SectionIndexEntry>,
331    total_entries: usize,
332    keyframes: Vec<KeyFrame>,
333    started: bool,
334    current_idx: Option<usize>,
335    last_keyframe: Option<u64>,
336    cache: HashMap<usize, RecordedReplayCachedSection<P>>,
337    cache_order: VecDeque<usize>,
338    cache_cap: usize,
339    phantom: std::marker::PhantomData<(S, L)>,
340}
341
342impl<App, P, S, L> RecordedReplaySession<App, P, S, L>
343where
344    App: CuDistributedReplayApplication<S, L>
345        + CuRecordedReplayApplication<S, L, RecordedDataSet = P>,
346    P: CopperListTuple,
347    S: SectionStorage,
348    L: UnifiedLogWrite<S> + 'static,
349{
350    fn from_log(
351        assignment: DistributedReplayAssignment,
352        app: App,
353        clock_mock: RobotClockMock,
354        log_base: &Path,
355    ) -> CuResult<Self> {
356        let (sections, keyframes, total_entries) =
357            index_log::<P, _>(log_base, &recorded_copperlist_timestamp::<P>)?;
358        let log_reader = build_read_logger(log_base)?;
359        Ok(Self {
360            assignment,
361            app,
362            clock_mock,
363            log_reader,
364            sections,
365            total_entries,
366            keyframes,
367            started: false,
368            current_idx: None,
369            last_keyframe: None,
370            cache: HashMap::new(),
371            cache_order: VecDeque::new(),
372            cache_cap: DEFAULT_SECTION_CACHE_CAP,
373            phantom: std::marker::PhantomData,
374        })
375    }
376
377    fn describe_nodes(&mut self) -> CuResult<Vec<DistributedReplayNodeDescriptor>> {
378        let mut nodes = Vec::with_capacity(self.total_entries);
379        for idx in 0..self.total_entries {
380            let (copperlist, _) = self.copperlist_at(idx)?;
381            let cursor = DistributedReplayCursor::new(
382                self.assignment.instance_id,
383                self.assignment.subsystem_id.clone(),
384                self.assignment.log.subsystem_code(),
385                copperlist.id,
386            );
387            nodes.push(DistributedReplayNodeDescriptor {
388                origin_key: DistributedReplayOriginKey {
389                    instance_id: cursor.instance_id,
390                    subsystem_code: cursor.subsystem_code(),
391                    cl_id: cursor.cl_id,
392                },
393                incoming_origins: copperlist_origins(copperlist.as_ref()),
394                cursor,
395            });
396        }
397        Ok(nodes)
398    }
399
400    fn ensure_started(&mut self) -> CuResult<()> {
401        if self.started {
402            return Ok(());
403        }
404        let mut noop = |_step: App::Step<'_>| crate::simulation::SimOverride::ExecuteByRuntime;
405        <App as CuSimApplication<S, L>>::start_all_tasks(&mut self.app, &mut noop)?;
406        self.started = true;
407        Ok(())
408    }
409
410    fn nearest_keyframe(&self, target_cl_id: u64) -> Option<KeyFrame> {
411        self.keyframes
412            .iter()
413            .filter(|keyframe| keyframe.culistid <= target_cl_id)
414            .max_by_key(|keyframe| keyframe.culistid)
415            .cloned()
416    }
417
418    fn restore_keyframe(&mut self, keyframe: &KeyFrame) -> CuResult<()> {
419        <App as CuSimApplication<S, L>>::restore_keyframe(&mut self.app, keyframe)?;
420        self.clock_mock.set_value(keyframe.timestamp.as_nanos());
421        self.last_keyframe = Some(keyframe.culistid);
422        Ok(())
423    }
424
425    fn find_section_for_index(&self, idx: usize) -> Option<usize> {
426        self.sections
427            .binary_search_by(|section| {
428                if idx < section.start_idx {
429                    std::cmp::Ordering::Greater
430                } else if idx >= section.start_idx + section.len {
431                    std::cmp::Ordering::Less
432                } else {
433                    std::cmp::Ordering::Equal
434                }
435            })
436            .ok()
437    }
438
439    fn find_section_for_cl_id(&self, cl_id: u64) -> Option<usize> {
440        self.sections
441            .binary_search_by(|section| {
442                if cl_id < section.first_id {
443                    std::cmp::Ordering::Greater
444                } else if cl_id > section.last_id {
445                    std::cmp::Ordering::Less
446                } else {
447                    std::cmp::Ordering::Equal
448                }
449            })
450            .ok()
451    }
452
453    fn touch_cache(&mut self, key: usize) {
454        if let Some(position) = self.cache_order.iter().position(|entry| *entry == key) {
455            self.cache_order.remove(position);
456        }
457        self.cache_order.push_back(key);
458        while self.cache_order.len() > self.cache_cap {
459            if let Some(oldest) = self.cache_order.pop_front() {
460                self.cache.remove(&oldest);
461            }
462        }
463    }
464
465    fn load_section(&mut self, section_idx: usize) -> CuResult<&RecordedReplayCachedSection<P>> {
466        if self.cache.contains_key(&section_idx) {
467            self.touch_cache(section_idx);
468            return Ok(self.cache.get(&section_idx).expect("cache entry exists"));
469        }
470
471        let entry = &self.sections[section_idx];
472        let (header, data) = read_section_at(&mut self.log_reader, entry.pos)?;
473        if header.entry_type != UnifiedLogType::CopperList {
474            return Err(CuError::from(
475                "Section type mismatch while loading distributed replay copperlists",
476            ));
477        }
478        let (entries, _) = decode_copperlists::<P, _>(&data, &recorded_copperlist_timestamp::<P>)?;
479        self.cache
480            .insert(section_idx, RecordedReplayCachedSection { entries });
481        self.touch_cache(section_idx);
482        Ok(self.cache.get(&section_idx).expect("cache entry exists"))
483    }
484
485    fn copperlist_at(&mut self, idx: usize) -> CuResult<(Arc<CopperList<P>>, Option<KeyFrame>)> {
486        let section_idx = self
487            .find_section_for_index(idx)
488            .ok_or_else(|| CuError::from("Distributed replay index is outside the log"))?;
489        let start_idx = self.sections[section_idx].start_idx;
490        let section = self.load_section(section_idx)?;
491        let local_idx = idx - start_idx;
492        let copperlist = section
493            .entries
494            .get(local_idx)
495            .ok_or_else(|| CuError::from("Corrupt distributed replay section index"))?
496            .clone();
497        let keyframe = self
498            .keyframes
499            .iter()
500            .find(|keyframe| keyframe.culistid == copperlist.id)
501            .cloned();
502        Ok((copperlist, keyframe))
503    }
504
505    fn index_for_cl_id(&mut self, cl_id: u64) -> CuResult<usize> {
506        let section_idx = self
507            .find_section_for_cl_id(cl_id)
508            .ok_or_else(|| CuError::from("Requested CopperList id is not present in the log"))?;
509        let start_idx = self.sections[section_idx].start_idx;
510        let section = self.load_section(section_idx)?;
511        for (offset, copperlist) in section.entries.iter().enumerate() {
512            if copperlist.id == cl_id {
513                return Ok(start_idx + offset);
514            }
515        }
516        Err(CuError::from(
517            "Requested CopperList id is missing from its indexed log section",
518        ))
519    }
520
521    fn replay_range(
522        &mut self,
523        start_idx: usize,
524        end_idx: usize,
525        replay_keyframe: Option<&KeyFrame>,
526    ) -> CuResult<()> {
527        for idx in start_idx..=end_idx {
528            let (copperlist, keyframe) = self.copperlist_at(idx)?;
529            let keyframe = replay_keyframe
530                .filter(|candidate| candidate.culistid == copperlist.id)
531                .or(keyframe
532                    .as_ref()
533                    .filter(|candidate| candidate.culistid == copperlist.id));
534            <App as CuRecordedReplayApplication<S, L>>::replay_recorded_copperlist(
535                &mut self.app,
536                &self.clock_mock,
537                copperlist.as_ref(),
538                keyframe,
539            )?;
540            self.current_idx = Some(idx);
541        }
542        Ok(())
543    }
544
545    fn goto_index(&mut self, target_idx: usize) -> CuResult<()> {
546        self.ensure_started()?;
547        if target_idx >= self.total_entries {
548            return Err(CuError::from(
549                "Distributed replay target is outside the log",
550            ));
551        }
552
553        let (target_copperlist, _) = self.copperlist_at(target_idx)?;
554        let target_cl_id = target_copperlist.id;
555
556        let replay_start_idx;
557        let replay_keyframe;
558
559        if let Some(current_idx) = self.current_idx {
560            if current_idx == target_idx {
561                return Ok(());
562            }
563
564            if target_idx > current_idx {
565                replay_start_idx = current_idx + 1;
566                replay_keyframe = None;
567            } else {
568                let keyframe = self.nearest_keyframe(target_cl_id).ok_or_else(|| {
569                    CuError::from("No keyframe is available to rewind distributed replay")
570                })?;
571                self.restore_keyframe(&keyframe)?;
572                replay_start_idx = self.index_for_cl_id(keyframe.culistid)?;
573                replay_keyframe = Some(keyframe);
574            }
575        } else {
576            let keyframe = self.nearest_keyframe(target_cl_id).ok_or_else(|| {
577                CuError::from("No keyframe is available to initialize distributed replay")
578            })?;
579            self.restore_keyframe(&keyframe)?;
580            replay_start_idx = self.index_for_cl_id(keyframe.culistid)?;
581            replay_keyframe = Some(keyframe);
582        }
583
584        self.replay_range(replay_start_idx, target_idx, replay_keyframe.as_ref())
585    }
586}
587
588impl<App, P, S, L> DistributedReplaySession for RecordedReplaySession<App, P, S, L>
589where
590    App: CuDistributedReplayApplication<S, L>
591        + CuRecordedReplayApplication<S, L, RecordedDataSet = P>,
592    P: CopperListTuple + 'static,
593    S: SectionStorage,
594    L: UnifiedLogWrite<S> + 'static,
595{
596    fn goto_cl(&mut self, cl_id: u64) -> CuResult<()> {
597        let target_idx = self.index_for_cl_id(cl_id)?;
598        self.goto_index(target_idx)
599    }
600
601    fn shutdown(&mut self) -> CuResult<()> {
602        if !self.started {
603            return Ok(());
604        }
605
606        let mut noop = |_step: App::Step<'_>| crate::simulation::SimOverride::ExecuteByRuntime;
607        <App as CuSimApplication<S, L>>::stop_all_tasks(&mut self.app, &mut noop)?;
608        self.started = false;
609        Ok(())
610    }
611}
612
613/// One typed subsystem registration provided to the distributed replay builder.
614#[derive(Clone)]
615pub struct DistributedReplayAppRegistration {
616    pub subsystem: Subsystem,
617    pub app_type_name: &'static str,
618    session_factory: DistributedReplaySessionFactory,
619}
620
621impl Debug for DistributedReplayAppRegistration {
622    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
623        f.debug_struct("DistributedReplayAppRegistration")
624            .field("subsystem", &self.subsystem)
625            .field("app_type_name", &self.app_type_name)
626            .finish()
627    }
628}
629
630impl PartialEq for DistributedReplayAppRegistration {
631    fn eq(&self, other: &Self) -> bool {
632        self.subsystem == other.subsystem && self.app_type_name == other.app_type_name
633    }
634}
635
636impl Eq for DistributedReplayAppRegistration {}
637
638/// One validated log assignment for a subsystem instance.
639#[derive(Debug, Clone, PartialEq, Eq)]
640pub struct DistributedReplayAssignment {
641    pub instance_id: u32,
642    pub subsystem_id: String,
643    pub log: DistributedReplayLog,
644    pub registration: DistributedReplayAppRegistration,
645}
646
647/// Validated replay plan produced by [`DistributedReplayBuilder`].
648#[derive(Debug, Clone)]
649pub struct DistributedReplayPlan {
650    pub multi_config_path: PathBuf,
651    pub multi_config: MultiCopperConfig,
652    pub catalog: DistributedReplayCatalog,
653    pub selected_instances: Vec<u32>,
654    pub mission: Option<String>,
655    pub registrations: Vec<DistributedReplayAppRegistration>,
656    pub assignments: Vec<DistributedReplayAssignment>,
657}
658
659impl DistributedReplayPlan {
660    #[inline]
661    pub fn builder(multi_config_path: impl AsRef<Path>) -> CuResult<DistributedReplayBuilder> {
662        DistributedReplayBuilder::new(multi_config_path)
663    }
664
665    #[inline]
666    pub fn assignment(
667        &self,
668        instance_id: u32,
669        subsystem_id: &str,
670    ) -> Option<&DistributedReplayAssignment> {
671        self.assignments.iter().find(|assignment| {
672            assignment.instance_id == instance_id && assignment.subsystem_id == subsystem_id
673        })
674    }
675
676    /// Build a causal distributed replay engine from this validated plan.
677    pub fn start(self) -> CuResult<DistributedReplayEngine> {
678        DistributedReplayEngine::new(self, DistributedReplaySessionConfig::default())
679    }
680
681    /// Build a causal distributed replay engine and persist replayed logs under `output_root`.
682    pub fn start_recording_logs_under(
683        self,
684        output_root: impl AsRef<Path>,
685    ) -> CuResult<DistributedReplayEngine> {
686        DistributedReplayEngine::new(
687            self,
688            DistributedReplaySessionConfig {
689                output_root: Some(output_root.as_ref().to_path_buf()),
690            },
691        )
692    }
693}
694
695/// Aggregated validation diagnostics emitted while constructing a distributed replay plan.
696#[derive(Debug, Clone, Default)]
697pub struct DistributedReplayValidationError {
698    pub issues: Vec<String>,
699}
700
701impl DistributedReplayValidationError {
702    fn push(&mut self, issue: impl Into<String>) {
703        self.issues.push(issue.into());
704    }
705
706    fn is_empty(&self) -> bool {
707        self.issues.is_empty()
708    }
709}
710
711impl Display for DistributedReplayValidationError {
712    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
713        writeln!(f, "Distributed replay validation failed:")?;
714        for issue in &self.issues {
715            writeln!(f, " - {issue}")?;
716        }
717        Ok(())
718    }
719}
720
721/// Builder for a validated distributed replay plan.
722#[derive(Debug, Clone)]
723pub struct DistributedReplayBuilder {
724    multi_config_path: PathBuf,
725    multi_config: MultiCopperConfig,
726    discovery_inputs: Vec<PathBuf>,
727    catalog: Option<DistributedReplayCatalog>,
728    registrations: BTreeMap<String, DistributedReplayAppRegistration>,
729    selected_instances: Option<BTreeSet<u32>>,
730}
731
732impl DistributedReplayBuilder {
733    /// Load a strict multi-Copper config and start building a distributed replay plan.
734    pub fn new(multi_config_path: impl AsRef<Path>) -> CuResult<Self> {
735        let multi_config_path = multi_config_path.as_ref().to_path_buf();
736        let multi_config = read_multi_configuration(&multi_config_path.to_string_lossy())?;
737        Ok(Self {
738            multi_config_path,
739            multi_config,
740            discovery_inputs: Vec::new(),
741            catalog: None,
742            registrations: BTreeMap::new(),
743            selected_instances: None,
744        })
745    }
746
747    /// Replace the discovered catalog explicitly.
748    pub fn with_catalog(mut self, catalog: DistributedReplayCatalog) -> Self {
749        self.catalog = Some(catalog);
750        self
751    }
752
753    /// Discover logs from files and/or directories.
754    ///
755    /// Directories are walked recursively by [`DistributedReplayCatalog`].
756    pub fn discover_logs<I, P>(mut self, inputs: I) -> CuResult<Self>
757    where
758        I: IntoIterator<Item = P>,
759        P: AsRef<Path>,
760    {
761        self.discovery_inputs
762            .extend(inputs.into_iter().map(|path| path.as_ref().to_path_buf()));
763        self.catalog = Some(DistributedReplayCatalog::discover(
764            self.discovery_inputs.iter().collect::<Vec<_>>(),
765        )?);
766        Ok(self)
767    }
768
769    /// Convenience wrapper for recursive discovery under one root directory.
770    pub fn discover_logs_under(self, root: impl AsRef<Path>) -> CuResult<Self> {
771        self.discover_logs([root.as_ref().to_path_buf()])
772    }
773
774    /// Restrict plan construction to a subset of instance ids.
775    pub fn instances<I>(mut self, instances: I) -> Self
776    where
777        I: IntoIterator<Item = u32>,
778    {
779        self.selected_instances = Some(instances.into_iter().collect());
780        self
781    }
782
783    /// Register the generated app type expected for one subsystem.
784    pub fn register<App>(mut self, subsystem_id: &str) -> CuResult<Self>
785    where
786        App: CuDistributedReplayApplication<NoopSectionStorage, NoopLogger>
787            + CuDistributedReplayApplication<MmapSectionStorage, UnifiedLoggerWrite>
788            + 'static,
789    {
790        if self.registrations.contains_key(subsystem_id) {
791            return Err(CuError::from(format!(
792                "Subsystem '{}' is already registered for distributed replay",
793                subsystem_id
794            )));
795        }
796
797        let expected_subsystem = self.multi_config.subsystem(subsystem_id).ok_or_else(|| {
798            CuError::from(format!(
799                "Multi-Copper config '{}' does not define subsystem '{}'",
800                self.multi_config_path.display(),
801                subsystem_id
802            ))
803        })?;
804
805        let registered_subsystem = App::subsystem();
806        let Some(registered_subsystem_id) = registered_subsystem.id() else {
807            return Err(CuError::from(format!(
808                "App type '{}' was not generated for a multi-Copper subsystem and cannot be registered for distributed replay",
809                type_name::<App>()
810            )));
811        };
812
813        if registered_subsystem_id != subsystem_id {
814            return Err(CuError::from(format!(
815                "App type '{}' declares subsystem '{}' but was registered as '{}'",
816                type_name::<App>(),
817                registered_subsystem_id,
818                subsystem_id
819            )));
820        }
821
822        let registered_subsystem_code = registered_subsystem.code();
823        if registered_subsystem_code != expected_subsystem.subsystem_code {
824            return Err(CuError::from(format!(
825                "App type '{}' declares subsystem code {} for '{}' but multi-Copper config '{}' expects {}",
826                type_name::<App>(),
827                registered_subsystem_code,
828                subsystem_id,
829                self.multi_config_path.display(),
830                expected_subsystem.subsystem_code
831            )));
832        }
833
834        self.registrations.insert(
835            subsystem_id.to_string(),
836            DistributedReplayAppRegistration {
837                subsystem: registered_subsystem,
838                app_type_name: type_name::<App>(),
839                session_factory: build_distributed_replay_session::<App>,
840            },
841        );
842        Ok(self)
843    }
844
845    /// Validate discovery + registrations and prepare a typed replay plan.
846    pub fn build(self) -> CuResult<DistributedReplayPlan> {
847        let catalog = match self.catalog {
848            Some(catalog) => catalog,
849            None if self.discovery_inputs.is_empty() => DistributedReplayCatalog::default(),
850            None => DistributedReplayCatalog::discover(
851                self.discovery_inputs.iter().collect::<Vec<_>>(),
852            )?,
853        };
854
855        let mut validation = DistributedReplayValidationError::default();
856
857        for failure in &catalog.failures {
858            validation.push(format!(
859                "discovery failure for '{}': {}",
860                failure.candidate_path.display(),
861                failure.error
862            ));
863        }
864
865        let subsystem_map: BTreeMap<_, _> = self
866            .multi_config
867            .subsystems
868            .iter()
869            .map(|subsystem| (subsystem.id.clone(), subsystem))
870            .collect();
871
872        for subsystem in subsystem_map.keys() {
873            if !self.registrations.contains_key(subsystem) {
874                validation.push(format!(
875                    "missing app registration for subsystem '{}'",
876                    subsystem
877                ));
878            }
879        }
880
881        let mut discovered_instances = BTreeSet::new();
882        let mut logs_by_target: BTreeMap<(u32, String), Vec<DistributedReplayLog>> =
883            BTreeMap::new();
884
885        for log in &catalog.logs {
886            let Some(subsystem_id) = log.subsystem_id() else {
887                validation.push(format!(
888                    "discovered log '{}' is missing subsystem_id runtime metadata",
889                    log.base_path.display()
890                ));
891                continue;
892            };
893
894            let Some(expected_subsystem) = subsystem_map.get(subsystem_id) else {
895                validation.push(format!(
896                    "discovered log '{}' belongs to subsystem '{}' which is not present in multi-Copper config '{}'",
897                    log.base_path.display(),
898                    subsystem_id,
899                    self.multi_config_path.display()
900                ));
901                continue;
902            };
903
904            if log.subsystem_code() != expected_subsystem.subsystem_code {
905                validation.push(format!(
906                    "discovered log '{}' reports subsystem code {} for '{}' but multi-Copper config '{}' expects {}",
907                    log.base_path.display(),
908                    log.subsystem_code(),
909                    subsystem_id,
910                    self.multi_config_path.display(),
911                    expected_subsystem.subsystem_code
912                ));
913            }
914
915            discovered_instances.insert(log.instance_id());
916            logs_by_target
917                .entry((log.instance_id(), subsystem_id.to_string()))
918                .or_default()
919                .push(log.clone());
920        }
921
922        for ((instance_id, subsystem_id), logs) in &logs_by_target {
923            if logs.len() > 1 {
924                validation.push(format!(
925                    "found {} logs for instance {} subsystem '{}': {}",
926                    logs.len(),
927                    instance_id,
928                    subsystem_id,
929                    join_log_paths(logs)
930                ));
931            }
932        }
933
934        let selected_instances: Vec<u32> =
935            if let Some(selected_instances) = &self.selected_instances {
936                let mut selected_instances: Vec<_> = selected_instances.iter().copied().collect();
937                selected_instances.sort_unstable();
938                for instance_id in &selected_instances {
939                    if !discovered_instances.contains(instance_id) {
940                        validation.push(format!(
941                            "selected instance {} has no discovered logs",
942                            instance_id
943                        ));
944                    }
945                }
946                selected_instances
947            } else {
948                discovered_instances.iter().copied().collect()
949            };
950
951        if selected_instances.is_empty() {
952            validation.push("no instances selected for distributed replay");
953        }
954
955        for instance_id in &selected_instances {
956            for subsystem in &self.multi_config.subsystems {
957                if !logs_by_target.contains_key(&(*instance_id, subsystem.id.clone())) {
958                    validation.push(format!(
959                        "missing log for instance {} subsystem '{}'",
960                        instance_id, subsystem.id
961                    ));
962                }
963            }
964        }
965
966        let mut known_missions = BTreeSet::new();
967        for instance_id in &selected_instances {
968            for subsystem in &self.multi_config.subsystems {
969                if let Some(logs) = logs_by_target.get(&(*instance_id, subsystem.id.clone()))
970                    && let Some(log) = logs.first()
971                    && let Some(mission) = &log.mission
972                {
973                    known_missions.insert(mission.clone());
974                }
975            }
976        }
977        if known_missions.len() > 1 {
978            validation.push(format!(
979                "selected logs disagree on mission: {}",
980                known_missions.into_iter().collect::<Vec<_>>().join(", ")
981            ));
982        }
983
984        if !validation.is_empty() {
985            return Err(CuError::from(validation.to_string()));
986        }
987
988        let mission = selected_instances
989            .iter()
990            .flat_map(|instance_id| {
991                self.multi_config.subsystems.iter().filter_map(|subsystem| {
992                    logs_by_target
993                        .get(&(*instance_id, subsystem.id.clone()))
994                        .and_then(|logs| logs.first())
995                        .and_then(|log| log.mission.clone())
996                })
997            })
998            .next();
999
1000        let mut registrations: Vec<_> = self.registrations.into_values().collect();
1001        registrations.sort_by(|left, right| left.subsystem.id().cmp(&right.subsystem.id()));
1002
1003        let mut assignments = Vec::new();
1004        for instance_id in &selected_instances {
1005            for subsystem in &self.multi_config.subsystems {
1006                let log = logs_by_target
1007                    .get(&(*instance_id, subsystem.id.clone()))
1008                    .and_then(|logs| logs.first())
1009                    .expect("validated distributed replay plan is missing a log")
1010                    .clone();
1011                let registration = registrations
1012                    .iter()
1013                    .find(|registration| registration.subsystem.id() == Some(subsystem.id.as_str()))
1014                    .expect("validated distributed replay plan is missing a registration")
1015                    .clone();
1016                assignments.push(DistributedReplayAssignment {
1017                    instance_id: *instance_id,
1018                    subsystem_id: subsystem.id.clone(),
1019                    log,
1020                    registration,
1021                });
1022            }
1023        }
1024        assignments.sort_by(|left, right| {
1025            (
1026                left.instance_id,
1027                left.registration.subsystem.code(),
1028                left.subsystem_id.as_str(),
1029            )
1030                .cmp(&(
1031                    right.instance_id,
1032                    right.registration.subsystem.code(),
1033                    right.subsystem_id.as_str(),
1034                ))
1035        });
1036
1037        Ok(DistributedReplayPlan {
1038            multi_config_path: self.multi_config_path,
1039            multi_config: self.multi_config,
1040            catalog,
1041            selected_instances,
1042            mission,
1043            registrations,
1044            assignments,
1045        })
1046    }
1047}
1048
1049fn build_distributed_replay_session<App>(
1050    assignment: &DistributedReplayAssignment,
1051    session_config: &DistributedReplaySessionConfig,
1052) -> CuResult<DistributedReplaySessionBuild>
1053where
1054    App: CuDistributedReplayApplication<NoopSectionStorage, NoopLogger>
1055        + CuDistributedReplayApplication<MmapSectionStorage, UnifiedLoggerWrite>
1056        + 'static,
1057{
1058    let config = read_configuration_str(assignment.log.effective_config_ron.clone(), None)
1059        .map_err(|err| {
1060            CuError::from(format!(
1061                "Failed to parse recorded effective config from '{}': {err}",
1062                assignment.log.base_path.display()
1063            ))
1064        })?;
1065    let (clock, clock_mock) = RobotClock::mock();
1066
1067    if let Some(output_root) = &session_config.output_root {
1068        let output_log_path = replay_output_log_path(output_root, assignment)?;
1069        let logger = build_replay_output_logger(
1070            &output_log_path,
1071            replay_output_log_size_bytes(assignment, &config),
1072        )?;
1073        let app = <App as CuDistributedReplayApplication<
1074            MmapSectionStorage,
1075            UnifiedLoggerWrite,
1076        >>::build_distributed_replay(
1077            clock.clone(), logger, assignment.instance_id, Some(config)
1078        )?;
1079        let mut session = RecordedReplaySession::<
1080            App,
1081            <App as CuRecordedReplayApplication<
1082                MmapSectionStorage,
1083                UnifiedLoggerWrite,
1084            >>::RecordedDataSet,
1085            MmapSectionStorage,
1086            UnifiedLoggerWrite,
1087        >::from_log(assignment.clone(), app, clock_mock, &assignment.log.base_path)?;
1088        let nodes = session.describe_nodes()?;
1089        return Ok(DistributedReplaySessionBuild {
1090            session: Box::new(session),
1091            nodes,
1092            output_log_path: Some(output_log_path),
1093        });
1094    }
1095
1096    let logger = Arc::new(Mutex::new(NoopLogger::new()));
1097    let app = <App as CuDistributedReplayApplication<NoopSectionStorage, NoopLogger>>::build_distributed_replay(
1098        clock,
1099        logger,
1100        assignment.instance_id,
1101        Some(config),
1102    )?;
1103    let mut session = RecordedReplaySession::<
1104        App,
1105        <App as CuRecordedReplayApplication<NoopSectionStorage, NoopLogger>>::RecordedDataSet,
1106        NoopSectionStorage,
1107        NoopLogger,
1108    >::from_log(
1109        assignment.clone(),
1110        app,
1111        clock_mock,
1112        &assignment.log.base_path,
1113    )?;
1114    let nodes = session.describe_nodes()?;
1115    Ok(DistributedReplaySessionBuild {
1116        session: Box::new(session),
1117        nodes,
1118        output_log_path: None,
1119    })
1120}
1121
1122fn replay_output_log_path(
1123    output_root: &Path,
1124    assignment: &DistributedReplayAssignment,
1125) -> CuResult<PathBuf> {
1126    let file_name = assignment
1127        .log
1128        .base_path
1129        .file_name()
1130        .ok_or_else(|| {
1131            CuError::from(format!(
1132                "Replay assignment log '{}' has no file name",
1133                assignment.log.base_path.display()
1134            ))
1135        })?
1136        .to_owned();
1137    Ok(output_root.join(file_name))
1138}
1139
1140fn build_replay_output_logger(
1141    path: &Path,
1142    preallocated_size: usize,
1143) -> CuResult<Arc<Mutex<UnifiedLoggerWrite>>> {
1144    if let Some(parent) = path.parent() {
1145        fs::create_dir_all(parent).map_err(|err| {
1146            CuError::new_with_cause(
1147                &format!(
1148                    "Failed to create replay log directory '{}'",
1149                    parent.display()
1150                ),
1151                err,
1152            )
1153        })?;
1154    }
1155    let UnifiedLogger::Write(writer) = UnifiedLoggerBuilder::new()
1156        .write(true)
1157        .create(true)
1158        .file_base_name(path)
1159        .preallocated_size(preallocated_size)
1160        .build()
1161        .map_err(|err| {
1162            CuError::new_with_cause(
1163                &format!("Failed to create replay log '{}'", path.display()),
1164                err,
1165            )
1166        })?
1167    else {
1168        return Err(CuError::from(format!(
1169            "Expected writable replay logger for '{}'",
1170            path.display()
1171        )));
1172    };
1173    Ok(Arc::new(Mutex::new(writer)))
1174}
1175
1176fn replay_output_log_size_bytes(
1177    assignment: &DistributedReplayAssignment,
1178    config: &crate::config::CuConfig,
1179) -> usize {
1180    if let Some(slab_zero) = slab_zero_path(&assignment.log.base_path)
1181        && let Ok(metadata) = fs::metadata(slab_zero)
1182        && let Ok(size) = usize::try_from(metadata.len())
1183    {
1184        return size.max(DEFAULT_REPLAY_LOG_SIZE_BYTES);
1185    }
1186
1187    config
1188        .logging
1189        .as_ref()
1190        .and_then(|logging| logging.slab_size_mib)
1191        .and_then(|size_mib| usize::try_from(size_mib).ok())
1192        .and_then(|size_mib| size_mib.checked_mul(1024 * 1024))
1193        .unwrap_or(DEFAULT_REPLAY_LOG_SIZE_BYTES)
1194}
1195
1196fn copperlist_origins<P: CopperListTuple>(
1197    copperlist: &CopperList<P>,
1198) -> BTreeSet<DistributedReplayOriginKey> {
1199    <CopperList<P> as ErasedCuStampedDataSet>::cumsgs(copperlist)
1200        .into_iter()
1201        .filter_map(|msg| msg.metadata().origin())
1202        .map(|origin| DistributedReplayOriginKey {
1203            instance_id: origin.instance_id,
1204            subsystem_code: origin.subsystem_code,
1205            cl_id: origin.cl_id,
1206        })
1207        .collect()
1208}
1209
1210#[derive(Default)]
1211struct DistributedReplayEngineState {
1212    sessions: Vec<Box<dyn DistributedReplaySession>>,
1213    nodes: Vec<DistributedReplayGraphNode>,
1214    node_lookup: BTreeMap<(u32, String, u64), usize>,
1215    output_log_paths: BTreeMap<(u32, String), PathBuf>,
1216    ready: BTreeSet<DistributedReplayReadyNode>,
1217    frontier: Vec<Option<DistributedReplayCursor>>,
1218}
1219
1220/// One causal distributed replay engine built from a validated plan.
1221pub struct DistributedReplayEngine {
1222    plan: DistributedReplayPlan,
1223    session_config: DistributedReplaySessionConfig,
1224    sessions: Vec<Box<dyn DistributedReplaySession>>,
1225    nodes: Vec<DistributedReplayGraphNode>,
1226    node_lookup: BTreeMap<(u32, String, u64), usize>,
1227    output_log_paths: BTreeMap<(u32, String), PathBuf>,
1228    ready: BTreeSet<DistributedReplayReadyNode>,
1229    frontier: Vec<Option<DistributedReplayCursor>>,
1230    executed: Vec<bool>,
1231    executed_count: usize,
1232}
1233
1234impl DistributedReplayEngine {
1235    fn new(
1236        plan: DistributedReplayPlan,
1237        session_config: DistributedReplaySessionConfig,
1238    ) -> CuResult<Self> {
1239        let state = Self::build_state(&plan, &session_config)?;
1240        let executed = vec![false; state.nodes.len()];
1241        Ok(Self {
1242            plan,
1243            session_config,
1244            sessions: state.sessions,
1245            nodes: state.nodes,
1246            node_lookup: state.node_lookup,
1247            output_log_paths: state.output_log_paths,
1248            ready: state.ready,
1249            frontier: state.frontier,
1250            executed,
1251            executed_count: 0,
1252        })
1253    }
1254
1255    fn build_state(
1256        plan: &DistributedReplayPlan,
1257        session_config: &DistributedReplaySessionConfig,
1258    ) -> CuResult<DistributedReplayEngineState> {
1259        let mut sessions = Vec::with_capacity(plan.assignments.len());
1260        let mut pending_nodes = Vec::new();
1261        let mut session_nodes = Vec::with_capacity(plan.assignments.len());
1262        let mut output_log_paths = BTreeMap::new();
1263
1264        for assignment in &plan.assignments {
1265            let build = (assignment.registration.session_factory)(assignment, session_config)?;
1266            let session_index = sessions.len();
1267            let mut node_indices = Vec::with_capacity(build.nodes.len());
1268            for node in build.nodes {
1269                let pending_index = pending_nodes.len();
1270                pending_nodes.push((session_index, node));
1271                node_indices.push(pending_index);
1272            }
1273            if let Some(output_log_path) = build.output_log_path {
1274                let replaced = output_log_paths.insert(
1275                    (assignment.instance_id, assignment.subsystem_id.clone()),
1276                    output_log_path,
1277                );
1278                if replaced.is_some() {
1279                    return Err(CuError::from(format!(
1280                        "Duplicate replay output log assignment for instance {} subsystem '{}'",
1281                        assignment.instance_id, assignment.subsystem_id
1282                    )));
1283                }
1284            }
1285            sessions.push(build.session);
1286            session_nodes.push(node_indices);
1287        }
1288
1289        let mut nodes = Vec::with_capacity(pending_nodes.len());
1290        let mut origin_lookup = BTreeMap::new();
1291        let mut node_lookup = BTreeMap::new();
1292
1293        for (node_index, (session_index, descriptor)) in pending_nodes.iter().enumerate() {
1294            if origin_lookup
1295                .insert(descriptor.origin_key.clone(), node_index)
1296                .is_some()
1297            {
1298                return Err(CuError::from(format!(
1299                    "Duplicate replay node detected for instance {} subsystem code {} CopperList {}",
1300                    descriptor.origin_key.instance_id,
1301                    descriptor.origin_key.subsystem_code,
1302                    descriptor.origin_key.cl_id
1303                )));
1304            }
1305
1306            if node_lookup
1307                .insert(
1308                    (
1309                        descriptor.cursor.instance_id,
1310                        descriptor.cursor.subsystem_id.clone(),
1311                        descriptor.cursor.cl_id,
1312                    ),
1313                    node_index,
1314                )
1315                .is_some()
1316            {
1317                return Err(CuError::from(format!(
1318                    "Duplicate replay cursor detected for instance {} subsystem '{}' CopperList {}",
1319                    descriptor.cursor.instance_id,
1320                    descriptor.cursor.subsystem_id,
1321                    descriptor.cursor.cl_id
1322                )));
1323            }
1324
1325            nodes.push(DistributedReplayGraphNode {
1326                cursor: descriptor.cursor.clone(),
1327                session_index: *session_index,
1328                outgoing: Vec::new(),
1329                initial_dependencies: 0,
1330                remaining_dependencies: 0,
1331            });
1332        }
1333
1334        let mut edges = BTreeSet::new();
1335
1336        for node_indices in &session_nodes {
1337            for pair in node_indices.windows(2) {
1338                let from = pair[0];
1339                let to = pair[1];
1340                if edges.insert((from, to)) {
1341                    nodes[from].outgoing.push(to);
1342                    nodes[to].initial_dependencies += 1;
1343                }
1344            }
1345        }
1346
1347        for (target_index, (_, descriptor)) in pending_nodes.iter().enumerate() {
1348            for origin in &descriptor.incoming_origins {
1349                let source_index = origin_lookup.get(origin).copied().ok_or_else(|| {
1350                    CuError::from(format!(
1351                        "Unresolved recorded provenance edge into instance {} subsystem '{}' CopperList {} from instance {} subsystem code {} CopperList {}",
1352                        descriptor.cursor.instance_id,
1353                        descriptor.cursor.subsystem_id,
1354                        descriptor.cursor.cl_id,
1355                        origin.instance_id,
1356                        origin.subsystem_code,
1357                        origin.cl_id
1358                    ))
1359                })?;
1360                if source_index == target_index {
1361                    return Err(CuError::from(format!(
1362                        "Recorded provenance on instance {} subsystem '{}' CopperList {} points to itself",
1363                        descriptor.cursor.instance_id,
1364                        descriptor.cursor.subsystem_id,
1365                        descriptor.cursor.cl_id
1366                    )));
1367                }
1368                if edges.insert((source_index, target_index)) {
1369                    nodes[source_index].outgoing.push(target_index);
1370                    nodes[target_index].initial_dependencies += 1;
1371                }
1372            }
1373        }
1374
1375        let mut ready = BTreeSet::new();
1376        for (node_index, node) in nodes.iter_mut().enumerate() {
1377            node.remaining_dependencies = node.initial_dependencies;
1378            if node.remaining_dependencies == 0 {
1379                ready.insert(DistributedReplayReadyNode {
1380                    instance_id: node.cursor.instance_id,
1381                    subsystem_code: node.cursor.subsystem_code(),
1382                    cl_id: node.cursor.cl_id,
1383                    node_index,
1384                });
1385            }
1386        }
1387
1388        if !nodes.is_empty() && ready.is_empty() {
1389            return Err(CuError::from(
1390                "Distributed replay graph has no causally ready starting point",
1391            ));
1392        }
1393
1394        Ok(DistributedReplayEngineState {
1395            frontier: vec![None; sessions.len()],
1396            sessions,
1397            nodes,
1398            node_lookup,
1399            output_log_paths,
1400            ready,
1401        })
1402    }
1403
1404    fn shutdown_sessions(sessions: &mut Vec<Box<dyn DistributedReplaySession>>) -> CuResult<()> {
1405        for session in sessions.iter_mut() {
1406            session.shutdown()?;
1407        }
1408        Ok(())
1409    }
1410
1411    fn ready_key(&self, node_index: usize) -> DistributedReplayReadyNode {
1412        let node = &self.nodes[node_index];
1413        DistributedReplayReadyNode {
1414            instance_id: node.cursor.instance_id,
1415            subsystem_code: node.cursor.subsystem_code(),
1416            cl_id: node.cursor.cl_id,
1417            node_index,
1418        }
1419    }
1420
1421    /// Reset all replay sessions and graph execution state back to the beginning.
1422    pub fn reset(&mut self) -> CuResult<()> {
1423        Self::shutdown_sessions(&mut self.sessions)?;
1424        let state = Self::build_state(&self.plan, &self.session_config)?;
1425        self.sessions = state.sessions;
1426        self.nodes = state.nodes;
1427        self.node_lookup = state.node_lookup;
1428        self.output_log_paths = state.output_log_paths;
1429        self.ready = state.ready;
1430        self.frontier = state.frontier;
1431        self.executed = vec![false; self.nodes.len()];
1432        self.executed_count = 0;
1433        Ok(())
1434    }
1435
1436    /// Replay the next causally ready CopperList, if any.
1437    pub fn step_causal(&mut self) -> CuResult<Option<DistributedReplayCursor>> {
1438        let Some(next_ready) = self.ready.iter().next().copied() else {
1439            if self.executed_count == self.nodes.len() {
1440                return Ok(None);
1441            }
1442            return Err(CuError::from(
1443                "Distributed replay is deadlocked: no causally ready CopperList remains",
1444            ));
1445        };
1446        self.ready.remove(&next_ready);
1447
1448        let cursor = self.nodes[next_ready.node_index].cursor.clone();
1449        let session_index = self.nodes[next_ready.node_index].session_index;
1450        self.sessions[session_index].goto_cl(cursor.cl_id)?;
1451        self.executed[next_ready.node_index] = true;
1452        self.executed_count += 1;
1453        self.frontier[session_index] = Some(cursor.clone());
1454
1455        let outgoing = self.nodes[next_ready.node_index].outgoing.clone();
1456        for dependent in outgoing {
1457            let node = &mut self.nodes[dependent];
1458            node.remaining_dependencies = node.remaining_dependencies.saturating_sub(1);
1459            if node.remaining_dependencies == 0 {
1460                self.ready.insert(self.ready_key(dependent));
1461            }
1462        }
1463
1464        Ok(Some(cursor))
1465    }
1466
1467    /// Replay the entire selected fleet to completion.
1468    pub fn run_all(&mut self) -> CuResult<()> {
1469        while self.step_causal()?.is_some() {}
1470        Ok(())
1471    }
1472
1473    /// Rebuild the replay from scratch and advance until the target CopperList is reached.
1474    pub fn goto(&mut self, instance_id: u32, subsystem_id: &str, cl_id: u64) -> CuResult<()> {
1475        let target = self
1476            .node_lookup
1477            .get(&(instance_id, subsystem_id.to_string(), cl_id))
1478            .copied()
1479            .ok_or_else(|| {
1480                CuError::from(format!(
1481                    "Distributed replay target instance {} subsystem '{}' CopperList {} does not exist",
1482                    instance_id, subsystem_id, cl_id
1483                ))
1484            })?;
1485        self.reset()?;
1486        while !self.executed[target] {
1487            let Some(_) = self.step_causal()? else {
1488                return Err(CuError::from(format!(
1489                    "Distributed replay exhausted before reaching instance {} subsystem '{}' CopperList {}",
1490                    instance_id, subsystem_id, cl_id
1491                )));
1492            };
1493        }
1494        Ok(())
1495    }
1496
1497    /// Return the latest executed CopperList cursor for each replay session.
1498    pub fn current_frontier(&self) -> Vec<DistributedReplayCursor> {
1499        self.frontier
1500            .iter()
1501            .filter_map(|cursor| cursor.clone())
1502            .collect()
1503    }
1504
1505    pub fn output_log_path(&self, instance_id: u32, subsystem_id: &str) -> Option<&Path> {
1506        self.output_log_paths
1507            .get(&(instance_id, subsystem_id.to_string()))
1508            .map(PathBuf::as_path)
1509    }
1510
1511    #[inline]
1512    pub fn total_nodes(&self) -> usize {
1513        self.nodes.len()
1514    }
1515
1516    #[inline]
1517    pub fn executed_nodes(&self) -> usize {
1518        self.executed_count
1519    }
1520}
1521
1522fn join_log_paths(logs: &[DistributedReplayLog]) -> String {
1523    logs.iter()
1524        .map(|log| log.base_path.display().to_string())
1525        .collect::<Vec<_>>()
1526        .join(", ")
1527}
1528
1529fn collect_candidate_base_paths(path: &Path, out: &mut BTreeSet<PathBuf>) -> CuResult<()> {
1530    if path.is_dir() {
1531        let mut entries = fs::read_dir(path)
1532            .map_err(|err| {
1533                CuError::new_with_cause(
1534                    &format!(
1535                        "Failed to read directory '{}' during distributed replay discovery",
1536                        path.display()
1537                    ),
1538                    err,
1539                )
1540            })?
1541            .collect::<Result<Vec<_>, _>>()
1542            .map_err(|err| {
1543                CuError::new_with_cause(
1544                    &format!(
1545                        "Failed to enumerate directory '{}' during distributed replay discovery",
1546                        path.display()
1547                    ),
1548                    err,
1549                )
1550            })?;
1551        entries.sort_by_key(|entry| entry.path());
1552        for entry in entries {
1553            collect_candidate_base_paths(&entry.path(), out)?;
1554        }
1555        return Ok(());
1556    }
1557
1558    if path
1559        .extension()
1560        .and_then(|ext| ext.to_str())
1561        .is_some_and(|ext| ext == "copper")
1562    {
1563        out.insert(normalize_candidate_log_base(path));
1564    }
1565
1566    Ok(())
1567}
1568
1569fn normalize_candidate_log_base(path: &Path) -> PathBuf {
1570    let Some(extension) = path.extension().and_then(|ext| ext.to_str()) else {
1571        return path.to_path_buf();
1572    };
1573    let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else {
1574        return path.to_path_buf();
1575    };
1576    let Some((base_stem, slab_suffix)) = stem.rsplit_once('_') else {
1577        return path.to_path_buf();
1578    };
1579
1580    if slab_suffix.is_empty() || !slab_suffix.chars().all(|c| c.is_ascii_digit()) {
1581        return path.to_path_buf();
1582    }
1583
1584    let mut normalized = path.to_path_buf();
1585    normalized.set_file_name(format!("{base_stem}.{extension}"));
1586    if slab_zero_path(&normalized).is_some_and(|slab_zero| slab_zero.exists()) {
1587        normalized
1588    } else {
1589        path.to_path_buf()
1590    }
1591}
1592
1593fn slab_zero_path(base_path: &Path) -> Option<PathBuf> {
1594    let extension = base_path.extension()?.to_str()?;
1595    let stem = base_path.file_stem()?.to_str()?;
1596    let mut slab_zero = base_path.to_path_buf();
1597    slab_zero.set_file_name(format!("{stem}_0.{extension}"));
1598    Some(slab_zero)
1599}
1600
1601fn read_next_entry<T: bincode::Decode<()>>(src: &mut impl Read) -> CuResult<Option<T>> {
1602    match decode_from_std_read::<T, _, _>(src, standard()) {
1603        Ok(entry) => Ok(Some(entry)),
1604        Err(DecodeError::UnexpectedEnd { .. }) => Ok(None),
1605        Err(DecodeError::Io { inner, .. }) if inner.kind() == std::io::ErrorKind::UnexpectedEof => {
1606            Ok(None)
1607        }
1608        Err(err) => Err(CuError::new_with_cause(
1609            "Failed to decode bincode entry during distributed replay discovery",
1610            err,
1611        )),
1612    }
1613}
1614
1615#[cfg(test)]
1616mod tests {
1617    use super::*;
1618    use crate::app::{
1619        CuDistributedReplayApplication, CuRecordedReplayApplication, CuSimApplication,
1620        CuSubsystemMetadata,
1621    };
1622    use crate::config::CuConfig;
1623    use crate::copperlist::CopperList;
1624    use crate::curuntime::KeyFrame;
1625    use crate::simulation::SimOverride;
1626    use bincode::{Decode, Encode};
1627    use cu29_clock::CuTime;
1628    use cu29_traits::{ErasedCuStampedData, ErasedCuStampedDataSet, MatchingTasks, WriteStream};
1629    use cu29_unifiedlog::memmap::MmapSectionStorage;
1630    use cu29_unifiedlog::stream_write;
1631    use serde::Serialize;
1632    use std::sync::{Arc, Mutex};
1633    use tempfile::TempDir;
1634
1635    fn write_runtime_lifecycle_log(
1636        base_path: &Path,
1637        stack: RuntimeLifecycleStackInfo,
1638        mission: Option<&str>,
1639    ) -> CuResult<()> {
1640        if let Some(parent) = base_path.parent() {
1641            fs::create_dir_all(parent).map_err(|err| {
1642                CuError::new_with_cause(
1643                    &format!("Failed to create test log directory '{}'", parent.display()),
1644                    err,
1645                )
1646            })?;
1647        }
1648
1649        let UnifiedLogger::Write(writer) = UnifiedLoggerBuilder::new()
1650            .write(true)
1651            .create(true)
1652            .preallocated_size(256 * 1024)
1653            .file_base_name(base_path)
1654            .build()
1655            .map_err(|err| {
1656                CuError::new_with_cause(
1657                    &format!("Failed to create test log '{}'", base_path.display()),
1658                    err,
1659                )
1660            })?
1661        else {
1662            return Err(CuError::from("Expected writable unified logger in test"));
1663        };
1664
1665        let logger = Arc::new(Mutex::new(writer));
1666        let mut stream = stream_write::<RuntimeLifecycleRecord, MmapSectionStorage>(
1667            logger.clone(),
1668            UnifiedLogType::RuntimeLifecycle,
1669            4096,
1670        )?;
1671        stream.log(&RuntimeLifecycleRecord {
1672            timestamp: CuTime::default(),
1673            event: RuntimeLifecycleEvent::Instantiated {
1674                config_source: RuntimeLifecycleConfigSource::ExternalFile,
1675                effective_config_ron: "(runtime: ())".to_string(),
1676                stack,
1677            },
1678        })?;
1679        if let Some(mission) = mission {
1680            stream.log(&RuntimeLifecycleRecord {
1681                timestamp: CuTime::from_nanos(1),
1682                event: RuntimeLifecycleEvent::MissionStarted {
1683                    mission: mission.to_string(),
1684                },
1685            })?;
1686        }
1687        drop(stream);
1688        drop(logger);
1689        Ok(())
1690    }
1691
1692    fn test_stack(
1693        subsystem_id: &str,
1694        subsystem_code: u16,
1695        instance_id: u32,
1696    ) -> RuntimeLifecycleStackInfo {
1697        RuntimeLifecycleStackInfo {
1698            app_name: "demo".to_string(),
1699            app_version: "0.1.0".to_string(),
1700            git_commit: Some("abc123".to_string()),
1701            git_dirty: Some(false),
1702            subsystem_id: Some(subsystem_id.to_string()),
1703            subsystem_code,
1704            instance_id,
1705        }
1706    }
1707
1708    fn write_multi_config_fixture(temp_dir: &TempDir, subsystem_ids: &[&str]) -> CuResult<PathBuf> {
1709        for subsystem_id in subsystem_ids {
1710            let subsystem_config = temp_dir.path().join(format!("{subsystem_id}_config.ron"));
1711            fs::write(&subsystem_config, "(tasks: [], cnx: [])").map_err(|err| {
1712                CuError::new_with_cause(
1713                    &format!(
1714                        "Failed to write subsystem config '{}'",
1715                        subsystem_config.display()
1716                    ),
1717                    err,
1718                )
1719            })?;
1720        }
1721
1722        let subsystem_entries = subsystem_ids
1723            .iter()
1724            .map(|subsystem_id| {
1725                format!(
1726                    r#"(
1727            id: "{subsystem_id}",
1728            config: "{subsystem_id}_config.ron",
1729        )"#
1730                )
1731            })
1732            .collect::<Vec<_>>()
1733            .join(",\n");
1734
1735        let multi_config = format!(
1736            "(\n    subsystems: [\n{entries}\n    ],\n    interconnects: [],\n)\n",
1737            entries = subsystem_entries
1738        );
1739        let multi_config_path = temp_dir.path().join("multi_copper.ron");
1740        fs::write(&multi_config_path, multi_config).map_err(|err| {
1741            CuError::new_with_cause(
1742                &format!(
1743                    "Failed to write multi-Copper config '{}'",
1744                    multi_config_path.display()
1745                ),
1746                err,
1747            )
1748        })?;
1749        Ok(multi_config_path)
1750    }
1751
1752    #[derive(Debug, Default, Encode, Decode, Serialize)]
1753    struct DummyRecordedDataSet;
1754
1755    impl ErasedCuStampedDataSet for DummyRecordedDataSet {
1756        fn cumsgs(&self) -> Vec<&dyn ErasedCuStampedData> {
1757            Vec::new()
1758        }
1759    }
1760
1761    impl MatchingTasks for DummyRecordedDataSet {
1762        fn get_all_task_ids() -> &'static [&'static str] {
1763            &[]
1764        }
1765    }
1766
1767    macro_rules! impl_registered_test_app {
1768        ($name:ident, $subsystem_id:expr, $subsystem_code:expr) => {
1769            struct $name;
1770
1771            impl CuSubsystemMetadata for $name {
1772                fn subsystem() -> Subsystem {
1773                    Subsystem::new(Some($subsystem_id), $subsystem_code)
1774                }
1775            }
1776
1777            impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
1778                CuSimApplication<S, L> for $name
1779            {
1780                type Step<'z> = ();
1781
1782                fn get_original_config() -> String {
1783                    "(tasks: [], cnx: [])".to_string()
1784                }
1785
1786                fn start_all_tasks(
1787                    &mut self,
1788                    _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1789                ) -> CuResult<()> {
1790                    Ok(())
1791                }
1792
1793                fn run_one_iteration(
1794                    &mut self,
1795                    _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1796                ) -> CuResult<()> {
1797                    Ok(())
1798                }
1799
1800                fn run(
1801                    &mut self,
1802                    _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1803                ) -> CuResult<()> {
1804                    Ok(())
1805                }
1806
1807                fn stop_all_tasks(
1808                    &mut self,
1809                    _sim_callback: &mut impl for<'z> FnMut(Self::Step<'z>) -> SimOverride,
1810                ) -> CuResult<()> {
1811                    Ok(())
1812                }
1813
1814                fn restore_keyframe(&mut self, _freezer: &KeyFrame) -> CuResult<()> {
1815                    Ok(())
1816                }
1817            }
1818
1819            impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
1820                CuRecordedReplayApplication<S, L> for $name
1821            {
1822                type RecordedDataSet = DummyRecordedDataSet;
1823
1824                fn replay_recorded_copperlist(
1825                    &mut self,
1826                    _clock_mock: &RobotClockMock,
1827                    _copperlist: &CopperList<Self::RecordedDataSet>,
1828                    _keyframe: Option<&KeyFrame>,
1829                ) -> CuResult<()> {
1830                    Ok(())
1831                }
1832            }
1833
1834            impl<S: SectionStorage + 'static, L: UnifiedLogWrite<S> + 'static>
1835                CuDistributedReplayApplication<S, L> for $name
1836            {
1837                fn build_distributed_replay(
1838                    _clock: RobotClock,
1839                    _unified_logger: Arc<Mutex<L>>,
1840                    _instance_id: u32,
1841                    _config_override: Option<CuConfig>,
1842                ) -> CuResult<Self> {
1843                    Ok(Self)
1844                }
1845            }
1846        };
1847    }
1848
1849    impl_registered_test_app!(PingRegisteredApp, "ping", 0);
1850    impl_registered_test_app!(PongRegisteredApp, "pong", 1);
1851    impl_registered_test_app!(PingWrongCodeApp, "ping", 99);
1852
1853    struct FakeReplaySession;
1854
1855    impl DistributedReplaySession for FakeReplaySession {
1856        fn goto_cl(&mut self, _cl_id: u64) -> CuResult<()> {
1857            Ok(())
1858        }
1859
1860        fn shutdown(&mut self) -> CuResult<()> {
1861            Ok(())
1862        }
1863    }
1864
1865    fn fake_registration(
1866        subsystem_id: &'static str,
1867        subsystem_code: u16,
1868        session_factory: DistributedReplaySessionFactory,
1869    ) -> DistributedReplayAppRegistration {
1870        DistributedReplayAppRegistration {
1871            subsystem: Subsystem::new(Some(subsystem_id), subsystem_code),
1872            app_type_name: "fake",
1873            session_factory,
1874        }
1875    }
1876
1877    fn fake_assignment(
1878        instance_id: u32,
1879        subsystem_id: &'static str,
1880        subsystem_code: u16,
1881        session_factory: DistributedReplaySessionFactory,
1882    ) -> DistributedReplayAssignment {
1883        DistributedReplayAssignment {
1884            instance_id,
1885            subsystem_id: subsystem_id.to_string(),
1886            log: DistributedReplayLog {
1887                base_path: PathBuf::from(format!("{subsystem_id}_{instance_id}.copper")),
1888                stack: test_stack(subsystem_id, subsystem_code, instance_id),
1889                config_source: RuntimeLifecycleConfigSource::ExternalFile,
1890                effective_config_ron: "(tasks: [], cnx: [])".to_string(),
1891                mission: Some("default".to_string()),
1892            },
1893            registration: fake_registration(subsystem_id, subsystem_code, session_factory),
1894        }
1895    }
1896
1897    fn fake_plan(assignments: Vec<DistributedReplayAssignment>) -> DistributedReplayPlan {
1898        let mut registrations: Vec<_> = assignments
1899            .iter()
1900            .map(|assignment| assignment.registration.clone())
1901            .collect();
1902        registrations.sort_by(|left, right| left.subsystem.id().cmp(&right.subsystem.id()));
1903        let mut selected_instances: Vec<_> = assignments
1904            .iter()
1905            .map(|assignment| assignment.instance_id)
1906            .collect::<BTreeSet<_>>()
1907            .into_iter()
1908            .collect();
1909        selected_instances.sort_unstable();
1910        DistributedReplayPlan {
1911            multi_config_path: PathBuf::from("fake_multi.ron"),
1912            multi_config: MultiCopperConfig {
1913                subsystems: Vec::new(),
1914                interconnects: Vec::new(),
1915                instance_overrides_root: None,
1916            },
1917            catalog: DistributedReplayCatalog::default(),
1918            selected_instances,
1919            mission: Some("default".to_string()),
1920            registrations,
1921            assignments,
1922        }
1923    }
1924
1925    fn fake_ping_session(
1926        assignment: &DistributedReplayAssignment,
1927        _session_config: &DistributedReplaySessionConfig,
1928    ) -> CuResult<DistributedReplaySessionBuild> {
1929        Ok(DistributedReplaySessionBuild {
1930            session: Box::new(FakeReplaySession),
1931            nodes: vec![
1932                DistributedReplayNodeDescriptor {
1933                    cursor: DistributedReplayCursor::new(
1934                        assignment.instance_id,
1935                        assignment.subsystem_id.clone(),
1936                        assignment.log.subsystem_code(),
1937                        0,
1938                    ),
1939                    origin_key: DistributedReplayOriginKey {
1940                        instance_id: assignment.instance_id,
1941                        subsystem_code: assignment.log.subsystem_code(),
1942                        cl_id: 0,
1943                    },
1944                    incoming_origins: BTreeSet::new(),
1945                },
1946                DistributedReplayNodeDescriptor {
1947                    cursor: DistributedReplayCursor::new(
1948                        assignment.instance_id,
1949                        assignment.subsystem_id.clone(),
1950                        assignment.log.subsystem_code(),
1951                        1,
1952                    ),
1953                    origin_key: DistributedReplayOriginKey {
1954                        instance_id: assignment.instance_id,
1955                        subsystem_code: assignment.log.subsystem_code(),
1956                        cl_id: 1,
1957                    },
1958                    incoming_origins: BTreeSet::new(),
1959                },
1960            ],
1961            output_log_path: None,
1962        })
1963    }
1964
1965    fn fake_pong_session(
1966        assignment: &DistributedReplayAssignment,
1967        _session_config: &DistributedReplaySessionConfig,
1968    ) -> CuResult<DistributedReplaySessionBuild> {
1969        Ok(DistributedReplaySessionBuild {
1970            session: Box::new(FakeReplaySession),
1971            nodes: vec![
1972                DistributedReplayNodeDescriptor {
1973                    cursor: DistributedReplayCursor::new(
1974                        assignment.instance_id,
1975                        assignment.subsystem_id.clone(),
1976                        assignment.log.subsystem_code(),
1977                        0,
1978                    ),
1979                    origin_key: DistributedReplayOriginKey {
1980                        instance_id: assignment.instance_id,
1981                        subsystem_code: assignment.log.subsystem_code(),
1982                        cl_id: 0,
1983                    },
1984                    incoming_origins: BTreeSet::from([DistributedReplayOriginKey {
1985                        instance_id: assignment.instance_id,
1986                        subsystem_code: 0,
1987                        cl_id: 0,
1988                    }]),
1989                },
1990                DistributedReplayNodeDescriptor {
1991                    cursor: DistributedReplayCursor::new(
1992                        assignment.instance_id,
1993                        assignment.subsystem_id.clone(),
1994                        assignment.log.subsystem_code(),
1995                        1,
1996                    ),
1997                    origin_key: DistributedReplayOriginKey {
1998                        instance_id: assignment.instance_id,
1999                        subsystem_code: assignment.log.subsystem_code(),
2000                        cl_id: 1,
2001                    },
2002                    incoming_origins: BTreeSet::from([DistributedReplayOriginKey {
2003                        instance_id: assignment.instance_id,
2004                        subsystem_code: 0,
2005                        cl_id: 1,
2006                    }]),
2007                },
2008            ],
2009            output_log_path: None,
2010        })
2011    }
2012
2013    fn fake_bad_pong_session(
2014        assignment: &DistributedReplayAssignment,
2015        _session_config: &DistributedReplaySessionConfig,
2016    ) -> CuResult<DistributedReplaySessionBuild> {
2017        Ok(DistributedReplaySessionBuild {
2018            session: Box::new(FakeReplaySession),
2019            nodes: vec![DistributedReplayNodeDescriptor {
2020                cursor: DistributedReplayCursor::new(
2021                    assignment.instance_id,
2022                    assignment.subsystem_id.clone(),
2023                    assignment.log.subsystem_code(),
2024                    0,
2025                ),
2026                origin_key: DistributedReplayOriginKey {
2027                    instance_id: assignment.instance_id,
2028                    subsystem_code: assignment.log.subsystem_code(),
2029                    cl_id: 0,
2030                },
2031                incoming_origins: BTreeSet::from([DistributedReplayOriginKey {
2032                    instance_id: assignment.instance_id,
2033                    subsystem_code: 0,
2034                    cl_id: 99,
2035                }]),
2036            }],
2037            output_log_path: None,
2038        })
2039    }
2040
2041    const STRESS_SUBSYSTEMS: [(&str, u16); 4] =
2042        [("sense", 0), ("plan", 1), ("control", 2), ("telemetry", 3)];
2043
2044    fn stress_origins_for(
2045        subsystem_id: &str,
2046        instance_id: u32,
2047        cl_id: u64,
2048    ) -> BTreeSet<DistributedReplayOriginKey> {
2049        match subsystem_id {
2050            "sense" => BTreeSet::new(),
2051            "plan" => BTreeSet::from([DistributedReplayOriginKey {
2052                instance_id,
2053                subsystem_code: 0,
2054                cl_id,
2055            }]),
2056            "control" => BTreeSet::from([DistributedReplayOriginKey {
2057                instance_id,
2058                subsystem_code: 1,
2059                cl_id,
2060            }]),
2061            "telemetry" => BTreeSet::from([
2062                DistributedReplayOriginKey {
2063                    instance_id,
2064                    subsystem_code: 0,
2065                    cl_id,
2066                },
2067                DistributedReplayOriginKey {
2068                    instance_id,
2069                    subsystem_code: 2,
2070                    cl_id,
2071                },
2072            ]),
2073            _ => panic!("unexpected synthetic stress subsystem '{subsystem_id}'"),
2074        }
2075    }
2076
2077    fn build_stress_session(
2078        assignment: &DistributedReplayAssignment,
2079        _session_config: &DistributedReplaySessionConfig,
2080        cl_count: u64,
2081    ) -> CuResult<DistributedReplaySessionBuild> {
2082        let subsystem_code = assignment.log.subsystem_code();
2083        let nodes = (0..cl_count)
2084            .map(|cl_id| DistributedReplayNodeDescriptor {
2085                cursor: DistributedReplayCursor::new(
2086                    assignment.instance_id,
2087                    assignment.subsystem_id.clone(),
2088                    subsystem_code,
2089                    cl_id,
2090                ),
2091                origin_key: DistributedReplayOriginKey {
2092                    instance_id: assignment.instance_id,
2093                    subsystem_code,
2094                    cl_id,
2095                },
2096                incoming_origins: stress_origins_for(
2097                    &assignment.subsystem_id,
2098                    assignment.instance_id,
2099                    cl_id,
2100                ),
2101            })
2102            .collect();
2103        Ok(DistributedReplaySessionBuild {
2104            session: Box::new(FakeReplaySession),
2105            nodes,
2106            output_log_path: None,
2107        })
2108    }
2109
2110    fn stress_session_ci(
2111        assignment: &DistributedReplayAssignment,
2112        session_config: &DistributedReplaySessionConfig,
2113    ) -> CuResult<DistributedReplaySessionBuild> {
2114        build_stress_session(assignment, session_config, 24)
2115    }
2116
2117    fn stress_session_goto(
2118        assignment: &DistributedReplayAssignment,
2119        session_config: &DistributedReplaySessionConfig,
2120    ) -> CuResult<DistributedReplaySessionBuild> {
2121        build_stress_session(assignment, session_config, 32)
2122    }
2123
2124    fn stress_session_heavy(
2125        assignment: &DistributedReplayAssignment,
2126        session_config: &DistributedReplaySessionConfig,
2127    ) -> CuResult<DistributedReplaySessionBuild> {
2128        build_stress_session(assignment, session_config, 96)
2129    }
2130
2131    fn stress_plan(
2132        instance_count: u32,
2133        session_factory: DistributedReplaySessionFactory,
2134    ) -> DistributedReplayPlan {
2135        let assignments = (1..=instance_count)
2136            .flat_map(|instance_id| {
2137                STRESS_SUBSYSTEMS
2138                    .into_iter()
2139                    .map(move |(subsystem_id, subsystem_code)| {
2140                        fake_assignment(instance_id, subsystem_id, subsystem_code, session_factory)
2141                    })
2142            })
2143            .collect();
2144        fake_plan(assignments)
2145    }
2146
2147    fn collect_engine_order(
2148        engine: &mut DistributedReplayEngine,
2149    ) -> CuResult<Vec<DistributedReplayCursor>> {
2150        let mut order = Vec::new();
2151        while let Some(cursor) = engine.step_causal()? {
2152            order.push(cursor);
2153        }
2154        Ok(order)
2155    }
2156
2157    fn assert_stress_order_is_topological(
2158        order: &[DistributedReplayCursor],
2159        instance_count: u32,
2160        cl_count: u64,
2161    ) {
2162        let expected_len = instance_count as usize * STRESS_SUBSYSTEMS.len() * cl_count as usize;
2163        assert_eq!(order.len(), expected_len);
2164
2165        let positions: BTreeMap<_, _> = order
2166            .iter()
2167            .enumerate()
2168            .map(|(idx, cursor)| {
2169                (
2170                    (
2171                        cursor.instance_id,
2172                        cursor.subsystem_id.clone(),
2173                        cursor.cl_id,
2174                    ),
2175                    idx,
2176                )
2177            })
2178            .collect();
2179        assert_eq!(positions.len(), expected_len);
2180
2181        for instance_id in 1..=instance_count {
2182            for (subsystem_id, _) in STRESS_SUBSYSTEMS {
2183                for cl_id in 1..cl_count {
2184                    let previous = positions
2185                        .get(&(instance_id, subsystem_id.to_string(), cl_id - 1))
2186                        .expect("previous local node missing");
2187                    let current = positions
2188                        .get(&(instance_id, subsystem_id.to_string(), cl_id))
2189                        .expect("current local node missing");
2190                    assert!(
2191                        previous < current,
2192                        "local order violated for instance {instance_id} subsystem '{subsystem_id}' cl {cl_id}"
2193                    );
2194                }
2195            }
2196
2197            for cl_id in 0..cl_count {
2198                let sense = positions
2199                    .get(&(instance_id, "sense".to_string(), cl_id))
2200                    .expect("sense node missing");
2201                let plan = positions
2202                    .get(&(instance_id, "plan".to_string(), cl_id))
2203                    .expect("plan node missing");
2204                let control = positions
2205                    .get(&(instance_id, "control".to_string(), cl_id))
2206                    .expect("control node missing");
2207                let telemetry = positions
2208                    .get(&(instance_id, "telemetry".to_string(), cl_id))
2209                    .expect("telemetry node missing");
2210                assert!(sense < plan);
2211                assert!(plan < control);
2212                assert!(sense < telemetry);
2213                assert!(control < telemetry);
2214            }
2215        }
2216    }
2217
2218    #[test]
2219    fn discovers_single_log_identity_from_runtime_lifecycle() -> CuResult<()> {
2220        let temp_dir = TempDir::new()
2221            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2222        let base_path = temp_dir.path().join("logs/ping.copper");
2223        write_runtime_lifecycle_log(&base_path, test_stack("ping", 7, 42), Some("default"))?;
2224
2225        let discovered = DistributedReplayLog::discover(&base_path)?;
2226        assert_eq!(discovered.base_path, base_path);
2227        assert_eq!(discovered.subsystem_id(), Some("ping"));
2228        assert_eq!(discovered.subsystem_code(), 7);
2229        assert_eq!(discovered.instance_id(), 42);
2230        assert_eq!(discovered.mission.as_deref(), Some("default"));
2231        Ok(())
2232    }
2233
2234    #[test]
2235    fn catalog_discovery_normalizes_slab_paths_and_deduplicates_candidates() -> CuResult<()> {
2236        let temp_dir = TempDir::new()
2237            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2238        let base_path = temp_dir.path().join("logs/pong.copper");
2239        let slab_zero_path = temp_dir.path().join("logs/pong_0.copper");
2240        write_runtime_lifecycle_log(&base_path, test_stack("pong", 3, 9), Some("default"))?;
2241
2242        let catalog = DistributedReplayCatalog::discover([base_path.clone(), slab_zero_path])?;
2243        assert!(
2244            catalog.failures.is_empty(),
2245            "unexpected failures: {:?}",
2246            catalog.failures
2247        );
2248        assert_eq!(catalog.logs.len(), 1);
2249        assert_eq!(catalog.logs[0].base_path, base_path);
2250        assert_eq!(catalog.logs[0].subsystem_id(), Some("pong"));
2251        Ok(())
2252    }
2253
2254    #[test]
2255    fn catalog_discovery_walks_directories_using_physical_slab_files() -> CuResult<()> {
2256        let temp_dir = TempDir::new()
2257            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2258        let ping_base = temp_dir.path().join("logs/ping.copper");
2259        let pong_base = temp_dir.path().join("logs/pong.copper");
2260        write_runtime_lifecycle_log(&ping_base, test_stack("ping", 0, 1), Some("alpha"))?;
2261        write_runtime_lifecycle_log(&pong_base, test_stack("pong", 1, 1), Some("alpha"))?;
2262
2263        let catalog = DistributedReplayCatalog::discover_under(temp_dir.path())?;
2264        assert!(
2265            catalog.failures.is_empty(),
2266            "unexpected failures: {:?}",
2267            catalog.failures
2268        );
2269        assert_eq!(catalog.logs.len(), 2);
2270        assert_eq!(catalog.logs[0].subsystem_id(), Some("ping"));
2271        assert_eq!(catalog.logs[1].subsystem_id(), Some("pong"));
2272        assert_eq!(catalog.logs[0].base_path, ping_base);
2273        assert_eq!(catalog.logs[1].base_path, pong_base);
2274        Ok(())
2275    }
2276
2277    #[test]
2278    fn catalog_reports_invalid_logs_without_aborting_scan() -> CuResult<()> {
2279        let temp_dir = TempDir::new()
2280            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2281        let good_base = temp_dir.path().join("logs/good.copper");
2282        write_runtime_lifecycle_log(&good_base, test_stack("good", 2, 5), Some("beta"))?;
2283
2284        let bad_slab = temp_dir.path().join("logs/bad_0.copper");
2285        if let Some(parent) = bad_slab.parent() {
2286            fs::create_dir_all(parent).map_err(|err| {
2287                CuError::new_with_cause(
2288                    &format!("Failed to create bad log dir '{}'", parent.display()),
2289                    err,
2290                )
2291            })?;
2292        }
2293        fs::write(&bad_slab, b"not a copper log").map_err(|err| {
2294            CuError::new_with_cause(
2295                &format!("Failed to create bad log '{}'", bad_slab.display()),
2296                err,
2297            )
2298        })?;
2299
2300        let catalog = DistributedReplayCatalog::discover_under(temp_dir.path())?;
2301        assert_eq!(catalog.logs.len(), 1);
2302        assert_eq!(catalog.failures.len(), 1);
2303        assert_eq!(catalog.logs[0].subsystem_id(), Some("good"));
2304        assert_eq!(
2305            catalog.failures[0].candidate_path,
2306            temp_dir.path().join("logs/bad.copper")
2307        );
2308        Ok(())
2309    }
2310
2311    #[test]
2312    fn builder_builds_validated_plan_for_selected_instances() -> CuResult<()> {
2313        let temp_dir = TempDir::new()
2314            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2315        let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2316        let logs_root = temp_dir.path().join("logs");
2317
2318        write_runtime_lifecycle_log(
2319            &logs_root.join("instance1_ping.copper"),
2320            test_stack("ping", 0, 1),
2321            Some("default"),
2322        )?;
2323        write_runtime_lifecycle_log(
2324            &logs_root.join("instance1_pong.copper"),
2325            test_stack("pong", 1, 1),
2326            Some("default"),
2327        )?;
2328        write_runtime_lifecycle_log(
2329            &logs_root.join("instance2_ping.copper"),
2330            test_stack("ping", 0, 2),
2331            Some("default"),
2332        )?;
2333        write_runtime_lifecycle_log(
2334            &logs_root.join("instance2_pong.copper"),
2335            test_stack("pong", 1, 2),
2336            Some("default"),
2337        )?;
2338
2339        let plan = DistributedReplayPlan::builder(&multi_config_path)?
2340            .discover_logs_under(&logs_root)?
2341            .register::<PingRegisteredApp>("ping")?
2342            .register::<PongRegisteredApp>("pong")?
2343            .instances([2])
2344            .build()?;
2345
2346        assert_eq!(plan.selected_instances, vec![2]);
2347        assert_eq!(plan.mission.as_deref(), Some("default"));
2348        assert_eq!(plan.assignments.len(), 2);
2349        assert_eq!(
2350            plan.assignment(2, "ping").unwrap().log.base_path,
2351            logs_root.join("instance2_ping.copper")
2352        );
2353        assert_eq!(
2354            plan.assignment(2, "pong").unwrap().log.base_path,
2355            logs_root.join("instance2_pong.copper")
2356        );
2357        Ok(())
2358    }
2359
2360    #[test]
2361    fn register_rejects_subsystem_code_mismatch() -> CuResult<()> {
2362        let temp_dir = TempDir::new()
2363            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2364        let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2365
2366        let err = DistributedReplayPlan::builder(&multi_config_path)?
2367            .register::<PingWrongCodeApp>("ping")
2368            .unwrap_err();
2369        assert!(err.to_string().contains("declares subsystem code 99"));
2370        Ok(())
2371    }
2372
2373    #[test]
2374    fn build_reports_missing_logs_and_missing_registrations() -> CuResult<()> {
2375        let temp_dir = TempDir::new()
2376            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2377        let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2378        let logs_root = temp_dir.path().join("logs");
2379
2380        write_runtime_lifecycle_log(
2381            &logs_root.join("instance1_ping.copper"),
2382            test_stack("ping", 0, 1),
2383            Some("default"),
2384        )?;
2385
2386        let err = DistributedReplayPlan::builder(&multi_config_path)?
2387            .discover_logs_under(&logs_root)?
2388            .register::<PingRegisteredApp>("ping")?
2389            .build()
2390            .unwrap_err();
2391        let err_text = err.to_string();
2392        assert!(err_text.contains("missing app registration for subsystem 'pong'"));
2393        assert!(err_text.contains("missing log for instance 1 subsystem 'pong'"));
2394        Ok(())
2395    }
2396
2397    #[test]
2398    fn build_reports_duplicate_logs_for_one_target() -> CuResult<()> {
2399        let temp_dir = TempDir::new()
2400            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2401        let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2402        let logs_root = temp_dir.path().join("logs");
2403
2404        write_runtime_lifecycle_log(
2405            &logs_root.join("instance1_ping_a.copper"),
2406            test_stack("ping", 0, 1),
2407            Some("default"),
2408        )?;
2409        write_runtime_lifecycle_log(
2410            &logs_root.join("instance1_ping_b.copper"),
2411            test_stack("ping", 0, 1),
2412            Some("default"),
2413        )?;
2414        write_runtime_lifecycle_log(
2415            &logs_root.join("instance1_pong.copper"),
2416            test_stack("pong", 1, 1),
2417            Some("default"),
2418        )?;
2419
2420        let err = DistributedReplayPlan::builder(&multi_config_path)?
2421            .discover_logs_under(&logs_root)?
2422            .register::<PingRegisteredApp>("ping")?
2423            .register::<PongRegisteredApp>("pong")?
2424            .build()
2425            .unwrap_err();
2426        assert!(
2427            err.to_string()
2428                .contains("found 2 logs for instance 1 subsystem 'ping'")
2429        );
2430        Ok(())
2431    }
2432
2433    #[test]
2434    fn build_reports_mission_mismatch_across_selected_logs() -> CuResult<()> {
2435        let temp_dir = TempDir::new()
2436            .map_err(|err| CuError::new_with_cause("Failed to create temp dir", err))?;
2437        let multi_config_path = write_multi_config_fixture(&temp_dir, &["ping", "pong"])?;
2438        let logs_root = temp_dir.path().join("logs");
2439
2440        write_runtime_lifecycle_log(
2441            &logs_root.join("instance1_ping.copper"),
2442            test_stack("ping", 0, 1),
2443            Some("default"),
2444        )?;
2445        write_runtime_lifecycle_log(
2446            &logs_root.join("instance1_pong.copper"),
2447            test_stack("pong", 1, 1),
2448            Some("recovery"),
2449        )?;
2450
2451        let err = DistributedReplayPlan::builder(&multi_config_path)?
2452            .discover_logs_under(&logs_root)?
2453            .register::<PingRegisteredApp>("ping")?
2454            .register::<PongRegisteredApp>("pong")?
2455            .build()
2456            .unwrap_err();
2457        assert!(
2458            err.to_string()
2459                .contains("selected logs disagree on mission: default, recovery")
2460        );
2461        Ok(())
2462    }
2463
2464    #[test]
2465    fn engine_steps_in_stable_causal_order() -> CuResult<()> {
2466        let plan = fake_plan(vec![
2467            fake_assignment(1, "ping", 0, fake_ping_session),
2468            fake_assignment(1, "pong", 1, fake_pong_session),
2469        ]);
2470
2471        let mut engine = plan.start()?;
2472        let mut order = Vec::new();
2473        while let Some(cursor) = engine.step_causal()? {
2474            order.push((cursor.subsystem_id, cursor.cl_id));
2475        }
2476
2477        assert_eq!(
2478            order,
2479            vec![
2480                ("ping".to_string(), 0),
2481                ("ping".to_string(), 1),
2482                ("pong".to_string(), 0),
2483                ("pong".to_string(), 1),
2484            ]
2485        );
2486        assert_eq!(engine.executed_nodes(), 4);
2487        Ok(())
2488    }
2489
2490    #[test]
2491    fn engine_goto_rebuilds_and_replays_to_target() -> CuResult<()> {
2492        let plan = fake_plan(vec![
2493            fake_assignment(1, "ping", 0, fake_ping_session),
2494            fake_assignment(1, "pong", 1, fake_pong_session),
2495        ]);
2496
2497        let mut engine = plan.start()?;
2498        engine.run_all()?;
2499        engine.goto(1, "pong", 0)?;
2500
2501        assert_eq!(engine.executed_nodes(), 3);
2502        let frontier = engine.current_frontier();
2503        assert_eq!(frontier.len(), 2);
2504        assert!(frontier.iter().any(|cursor| {
2505            cursor.instance_id == 1 && cursor.subsystem_id == "ping" && cursor.cl_id == 1
2506        }));
2507        assert!(frontier.iter().any(|cursor| {
2508            cursor.instance_id == 1 && cursor.subsystem_id == "pong" && cursor.cl_id == 0
2509        }));
2510        Ok(())
2511    }
2512
2513    #[test]
2514    fn engine_reports_unresolved_recorded_provenance() -> CuResult<()> {
2515        let plan = fake_plan(vec![
2516            fake_assignment(1, "ping", 0, fake_ping_session),
2517            fake_assignment(1, "pong", 1, fake_bad_pong_session),
2518        ]);
2519
2520        let err = match plan.start() {
2521            Ok(_) => return Err(CuError::from("expected distributed replay startup failure")),
2522            Err(err) => err,
2523        };
2524        assert!(
2525            err.to_string()
2526                .contains("Unresolved recorded provenance edge")
2527        );
2528        Ok(())
2529    }
2530
2531    #[test]
2532    fn engine_run_all_scales_across_many_identical_instances() -> CuResult<()> {
2533        let mut engine = stress_plan(6, stress_session_ci).start()?;
2534        let order = collect_engine_order(&mut engine)?;
2535
2536        assert_stress_order_is_topological(&order, 6, 24);
2537        assert_eq!(engine.executed_nodes(), 6 * STRESS_SUBSYSTEMS.len() * 24);
2538
2539        let frontier = engine.current_frontier();
2540        assert_eq!(frontier.len(), 6 * STRESS_SUBSYSTEMS.len());
2541        for instance_id in 1..=6 {
2542            for (subsystem_id, _) in STRESS_SUBSYSTEMS {
2543                assert!(frontier.iter().any(|cursor| {
2544                    cursor.instance_id == instance_id
2545                        && cursor.subsystem_id == subsystem_id
2546                        && cursor.cl_id == 23
2547                }));
2548            }
2549        }
2550        Ok(())
2551    }
2552
2553    #[test]
2554    fn engine_goto_matches_manual_replay_on_large_graph() -> CuResult<()> {
2555        let plan = stress_plan(5, stress_session_goto);
2556        let mut manual = plan.clone().start()?;
2557
2558        let (expected_steps, expected_frontier) = {
2559            let mut expected_steps = 0usize;
2560            loop {
2561                let Some(cursor) = manual.step_causal()? else {
2562                    return Err(CuError::from(
2563                        "manual distributed replay exhausted before reaching stress target",
2564                    ));
2565                };
2566                expected_steps += 1;
2567                if cursor.instance_id == 4 && cursor.subsystem_id == "control" && cursor.cl_id == 17
2568                {
2569                    break (expected_steps, manual.current_frontier());
2570                }
2571            }
2572        };
2573
2574        let mut via_goto = plan.start()?;
2575        via_goto.goto(4, "control", 17)?;
2576
2577        assert_eq!(via_goto.executed_nodes(), expected_steps);
2578        assert_eq!(via_goto.current_frontier(), expected_frontier);
2579        Ok(())
2580    }
2581
2582    #[test]
2583    #[ignore = "stress"]
2584    fn engine_heavy_stress_run_all_completes() -> CuResult<()> {
2585        let mut engine = stress_plan(12, stress_session_heavy).start()?;
2586        engine.run_all()?;
2587
2588        let expected = 12 * STRESS_SUBSYSTEMS.len() * 96;
2589        assert_eq!(engine.executed_nodes(), expected);
2590        assert_eq!(
2591            engine.current_frontier().len(),
2592            12 * STRESS_SUBSYSTEMS.len()
2593        );
2594        Ok(())
2595    }
2596}