use crate::persistence::PersistenceStore;
use anyhow::{Result, anyhow};
use ranvier_core::schematic::MigrationRegistry;
use ranvier_core::timeline::{Timeline, TimelineEvent};
pub struct ReplayEngine {
timeline: Timeline,
cursor: usize,
}
#[derive(Debug, Clone)]
pub struct ReplayFrame {
pub current_node_id: Option<String>,
pub event: TimelineEvent,
}
impl ReplayEngine {
pub fn new(timeline: Timeline) -> Self {
Self {
timeline,
cursor: 0,
}
}
pub fn next_step(&mut self) -> Option<ReplayFrame> {
if self.cursor >= self.timeline.events.len() {
return None;
}
let event = self.timeline.events[self.cursor].clone();
self.cursor += 1;
let current_node_id = match &event {
TimelineEvent::NodeEnter { node_id, .. } => Some(node_id.clone()),
TimelineEvent::NodeExit { node_id, .. } => Some(node_id.clone()),
TimelineEvent::NodePaused { node_id, .. } => Some(node_id.clone()),
TimelineEvent::NodeRetry { node_id, .. } => Some(node_id.clone()),
TimelineEvent::DlqExhausted { node_id, .. } => Some(node_id.clone()),
TimelineEvent::Branchtaken { .. } => None, TimelineEvent::NodeTimeout { node_id, .. } => Some(node_id.clone()),
};
Some(ReplayFrame {
current_node_id,
event,
})
}
pub fn reset(&mut self) {
self.cursor = 0;
}
pub fn fast_forward_to_end(&mut self) -> Option<ReplayFrame> {
if self.timeline.events.is_empty() {
return None;
}
self.cursor = self.timeline.events.len() - 1;
self.next_step()
}
pub fn fast_forward_to_active(&mut self) -> Option<ReplayFrame> {
let mut exited_nodes = std::collections::HashSet::new();
let mut active_index = None;
for i in (0..self.timeline.events.len()).rev() {
match &self.timeline.events[i] {
TimelineEvent::NodeExit { node_id, .. } => {
exited_nodes.insert(node_id.clone());
}
TimelineEvent::NodeEnter { node_id, .. }
| TimelineEvent::NodePaused { node_id, .. } => {
if !exited_nodes.contains(node_id) {
active_index = Some(i);
break;
} else {
exited_nodes.remove(node_id);
}
}
_ => {}
}
}
if let Some(index) = active_index {
self.cursor = index;
self.next_step()
} else {
self.fast_forward_to_end()
}
}
}
#[derive(Debug, Clone)]
pub struct ReplayRecoveryResult {
pub trace_id: String,
pub original_version: String,
pub target_version: String,
pub migration_hops: Vec<(String, String)>,
pub last_node_id: Option<String>,
pub recovered_payload: Option<serde_json::Value>,
pub resume_from_step: u64,
}
pub async fn replay_and_recover(
store: &dyn PersistenceStore,
trace_id: &str,
target_version: &str,
registry: &MigrationRegistry,
) -> Result<ReplayRecoveryResult> {
let trace = store
.load(trace_id)
.await?
.ok_or_else(|| anyhow!("trace_id {} not found", trace_id))?;
let original_version = trace.schematic_version.clone();
if original_version == target_version {
let last_event = trace.events.last();
return Ok(ReplayRecoveryResult {
trace_id: trace_id.to_string(),
original_version: original_version.clone(),
target_version: target_version.to_string(),
migration_hops: Vec::new(),
last_node_id: last_event.and_then(|e| e.node_id.clone()),
recovered_payload: last_event.and_then(|e| e.payload.clone()),
resume_from_step: last_event.map(|e| e.step.saturating_add(1)).unwrap_or(0),
});
}
let path = registry
.find_migration_path(&original_version, target_version)
.ok_or_else(|| {
anyhow!(
"no migration path from {} to {} for circuit {}",
original_version,
target_version,
registry.circuit_id
)
})?;
if path.is_empty() {
return Err(anyhow!(
"empty migration path from {} to {}",
original_version,
target_version
));
}
let last_event = trace.events.last();
let mut current_payload = last_event.and_then(|e| e.payload.clone());
let last_node_id = last_event.and_then(|e| e.node_id.clone());
let resume_step = last_event.map(|e| e.step.saturating_add(1)).unwrap_or(0);
let mut hops = Vec::with_capacity(path.len());
for migration in &path {
hops.push((migration.from_version.clone(), migration.to_version.clone()));
if let (Some(mapper), Some(payload)) = (&migration.payload_mapper, ¤t_payload) {
current_payload = Some(mapper.map_state(payload)?);
}
}
Ok(ReplayRecoveryResult {
trace_id: trace_id.to_string(),
original_version,
target_version: target_version.to_string(),
migration_hops: hops,
last_node_id,
recovered_payload: current_payload,
resume_from_step: resume_step,
})
}
pub async fn validate_migration_path(
store: &dyn PersistenceStore,
trace_id: &str,
target_version: &str,
registry: &MigrationRegistry,
) -> Result<bool> {
match replay_and_recover(store, trace_id, target_version, registry).await {
Ok(_) => Ok(true),
Err(e) => {
tracing::warn!(
trace_id = %trace_id,
target_version = %target_version,
error = %e,
"Migration path validation failed"
);
Ok(false)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use ranvier_core::timeline::{Timeline, TimelineEvent};
fn test_event(node_id: &str, enter: bool) -> TimelineEvent {
if enter {
TimelineEvent::NodeEnter {
node_id: node_id.to_string(),
node_label: node_id.to_string(),
timestamp: 0,
}
} else {
TimelineEvent::NodeExit {
node_id: node_id.to_string(),
outcome_type: "Next".to_string(),
duration_ms: 0,
timestamp: 0,
}
}
}
#[test]
fn test_replay_fast_forward_to_active() {
let mut timeline = Timeline::new();
timeline.push(test_event("A", true));
timeline.push(test_event("A", false));
timeline.push(test_event("B", true));
timeline.push(test_event("B", false));
timeline.push(test_event("C", true));
let mut engine = ReplayEngine::new(timeline);
let frame = engine.fast_forward_to_active().unwrap();
assert_eq!(frame.current_node_id, Some("C".to_string()));
}
#[test]
fn test_replay_with_repeated_nodes() {
let mut timeline = Timeline::new();
timeline.push(test_event("A", true));
timeline.push(test_event("A", false));
timeline.push(test_event("A", true));
let mut engine = ReplayEngine::new(timeline);
let frame = engine.fast_forward_to_active().unwrap();
assert_eq!(frame.current_node_id, Some("A".to_string()));
assert_eq!(engine.cursor, 3);
}
}