1use anyhow::{anyhow, Result};
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use parking_lot::RwLock;
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32use tokio::sync::mpsc;
33use tokio::time::sleep;
34use tracing::{debug, error, info, warn};
35use uuid::Uuid;
36
37use crate::event::StreamEvent;
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum ReplayMode {
42 FromTime(DateTime<Utc>),
44 FromOffset(u64),
46 TimeRange {
48 start: DateTime<Utc>,
49 end: DateTime<Utc>,
50 },
51 OffsetRange { start: u64, end: u64 },
53 All,
55 Filtered {
57 filter: String,
58 from: Option<DateTime<Utc>>,
59 },
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub enum ReplaySpeed {
65 RealTime,
67 Custom(f64),
69 MaxSpeed,
71 SlowMotion(f64),
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct ReplayFilter {
78 pub event_types: Option<Vec<String>>,
80 pub sources: Option<Vec<String>>,
82 pub min_priority: Option<u8>,
84 pub custom_predicate: Option<String>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct ReplayTransformation {
91 pub name: String,
93 pub transform_type: TransformationType,
95 pub parameters: HashMap<String, String>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub enum TransformationType {
102 FieldMapping,
104 Enrichment,
106 Aggregation,
108 Splitting,
110 Custom,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct ReplayConfig {
117 pub mode: ReplayMode,
119 pub speed: ReplaySpeed,
121 pub filter: Option<ReplayFilter>,
123 pub transformations: Vec<ReplayTransformation>,
125 pub batch_size: usize,
127 pub enable_snapshots: bool,
129 pub snapshot_interval: Duration,
131 pub enable_parallel: bool,
133 pub parallel_workers: usize,
135 pub checkpoint_interval: Duration,
137}
138
139impl Default for ReplayConfig {
140 fn default() -> Self {
141 Self {
142 mode: ReplayMode::All,
143 speed: ReplaySpeed::MaxSpeed,
144 filter: None,
145 transformations: Vec::new(),
146 batch_size: 1000,
147 enable_snapshots: true,
148 snapshot_interval: Duration::from_secs(60),
149 enable_parallel: false,
150 parallel_workers: 4,
151 checkpoint_interval: Duration::from_secs(30),
152 }
153 }
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct StateSnapshot {
159 pub snapshot_id: String,
161 pub timestamp: DateTime<Utc>,
163 pub event_offset: u64,
165 pub state_data: Vec<u8>,
167 pub metadata: HashMap<String, String>,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct ReplayCheckpoint {
174 pub checkpoint_id: String,
176 pub timestamp: DateTime<Utc>,
178 pub last_offset: u64,
180 pub events_processed: u64,
182 pub status: ReplayStatus,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
188pub enum ReplayStatus {
189 NotStarted,
191 InProgress,
193 Paused,
195 Completed,
197 Failed { reason: String },
199}
200
201#[derive(Debug, Clone, Default)]
203pub struct ReplayStats {
204 pub events_replayed: u64,
206 pub events_filtered: u64,
208 pub events_transformed: u64,
210 pub total_replay_time_ms: u64,
212 pub avg_processing_time_ms: f64,
214 pub snapshots_created: u64,
216 pub checkpoints_created: u64,
218 pub errors_encountered: u64,
220}
221
222pub struct StreamReplayManager {
224 config: ReplayConfig,
226 event_store: Arc<DashMap<u64, StreamEvent>>,
228 snapshots: Arc<RwLock<Vec<StateSnapshot>>>,
230 checkpoints: Arc<RwLock<Vec<ReplayCheckpoint>>>,
232 stats: Arc<RwLock<ReplayStats>>,
234 active_replays: Arc<DashMap<String, ReplaySession>>,
236 processors: Arc<RwLock<Vec<Box<dyn EventProcessor + Send + Sync>>>>,
238}
239
240struct ReplaySession {
242 session_id: String,
244 start_time: Instant,
246 status: ReplayStatus,
248 current_offset: u64,
250 events_processed: u64,
252}
253
254pub trait EventProcessor: Send + Sync {
256 fn process(&mut self, event: &StreamEvent) -> Result<Option<StreamEvent>>;
258
259 fn name(&self) -> &str;
261}
262
263impl StreamReplayManager {
264 pub fn new(config: ReplayConfig) -> Self {
266 Self {
267 config,
268 event_store: Arc::new(DashMap::new()),
269 snapshots: Arc::new(RwLock::new(Vec::new())),
270 checkpoints: Arc::new(RwLock::new(Vec::new())),
271 stats: Arc::new(RwLock::new(ReplayStats::default())),
272 active_replays: Arc::new(DashMap::new()),
273 processors: Arc::new(RwLock::new(Vec::new())),
274 }
275 }
276
277 pub fn store_event(&self, offset: u64, event: StreamEvent) {
279 self.event_store.insert(offset, event);
280 debug!("Stored event at offset {}", offset);
281 }
282
283 pub fn store_events(&self, events: Vec<(u64, StreamEvent)>) {
285 for (offset, event) in events {
286 self.store_event(offset, event);
287 }
288 }
289
290 pub async fn start_replay(
292 &self,
293 session_id: Option<String>,
294 ) -> Result<mpsc::UnboundedReceiver<StreamEvent>> {
295 let session_id = session_id.unwrap_or_else(|| Uuid::new_v4().to_string());
296
297 let session = ReplaySession {
298 session_id: session_id.clone(),
299 start_time: Instant::now(),
300 status: ReplayStatus::InProgress,
301 current_offset: 0,
302 events_processed: 0,
303 };
304
305 self.active_replays.insert(session_id.clone(), session);
306
307 let (tx, rx) = mpsc::unbounded_channel();
308
309 let event_store = self.event_store.clone();
311 let config = self.config.clone();
312 let stats = self.stats.clone();
313 let snapshots = self.snapshots.clone();
314 let checkpoints = self.checkpoints.clone();
315 let active_replays = self.active_replays.clone();
316 let processors = self.processors.clone();
317 let session_id_clone = session_id.clone();
318
319 tokio::spawn(async move {
320 if let Err(e) = Self::replay_events_internal(
321 &session_id_clone,
322 &event_store,
323 &config,
324 &stats,
325 &snapshots,
326 &checkpoints,
327 &active_replays,
328 &processors,
329 tx,
330 )
331 .await
332 {
333 error!("Replay failed: {}", e);
334
335 if let Some(mut session) = active_replays.get_mut(&session_id_clone) {
336 session.status = ReplayStatus::Failed {
337 reason: e.to_string(),
338 };
339 }
340 }
341 });
342
343 info!("Started replay session: {}", session_id);
344 Ok(rx)
345 }
346
347 #[allow(clippy::too_many_arguments)]
349 async fn replay_events_internal(
350 session_id: &str,
351 event_store: &Arc<DashMap<u64, StreamEvent>>,
352 config: &ReplayConfig,
353 stats: &Arc<RwLock<ReplayStats>>,
354 snapshots: &Arc<RwLock<Vec<StateSnapshot>>>,
355 checkpoints: &Arc<RwLock<Vec<ReplayCheckpoint>>>,
356 active_replays: &Arc<DashMap<String, ReplaySession>>,
357 processors: &Arc<RwLock<Vec<Box<dyn EventProcessor + Send + Sync>>>>,
358 tx: mpsc::UnboundedSender<StreamEvent>,
359 ) -> Result<()> {
360 let start_time = Instant::now();
361
362 let (start_offset, end_offset) = Self::determine_offset_range(config, event_store)?;
364
365 debug!(
366 "Replaying events from offset {} to {}",
367 start_offset, end_offset
368 );
369
370 let mut events_replayed = 0;
371 let mut last_checkpoint = Instant::now();
372 let mut last_snapshot = Instant::now();
373
374 for offset in start_offset..=end_offset {
375 if let Some(event) = event_store.get(&offset) {
376 let event = event.clone();
377
378 if let Some(ref filter) = config.filter {
380 if !Self::apply_filter(&event, filter) {
381 stats.write().events_filtered += 1;
382 continue;
383 }
384 }
385
386 let mut transformed_event = event.clone();
388 for transformation in &config.transformations {
389 transformed_event =
390 Self::apply_transformation(transformed_event, transformation)?;
391 }
392
393 if !config.transformations.is_empty() {
394 stats.write().events_transformed += 1;
395 }
396
397 let mut final_event = Some(transformed_event);
399 for processor in processors.write().iter_mut() {
400 if let Some(evt) = final_event {
401 final_event = processor.process(&evt)?;
402 }
403 }
404
405 if let Some(evt) = final_event {
407 Self::apply_speed_control(config).await;
409
410 if tx.send(evt).is_err() {
411 warn!("Receiver dropped, stopping replay");
412 break;
413 }
414
415 events_replayed += 1;
416
417 if let Some(mut session) = active_replays.get_mut(session_id) {
419 session.current_offset = offset;
420 session.events_processed = events_replayed;
421 }
422 }
423
424 if last_checkpoint.elapsed() >= config.checkpoint_interval {
426 Self::create_checkpoint(
427 session_id,
428 offset,
429 events_replayed,
430 checkpoints,
431 stats,
432 )
433 .await?;
434 last_checkpoint = Instant::now();
435 }
436
437 if config.enable_snapshots && last_snapshot.elapsed() >= config.snapshot_interval {
439 Self::create_snapshot(session_id, offset, snapshots, stats).await?;
440 last_snapshot = Instant::now();
441 }
442 }
443 }
444
445 let total_time = start_time.elapsed().as_millis() as u64;
447 let mut stats_guard = stats.write();
448 stats_guard.events_replayed += events_replayed;
449 stats_guard.total_replay_time_ms += total_time;
450 if events_replayed > 0 {
451 stats_guard.avg_processing_time_ms = total_time as f64 / events_replayed as f64;
452 }
453
454 if let Some(mut session) = active_replays.get_mut(session_id) {
456 session.status = ReplayStatus::Completed;
457 }
458
459 info!(
460 "Replay completed: {} events in {}ms",
461 events_replayed, total_time
462 );
463 Ok(())
464 }
465
466 fn determine_offset_range(
468 config: &ReplayConfig,
469 event_store: &Arc<DashMap<u64, StreamEvent>>,
470 ) -> Result<(u64, u64)> {
471 let max_offset = event_store.iter().map(|e| *e.key()).max().unwrap_or(0);
472
473 match &config.mode {
474 ReplayMode::All => Ok((0, max_offset)),
475 ReplayMode::FromOffset(start) => Ok((*start, max_offset)),
476 ReplayMode::OffsetRange { start, end } => Ok((*start, *end)),
477 ReplayMode::FromTime(start_time) => {
478 let start_offset = event_store
480 .iter()
481 .filter_map(|entry| {
482 let offset = *entry.key();
483 let event = entry.value();
484 let event_time = Self::get_event_timestamp(event);
485 if event_time >= *start_time {
486 Some(offset)
487 } else {
488 None
489 }
490 })
491 .min()
492 .unwrap_or(0);
493 Ok((start_offset, max_offset))
494 }
495 ReplayMode::TimeRange { start, end } => {
496 let start_offset = event_store
497 .iter()
498 .filter_map(|entry| {
499 let offset = *entry.key();
500 let event = entry.value();
501 let event_time = Self::get_event_timestamp(event);
502 if event_time >= *start {
503 Some(offset)
504 } else {
505 None
506 }
507 })
508 .min()
509 .unwrap_or(0);
510
511 let end_offset = event_store
512 .iter()
513 .filter_map(|entry| {
514 let offset = *entry.key();
515 let event = entry.value();
516 let event_time = Self::get_event_timestamp(event);
517 if event_time <= *end {
518 Some(offset)
519 } else {
520 None
521 }
522 })
523 .max()
524 .unwrap_or(max_offset);
525
526 Ok((start_offset, end_offset))
527 }
528 ReplayMode::Filtered { from, .. } => {
529 let start_offset = if let Some(start_time) = from {
530 event_store
531 .iter()
532 .filter_map(|entry| {
533 let offset = *entry.key();
534 let event = entry.value();
535 let event_time = Self::get_event_timestamp(event);
536 if event_time >= *start_time {
537 Some(offset)
538 } else {
539 None
540 }
541 })
542 .min()
543 .unwrap_or(0)
544 } else {
545 0
546 };
547 Ok((start_offset, max_offset))
548 }
549 }
550 }
551
552 fn get_event_timestamp(event: &StreamEvent) -> DateTime<Utc> {
554 let metadata = match event {
555 StreamEvent::TripleAdded { metadata, .. }
556 | StreamEvent::TripleRemoved { metadata, .. }
557 | StreamEvent::QuadAdded { metadata, .. }
558 | StreamEvent::QuadRemoved { metadata, .. }
559 | StreamEvent::GraphCreated { metadata, .. }
560 | StreamEvent::GraphCleared { metadata, .. }
561 | StreamEvent::GraphDeleted { metadata, .. }
562 | StreamEvent::GraphMetadataUpdated { metadata, .. }
563 | StreamEvent::GraphPermissionsChanged { metadata, .. }
564 | StreamEvent::GraphStatisticsUpdated { metadata, .. }
565 | StreamEvent::GraphRenamed { metadata, .. }
566 | StreamEvent::GraphMerged { metadata, .. }
567 | StreamEvent::GraphSplit { metadata, .. }
568 | StreamEvent::SparqlUpdate { metadata, .. }
569 | StreamEvent::QueryCompleted { metadata, .. }
570 | StreamEvent::QueryResultAdded { metadata, .. }
571 | StreamEvent::QueryResultRemoved { metadata, .. }
572 | StreamEvent::TransactionBegin { metadata, .. }
573 | StreamEvent::TransactionCommit { metadata, .. }
574 | StreamEvent::TransactionAbort { metadata, .. }
575 | StreamEvent::SchemaChanged { metadata, .. }
576 | StreamEvent::SchemaDefinitionAdded { metadata, .. }
577 | StreamEvent::SchemaDefinitionRemoved { metadata, .. }
578 | StreamEvent::SchemaDefinitionModified { metadata, .. }
579 | StreamEvent::OntologyImported { metadata, .. }
580 | StreamEvent::OntologyRemoved { metadata, .. }
581 | StreamEvent::ConstraintAdded { metadata, .. }
582 | StreamEvent::ConstraintRemoved { metadata, .. }
583 | StreamEvent::ConstraintViolated { metadata, .. }
584 | StreamEvent::IndexCreated { metadata, .. }
585 | StreamEvent::IndexDropped { metadata, .. }
586 | StreamEvent::IndexRebuilt { metadata, .. }
587 | StreamEvent::SchemaUpdated { metadata, .. }
588 | StreamEvent::ShapeAdded { metadata, .. }
589 | StreamEvent::ShapeRemoved { metadata, .. }
590 | StreamEvent::ShapeModified { metadata, .. }
591 | StreamEvent::ShapeUpdated { metadata, .. }
592 | StreamEvent::ShapeValidationStarted { metadata, .. }
593 | StreamEvent::ShapeValidationCompleted { metadata, .. }
594 | StreamEvent::ShapeViolationDetected { metadata, .. }
595 | StreamEvent::Heartbeat { metadata, .. }
596 | StreamEvent::ErrorOccurred { metadata, .. } => metadata,
597 };
598 metadata.timestamp
599 }
600
601 fn apply_filter(event: &StreamEvent, filter: &ReplayFilter) -> bool {
603 if let Some(ref event_types) = filter.event_types {
605 let event_type = Self::get_event_type(event);
606 if !event_types.contains(&event_type) {
607 return false;
608 }
609 }
610
611 if let Some(ref sources) = filter.sources {
613 let source = Self::get_event_source(event);
614 if !sources.contains(&source) {
615 return false;
616 }
617 }
618
619 true
620 }
621
622 fn get_event_type(event: &StreamEvent) -> String {
624 match event {
625 StreamEvent::TripleAdded { .. } => "TripleAdded",
626 StreamEvent::TripleRemoved { .. } => "TripleRemoved",
627 StreamEvent::QuadAdded { .. } => "QuadAdded",
628 StreamEvent::QuadRemoved { .. } => "QuadRemoved",
629 StreamEvent::GraphCreated { .. } => "GraphCreated",
630 StreamEvent::GraphCleared { .. } => "GraphCleared",
631 StreamEvent::GraphDeleted { .. } => "GraphDeleted",
632 StreamEvent::SparqlUpdate { .. } => "SparqlUpdate",
633 StreamEvent::TransactionBegin { .. } => "TransactionBegin",
634 StreamEvent::TransactionCommit { .. } => "TransactionCommit",
635 StreamEvent::TransactionAbort { .. } => "TransactionAbort",
636 StreamEvent::SchemaChanged { .. } => "SchemaChanged",
637 StreamEvent::Heartbeat { .. } => "Heartbeat",
638 StreamEvent::QueryResultAdded { .. } => "QueryResultAdded",
639 StreamEvent::QueryResultRemoved { .. } => "QueryResultRemoved",
640 StreamEvent::QueryCompleted { .. } => "QueryCompleted",
641 _ => "Other",
642 }
643 .to_string()
644 }
645
646 fn get_event_source(event: &StreamEvent) -> String {
648 let metadata = match event {
649 StreamEvent::TripleAdded { metadata, .. }
650 | StreamEvent::TripleRemoved { metadata, .. }
651 | StreamEvent::QuadAdded { metadata, .. }
652 | StreamEvent::QuadRemoved { metadata, .. }
653 | StreamEvent::GraphCreated { metadata, .. }
654 | StreamEvent::GraphCleared { metadata, .. }
655 | StreamEvent::GraphDeleted { metadata, .. }
656 | StreamEvent::SparqlUpdate { metadata, .. }
657 | StreamEvent::TransactionBegin { metadata, .. }
658 | StreamEvent::TransactionCommit { metadata, .. }
659 | StreamEvent::TransactionAbort { metadata, .. }
660 | StreamEvent::SchemaChanged { metadata, .. }
661 | StreamEvent::Heartbeat { metadata, .. }
662 | StreamEvent::QueryResultAdded { metadata, .. }
663 | StreamEvent::QueryResultRemoved { metadata, .. }
664 | StreamEvent::QueryCompleted { metadata, .. }
665 | StreamEvent::GraphMetadataUpdated { metadata, .. }
666 | StreamEvent::GraphPermissionsChanged { metadata, .. }
667 | StreamEvent::GraphStatisticsUpdated { metadata, .. }
668 | StreamEvent::GraphRenamed { metadata, .. }
669 | StreamEvent::GraphMerged { metadata, .. }
670 | StreamEvent::GraphSplit { metadata, .. }
671 | StreamEvent::SchemaDefinitionAdded { metadata, .. }
672 | StreamEvent::SchemaDefinitionRemoved { metadata, .. }
673 | StreamEvent::SchemaDefinitionModified { metadata, .. }
674 | StreamEvent::OntologyImported { metadata, .. }
675 | StreamEvent::OntologyRemoved { metadata, .. }
676 | StreamEvent::ConstraintAdded { metadata, .. }
677 | StreamEvent::ConstraintRemoved { metadata, .. }
678 | StreamEvent::ConstraintViolated { metadata, .. }
679 | StreamEvent::IndexCreated { metadata, .. }
680 | StreamEvent::IndexDropped { metadata, .. }
681 | StreamEvent::IndexRebuilt { metadata, .. }
682 | StreamEvent::SchemaUpdated { metadata, .. }
683 | StreamEvent::ShapeAdded { metadata, .. }
684 | StreamEvent::ShapeUpdated { metadata, .. }
685 | StreamEvent::ShapeRemoved { metadata, .. }
686 | StreamEvent::ShapeModified { metadata, .. }
687 | StreamEvent::ShapeValidationStarted { metadata, .. }
688 | StreamEvent::ShapeValidationCompleted { metadata, .. }
689 | StreamEvent::ShapeViolationDetected { metadata, .. }
690 | StreamEvent::ErrorOccurred { metadata, .. } => metadata,
691 };
692 metadata.source.clone()
693 }
694
695 fn apply_transformation(
697 event: StreamEvent,
698 _transformation: &ReplayTransformation,
699 ) -> Result<StreamEvent> {
700 Ok(event)
703 }
704
705 async fn apply_speed_control(config: &ReplayConfig) {
707 match config.speed {
708 ReplaySpeed::RealTime => {
709 sleep(Duration::from_millis(1)).await;
711 }
712 ReplaySpeed::MaxSpeed => {
713 }
715 ReplaySpeed::Custom(multiplier) => {
716 let delay = Duration::from_millis((10.0 / multiplier) as u64);
717 sleep(delay).await;
718 }
719 ReplaySpeed::SlowMotion(factor) => {
720 let delay = Duration::from_millis((100.0 * factor) as u64);
721 sleep(delay).await;
722 }
723 }
724 }
725
726 async fn create_checkpoint(
728 session_id: &str,
729 offset: u64,
730 events_processed: u64,
731 checkpoints: &Arc<RwLock<Vec<ReplayCheckpoint>>>,
732 stats: &Arc<RwLock<ReplayStats>>,
733 ) -> Result<()> {
734 let checkpoint = ReplayCheckpoint {
735 checkpoint_id: Uuid::new_v4().to_string(),
736 timestamp: Utc::now(),
737 last_offset: offset,
738 events_processed,
739 status: ReplayStatus::InProgress,
740 };
741
742 checkpoints.write().push(checkpoint);
743 stats.write().checkpoints_created += 1;
744
745 debug!(
746 "Checkpoint created for session {} at offset {}",
747 session_id, offset
748 );
749 Ok(())
750 }
751
752 async fn create_snapshot(
754 session_id: &str,
755 offset: u64,
756 snapshots: &Arc<RwLock<Vec<StateSnapshot>>>,
757 stats: &Arc<RwLock<ReplayStats>>,
758 ) -> Result<()> {
759 let snapshot = StateSnapshot {
760 snapshot_id: Uuid::new_v4().to_string(),
761 timestamp: Utc::now(),
762 event_offset: offset,
763 state_data: Vec::new(), metadata: HashMap::new(),
765 };
766
767 snapshots.write().push(snapshot);
768 stats.write().snapshots_created += 1;
769
770 debug!(
771 "Snapshot created for session {} at offset {}",
772 session_id, offset
773 );
774 Ok(())
775 }
776
777 pub fn pause_replay(&self, session_id: &str) -> Result<()> {
779 if let Some(mut session) = self.active_replays.get_mut(session_id) {
780 session.status = ReplayStatus::Paused;
781 info!("Replay session {} paused", session_id);
782 Ok(())
783 } else {
784 Err(anyhow!("Replay session not found: {}", session_id))
785 }
786 }
787
788 pub fn resume_replay(&self, session_id: &str) -> Result<()> {
790 if let Some(mut session) = self.active_replays.get_mut(session_id) {
791 session.status = ReplayStatus::InProgress;
792 info!("Replay session {} resumed", session_id);
793 Ok(())
794 } else {
795 Err(anyhow!("Replay session not found: {}", session_id))
796 }
797 }
798
799 pub fn get_stats(&self) -> ReplayStats {
801 self.stats.read().clone()
802 }
803
804 pub fn get_session_status(&self, session_id: &str) -> Option<ReplayStatus> {
806 self.active_replays
807 .get(session_id)
808 .map(|session| session.status.clone())
809 }
810
811 pub fn register_processor(&self, processor: Box<dyn EventProcessor + Send + Sync>) {
813 let name = processor.name().to_string();
814 self.processors.write().push(processor);
815 info!("Registered event processor: {}", name);
816 }
817}
818
819#[cfg(test)]
820mod tests {
821 use super::*;
822 use crate::event::EventMetadata;
823
824 #[tokio::test]
825 async fn test_replay_all_events() {
826 let config = ReplayConfig {
827 mode: ReplayMode::All,
828 speed: ReplaySpeed::MaxSpeed,
829 ..Default::default()
830 };
831
832 let manager = StreamReplayManager::new(config);
833
834 for i in 0..10 {
836 let event = StreamEvent::SchemaChanged {
837 schema_type: crate::event::SchemaType::Ontology,
838 change_type: crate::event::SchemaChangeType::Added,
839 details: format!("test schema change {}", i),
840 metadata: EventMetadata {
841 event_id: format!("event-{}", i),
842 timestamp: Utc::now(),
843 source: "test".to_string(),
844 user: None,
845 context: None,
846 caused_by: None,
847 version: "1.0".to_string(),
848 properties: HashMap::new(),
849 checksum: None,
850 },
851 };
852 manager.store_event(i, event);
853 }
854
855 let mut rx = manager.start_replay(None).await.unwrap();
857
858 let mut count = 0;
860 while let Some(_event) = rx.recv().await {
861 count += 1;
862 }
863
864 assert_eq!(count, 10);
865
866 let stats = manager.get_stats();
867 assert_eq!(stats.events_replayed, 10);
868 }
869
870 #[tokio::test]
871 async fn test_replay_with_filter() {
872 let config = ReplayConfig {
873 mode: ReplayMode::All,
874 speed: ReplaySpeed::MaxSpeed,
875 filter: Some(ReplayFilter {
876 event_types: Some(vec!["SchemaChanged".to_string()]),
877 sources: Some(vec!["test".to_string()]),
878 min_priority: None,
879 custom_predicate: None,
880 }),
881 ..Default::default()
882 };
883
884 let manager = StreamReplayManager::new(config);
885
886 for i in 0..5 {
888 let event = StreamEvent::SchemaChanged {
889 schema_type: crate::event::SchemaType::Ontology,
890 change_type: crate::event::SchemaChangeType::Added,
891 details: format!("test schema change {}", i),
892 metadata: EventMetadata {
893 event_id: format!("event-{}", i),
894 timestamp: Utc::now(),
895 source: "test".to_string(),
896 user: None,
897 context: None,
898 caused_by: None,
899 version: "1.0".to_string(),
900 properties: HashMap::new(),
901 checksum: None,
902 },
903 };
904 manager.store_event(i, event);
905 }
906
907 let mut rx = manager.start_replay(None).await.unwrap();
908
909 let mut count = 0;
910 while let Some(_event) = rx.recv().await {
911 count += 1;
912 }
913
914 assert_eq!(count, 5);
915 }
916}