Skip to main content

ranvier_runtime/
replay.rs

1use crate::persistence::PersistenceStore;
2use anyhow::{Result, anyhow};
3use ranvier_core::schematic::MigrationRegistry;
4use ranvier_core::timeline::{Timeline, TimelineEvent};
5
6/// ReplayEngine reconstructs the execution state from a Timeline.
7/// It creates a "virtual" cursor that moves through the circuit based on recorded events.
8pub struct ReplayEngine {
9    timeline: Timeline,
10    cursor: usize,
11}
12
13#[derive(Debug, Clone)]
14pub struct ReplayFrame {
15    pub current_node_id: Option<String>,
16    pub event: TimelineEvent,
17}
18
19impl ReplayEngine {
20    pub fn new(timeline: Timeline) -> Self {
21        Self {
22            timeline,
23            cursor: 0,
24        }
25    }
26
27    /// Advance the replay by one step.
28    /// Returns the current frame or None if finished.
29    pub fn next_step(&mut self) -> Option<ReplayFrame> {
30        if self.cursor >= self.timeline.events.len() {
31            return None;
32        }
33
34        let event = self.timeline.events[self.cursor].clone();
35        self.cursor += 1;
36
37        let current_node_id = match &event {
38            TimelineEvent::NodeEnter { node_id, .. } => Some(node_id.clone()),
39            TimelineEvent::NodeExit { node_id, .. } => Some(node_id.clone()),
40            TimelineEvent::NodePaused { node_id, .. } => Some(node_id.clone()),
41            TimelineEvent::NodeRetry { node_id, .. } => Some(node_id.clone()),
42            TimelineEvent::DlqExhausted { node_id, .. } => Some(node_id.clone()),
43            TimelineEvent::Branchtaken { .. } => None, // Branches happen "between" nodes conceptually or part of outcome
44        };
45
46        Some(ReplayFrame {
47            current_node_id,
48            event,
49        })
50    }
51
52    /// Reset replay to start
53    pub fn reset(&mut self) {
54        self.cursor = 0;
55    }
56
57    /// Fast-forwards the replay cursor to the end, returning the final known frame.
58    /// This is a O(1) operation since it just jumps to the end of the timeline array.
59    pub fn fast_forward_to_end(&mut self) -> Option<ReplayFrame> {
60        if self.timeline.events.is_empty() {
61            return None;
62        }
63        self.cursor = self.timeline.events.len() - 1;
64        self.next_step()
65    }
66
67    /// Fast-forwards the replay cursor to the last active (entered but not exited) node.
68    /// This is useful for active intervention where we want to resume execution at the exact stalled point.
69    pub fn fast_forward_to_active(&mut self) -> Option<ReplayFrame> {
70        let mut exited_nodes = std::collections::HashSet::new();
71        let mut active_index = None;
72
73        for i in (0..self.timeline.events.len()).rev() {
74            match &self.timeline.events[i] {
75                TimelineEvent::NodeExit { node_id, .. } => {
76                    exited_nodes.insert(node_id.clone());
77                }
78                TimelineEvent::NodeEnter { node_id, .. }
79                | TimelineEvent::NodePaused { node_id, .. } => {
80                    if !exited_nodes.contains(node_id) {
81                        active_index = Some(i);
82                        break;
83                    } else {
84                        exited_nodes.remove(node_id);
85                    }
86                }
87                _ => {}
88            }
89        }
90
91        if let Some(index) = active_index {
92            self.cursor = index;
93            self.next_step()
94        } else {
95            self.fast_forward_to_end()
96        }
97    }
98}
99
100/// Result of an event-sourcing replay operation.
101#[derive(Debug, Clone)]
102pub struct ReplayRecoveryResult {
103    /// The trace that was replayed.
104    pub trace_id: String,
105    /// The original schematic version of the persisted trace.
106    pub original_version: String,
107    /// The target schematic version after migration.
108    pub target_version: String,
109    /// Ordered migration hops applied (from → to).
110    pub migration_hops: Vec<(String, String)>,
111    /// The last known node ID from the persisted events.
112    pub last_node_id: Option<String>,
113    /// The recovered (and potentially mapped) payload.
114    pub recovered_payload: Option<serde_json::Value>,
115    /// The step to resume from.
116    pub resume_from_step: u64,
117}
118
119/// Replays a persisted trace through migration mappers, recovering state
120/// for resumption under a newer schematic version.
121///
122/// This is the core event-sourcing replay function for M172-RQ2. It:
123/// 1. Loads the persisted trace from the store
124/// 2. Finds a migration path (single or multi-hop) using the registry
125/// 3. Applies each migration's payload mapper sequentially
126/// 4. Returns a `ReplayRecoveryResult` with the recovered state
127pub async fn replay_and_recover(
128    store: &dyn PersistenceStore,
129    trace_id: &str,
130    target_version: &str,
131    registry: &MigrationRegistry,
132) -> Result<ReplayRecoveryResult> {
133    let trace = store
134        .load(trace_id)
135        .await?
136        .ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
137
138    let original_version = trace.schematic_version.clone();
139    if original_version == target_version {
140        // No migration needed — return current state as-is
141        let last_event = trace.events.last();
142        return Ok(ReplayRecoveryResult {
143            trace_id: trace_id.to_string(),
144            original_version: original_version.clone(),
145            target_version: target_version.to_string(),
146            migration_hops: Vec::new(),
147            last_node_id: last_event.and_then(|e| e.node_id.clone()),
148            recovered_payload: last_event.and_then(|e| e.payload.clone()),
149            resume_from_step: last_event.map(|e| e.step.saturating_add(1)).unwrap_or(0),
150        });
151    }
152
153    let path = registry
154        .find_migration_path(&original_version, target_version)
155        .ok_or_else(|| {
156            anyhow!(
157                "no migration path from {} to {} for circuit {}",
158                original_version,
159                target_version,
160                registry.circuit_id
161            )
162        })?;
163
164    if path.is_empty() {
165        return Err(anyhow!(
166            "empty migration path from {} to {}",
167            original_version,
168            target_version
169        ));
170    }
171
172    // Start with the last persisted payload
173    let last_event = trace.events.last();
174    let mut current_payload = last_event.and_then(|e| e.payload.clone());
175    let last_node_id = last_event.and_then(|e| e.node_id.clone());
176    let resume_step = last_event.map(|e| e.step.saturating_add(1)).unwrap_or(0);
177
178    let mut hops = Vec::with_capacity(path.len());
179
180    // Apply each migration hop's payload mapper sequentially
181    for migration in &path {
182        hops.push((migration.from_version.clone(), migration.to_version.clone()));
183
184        if let (Some(mapper), Some(payload)) = (&migration.payload_mapper, &current_payload) {
185            current_payload = Some(mapper.map_state(payload)?);
186        }
187        // If no mapper, payload passes through unchanged
188    }
189
190    Ok(ReplayRecoveryResult {
191        trace_id: trace_id.to_string(),
192        original_version,
193        target_version: target_version.to_string(),
194        migration_hops: hops,
195        last_node_id,
196        recovered_payload: current_payload,
197        resume_from_step: resume_step,
198    })
199}
200
201/// Validates that a migration path exists and all mappers can transform a
202/// sample payload without error. Useful as a pre-deployment check.
203pub async fn validate_migration_path(
204    store: &dyn PersistenceStore,
205    trace_id: &str,
206    target_version: &str,
207    registry: &MigrationRegistry,
208) -> Result<bool> {
209    match replay_and_recover(store, trace_id, target_version, registry).await {
210        Ok(_) => Ok(true),
211        Err(e) => {
212            tracing::warn!(
213                trace_id = %trace_id,
214                target_version = %target_version,
215                error = %e,
216                "Migration path validation failed"
217            );
218            Ok(false)
219        }
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use ranvier_core::timeline::{Timeline, TimelineEvent};
227
228    fn test_event(node_id: &str, enter: bool) -> TimelineEvent {
229        if enter {
230            TimelineEvent::NodeEnter {
231                node_id: node_id.to_string(),
232                node_label: node_id.to_string(),
233                timestamp: 0,
234            }
235        } else {
236            TimelineEvent::NodeExit {
237                node_id: node_id.to_string(),
238                outcome_type: "Next".to_string(),
239                duration_ms: 0,
240                timestamp: 0,
241            }
242        }
243    }
244
245    #[test]
246    fn test_replay_fast_forward_to_active() {
247        let mut timeline = Timeline::new();
248        timeline.push(test_event("A", true));
249        timeline.push(test_event("A", false));
250        timeline.push(test_event("B", true));
251        timeline.push(test_event("B", false));
252        timeline.push(test_event("C", true)); // Stalled at C
253
254        let mut engine = ReplayEngine::new(timeline);
255        let frame = engine.fast_forward_to_active().unwrap();
256        assert_eq!(frame.current_node_id, Some("C".to_string()));
257    }
258
259    #[test]
260    fn test_replay_with_repeated_nodes() {
261        let mut timeline = Timeline::new();
262        timeline.push(test_event("A", true));
263        timeline.push(test_event("A", false));
264        timeline.push(test_event("A", true)); // Stalled at second A
265
266        let mut engine = ReplayEngine::new(timeline);
267        let frame = engine.fast_forward_to_active().unwrap();
268        assert_eq!(frame.current_node_id, Some("A".to_string()));
269        // After fast-forwarding to index 2, calling next_step increments cursor to 3.
270        assert_eq!(engine.cursor, 3);
271    }
272}