use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::event::StreamEvent;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReplayMode {
FromTime(DateTime<Utc>),
FromOffset(u64),
TimeRange {
start: DateTime<Utc>,
end: DateTime<Utc>,
},
OffsetRange { start: u64, end: u64 },
All,
Filtered {
filter: String,
from: Option<DateTime<Utc>>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReplaySpeed {
RealTime,
Custom(f64),
MaxSpeed,
SlowMotion(f64),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplayFilter {
pub event_types: Option<Vec<String>>,
pub sources: Option<Vec<String>>,
pub min_priority: Option<u8>,
pub custom_predicate: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplayTransformation {
pub name: String,
pub transform_type: TransformationType,
pub parameters: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TransformationType {
FieldMapping,
Enrichment,
Aggregation,
Splitting,
Custom,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplayConfig {
pub mode: ReplayMode,
pub speed: ReplaySpeed,
pub filter: Option<ReplayFilter>,
pub transformations: Vec<ReplayTransformation>,
pub batch_size: usize,
pub enable_snapshots: bool,
pub snapshot_interval: Duration,
pub enable_parallel: bool,
pub parallel_workers: usize,
pub checkpoint_interval: Duration,
}
impl Default for ReplayConfig {
fn default() -> Self {
Self {
mode: ReplayMode::All,
speed: ReplaySpeed::MaxSpeed,
filter: None,
transformations: Vec::new(),
batch_size: 1000,
enable_snapshots: true,
snapshot_interval: Duration::from_secs(60),
enable_parallel: false,
parallel_workers: 4,
checkpoint_interval: Duration::from_secs(30),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateSnapshot {
pub snapshot_id: String,
pub timestamp: DateTime<Utc>,
pub event_offset: u64,
pub state_data: Vec<u8>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplayCheckpoint {
pub checkpoint_id: String,
pub timestamp: DateTime<Utc>,
pub last_offset: u64,
pub events_processed: u64,
pub status: ReplayStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ReplayStatus {
NotStarted,
InProgress,
Paused,
Completed,
Failed { reason: String },
}
#[derive(Debug, Clone, Default)]
pub struct ReplayStats {
pub events_replayed: u64,
pub events_filtered: u64,
pub events_transformed: u64,
pub total_replay_time_ms: u64,
pub avg_processing_time_ms: f64,
pub snapshots_created: u64,
pub checkpoints_created: u64,
pub errors_encountered: u64,
}
pub struct StreamReplayManager {
config: ReplayConfig,
event_store: Arc<DashMap<u64, StreamEvent>>,
snapshots: Arc<RwLock<Vec<StateSnapshot>>>,
checkpoints: Arc<RwLock<Vec<ReplayCheckpoint>>>,
stats: Arc<RwLock<ReplayStats>>,
active_replays: Arc<DashMap<String, ReplaySession>>,
processors: Arc<RwLock<Vec<Box<dyn EventProcessor + Send + Sync>>>>,
}
struct ReplaySession {
session_id: String,
start_time: Instant,
status: ReplayStatus,
current_offset: u64,
events_processed: u64,
}
pub trait EventProcessor: Send + Sync {
fn process(&mut self, event: &StreamEvent) -> Result<Option<StreamEvent>>;
fn name(&self) -> &str;
}
impl StreamReplayManager {
pub fn new(config: ReplayConfig) -> Self {
Self {
config,
event_store: Arc::new(DashMap::new()),
snapshots: Arc::new(RwLock::new(Vec::new())),
checkpoints: Arc::new(RwLock::new(Vec::new())),
stats: Arc::new(RwLock::new(ReplayStats::default())),
active_replays: Arc::new(DashMap::new()),
processors: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn store_event(&self, offset: u64, event: StreamEvent) {
self.event_store.insert(offset, event);
debug!("Stored event at offset {}", offset);
}
pub fn store_events(&self, events: Vec<(u64, StreamEvent)>) {
for (offset, event) in events {
self.store_event(offset, event);
}
}
pub async fn start_replay(
&self,
session_id: Option<String>,
) -> Result<mpsc::UnboundedReceiver<StreamEvent>> {
let session_id = session_id.unwrap_or_else(|| Uuid::new_v4().to_string());
let session = ReplaySession {
session_id: session_id.clone(),
start_time: Instant::now(),
status: ReplayStatus::InProgress,
current_offset: 0,
events_processed: 0,
};
self.active_replays.insert(session_id.clone(), session);
let (tx, rx) = mpsc::unbounded_channel();
let event_store = self.event_store.clone();
let config = self.config.clone();
let stats = self.stats.clone();
let snapshots = self.snapshots.clone();
let checkpoints = self.checkpoints.clone();
let active_replays = self.active_replays.clone();
let processors = self.processors.clone();
let session_id_clone = session_id.clone();
tokio::spawn(async move {
if let Err(e) = Self::replay_events_internal(
&session_id_clone,
&event_store,
&config,
&stats,
&snapshots,
&checkpoints,
&active_replays,
&processors,
tx,
)
.await
{
error!("Replay failed: {}", e);
if let Some(mut session) = active_replays.get_mut(&session_id_clone) {
session.status = ReplayStatus::Failed {
reason: e.to_string(),
};
}
}
});
info!("Started replay session: {}", session_id);
Ok(rx)
}
#[allow(clippy::too_many_arguments)]
async fn replay_events_internal(
session_id: &str,
event_store: &Arc<DashMap<u64, StreamEvent>>,
config: &ReplayConfig,
stats: &Arc<RwLock<ReplayStats>>,
snapshots: &Arc<RwLock<Vec<StateSnapshot>>>,
checkpoints: &Arc<RwLock<Vec<ReplayCheckpoint>>>,
active_replays: &Arc<DashMap<String, ReplaySession>>,
processors: &Arc<RwLock<Vec<Box<dyn EventProcessor + Send + Sync>>>>,
tx: mpsc::UnboundedSender<StreamEvent>,
) -> Result<()> {
let start_time = Instant::now();
let (start_offset, end_offset) = Self::determine_offset_range(config, event_store)?;
debug!(
"Replaying events from offset {} to {}",
start_offset, end_offset
);
let mut events_replayed = 0;
let mut last_checkpoint = Instant::now();
let mut last_snapshot = Instant::now();
for offset in start_offset..=end_offset {
if let Some(event) = event_store.get(&offset) {
let event = event.clone();
if let Some(ref filter) = config.filter {
if !Self::apply_filter(&event, filter) {
stats.write().events_filtered += 1;
continue;
}
}
let mut transformed_event = event.clone();
for transformation in &config.transformations {
transformed_event =
Self::apply_transformation(transformed_event, transformation)?;
}
if !config.transformations.is_empty() {
stats.write().events_transformed += 1;
}
let mut final_event = Some(transformed_event);
for processor in processors.write().iter_mut() {
if let Some(evt) = final_event {
final_event = processor.process(&evt)?;
}
}
if let Some(evt) = final_event {
Self::apply_speed_control(config).await;
if tx.send(evt).is_err() {
warn!("Receiver dropped, stopping replay");
break;
}
events_replayed += 1;
if let Some(mut session) = active_replays.get_mut(session_id) {
session.current_offset = offset;
session.events_processed = events_replayed;
}
}
if last_checkpoint.elapsed() >= config.checkpoint_interval {
Self::create_checkpoint(
session_id,
offset,
events_replayed,
checkpoints,
stats,
)
.await?;
last_checkpoint = Instant::now();
}
if config.enable_snapshots && last_snapshot.elapsed() >= config.snapshot_interval {
Self::create_snapshot(session_id, offset, snapshots, stats).await?;
last_snapshot = Instant::now();
}
}
}
let total_time = start_time.elapsed().as_millis() as u64;
let mut stats_guard = stats.write();
stats_guard.events_replayed += events_replayed;
stats_guard.total_replay_time_ms += total_time;
if events_replayed > 0 {
stats_guard.avg_processing_time_ms = total_time as f64 / events_replayed as f64;
}
if let Some(mut session) = active_replays.get_mut(session_id) {
session.status = ReplayStatus::Completed;
}
info!(
"Replay completed: {} events in {}ms",
events_replayed, total_time
);
Ok(())
}
fn determine_offset_range(
config: &ReplayConfig,
event_store: &Arc<DashMap<u64, StreamEvent>>,
) -> Result<(u64, u64)> {
let max_offset = event_store.iter().map(|e| *e.key()).max().unwrap_or(0);
match &config.mode {
ReplayMode::All => Ok((0, max_offset)),
ReplayMode::FromOffset(start) => Ok((*start, max_offset)),
ReplayMode::OffsetRange { start, end } => Ok((*start, *end)),
ReplayMode::FromTime(start_time) => {
let start_offset = event_store
.iter()
.filter_map(|entry| {
let offset = *entry.key();
let event = entry.value();
let event_time = Self::get_event_timestamp(event);
if event_time >= *start_time {
Some(offset)
} else {
None
}
})
.min()
.unwrap_or(0);
Ok((start_offset, max_offset))
}
ReplayMode::TimeRange { start, end } => {
let start_offset = event_store
.iter()
.filter_map(|entry| {
let offset = *entry.key();
let event = entry.value();
let event_time = Self::get_event_timestamp(event);
if event_time >= *start {
Some(offset)
} else {
None
}
})
.min()
.unwrap_or(0);
let end_offset = event_store
.iter()
.filter_map(|entry| {
let offset = *entry.key();
let event = entry.value();
let event_time = Self::get_event_timestamp(event);
if event_time <= *end {
Some(offset)
} else {
None
}
})
.max()
.unwrap_or(max_offset);
Ok((start_offset, end_offset))
}
ReplayMode::Filtered { from, .. } => {
let start_offset = if let Some(start_time) = from {
event_store
.iter()
.filter_map(|entry| {
let offset = *entry.key();
let event = entry.value();
let event_time = Self::get_event_timestamp(event);
if event_time >= *start_time {
Some(offset)
} else {
None
}
})
.min()
.unwrap_or(0)
} else {
0
};
Ok((start_offset, max_offset))
}
}
}
fn get_event_timestamp(event: &StreamEvent) -> DateTime<Utc> {
let metadata = match event {
StreamEvent::TripleAdded { metadata, .. }
| StreamEvent::TripleRemoved { metadata, .. }
| StreamEvent::QuadAdded { metadata, .. }
| StreamEvent::QuadRemoved { metadata, .. }
| StreamEvent::GraphCreated { metadata, .. }
| StreamEvent::GraphCleared { metadata, .. }
| StreamEvent::GraphDeleted { metadata, .. }
| StreamEvent::GraphMetadataUpdated { metadata, .. }
| StreamEvent::GraphPermissionsChanged { metadata, .. }
| StreamEvent::GraphStatisticsUpdated { metadata, .. }
| StreamEvent::GraphRenamed { metadata, .. }
| StreamEvent::GraphMerged { metadata, .. }
| StreamEvent::GraphSplit { metadata, .. }
| StreamEvent::SparqlUpdate { metadata, .. }
| StreamEvent::QueryCompleted { metadata, .. }
| StreamEvent::QueryResultAdded { metadata, .. }
| StreamEvent::QueryResultRemoved { metadata, .. }
| StreamEvent::TransactionBegin { metadata, .. }
| StreamEvent::TransactionCommit { metadata, .. }
| StreamEvent::TransactionAbort { metadata, .. }
| StreamEvent::SchemaChanged { metadata, .. }
| StreamEvent::SchemaDefinitionAdded { metadata, .. }
| StreamEvent::SchemaDefinitionRemoved { metadata, .. }
| StreamEvent::SchemaDefinitionModified { metadata, .. }
| StreamEvent::OntologyImported { metadata, .. }
| StreamEvent::OntologyRemoved { metadata, .. }
| StreamEvent::ConstraintAdded { metadata, .. }
| StreamEvent::ConstraintRemoved { metadata, .. }
| StreamEvent::ConstraintViolated { metadata, .. }
| StreamEvent::IndexCreated { metadata, .. }
| StreamEvent::IndexDropped { metadata, .. }
| StreamEvent::IndexRebuilt { metadata, .. }
| StreamEvent::SchemaUpdated { metadata, .. }
| StreamEvent::ShapeAdded { metadata, .. }
| StreamEvent::ShapeRemoved { metadata, .. }
| StreamEvent::ShapeModified { metadata, .. }
| StreamEvent::ShapeUpdated { metadata, .. }
| StreamEvent::ShapeValidationStarted { metadata, .. }
| StreamEvent::ShapeValidationCompleted { metadata, .. }
| StreamEvent::ShapeViolationDetected { metadata, .. }
| StreamEvent::Heartbeat { metadata, .. }
| StreamEvent::ErrorOccurred { metadata, .. } => metadata,
};
metadata.timestamp
}
fn apply_filter(event: &StreamEvent, filter: &ReplayFilter) -> bool {
if let Some(ref event_types) = filter.event_types {
let event_type = Self::get_event_type(event);
if !event_types.contains(&event_type) {
return false;
}
}
if let Some(ref sources) = filter.sources {
let source = Self::get_event_source(event);
if !sources.contains(&source) {
return false;
}
}
true
}
fn get_event_type(event: &StreamEvent) -> String {
match event {
StreamEvent::TripleAdded { .. } => "TripleAdded",
StreamEvent::TripleRemoved { .. } => "TripleRemoved",
StreamEvent::QuadAdded { .. } => "QuadAdded",
StreamEvent::QuadRemoved { .. } => "QuadRemoved",
StreamEvent::GraphCreated { .. } => "GraphCreated",
StreamEvent::GraphCleared { .. } => "GraphCleared",
StreamEvent::GraphDeleted { .. } => "GraphDeleted",
StreamEvent::SparqlUpdate { .. } => "SparqlUpdate",
StreamEvent::TransactionBegin { .. } => "TransactionBegin",
StreamEvent::TransactionCommit { .. } => "TransactionCommit",
StreamEvent::TransactionAbort { .. } => "TransactionAbort",
StreamEvent::SchemaChanged { .. } => "SchemaChanged",
StreamEvent::Heartbeat { .. } => "Heartbeat",
StreamEvent::QueryResultAdded { .. } => "QueryResultAdded",
StreamEvent::QueryResultRemoved { .. } => "QueryResultRemoved",
StreamEvent::QueryCompleted { .. } => "QueryCompleted",
_ => "Other",
}
.to_string()
}
fn get_event_source(event: &StreamEvent) -> String {
let metadata = match event {
StreamEvent::TripleAdded { metadata, .. }
| StreamEvent::TripleRemoved { metadata, .. }
| StreamEvent::QuadAdded { metadata, .. }
| StreamEvent::QuadRemoved { metadata, .. }
| StreamEvent::GraphCreated { metadata, .. }
| StreamEvent::GraphCleared { metadata, .. }
| StreamEvent::GraphDeleted { metadata, .. }
| StreamEvent::SparqlUpdate { metadata, .. }
| StreamEvent::TransactionBegin { metadata, .. }
| StreamEvent::TransactionCommit { metadata, .. }
| StreamEvent::TransactionAbort { metadata, .. }
| StreamEvent::SchemaChanged { metadata, .. }
| StreamEvent::Heartbeat { metadata, .. }
| StreamEvent::QueryResultAdded { metadata, .. }
| StreamEvent::QueryResultRemoved { metadata, .. }
| StreamEvent::QueryCompleted { metadata, .. }
| StreamEvent::GraphMetadataUpdated { metadata, .. }
| StreamEvent::GraphPermissionsChanged { metadata, .. }
| StreamEvent::GraphStatisticsUpdated { metadata, .. }
| StreamEvent::GraphRenamed { metadata, .. }
| StreamEvent::GraphMerged { metadata, .. }
| StreamEvent::GraphSplit { metadata, .. }
| StreamEvent::SchemaDefinitionAdded { metadata, .. }
| StreamEvent::SchemaDefinitionRemoved { metadata, .. }
| StreamEvent::SchemaDefinitionModified { metadata, .. }
| StreamEvent::OntologyImported { metadata, .. }
| StreamEvent::OntologyRemoved { metadata, .. }
| StreamEvent::ConstraintAdded { metadata, .. }
| StreamEvent::ConstraintRemoved { metadata, .. }
| StreamEvent::ConstraintViolated { metadata, .. }
| StreamEvent::IndexCreated { metadata, .. }
| StreamEvent::IndexDropped { metadata, .. }
| StreamEvent::IndexRebuilt { metadata, .. }
| StreamEvent::SchemaUpdated { metadata, .. }
| StreamEvent::ShapeAdded { metadata, .. }
| StreamEvent::ShapeUpdated { metadata, .. }
| StreamEvent::ShapeRemoved { metadata, .. }
| StreamEvent::ShapeModified { metadata, .. }
| StreamEvent::ShapeValidationStarted { metadata, .. }
| StreamEvent::ShapeValidationCompleted { metadata, .. }
| StreamEvent::ShapeViolationDetected { metadata, .. }
| StreamEvent::ErrorOccurred { metadata, .. } => metadata,
};
metadata.source.clone()
}
fn apply_transformation(
event: StreamEvent,
_transformation: &ReplayTransformation,
) -> Result<StreamEvent> {
Ok(event)
}
async fn apply_speed_control(config: &ReplayConfig) {
match config.speed {
ReplaySpeed::RealTime => {
sleep(Duration::from_millis(1)).await;
}
ReplaySpeed::MaxSpeed => {
}
ReplaySpeed::Custom(multiplier) => {
let delay = Duration::from_millis((10.0 / multiplier) as u64);
sleep(delay).await;
}
ReplaySpeed::SlowMotion(factor) => {
let delay = Duration::from_millis((100.0 * factor) as u64);
sleep(delay).await;
}
}
}
async fn create_checkpoint(
session_id: &str,
offset: u64,
events_processed: u64,
checkpoints: &Arc<RwLock<Vec<ReplayCheckpoint>>>,
stats: &Arc<RwLock<ReplayStats>>,
) -> Result<()> {
let checkpoint = ReplayCheckpoint {
checkpoint_id: Uuid::new_v4().to_string(),
timestamp: Utc::now(),
last_offset: offset,
events_processed,
status: ReplayStatus::InProgress,
};
checkpoints.write().push(checkpoint);
stats.write().checkpoints_created += 1;
debug!(
"Checkpoint created for session {} at offset {}",
session_id, offset
);
Ok(())
}
async fn create_snapshot(
session_id: &str,
offset: u64,
snapshots: &Arc<RwLock<Vec<StateSnapshot>>>,
stats: &Arc<RwLock<ReplayStats>>,
) -> Result<()> {
let snapshot = StateSnapshot {
snapshot_id: Uuid::new_v4().to_string(),
timestamp: Utc::now(),
event_offset: offset,
state_data: Vec::new(), metadata: HashMap::new(),
};
snapshots.write().push(snapshot);
stats.write().snapshots_created += 1;
debug!(
"Snapshot created for session {} at offset {}",
session_id, offset
);
Ok(())
}
pub fn pause_replay(&self, session_id: &str) -> Result<()> {
if let Some(mut session) = self.active_replays.get_mut(session_id) {
session.status = ReplayStatus::Paused;
info!("Replay session {} paused", session_id);
Ok(())
} else {
Err(anyhow!("Replay session not found: {}", session_id))
}
}
pub fn resume_replay(&self, session_id: &str) -> Result<()> {
if let Some(mut session) = self.active_replays.get_mut(session_id) {
session.status = ReplayStatus::InProgress;
info!("Replay session {} resumed", session_id);
Ok(())
} else {
Err(anyhow!("Replay session not found: {}", session_id))
}
}
pub fn get_stats(&self) -> ReplayStats {
self.stats.read().clone()
}
pub fn get_session_status(&self, session_id: &str) -> Option<ReplayStatus> {
self.active_replays
.get(session_id)
.map(|session| session.status.clone())
}
pub fn register_processor(&self, processor: Box<dyn EventProcessor + Send + Sync>) {
let name = processor.name().to_string();
self.processors.write().push(processor);
info!("Registered event processor: {}", name);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::EventMetadata;
#[tokio::test]
async fn test_replay_all_events() {
let config = ReplayConfig {
mode: ReplayMode::All,
speed: ReplaySpeed::MaxSpeed,
..Default::default()
};
let manager = StreamReplayManager::new(config);
for i in 0..10 {
let event = StreamEvent::SchemaChanged {
schema_type: crate::event::SchemaType::Ontology,
change_type: crate::event::SchemaChangeType::Added,
details: format!("test schema change {}", i),
metadata: EventMetadata {
event_id: format!("event-{}", i),
timestamp: Utc::now(),
source: "test".to_string(),
user: None,
context: None,
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
},
};
manager.store_event(i, event);
}
let mut rx = manager.start_replay(None).await.unwrap();
let mut count = 0;
while let Some(_event) = rx.recv().await {
count += 1;
}
assert_eq!(count, 10);
let stats = manager.get_stats();
assert_eq!(stats.events_replayed, 10);
}
#[tokio::test]
async fn test_replay_with_filter() {
let config = ReplayConfig {
mode: ReplayMode::All,
speed: ReplaySpeed::MaxSpeed,
filter: Some(ReplayFilter {
event_types: Some(vec!["SchemaChanged".to_string()]),
sources: Some(vec!["test".to_string()]),
min_priority: None,
custom_predicate: None,
}),
..Default::default()
};
let manager = StreamReplayManager::new(config);
for i in 0..5 {
let event = StreamEvent::SchemaChanged {
schema_type: crate::event::SchemaType::Ontology,
change_type: crate::event::SchemaChangeType::Added,
details: format!("test schema change {}", i),
metadata: EventMetadata {
event_id: format!("event-{}", i),
timestamp: Utc::now(),
source: "test".to_string(),
user: None,
context: None,
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
},
};
manager.store_event(i, event);
}
let mut rx = manager.start_replay(None).await.unwrap();
let mut count = 0;
while let Some(_event) = rx.recv().await {
count += 1;
}
assert_eq!(count, 5);
}
}