1use crate::persistence::PersistenceStore;
2use anyhow::{Result, anyhow};
3use ranvier_core::schematic::MigrationRegistry;
4use ranvier_core::timeline::{Timeline, TimelineEvent};
5
6pub struct ReplayEngine {
9 timeline: Timeline,
10 cursor: usize,
11}
12
13#[derive(Debug, Clone)]
14pub struct ReplayFrame {
15 pub current_node_id: Option<String>,
16 pub event: TimelineEvent,
17}
18
19impl ReplayEngine {
20 pub fn new(timeline: Timeline) -> Self {
21 Self {
22 timeline,
23 cursor: 0,
24 }
25 }
26
27 pub fn next_step(&mut self) -> Option<ReplayFrame> {
30 if self.cursor >= self.timeline.events.len() {
31 return None;
32 }
33
34 let event = self.timeline.events[self.cursor].clone();
35 self.cursor += 1;
36
37 let current_node_id = match &event {
38 TimelineEvent::NodeEnter { node_id, .. } => Some(node_id.clone()),
39 TimelineEvent::NodeExit { node_id, .. } => Some(node_id.clone()),
40 TimelineEvent::NodePaused { node_id, .. } => Some(node_id.clone()),
41 TimelineEvent::NodeRetry { node_id, .. } => Some(node_id.clone()),
42 TimelineEvent::DlqExhausted { node_id, .. } => Some(node_id.clone()),
43 TimelineEvent::Branchtaken { .. } => None, TimelineEvent::NodeTimeout { node_id, .. } => Some(node_id.clone()),
45 };
46
47 Some(ReplayFrame {
48 current_node_id,
49 event,
50 })
51 }
52
53 pub fn reset(&mut self) {
55 self.cursor = 0;
56 }
57
58 pub fn fast_forward_to_end(&mut self) -> Option<ReplayFrame> {
61 if self.timeline.events.is_empty() {
62 return None;
63 }
64 self.cursor = self.timeline.events.len() - 1;
65 self.next_step()
66 }
67
68 pub fn fast_forward_to_active(&mut self) -> Option<ReplayFrame> {
71 let mut exited_nodes = std::collections::HashSet::new();
72 let mut active_index = None;
73
74 for i in (0..self.timeline.events.len()).rev() {
75 match &self.timeline.events[i] {
76 TimelineEvent::NodeExit { node_id, .. } => {
77 exited_nodes.insert(node_id.clone());
78 }
79 TimelineEvent::NodeEnter { node_id, .. }
80 | TimelineEvent::NodePaused { node_id, .. } => {
81 if !exited_nodes.contains(node_id) {
82 active_index = Some(i);
83 break;
84 } else {
85 exited_nodes.remove(node_id);
86 }
87 }
88 _ => {}
89 }
90 }
91
92 if let Some(index) = active_index {
93 self.cursor = index;
94 self.next_step()
95 } else {
96 self.fast_forward_to_end()
97 }
98 }
99}
100
101#[derive(Debug, Clone)]
103pub struct ReplayRecoveryResult {
104 pub trace_id: String,
106 pub original_version: String,
108 pub target_version: String,
110 pub migration_hops: Vec<(String, String)>,
112 pub last_node_id: Option<String>,
114 pub recovered_payload: Option<serde_json::Value>,
116 pub resume_from_step: u64,
118}
119
120pub async fn replay_and_recover(
129 store: &dyn PersistenceStore,
130 trace_id: &str,
131 target_version: &str,
132 registry: &MigrationRegistry,
133) -> Result<ReplayRecoveryResult> {
134 let trace = store
135 .load(trace_id)
136 .await?
137 .ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
138
139 let original_version = trace.schematic_version.clone();
140 if original_version == target_version {
141 let last_event = trace.events.last();
143 return Ok(ReplayRecoveryResult {
144 trace_id: trace_id.to_string(),
145 original_version: original_version.clone(),
146 target_version: target_version.to_string(),
147 migration_hops: Vec::new(),
148 last_node_id: last_event.and_then(|e| e.node_id.clone()),
149 recovered_payload: last_event.and_then(|e| e.payload.clone()),
150 resume_from_step: last_event.map(|e| e.step.saturating_add(1)).unwrap_or(0),
151 });
152 }
153
154 let path = registry
155 .find_migration_path(&original_version, target_version)
156 .ok_or_else(|| {
157 anyhow!(
158 "no migration path from {} to {} for circuit {}",
159 original_version,
160 target_version,
161 registry.circuit_id
162 )
163 })?;
164
165 if path.is_empty() {
166 return Err(anyhow!(
167 "empty migration path from {} to {}",
168 original_version,
169 target_version
170 ));
171 }
172
173 let last_event = trace.events.last();
175 let mut current_payload = last_event.and_then(|e| e.payload.clone());
176 let last_node_id = last_event.and_then(|e| e.node_id.clone());
177 let resume_step = last_event.map(|e| e.step.saturating_add(1)).unwrap_or(0);
178
179 let mut hops = Vec::with_capacity(path.len());
180
181 for migration in &path {
183 hops.push((migration.from_version.clone(), migration.to_version.clone()));
184
185 if let (Some(mapper), Some(payload)) = (&migration.payload_mapper, ¤t_payload) {
186 current_payload = Some(mapper.map_state(payload)?);
187 }
188 }
190
191 Ok(ReplayRecoveryResult {
192 trace_id: trace_id.to_string(),
193 original_version,
194 target_version: target_version.to_string(),
195 migration_hops: hops,
196 last_node_id,
197 recovered_payload: current_payload,
198 resume_from_step: resume_step,
199 })
200}
201
202pub async fn validate_migration_path(
205 store: &dyn PersistenceStore,
206 trace_id: &str,
207 target_version: &str,
208 registry: &MigrationRegistry,
209) -> Result<bool> {
210 match replay_and_recover(store, trace_id, target_version, registry).await {
211 Ok(_) => Ok(true),
212 Err(e) => {
213 tracing::warn!(
214 trace_id = %trace_id,
215 target_version = %target_version,
216 error = %e,
217 "Migration path validation failed"
218 );
219 Ok(false)
220 }
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use ranvier_core::timeline::{Timeline, TimelineEvent};
228
229 fn test_event(node_id: &str, enter: bool) -> TimelineEvent {
230 if enter {
231 TimelineEvent::NodeEnter {
232 node_id: node_id.to_string(),
233 node_label: node_id.to_string(),
234 timestamp: 0,
235 }
236 } else {
237 TimelineEvent::NodeExit {
238 node_id: node_id.to_string(),
239 outcome_type: "Next".to_string(),
240 duration_ms: 0,
241 timestamp: 0,
242 }
243 }
244 }
245
246 #[test]
247 fn test_replay_fast_forward_to_active() {
248 let mut timeline = Timeline::new();
249 timeline.push(test_event("A", true));
250 timeline.push(test_event("A", false));
251 timeline.push(test_event("B", true));
252 timeline.push(test_event("B", false));
253 timeline.push(test_event("C", true)); let mut engine = ReplayEngine::new(timeline);
256 let frame = engine.fast_forward_to_active().unwrap();
257 assert_eq!(frame.current_node_id, Some("C".to_string()));
258 }
259
260 #[test]
261 fn test_replay_with_repeated_nodes() {
262 let mut timeline = Timeline::new();
263 timeline.push(test_event("A", true));
264 timeline.push(test_event("A", false));
265 timeline.push(test_event("A", true)); let mut engine = ReplayEngine::new(timeline);
268 let frame = engine.fast_forward_to_active().unwrap();
269 assert_eq!(frame.current_node_id, Some("A".to_string()));
270 assert_eq!(engine.cursor, 3);
272 }
273}