Skip to main content

ranvier_runtime/
replay.rs

1use crate::persistence::PersistenceStore;
2use anyhow::{Result, anyhow};
3use ranvier_core::schematic::MigrationRegistry;
4use ranvier_core::timeline::{Timeline, TimelineEvent};
5
6/// ReplayEngine reconstructs the execution state from a Timeline.
7/// It creates a "virtual" cursor that moves through the circuit based on recorded events.
8pub struct ReplayEngine {
9    timeline: Timeline,
10    cursor: usize,
11}
12
13#[derive(Debug, Clone)]
14pub struct ReplayFrame {
15    pub current_node_id: Option<String>,
16    pub event: TimelineEvent,
17}
18
19impl ReplayEngine {
20    pub fn new(timeline: Timeline) -> Self {
21        Self {
22            timeline,
23            cursor: 0,
24        }
25    }
26
27    /// Advance the replay by one step.
28    /// Returns the current frame or None if finished.
29    pub fn next_step(&mut self) -> Option<ReplayFrame> {
30        if self.cursor >= self.timeline.events.len() {
31            return None;
32        }
33
34        let event = self.timeline.events[self.cursor].clone();
35        self.cursor += 1;
36
37        let current_node_id = match &event {
38            TimelineEvent::NodeEnter { node_id, .. } => Some(node_id.clone()),
39            TimelineEvent::NodeExit { node_id, .. } => Some(node_id.clone()),
40            TimelineEvent::NodePaused { node_id, .. } => Some(node_id.clone()),
41            TimelineEvent::NodeRetry { node_id, .. } => Some(node_id.clone()),
42            TimelineEvent::DlqExhausted { node_id, .. } => Some(node_id.clone()),
43            TimelineEvent::Branchtaken { .. } => None, // Branches happen "between" nodes conceptually or part of outcome
44            TimelineEvent::NodeTimeout { node_id, .. } => Some(node_id.clone()),
45        };
46
47        Some(ReplayFrame {
48            current_node_id,
49            event,
50        })
51    }
52
53    /// Reset replay to start
54    pub fn reset(&mut self) {
55        self.cursor = 0;
56    }
57
58    /// Fast-forwards the replay cursor to the end, returning the final known frame.
59    /// This is a O(1) operation since it just jumps to the end of the timeline array.
60    pub fn fast_forward_to_end(&mut self) -> Option<ReplayFrame> {
61        if self.timeline.events.is_empty() {
62            return None;
63        }
64        self.cursor = self.timeline.events.len() - 1;
65        self.next_step()
66    }
67
68    /// Fast-forwards the replay cursor to the last active (entered but not exited) node.
69    /// This is useful for active intervention where we want to resume execution at the exact stalled point.
70    pub fn fast_forward_to_active(&mut self) -> Option<ReplayFrame> {
71        let mut exited_nodes = std::collections::HashSet::new();
72        let mut active_index = None;
73
74        for i in (0..self.timeline.events.len()).rev() {
75            match &self.timeline.events[i] {
76                TimelineEvent::NodeExit { node_id, .. } => {
77                    exited_nodes.insert(node_id.clone());
78                }
79                TimelineEvent::NodeEnter { node_id, .. }
80                | TimelineEvent::NodePaused { node_id, .. } => {
81                    if !exited_nodes.contains(node_id) {
82                        active_index = Some(i);
83                        break;
84                    } else {
85                        exited_nodes.remove(node_id);
86                    }
87                }
88                _ => {}
89            }
90        }
91
92        if let Some(index) = active_index {
93            self.cursor = index;
94            self.next_step()
95        } else {
96            self.fast_forward_to_end()
97        }
98    }
99}
100
101/// Result of an event-sourcing replay operation.
102#[derive(Debug, Clone)]
103pub struct ReplayRecoveryResult {
104    /// The trace that was replayed.
105    pub trace_id: String,
106    /// The original schematic version of the persisted trace.
107    pub original_version: String,
108    /// The target schematic version after migration.
109    pub target_version: String,
110    /// Ordered migration hops applied (from → to).
111    pub migration_hops: Vec<(String, String)>,
112    /// The last known node ID from the persisted events.
113    pub last_node_id: Option<String>,
114    /// The recovered (and potentially mapped) payload.
115    pub recovered_payload: Option<serde_json::Value>,
116    /// The step to resume from.
117    pub resume_from_step: u64,
118}
119
120/// Replays a persisted trace through migration mappers, recovering state
121/// for resumption under a newer schematic version.
122///
123/// This is the core event-sourcing replay function for M172-RQ2. It:
124/// 1. Loads the persisted trace from the store
125/// 2. Finds a migration path (single or multi-hop) using the registry
126/// 3. Applies each migration's payload mapper sequentially
127/// 4. Returns a `ReplayRecoveryResult` with the recovered state
128pub async fn replay_and_recover(
129    store: &dyn PersistenceStore,
130    trace_id: &str,
131    target_version: &str,
132    registry: &MigrationRegistry,
133) -> Result<ReplayRecoveryResult> {
134    let trace = store
135        .load(trace_id)
136        .await?
137        .ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
138
139    let original_version = trace.schematic_version.clone();
140    if original_version == target_version {
141        // No migration needed — return current state as-is
142        let last_event = trace.events.last();
143        return Ok(ReplayRecoveryResult {
144            trace_id: trace_id.to_string(),
145            original_version: original_version.clone(),
146            target_version: target_version.to_string(),
147            migration_hops: Vec::new(),
148            last_node_id: last_event.and_then(|e| e.node_id.clone()),
149            recovered_payload: last_event.and_then(|e| e.payload.clone()),
150            resume_from_step: last_event.map(|e| e.step.saturating_add(1)).unwrap_or(0),
151        });
152    }
153
154    let path = registry
155        .find_migration_path(&original_version, target_version)
156        .ok_or_else(|| {
157            anyhow!(
158                "no migration path from {} to {} for circuit {}",
159                original_version,
160                target_version,
161                registry.circuit_id
162            )
163        })?;
164
165    if path.is_empty() {
166        return Err(anyhow!(
167            "empty migration path from {} to {}",
168            original_version,
169            target_version
170        ));
171    }
172
173    // Start with the last persisted payload
174    let last_event = trace.events.last();
175    let mut current_payload = last_event.and_then(|e| e.payload.clone());
176    let last_node_id = last_event.and_then(|e| e.node_id.clone());
177    let resume_step = last_event.map(|e| e.step.saturating_add(1)).unwrap_or(0);
178
179    let mut hops = Vec::with_capacity(path.len());
180
181    // Apply each migration hop's payload mapper sequentially
182    for migration in &path {
183        hops.push((migration.from_version.clone(), migration.to_version.clone()));
184
185        if let (Some(mapper), Some(payload)) = (&migration.payload_mapper, &current_payload) {
186            current_payload = Some(mapper.map_state(payload)?);
187        }
188        // If no mapper, payload passes through unchanged
189    }
190
191    Ok(ReplayRecoveryResult {
192        trace_id: trace_id.to_string(),
193        original_version,
194        target_version: target_version.to_string(),
195        migration_hops: hops,
196        last_node_id,
197        recovered_payload: current_payload,
198        resume_from_step: resume_step,
199    })
200}
201
202/// Validates that a migration path exists and all mappers can transform a
203/// sample payload without error. Useful as a pre-deployment check.
204pub async fn validate_migration_path(
205    store: &dyn PersistenceStore,
206    trace_id: &str,
207    target_version: &str,
208    registry: &MigrationRegistry,
209) -> Result<bool> {
210    match replay_and_recover(store, trace_id, target_version, registry).await {
211        Ok(_) => Ok(true),
212        Err(e) => {
213            tracing::warn!(
214                trace_id = %trace_id,
215                target_version = %target_version,
216                error = %e,
217                "Migration path validation failed"
218            );
219            Ok(false)
220        }
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use ranvier_core::timeline::{Timeline, TimelineEvent};
228
229    fn test_event(node_id: &str, enter: bool) -> TimelineEvent {
230        if enter {
231            TimelineEvent::NodeEnter {
232                node_id: node_id.to_string(),
233                node_label: node_id.to_string(),
234                timestamp: 0,
235            }
236        } else {
237            TimelineEvent::NodeExit {
238                node_id: node_id.to_string(),
239                outcome_type: "Next".to_string(),
240                duration_ms: 0,
241                timestamp: 0,
242            }
243        }
244    }
245
246    #[test]
247    fn test_replay_fast_forward_to_active() {
248        let mut timeline = Timeline::new();
249        timeline.push(test_event("A", true));
250        timeline.push(test_event("A", false));
251        timeline.push(test_event("B", true));
252        timeline.push(test_event("B", false));
253        timeline.push(test_event("C", true)); // Stalled at C
254
255        let mut engine = ReplayEngine::new(timeline);
256        let frame = engine.fast_forward_to_active().unwrap();
257        assert_eq!(frame.current_node_id, Some("C".to_string()));
258    }
259
260    #[test]
261    fn test_replay_with_repeated_nodes() {
262        let mut timeline = Timeline::new();
263        timeline.push(test_event("A", true));
264        timeline.push(test_event("A", false));
265        timeline.push(test_event("A", true)); // Stalled at second A
266
267        let mut engine = ReplayEngine::new(timeline);
268        let frame = engine.fast_forward_to_active().unwrap();
269        assert_eq!(frame.current_node_id, Some("A".to_string()));
270        // After fast-forwarding to index 2, calling next_step increments cursor to 3.
271        assert_eq!(engine.cursor, 3);
272    }
273}