1use crate::persistence::PersistenceStore;
2use anyhow::{Result, anyhow};
3use ranvier_core::schematic::MigrationRegistry;
4use ranvier_core::timeline::{Timeline, TimelineEvent};
5
6pub struct ReplayEngine {
9 timeline: Timeline,
10 cursor: usize,
11}
12
13#[derive(Debug, Clone)]
14pub struct ReplayFrame {
15 pub current_node_id: Option<String>,
16 pub event: TimelineEvent,
17}
18
19impl ReplayEngine {
20 pub fn new(timeline: Timeline) -> Self {
21 Self {
22 timeline,
23 cursor: 0,
24 }
25 }
26
27 pub fn next_step(&mut self) -> Option<ReplayFrame> {
30 if self.cursor >= self.timeline.events.len() {
31 return None;
32 }
33
34 let event = self.timeline.events[self.cursor].clone();
35 self.cursor += 1;
36
37 let current_node_id = match &event {
38 TimelineEvent::NodeEnter { node_id, .. } => Some(node_id.clone()),
39 TimelineEvent::NodeExit { node_id, .. } => Some(node_id.clone()),
40 TimelineEvent::NodePaused { node_id, .. } => Some(node_id.clone()),
41 TimelineEvent::NodeRetry { node_id, .. } => Some(node_id.clone()),
42 TimelineEvent::DlqExhausted { node_id, .. } => Some(node_id.clone()),
43 TimelineEvent::Branchtaken { .. } => None, };
45
46 Some(ReplayFrame {
47 current_node_id,
48 event,
49 })
50 }
51
52 pub fn reset(&mut self) {
54 self.cursor = 0;
55 }
56
57 pub fn fast_forward_to_end(&mut self) -> Option<ReplayFrame> {
60 if self.timeline.events.is_empty() {
61 return None;
62 }
63 self.cursor = self.timeline.events.len() - 1;
64 self.next_step()
65 }
66
67 pub fn fast_forward_to_active(&mut self) -> Option<ReplayFrame> {
70 let mut exited_nodes = std::collections::HashSet::new();
71 let mut active_index = None;
72
73 for i in (0..self.timeline.events.len()).rev() {
74 match &self.timeline.events[i] {
75 TimelineEvent::NodeExit { node_id, .. } => {
76 exited_nodes.insert(node_id.clone());
77 }
78 TimelineEvent::NodeEnter { node_id, .. }
79 | TimelineEvent::NodePaused { node_id, .. } => {
80 if !exited_nodes.contains(node_id) {
81 active_index = Some(i);
82 break;
83 } else {
84 exited_nodes.remove(node_id);
85 }
86 }
87 _ => {}
88 }
89 }
90
91 if let Some(index) = active_index {
92 self.cursor = index;
93 self.next_step()
94 } else {
95 self.fast_forward_to_end()
96 }
97 }
98}
99
100#[derive(Debug, Clone)]
102pub struct ReplayRecoveryResult {
103 pub trace_id: String,
105 pub original_version: String,
107 pub target_version: String,
109 pub migration_hops: Vec<(String, String)>,
111 pub last_node_id: Option<String>,
113 pub recovered_payload: Option<serde_json::Value>,
115 pub resume_from_step: u64,
117}
118
119pub async fn replay_and_recover(
128 store: &dyn PersistenceStore,
129 trace_id: &str,
130 target_version: &str,
131 registry: &MigrationRegistry,
132) -> Result<ReplayRecoveryResult> {
133 let trace = store
134 .load(trace_id)
135 .await?
136 .ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
137
138 let original_version = trace.schematic_version.clone();
139 if original_version == target_version {
140 let last_event = trace.events.last();
142 return Ok(ReplayRecoveryResult {
143 trace_id: trace_id.to_string(),
144 original_version: original_version.clone(),
145 target_version: target_version.to_string(),
146 migration_hops: Vec::new(),
147 last_node_id: last_event.and_then(|e| e.node_id.clone()),
148 recovered_payload: last_event.and_then(|e| e.payload.clone()),
149 resume_from_step: last_event.map(|e| e.step.saturating_add(1)).unwrap_or(0),
150 });
151 }
152
153 let path = registry
154 .find_migration_path(&original_version, target_version)
155 .ok_or_else(|| {
156 anyhow!(
157 "no migration path from {} to {} for circuit {}",
158 original_version,
159 target_version,
160 registry.circuit_id
161 )
162 })?;
163
164 if path.is_empty() {
165 return Err(anyhow!(
166 "empty migration path from {} to {}",
167 original_version,
168 target_version
169 ));
170 }
171
172 let last_event = trace.events.last();
174 let mut current_payload = last_event.and_then(|e| e.payload.clone());
175 let last_node_id = last_event.and_then(|e| e.node_id.clone());
176 let resume_step = last_event.map(|e| e.step.saturating_add(1)).unwrap_or(0);
177
178 let mut hops = Vec::with_capacity(path.len());
179
180 for migration in &path {
182 hops.push((migration.from_version.clone(), migration.to_version.clone()));
183
184 if let (Some(mapper), Some(payload)) = (&migration.payload_mapper, ¤t_payload) {
185 current_payload = Some(mapper.map_state(payload)?);
186 }
187 }
189
190 Ok(ReplayRecoveryResult {
191 trace_id: trace_id.to_string(),
192 original_version,
193 target_version: target_version.to_string(),
194 migration_hops: hops,
195 last_node_id,
196 recovered_payload: current_payload,
197 resume_from_step: resume_step,
198 })
199}
200
201pub async fn validate_migration_path(
204 store: &dyn PersistenceStore,
205 trace_id: &str,
206 target_version: &str,
207 registry: &MigrationRegistry,
208) -> Result<bool> {
209 match replay_and_recover(store, trace_id, target_version, registry).await {
210 Ok(_) => Ok(true),
211 Err(e) => {
212 tracing::warn!(
213 trace_id = %trace_id,
214 target_version = %target_version,
215 error = %e,
216 "Migration path validation failed"
217 );
218 Ok(false)
219 }
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use ranvier_core::timeline::{Timeline, TimelineEvent};
227
228 fn test_event(node_id: &str, enter: bool) -> TimelineEvent {
229 if enter {
230 TimelineEvent::NodeEnter {
231 node_id: node_id.to_string(),
232 node_label: node_id.to_string(),
233 timestamp: 0,
234 }
235 } else {
236 TimelineEvent::NodeExit {
237 node_id: node_id.to_string(),
238 outcome_type: "Next".to_string(),
239 duration_ms: 0,
240 timestamp: 0,
241 }
242 }
243 }
244
245 #[test]
246 fn test_replay_fast_forward_to_active() {
247 let mut timeline = Timeline::new();
248 timeline.push(test_event("A", true));
249 timeline.push(test_event("A", false));
250 timeline.push(test_event("B", true));
251 timeline.push(test_event("B", false));
252 timeline.push(test_event("C", true)); let mut engine = ReplayEngine::new(timeline);
255 let frame = engine.fast_forward_to_active().unwrap();
256 assert_eq!(frame.current_node_id, Some("C".to_string()));
257 }
258
259 #[test]
260 fn test_replay_with_repeated_nodes() {
261 let mut timeline = Timeline::new();
262 timeline.push(test_event("A", true));
263 timeline.push(test_event("A", false));
264 timeline.push(test_event("A", true)); let mut engine = ReplayEngine::new(timeline);
267 let frame = engine.fast_forward_to_active().unwrap();
268 assert_eq!(frame.current_node_id, Some("A".to_string()));
269 assert_eq!(engine.cursor, 3);
271 }
272}