Skip to main content

oxirs_stream/
stream_replay.rs

1//! # Stream Replay and Reprocessing
2//!
3//! This module provides comprehensive stream replay and reprocessing capabilities
4//! for debugging, recovery, and data analysis.
5//!
6//! ## Features
7//!
8//! - **Time-based replay**: Replay events from a specific time range
9//! - **Offset-based replay**: Replay from a specific offset
10//! - **Conditional replay**: Replay with filtering and transformation
11//! - **Speed control**: Replay at custom speeds (slow-motion, fast-forward)
12//! - **State snapshots**: Capture and restore application state
13//! - **Parallel replay**: Replay multiple streams concurrently
14//! - **Incremental processing**: Process only new events since last run
15//!
16//! ## Use Cases
17//!
18//! - **Debugging**: Replay problematic event sequences
19//! - **Testing**: Replay production data in test environments
20//! - **Recovery**: Rebuild state after failures
21//! - **Analysis**: Analyze historical event patterns
22//! - **Migration**: Reprocess events with new business logic
23
24use anyhow::{anyhow, Result};
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use parking_lot::RwLock;
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32use tokio::sync::mpsc;
33use tokio::time::sleep;
34use tracing::{debug, error, info, warn};
35use uuid::Uuid;
36
37use crate::event::StreamEvent;
38
39/// Replay mode determining how events are replayed
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum ReplayMode {
42    /// Replay from a specific time
43    FromTime(DateTime<Utc>),
44    /// Replay from a specific offset
45    FromOffset(u64),
46    /// Replay a time range
47    TimeRange {
48        start: DateTime<Utc>,
49        end: DateTime<Utc>,
50    },
51    /// Replay an offset range
52    OffsetRange { start: u64, end: u64 },
53    /// Replay all events
54    All,
55    /// Replay only events matching a filter
56    Filtered {
57        filter: String,
58        from: Option<DateTime<Utc>>,
59    },
60}
61
62/// Speed control for replay
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub enum ReplaySpeed {
65    /// Real-time speed (preserve original timing)
66    RealTime,
67    /// Custom speed multiplier (2.0 = 2x faster, 0.5 = 2x slower)
68    Custom(f64),
69    /// Fast as possible (no delays)
70    MaxSpeed,
71    /// Slow motion for debugging
72    SlowMotion(f64),
73}
74
75/// Filter for selective replay
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct ReplayFilter {
78    /// Event types to include
79    pub event_types: Option<Vec<String>>,
80    /// Event sources to include
81    pub sources: Option<Vec<String>>,
82    /// Minimum event priority
83    pub min_priority: Option<u8>,
84    /// Custom predicate (serialized as string)
85    pub custom_predicate: Option<String>,
86}
87
88/// Transformation to apply during replay
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct ReplayTransformation {
91    /// Transform name
92    pub name: String,
93    /// Transform type
94    pub transform_type: TransformationType,
95    /// Parameters
96    pub parameters: HashMap<String, String>,
97}
98
99/// Type of transformation
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub enum TransformationType {
102    /// Map event fields
103    FieldMapping,
104    /// Enrich with additional data
105    Enrichment,
106    /// Aggregate multiple events
107    Aggregation,
108    /// Split single event into multiple
109    Splitting,
110    /// Custom transformation
111    Custom,
112}
113
114/// Configuration for stream replay
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct ReplayConfig {
117    /// Replay mode
118    pub mode: ReplayMode,
119    /// Replay speed
120    pub speed: ReplaySpeed,
121    /// Optional filter
122    pub filter: Option<ReplayFilter>,
123    /// Optional transformations
124    pub transformations: Vec<ReplayTransformation>,
125    /// Batch size for replay
126    pub batch_size: usize,
127    /// Enable state snapshots
128    pub enable_snapshots: bool,
129    /// Snapshot interval
130    pub snapshot_interval: Duration,
131    /// Enable parallel replay
132    pub enable_parallel: bool,
133    /// Number of parallel workers
134    pub parallel_workers: usize,
135    /// Checkpoint interval for long replays
136    pub checkpoint_interval: Duration,
137}
138
139impl Default for ReplayConfig {
140    fn default() -> Self {
141        Self {
142            mode: ReplayMode::All,
143            speed: ReplaySpeed::MaxSpeed,
144            filter: None,
145            transformations: Vec::new(),
146            batch_size: 1000,
147            enable_snapshots: true,
148            snapshot_interval: Duration::from_secs(60),
149            enable_parallel: false,
150            parallel_workers: 4,
151            checkpoint_interval: Duration::from_secs(30),
152        }
153    }
154}
155
156/// State snapshot for recovery
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct StateSnapshot {
159    /// Snapshot ID
160    pub snapshot_id: String,
161    /// Timestamp
162    pub timestamp: DateTime<Utc>,
163    /// Event offset at snapshot
164    pub event_offset: u64,
165    /// Application state (serialized)
166    pub state_data: Vec<u8>,
167    /// Metadata
168    pub metadata: HashMap<String, String>,
169}
170
171/// Replay checkpoint for resuming
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct ReplayCheckpoint {
174    /// Checkpoint ID
175    pub checkpoint_id: String,
176    /// Timestamp
177    pub timestamp: DateTime<Utc>,
178    /// Last processed offset
179    pub last_offset: u64,
180    /// Events processed
181    pub events_processed: u64,
182    /// Replay status
183    pub status: ReplayStatus,
184}
185
186/// Status of replay operation
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
188pub enum ReplayStatus {
189    /// Replay not started
190    NotStarted,
191    /// Replay in progress
192    InProgress,
193    /// Replay paused
194    Paused,
195    /// Replay completed
196    Completed,
197    /// Replay failed
198    Failed { reason: String },
199}
200
201/// Statistics for replay operations
202#[derive(Debug, Clone, Default)]
203pub struct ReplayStats {
204    /// Total events replayed
205    pub events_replayed: u64,
206    /// Events filtered out
207    pub events_filtered: u64,
208    /// Events transformed
209    pub events_transformed: u64,
210    /// Total replay time (ms)
211    pub total_replay_time_ms: u64,
212    /// Average processing time per event (ms)
213    pub avg_processing_time_ms: f64,
214    /// Snapshots created
215    pub snapshots_created: u64,
216    /// Checkpoints created
217    pub checkpoints_created: u64,
218    /// Errors encountered
219    pub errors_encountered: u64,
220}
221
222/// Stream replay manager
223pub struct StreamReplayManager {
224    /// Configuration
225    config: ReplayConfig,
226    /// Event store for replay
227    event_store: Arc<DashMap<u64, StreamEvent>>,
228    /// State snapshots
229    snapshots: Arc<RwLock<Vec<StateSnapshot>>>,
230    /// Replay checkpoints
231    checkpoints: Arc<RwLock<Vec<ReplayCheckpoint>>>,
232    /// Statistics
233    stats: Arc<RwLock<ReplayStats>>,
234    /// Active replay sessions
235    active_replays: Arc<DashMap<String, ReplaySession>>,
236    /// Event processors
237    processors: Arc<RwLock<Vec<Box<dyn EventProcessor + Send + Sync>>>>,
238}
239
240/// Active replay session
241struct ReplaySession {
242    /// Session ID
243    session_id: String,
244    /// Start time
245    start_time: Instant,
246    /// Current status
247    status: ReplayStatus,
248    /// Current offset
249    current_offset: u64,
250    /// Events processed in this session
251    events_processed: u64,
252}
253
254/// Trait for processing replayed events
255pub trait EventProcessor: Send + Sync {
256    /// Process a replayed event
257    fn process(&mut self, event: &StreamEvent) -> Result<Option<StreamEvent>>;
258
259    /// Get processor name
260    fn name(&self) -> &str;
261}
262
263impl StreamReplayManager {
264    /// Create a new stream replay manager
265    pub fn new(config: ReplayConfig) -> Self {
266        Self {
267            config,
268            event_store: Arc::new(DashMap::new()),
269            snapshots: Arc::new(RwLock::new(Vec::new())),
270            checkpoints: Arc::new(RwLock::new(Vec::new())),
271            stats: Arc::new(RwLock::new(ReplayStats::default())),
272            active_replays: Arc::new(DashMap::new()),
273            processors: Arc::new(RwLock::new(Vec::new())),
274        }
275    }
276
277    /// Store an event for future replay
278    pub fn store_event(&self, offset: u64, event: StreamEvent) {
279        self.event_store.insert(offset, event);
280        debug!("Stored event at offset {}", offset);
281    }
282
283    /// Store multiple events
284    pub fn store_events(&self, events: Vec<(u64, StreamEvent)>) {
285        for (offset, event) in events {
286            self.store_event(offset, event);
287        }
288    }
289
290    /// Start a replay session
291    pub async fn start_replay(
292        &self,
293        session_id: Option<String>,
294    ) -> Result<mpsc::UnboundedReceiver<StreamEvent>> {
295        let session_id = session_id.unwrap_or_else(|| Uuid::new_v4().to_string());
296
297        let session = ReplaySession {
298            session_id: session_id.clone(),
299            start_time: Instant::now(),
300            status: ReplayStatus::InProgress,
301            current_offset: 0,
302            events_processed: 0,
303        };
304
305        self.active_replays.insert(session_id.clone(), session);
306
307        let (tx, rx) = mpsc::unbounded_channel();
308
309        // Spawn replay task
310        let event_store = self.event_store.clone();
311        let config = self.config.clone();
312        let stats = self.stats.clone();
313        let snapshots = self.snapshots.clone();
314        let checkpoints = self.checkpoints.clone();
315        let active_replays = self.active_replays.clone();
316        let processors = self.processors.clone();
317        let session_id_clone = session_id.clone();
318
319        tokio::spawn(async move {
320            if let Err(e) = Self::replay_events_internal(
321                &session_id_clone,
322                &event_store,
323                &config,
324                &stats,
325                &snapshots,
326                &checkpoints,
327                &active_replays,
328                &processors,
329                tx,
330            )
331            .await
332            {
333                error!("Replay failed: {}", e);
334
335                if let Some(mut session) = active_replays.get_mut(&session_id_clone) {
336                    session.status = ReplayStatus::Failed {
337                        reason: e.to_string(),
338                    };
339                }
340            }
341        });
342
343        info!("Started replay session: {}", session_id);
344        Ok(rx)
345    }
346
347    /// Internal replay implementation
348    #[allow(clippy::too_many_arguments)]
349    async fn replay_events_internal(
350        session_id: &str,
351        event_store: &Arc<DashMap<u64, StreamEvent>>,
352        config: &ReplayConfig,
353        stats: &Arc<RwLock<ReplayStats>>,
354        snapshots: &Arc<RwLock<Vec<StateSnapshot>>>,
355        checkpoints: &Arc<RwLock<Vec<ReplayCheckpoint>>>,
356        active_replays: &Arc<DashMap<String, ReplaySession>>,
357        processors: &Arc<RwLock<Vec<Box<dyn EventProcessor + Send + Sync>>>>,
358        tx: mpsc::UnboundedSender<StreamEvent>,
359    ) -> Result<()> {
360        let start_time = Instant::now();
361
362        // Determine offset range to replay
363        let (start_offset, end_offset) = Self::determine_offset_range(config, event_store)?;
364
365        debug!(
366            "Replaying events from offset {} to {}",
367            start_offset, end_offset
368        );
369
370        let mut events_replayed = 0;
371        let mut last_checkpoint = Instant::now();
372        let mut last_snapshot = Instant::now();
373
374        for offset in start_offset..=end_offset {
375            if let Some(event) = event_store.get(&offset) {
376                let event = event.clone();
377
378                // Apply filter
379                if let Some(ref filter) = config.filter {
380                    if !Self::apply_filter(&event, filter) {
381                        stats.write().events_filtered += 1;
382                        continue;
383                    }
384                }
385
386                // Apply transformations
387                let mut transformed_event = event.clone();
388                for transformation in &config.transformations {
389                    transformed_event =
390                        Self::apply_transformation(transformed_event, transformation)?;
391                }
392
393                if !config.transformations.is_empty() {
394                    stats.write().events_transformed += 1;
395                }
396
397                // Process through registered processors
398                let mut final_event = Some(transformed_event);
399                for processor in processors.write().iter_mut() {
400                    if let Some(evt) = final_event {
401                        final_event = processor.process(&evt)?;
402                    }
403                }
404
405                // Send event if not filtered by processors
406                if let Some(evt) = final_event {
407                    // Apply speed control
408                    Self::apply_speed_control(config).await;
409
410                    if tx.send(evt).is_err() {
411                        warn!("Receiver dropped, stopping replay");
412                        break;
413                    }
414
415                    events_replayed += 1;
416
417                    // Update session
418                    if let Some(mut session) = active_replays.get_mut(session_id) {
419                        session.current_offset = offset;
420                        session.events_processed = events_replayed;
421                    }
422                }
423
424                // Create checkpoint if needed
425                if last_checkpoint.elapsed() >= config.checkpoint_interval {
426                    Self::create_checkpoint(
427                        session_id,
428                        offset,
429                        events_replayed,
430                        checkpoints,
431                        stats,
432                    )
433                    .await?;
434                    last_checkpoint = Instant::now();
435                }
436
437                // Create snapshot if needed
438                if config.enable_snapshots && last_snapshot.elapsed() >= config.snapshot_interval {
439                    Self::create_snapshot(session_id, offset, snapshots, stats).await?;
440                    last_snapshot = Instant::now();
441                }
442            }
443        }
444
445        // Update final stats
446        let total_time = start_time.elapsed().as_millis() as u64;
447        let mut stats_guard = stats.write();
448        stats_guard.events_replayed += events_replayed;
449        stats_guard.total_replay_time_ms += total_time;
450        if events_replayed > 0 {
451            stats_guard.avg_processing_time_ms = total_time as f64 / events_replayed as f64;
452        }
453
454        // Mark session as completed
455        if let Some(mut session) = active_replays.get_mut(session_id) {
456            session.status = ReplayStatus::Completed;
457        }
458
459        info!(
460            "Replay completed: {} events in {}ms",
461            events_replayed, total_time
462        );
463        Ok(())
464    }
465
466    /// Determine offset range for replay
467    fn determine_offset_range(
468        config: &ReplayConfig,
469        event_store: &Arc<DashMap<u64, StreamEvent>>,
470    ) -> Result<(u64, u64)> {
471        let max_offset = event_store.iter().map(|e| *e.key()).max().unwrap_or(0);
472
473        match &config.mode {
474            ReplayMode::All => Ok((0, max_offset)),
475            ReplayMode::FromOffset(start) => Ok((*start, max_offset)),
476            ReplayMode::OffsetRange { start, end } => Ok((*start, *end)),
477            ReplayMode::FromTime(start_time) => {
478                // Find first offset after start_time
479                let start_offset = event_store
480                    .iter()
481                    .filter_map(|entry| {
482                        let offset = *entry.key();
483                        let event = entry.value();
484                        let event_time = Self::get_event_timestamp(event);
485                        if event_time >= *start_time {
486                            Some(offset)
487                        } else {
488                            None
489                        }
490                    })
491                    .min()
492                    .unwrap_or(0);
493                Ok((start_offset, max_offset))
494            }
495            ReplayMode::TimeRange { start, end } => {
496                let start_offset = event_store
497                    .iter()
498                    .filter_map(|entry| {
499                        let offset = *entry.key();
500                        let event = entry.value();
501                        let event_time = Self::get_event_timestamp(event);
502                        if event_time >= *start {
503                            Some(offset)
504                        } else {
505                            None
506                        }
507                    })
508                    .min()
509                    .unwrap_or(0);
510
511                let end_offset = event_store
512                    .iter()
513                    .filter_map(|entry| {
514                        let offset = *entry.key();
515                        let event = entry.value();
516                        let event_time = Self::get_event_timestamp(event);
517                        if event_time <= *end {
518                            Some(offset)
519                        } else {
520                            None
521                        }
522                    })
523                    .max()
524                    .unwrap_or(max_offset);
525
526                Ok((start_offset, end_offset))
527            }
528            ReplayMode::Filtered { from, .. } => {
529                let start_offset = if let Some(start_time) = from {
530                    event_store
531                        .iter()
532                        .filter_map(|entry| {
533                            let offset = *entry.key();
534                            let event = entry.value();
535                            let event_time = Self::get_event_timestamp(event);
536                            if event_time >= *start_time {
537                                Some(offset)
538                            } else {
539                                None
540                            }
541                        })
542                        .min()
543                        .unwrap_or(0)
544                } else {
545                    0
546                };
547                Ok((start_offset, max_offset))
548            }
549        }
550    }
551
552    /// Get event timestamp
553    fn get_event_timestamp(event: &StreamEvent) -> DateTime<Utc> {
554        let metadata = match event {
555            StreamEvent::TripleAdded { metadata, .. }
556            | StreamEvent::TripleRemoved { metadata, .. }
557            | StreamEvent::QuadAdded { metadata, .. }
558            | StreamEvent::QuadRemoved { metadata, .. }
559            | StreamEvent::GraphCreated { metadata, .. }
560            | StreamEvent::GraphCleared { metadata, .. }
561            | StreamEvent::GraphDeleted { metadata, .. }
562            | StreamEvent::GraphMetadataUpdated { metadata, .. }
563            | StreamEvent::GraphPermissionsChanged { metadata, .. }
564            | StreamEvent::GraphStatisticsUpdated { metadata, .. }
565            | StreamEvent::GraphRenamed { metadata, .. }
566            | StreamEvent::GraphMerged { metadata, .. }
567            | StreamEvent::GraphSplit { metadata, .. }
568            | StreamEvent::SparqlUpdate { metadata, .. }
569            | StreamEvent::QueryCompleted { metadata, .. }
570            | StreamEvent::QueryResultAdded { metadata, .. }
571            | StreamEvent::QueryResultRemoved { metadata, .. }
572            | StreamEvent::TransactionBegin { metadata, .. }
573            | StreamEvent::TransactionCommit { metadata, .. }
574            | StreamEvent::TransactionAbort { metadata, .. }
575            | StreamEvent::SchemaChanged { metadata, .. }
576            | StreamEvent::SchemaDefinitionAdded { metadata, .. }
577            | StreamEvent::SchemaDefinitionRemoved { metadata, .. }
578            | StreamEvent::SchemaDefinitionModified { metadata, .. }
579            | StreamEvent::OntologyImported { metadata, .. }
580            | StreamEvent::OntologyRemoved { metadata, .. }
581            | StreamEvent::ConstraintAdded { metadata, .. }
582            | StreamEvent::ConstraintRemoved { metadata, .. }
583            | StreamEvent::ConstraintViolated { metadata, .. }
584            | StreamEvent::IndexCreated { metadata, .. }
585            | StreamEvent::IndexDropped { metadata, .. }
586            | StreamEvent::IndexRebuilt { metadata, .. }
587            | StreamEvent::SchemaUpdated { metadata, .. }
588            | StreamEvent::ShapeAdded { metadata, .. }
589            | StreamEvent::ShapeRemoved { metadata, .. }
590            | StreamEvent::ShapeModified { metadata, .. }
591            | StreamEvent::ShapeUpdated { metadata, .. }
592            | StreamEvent::ShapeValidationStarted { metadata, .. }
593            | StreamEvent::ShapeValidationCompleted { metadata, .. }
594            | StreamEvent::ShapeViolationDetected { metadata, .. }
595            | StreamEvent::Heartbeat { metadata, .. }
596            | StreamEvent::ErrorOccurred { metadata, .. } => metadata,
597        };
598        metadata.timestamp
599    }
600
601    /// Apply filter to event
602    fn apply_filter(event: &StreamEvent, filter: &ReplayFilter) -> bool {
603        // Check event type
604        if let Some(ref event_types) = filter.event_types {
605            let event_type = Self::get_event_type(event);
606            if !event_types.contains(&event_type) {
607                return false;
608            }
609        }
610
611        // Check source
612        if let Some(ref sources) = filter.sources {
613            let source = Self::get_event_source(event);
614            if !sources.contains(&source) {
615                return false;
616            }
617        }
618
619        true
620    }
621
622    /// Get event type
623    fn get_event_type(event: &StreamEvent) -> String {
624        match event {
625            StreamEvent::TripleAdded { .. } => "TripleAdded",
626            StreamEvent::TripleRemoved { .. } => "TripleRemoved",
627            StreamEvent::QuadAdded { .. } => "QuadAdded",
628            StreamEvent::QuadRemoved { .. } => "QuadRemoved",
629            StreamEvent::GraphCreated { .. } => "GraphCreated",
630            StreamEvent::GraphCleared { .. } => "GraphCleared",
631            StreamEvent::GraphDeleted { .. } => "GraphDeleted",
632            StreamEvent::SparqlUpdate { .. } => "SparqlUpdate",
633            StreamEvent::TransactionBegin { .. } => "TransactionBegin",
634            StreamEvent::TransactionCommit { .. } => "TransactionCommit",
635            StreamEvent::TransactionAbort { .. } => "TransactionAbort",
636            StreamEvent::SchemaChanged { .. } => "SchemaChanged",
637            StreamEvent::Heartbeat { .. } => "Heartbeat",
638            StreamEvent::QueryResultAdded { .. } => "QueryResultAdded",
639            StreamEvent::QueryResultRemoved { .. } => "QueryResultRemoved",
640            StreamEvent::QueryCompleted { .. } => "QueryCompleted",
641            _ => "Other",
642        }
643        .to_string()
644    }
645
646    /// Get event source
647    fn get_event_source(event: &StreamEvent) -> String {
648        let metadata = match event {
649            StreamEvent::TripleAdded { metadata, .. }
650            | StreamEvent::TripleRemoved { metadata, .. }
651            | StreamEvent::QuadAdded { metadata, .. }
652            | StreamEvent::QuadRemoved { metadata, .. }
653            | StreamEvent::GraphCreated { metadata, .. }
654            | StreamEvent::GraphCleared { metadata, .. }
655            | StreamEvent::GraphDeleted { metadata, .. }
656            | StreamEvent::SparqlUpdate { metadata, .. }
657            | StreamEvent::TransactionBegin { metadata, .. }
658            | StreamEvent::TransactionCommit { metadata, .. }
659            | StreamEvent::TransactionAbort { metadata, .. }
660            | StreamEvent::SchemaChanged { metadata, .. }
661            | StreamEvent::Heartbeat { metadata, .. }
662            | StreamEvent::QueryResultAdded { metadata, .. }
663            | StreamEvent::QueryResultRemoved { metadata, .. }
664            | StreamEvent::QueryCompleted { metadata, .. }
665            | StreamEvent::GraphMetadataUpdated { metadata, .. }
666            | StreamEvent::GraphPermissionsChanged { metadata, .. }
667            | StreamEvent::GraphStatisticsUpdated { metadata, .. }
668            | StreamEvent::GraphRenamed { metadata, .. }
669            | StreamEvent::GraphMerged { metadata, .. }
670            | StreamEvent::GraphSplit { metadata, .. }
671            | StreamEvent::SchemaDefinitionAdded { metadata, .. }
672            | StreamEvent::SchemaDefinitionRemoved { metadata, .. }
673            | StreamEvent::SchemaDefinitionModified { metadata, .. }
674            | StreamEvent::OntologyImported { metadata, .. }
675            | StreamEvent::OntologyRemoved { metadata, .. }
676            | StreamEvent::ConstraintAdded { metadata, .. }
677            | StreamEvent::ConstraintRemoved { metadata, .. }
678            | StreamEvent::ConstraintViolated { metadata, .. }
679            | StreamEvent::IndexCreated { metadata, .. }
680            | StreamEvent::IndexDropped { metadata, .. }
681            | StreamEvent::IndexRebuilt { metadata, .. }
682            | StreamEvent::SchemaUpdated { metadata, .. }
683            | StreamEvent::ShapeAdded { metadata, .. }
684            | StreamEvent::ShapeUpdated { metadata, .. }
685            | StreamEvent::ShapeRemoved { metadata, .. }
686            | StreamEvent::ShapeModified { metadata, .. }
687            | StreamEvent::ShapeValidationStarted { metadata, .. }
688            | StreamEvent::ShapeValidationCompleted { metadata, .. }
689            | StreamEvent::ShapeViolationDetected { metadata, .. }
690            | StreamEvent::ErrorOccurred { metadata, .. } => metadata,
691        };
692        metadata.source.clone()
693    }
694
695    /// Apply transformation to event
696    fn apply_transformation(
697        event: StreamEvent,
698        _transformation: &ReplayTransformation,
699    ) -> Result<StreamEvent> {
700        // Placeholder for transformation logic
701        // In real implementation, this would apply the actual transformation
702        Ok(event)
703    }
704
705    /// Apply speed control
706    async fn apply_speed_control(config: &ReplayConfig) {
707        match config.speed {
708            ReplaySpeed::RealTime => {
709                // Would need event timing information to preserve original timing
710                sleep(Duration::from_millis(1)).await;
711            }
712            ReplaySpeed::MaxSpeed => {
713                // No delay
714            }
715            ReplaySpeed::Custom(multiplier) => {
716                let delay = Duration::from_millis((10.0 / multiplier) as u64);
717                sleep(delay).await;
718            }
719            ReplaySpeed::SlowMotion(factor) => {
720                let delay = Duration::from_millis((100.0 * factor) as u64);
721                sleep(delay).await;
722            }
723        }
724    }
725
726    /// Create a checkpoint
727    async fn create_checkpoint(
728        session_id: &str,
729        offset: u64,
730        events_processed: u64,
731        checkpoints: &Arc<RwLock<Vec<ReplayCheckpoint>>>,
732        stats: &Arc<RwLock<ReplayStats>>,
733    ) -> Result<()> {
734        let checkpoint = ReplayCheckpoint {
735            checkpoint_id: Uuid::new_v4().to_string(),
736            timestamp: Utc::now(),
737            last_offset: offset,
738            events_processed,
739            status: ReplayStatus::InProgress,
740        };
741
742        checkpoints.write().push(checkpoint);
743        stats.write().checkpoints_created += 1;
744
745        debug!(
746            "Checkpoint created for session {} at offset {}",
747            session_id, offset
748        );
749        Ok(())
750    }
751
752    /// Create a state snapshot
753    async fn create_snapshot(
754        session_id: &str,
755        offset: u64,
756        snapshots: &Arc<RwLock<Vec<StateSnapshot>>>,
757        stats: &Arc<RwLock<ReplayStats>>,
758    ) -> Result<()> {
759        let snapshot = StateSnapshot {
760            snapshot_id: Uuid::new_v4().to_string(),
761            timestamp: Utc::now(),
762            event_offset: offset,
763            state_data: Vec::new(), // Would contain actual state
764            metadata: HashMap::new(),
765        };
766
767        snapshots.write().push(snapshot);
768        stats.write().snapshots_created += 1;
769
770        debug!(
771            "Snapshot created for session {} at offset {}",
772            session_id, offset
773        );
774        Ok(())
775    }
776
777    /// Pause a replay session
778    pub fn pause_replay(&self, session_id: &str) -> Result<()> {
779        if let Some(mut session) = self.active_replays.get_mut(session_id) {
780            session.status = ReplayStatus::Paused;
781            info!("Replay session {} paused", session_id);
782            Ok(())
783        } else {
784            Err(anyhow!("Replay session not found: {}", session_id))
785        }
786    }
787
788    /// Resume a paused replay session
789    pub fn resume_replay(&self, session_id: &str) -> Result<()> {
790        if let Some(mut session) = self.active_replays.get_mut(session_id) {
791            session.status = ReplayStatus::InProgress;
792            info!("Replay session {} resumed", session_id);
793            Ok(())
794        } else {
795            Err(anyhow!("Replay session not found: {}", session_id))
796        }
797    }
798
799    /// Get replay statistics
800    pub fn get_stats(&self) -> ReplayStats {
801        self.stats.read().clone()
802    }
803
804    /// Get session status
805    pub fn get_session_status(&self, session_id: &str) -> Option<ReplayStatus> {
806        self.active_replays
807            .get(session_id)
808            .map(|session| session.status.clone())
809    }
810
811    /// Register an event processor
812    pub fn register_processor(&self, processor: Box<dyn EventProcessor + Send + Sync>) {
813        let name = processor.name().to_string();
814        self.processors.write().push(processor);
815        info!("Registered event processor: {}", name);
816    }
817}
818
819#[cfg(test)]
820mod tests {
821    use super::*;
822    use crate::event::EventMetadata;
823
824    #[tokio::test]
825    async fn test_replay_all_events() {
826        let config = ReplayConfig {
827            mode: ReplayMode::All,
828            speed: ReplaySpeed::MaxSpeed,
829            ..Default::default()
830        };
831
832        let manager = StreamReplayManager::new(config);
833
834        // Store test events
835        for i in 0..10 {
836            let event = StreamEvent::SchemaChanged {
837                schema_type: crate::event::SchemaType::Ontology,
838                change_type: crate::event::SchemaChangeType::Added,
839                details: format!("test schema change {}", i),
840                metadata: EventMetadata {
841                    event_id: format!("event-{}", i),
842                    timestamp: Utc::now(),
843                    source: "test".to_string(),
844                    user: None,
845                    context: None,
846                    caused_by: None,
847                    version: "1.0".to_string(),
848                    properties: HashMap::new(),
849                    checksum: None,
850                },
851            };
852            manager.store_event(i, event);
853        }
854
855        // Start replay
856        let mut rx = manager.start_replay(None).await.unwrap();
857
858        // Receive events
859        let mut count = 0;
860        while let Some(_event) = rx.recv().await {
861            count += 1;
862        }
863
864        assert_eq!(count, 10);
865
866        let stats = manager.get_stats();
867        assert_eq!(stats.events_replayed, 10);
868    }
869
870    #[tokio::test]
871    async fn test_replay_with_filter() {
872        let config = ReplayConfig {
873            mode: ReplayMode::All,
874            speed: ReplaySpeed::MaxSpeed,
875            filter: Some(ReplayFilter {
876                event_types: Some(vec!["SchemaChanged".to_string()]),
877                sources: Some(vec!["test".to_string()]),
878                min_priority: None,
879                custom_predicate: None,
880            }),
881            ..Default::default()
882        };
883
884        let manager = StreamReplayManager::new(config);
885
886        // Store test events with different types
887        for i in 0..5 {
888            let event = StreamEvent::SchemaChanged {
889                schema_type: crate::event::SchemaType::Ontology,
890                change_type: crate::event::SchemaChangeType::Added,
891                details: format!("test schema change {}", i),
892                metadata: EventMetadata {
893                    event_id: format!("event-{}", i),
894                    timestamp: Utc::now(),
895                    source: "test".to_string(),
896                    user: None,
897                    context: None,
898                    caused_by: None,
899                    version: "1.0".to_string(),
900                    properties: HashMap::new(),
901                    checksum: None,
902                },
903            };
904            manager.store_event(i, event);
905        }
906
907        let mut rx = manager.start_replay(None).await.unwrap();
908
909        let mut count = 0;
910        while let Some(_event) = rx.recv().await {
911            count += 1;
912        }
913
914        assert_eq!(count, 5);
915    }
916}