oxirs_stream/
event_sourcing.rs

1//! # Event Sourcing Framework
2//!
3//! Complete event sourcing implementation for OxiRS Stream providing event storage,
4//! replay capabilities, snapshots, and temporal queries. This forms the foundation
5//! for CQRS patterns and enables advanced temporal analytics.
6
7use crate::{EventMetadata, StreamEvent};
8use anyhow::Result;
9use chrono::{DateTime, Duration as ChronoDuration, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{Mutex, RwLock, Semaphore};
16use tracing::{debug, error, info, warn};
17use uuid::Uuid;
18
19/// Event store configuration
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct EventStoreConfig {
22    /// Maximum events to keep in memory
23    pub max_memory_events: usize,
24    /// Enable persistent storage
25    pub enable_persistence: bool,
26    /// Persistence backend type
27    pub persistence_backend: PersistenceBackend,
28    /// Snapshot configuration
29    pub snapshot_config: SnapshotConfig,
30    /// Retention policy
31    pub retention_policy: RetentionPolicy,
32    /// Indexing configuration
33    pub indexing_config: IndexingConfig,
34    /// Enable compression for stored events
35    pub enable_compression: bool,
36    /// Batch size for persistence operations
37    pub persistence_batch_size: usize,
38}
39
40impl Default for EventStoreConfig {
41    fn default() -> Self {
42        Self {
43            max_memory_events: 1_000_000,
44            enable_persistence: true,
45            persistence_backend: PersistenceBackend::FileSystem {
46                base_path: "/tmp/oxirs-event-store".to_string(),
47            },
48            snapshot_config: SnapshotConfig::default(),
49            retention_policy: RetentionPolicy::default(),
50            indexing_config: IndexingConfig::default(),
51            enable_compression: true,
52            persistence_batch_size: 1000,
53        }
54    }
55}
56
57/// Persistence backend options
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub enum PersistenceBackend {
60    /// File system based storage
61    FileSystem { base_path: String },
62    /// Database storage
63    Database { connection_string: String },
64    /// S3-compatible object storage
65    ObjectStorage {
66        endpoint: String,
67        bucket: String,
68        access_key: String,
69        secret_key: String,
70    },
71    /// In-memory only (no persistence)
72    Memory,
73}
74
75/// Snapshot configuration
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct SnapshotConfig {
78    /// Enable automatic snapshots
79    pub enable_snapshots: bool,
80    /// Snapshot interval (number of events)
81    pub snapshot_interval: usize,
82    /// Maximum snapshots to keep
83    pub max_snapshots: usize,
84    /// Snapshot compression
85    pub compress_snapshots: bool,
86}
87
88impl Default for SnapshotConfig {
89    fn default() -> Self {
90        Self {
91            enable_snapshots: true,
92            snapshot_interval: 10000,
93            max_snapshots: 10,
94            compress_snapshots: true,
95        }
96    }
97}
98
99/// Event retention policy
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct RetentionPolicy {
102    /// Maximum age of events to keep
103    pub max_age: Option<ChronoDuration>,
104    /// Maximum number of events to keep
105    pub max_events: Option<u64>,
106    /// Archive old events instead of deleting
107    pub enable_archiving: bool,
108    /// Archive backend
109    pub archive_backend: Option<PersistenceBackend>,
110}
111
112impl Default for RetentionPolicy {
113    fn default() -> Self {
114        Self {
115            max_age: Some(ChronoDuration::days(365)), // 1 year
116            max_events: Some(10_000_000),             // 10M events
117            enable_archiving: true,
118            archive_backend: None,
119        }
120    }
121}
122
123/// Indexing configuration
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct IndexingConfig {
126    /// Enable event type indexing
127    pub index_by_event_type: bool,
128    /// Enable timestamp indexing
129    pub index_by_timestamp: bool,
130    /// Enable source indexing
131    pub index_by_source: bool,
132    /// Enable custom field indexing
133    pub custom_indexes: Vec<CustomIndex>,
134}
135
136impl Default for IndexingConfig {
137    fn default() -> Self {
138        Self {
139            index_by_event_type: true,
140            index_by_timestamp: true,
141            index_by_source: true,
142            custom_indexes: Vec::new(),
143        }
144    }
145}
146
147/// Custom index definition
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct CustomIndex {
150    /// Index name
151    pub name: String,
152    /// Field path to index
153    pub field_path: String,
154    /// Index type
155    pub index_type: IndexType,
156}
157
158/// Index type
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub enum IndexType {
161    /// Hash index for exact matches
162    Hash,
163    /// B-tree index for range queries
164    BTree,
165    /// Full-text search index
166    FullText,
167}
168
169/// Stored event with metadata
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct StoredEvent {
172    /// Unique event ID
173    pub event_id: Uuid,
174    /// Event sequence number (global order)
175    pub sequence_number: u64,
176    /// Stream ID (for grouping related events)
177    pub stream_id: String,
178    /// Event version within the stream
179    pub stream_version: u64,
180    /// Original event data
181    pub event_data: StreamEvent,
182    /// Storage timestamp
183    pub stored_at: DateTime<Utc>,
184    /// Storage metadata
185    pub storage_metadata: StorageMetadata,
186}
187
188/// Storage metadata
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct StorageMetadata {
191    /// Checksum for integrity verification
192    pub checksum: String,
193    /// Compressed size (if compressed)
194    pub compressed_size: Option<usize>,
195    /// Original size
196    pub original_size: usize,
197    /// Storage location
198    pub storage_location: String,
199    /// Persistence status
200    pub persistence_status: PersistenceStatus,
201}
202
203/// Persistence status
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub enum PersistenceStatus {
206    /// Only in memory
207    InMemory,
208    /// Persisted to disk
209    Persisted,
210    /// Archived to long-term storage
211    Archived,
212    /// Failed to persist
213    Failed { error: String },
214}
215
216/// Event stream snapshot
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct EventSnapshot {
219    /// Snapshot ID
220    pub snapshot_id: Uuid,
221    /// Stream ID
222    pub stream_id: String,
223    /// Stream version at snapshot time
224    pub stream_version: u64,
225    /// Sequence number at snapshot time
226    pub sequence_number: u64,
227    /// Snapshot timestamp
228    pub created_at: DateTime<Utc>,
229    /// Aggregated state data
230    pub state_data: Vec<u8>,
231    /// Snapshot metadata
232    pub metadata: SnapshotMetadata,
233}
234
235/// Snapshot metadata
236#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct SnapshotMetadata {
238    /// Compression algorithm used
239    pub compression: Option<String>,
240    /// Original state size
241    pub original_size: usize,
242    /// Compressed size
243    pub compressed_size: usize,
244    /// Checksum for integrity
245    pub checksum: String,
246}
247
248/// Query criteria for event retrieval
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct EventQuery {
251    /// Stream ID filter
252    pub stream_id: Option<String>,
253    /// Event type filter
254    pub event_types: Option<Vec<String>>,
255    /// Time range filter
256    pub time_range: Option<TimeRange>,
257    /// Sequence number range
258    pub sequence_range: Option<SequenceRange>,
259    /// Source filter
260    pub source: Option<String>,
261    /// Custom field filters
262    pub custom_filters: HashMap<String, String>,
263    /// Maximum number of events to return
264    pub limit: Option<usize>,
265    /// Ordering preference
266    pub order: QueryOrder,
267}
268
269/// Time range for queries
270#[derive(Debug, Clone, Serialize, Deserialize)]
271pub struct TimeRange {
272    pub start: DateTime<Utc>,
273    pub end: DateTime<Utc>,
274}
275
276/// Sequence number range for queries
277#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct SequenceRange {
279    pub start: u64,
280    pub end: u64,
281}
282
283/// Query ordering
284#[derive(Debug, Clone, Serialize, Deserialize)]
285pub enum QueryOrder {
286    /// Ascending by sequence number
287    SequenceAsc,
288    /// Descending by sequence number
289    SequenceDesc,
290    /// Ascending by timestamp
291    TimestampAsc,
292    /// Descending by timestamp
293    TimestampDesc,
294}
295
296/// Event sourcing statistics
297#[derive(Debug, Default)]
298pub struct EventSourcingStats {
299    pub total_events_stored: AtomicU64,
300    pub total_events_retrieved: AtomicU64,
301    pub snapshots_created: AtomicU64,
302    pub events_archived: AtomicU64,
303    pub persistence_operations: AtomicU64,
304    pub failed_operations: AtomicU64,
305    pub memory_usage_bytes: AtomicU64,
306    pub disk_usage_bytes: AtomicU64,
307    pub average_store_latency_ms: AtomicU64,
308    pub average_retrieve_latency_ms: AtomicU64,
309}
310
311/// EventStore trait for abstracting event storage
312#[async_trait::async_trait]
313pub trait EventStoreTrait: Send + Sync {
314    async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent>;
315    async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>>;
316    async fn get_stream_events(
317        &self,
318        stream_id: &str,
319        from_version: Option<u64>,
320    ) -> Result<Vec<StoredEvent>>;
321    async fn replay_from_timestamp(&self, timestamp: DateTime<Utc>) -> Result<Vec<StoredEvent>>;
322    async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>>;
323    async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>>;
324    async fn append_events(
325        &self,
326        aggregate_id: &str,
327        events: &[StreamEvent],
328        expected_version: Option<u64>,
329    ) -> Result<u64>;
330}
331
332/// Event stream trait for streaming events
333#[async_trait::async_trait]
334pub trait EventStream: Send + Sync {
335    async fn next_event(&mut self) -> Option<StoredEvent>;
336    async fn has_events(&self) -> bool;
337    async fn read_events_from_position(
338        &self,
339        position: u64,
340        max_events: usize,
341    ) -> Result<Vec<StoredEvent>>;
342}
343
344/// Snapshot store trait for managing snapshots
345#[async_trait::async_trait]
346pub trait SnapshotStore: Send + Sync {
347    async fn store_snapshot(&self, snapshot: EventSnapshot) -> Result<()>;
348    async fn get_snapshot(
349        &self,
350        stream_id: &str,
351        version: Option<u64>,
352    ) -> Result<Option<EventSnapshot>>;
353    async fn list_snapshots(&self, stream_id: &str) -> Result<Vec<EventSnapshot>>;
354}
355
356/// Main event store implementation
357pub struct EventStore {
358    /// Configuration
359    config: EventStoreConfig,
360    /// In-memory event storage
361    memory_events: Arc<RwLock<BTreeMap<u64, StoredEvent>>>,
362    /// Stream version tracking
363    stream_versions: Arc<RwLock<HashMap<String, u64>>>,
364    /// Next sequence number
365    next_sequence: Arc<AtomicU64>,
366    /// Event indexes
367    indexes: Arc<EventIndexes>,
368    /// Snapshots
369    snapshots: Arc<RwLock<HashMap<String, Vec<EventSnapshot>>>>,
370    /// Persistence manager
371    persistence_manager: Arc<PersistenceManager>,
372    /// Statistics
373    stats: Arc<EventSourcingStats>,
374    /// Operation semaphore
375    operation_semaphore: Arc<Semaphore>,
376}
377
378/// Event indexes for efficient querying
379pub struct EventIndexes {
380    /// Index by event type
381    by_event_type: RwLock<HashMap<String, Vec<u64>>>,
382    /// Index by timestamp
383    by_timestamp: RwLock<BTreeMap<DateTime<Utc>, Vec<u64>>>,
384    /// Index by source
385    by_source: RwLock<HashMap<String, Vec<u64>>>,
386    /// Index by stream ID
387    by_stream: RwLock<HashMap<String, Vec<u64>>>,
388    /// Custom indexes
389    custom_indexes: RwLock<HashMap<String, HashMap<String, Vec<u64>>>>,
390}
391
392/// Persistence manager for durable storage
393pub struct PersistenceManager {
394    /// Backend configuration
395    backend: PersistenceBackend,
396    /// Pending operations queue
397    pending_operations: Arc<Mutex<VecDeque<PersistenceOperation>>>,
398    /// Persistence statistics
399    stats: Arc<PersistenceStats>,
400}
401
402/// Persistence operation
403#[derive(Debug, Clone)]
404pub enum PersistenceOperation {
405    /// Store event
406    StoreEvent(Box<StoredEvent>),
407    /// Store snapshot
408    StoreSnapshot(EventSnapshot),
409    /// Archive events
410    ArchiveEvents(Vec<StoredEvent>),
411    /// Delete events
412    DeleteEvents(Vec<u64>),
413}
414
415/// Persistence statistics
416#[derive(Debug, Default)]
417pub struct PersistenceStats {
418    pub operations_queued: AtomicU64,
419    pub operations_completed: AtomicU64,
420    pub operations_failed: AtomicU64,
421    pub bytes_written: AtomicU64,
422    pub bytes_read: AtomicU64,
423}
424
425impl EventStore {
426    /// Create a new event store
427    pub fn new(config: EventStoreConfig) -> Self {
428        let persistence_manager =
429            Arc::new(PersistenceManager::new(config.persistence_backend.clone()));
430
431        Self {
432            config,
433            memory_events: Arc::new(RwLock::new(BTreeMap::new())),
434            stream_versions: Arc::new(RwLock::new(HashMap::new())),
435            next_sequence: Arc::new(AtomicU64::new(1)),
436            indexes: Arc::new(EventIndexes::new()),
437            snapshots: Arc::new(RwLock::new(HashMap::new())),
438            persistence_manager,
439            stats: Arc::new(EventSourcingStats::default()),
440            operation_semaphore: Arc::new(Semaphore::new(1000)), // Max 1000 concurrent operations
441        }
442    }
443
444    /// Store an event in the event store
445    pub async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent> {
446        let _permit = self.operation_semaphore.acquire().await?;
447        let start_time = Instant::now();
448
449        // Generate sequence number and stream version
450        let sequence_number = self.next_sequence.fetch_add(1, Ordering::SeqCst);
451        let stream_version = {
452            let mut versions = self.stream_versions.write().await;
453            let version = versions.get(&stream_id).unwrap_or(&0) + 1;
454            versions.insert(stream_id.clone(), version);
455            version
456        };
457
458        // Create stored event
459        let checksum = self.calculate_checksum(&event)?;
460        let original_size = self.estimate_size(&event);
461        let stored_event = StoredEvent {
462            event_id: Uuid::new_v4(),
463            sequence_number,
464            stream_id: stream_id.clone(),
465            stream_version,
466            event_data: event,
467            stored_at: Utc::now(),
468            storage_metadata: StorageMetadata {
469                checksum,
470                compressed_size: None,
471                original_size,
472                storage_location: format!("memory:{sequence_number}"),
473                persistence_status: PersistenceStatus::InMemory,
474            },
475        };
476
477        // Store in memory
478        {
479            let mut memory_events = self.memory_events.write().await;
480            memory_events.insert(sequence_number, stored_event.clone());
481
482            // Evict old events if needed
483            if memory_events.len() > self.config.max_memory_events {
484                let to_remove: Vec<u64> = memory_events
485                    .keys()
486                    .take(memory_events.len() - self.config.max_memory_events)
487                    .cloned()
488                    .collect();
489
490                for seq in to_remove {
491                    memory_events.remove(&seq);
492                }
493            }
494        }
495
496        // Update indexes
497        self.indexes.add_event(&stored_event).await?;
498
499        // Queue for persistence if enabled
500        if self.config.enable_persistence {
501            self.persistence_manager
502                .queue_operation(PersistenceOperation::StoreEvent(Box::new(
503                    stored_event.clone(),
504                )))
505                .await?;
506        }
507
508        // Check if snapshot is needed
509        if self.config.snapshot_config.enable_snapshots
510            && stream_version % self.config.snapshot_config.snapshot_interval as u64 == 0
511        {
512            self.create_snapshot(&stream_id, stream_version).await?;
513        }
514
515        // Update statistics
516        self.stats
517            .total_events_stored
518            .fetch_add(1, Ordering::Relaxed);
519        let store_latency = start_time.elapsed();
520        self.stats
521            .average_store_latency_ms
522            .store(store_latency.as_millis() as u64, Ordering::Relaxed);
523
524        info!(
525            "Stored event {} for stream {} (seq: {}, version: {})",
526            stored_event.event_id, stream_id, sequence_number, stream_version
527        );
528
529        Ok(stored_event)
530    }
531
532    /// Retrieve events by query
533    pub async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>> {
534        let _permit = self.operation_semaphore.acquire().await?;
535        let start_time = Instant::now();
536
537        let candidate_sequences = self.indexes.find_matching_sequences(&query).await?;
538        let mut results = Vec::new();
539
540        let memory_events = self.memory_events.read().await;
541        for &sequence in &candidate_sequences {
542            if let Some(stored_event) = memory_events.get(&sequence) {
543                if self.matches_query(stored_event, &query) {
544                    results.push(stored_event.clone());
545
546                    if let Some(limit) = query.limit {
547                        if results.len() >= limit {
548                            break;
549                        }
550                    }
551                }
552            }
553        }
554
555        // Sort results based on query order
556        self.sort_results(&mut results, &query.order);
557
558        // Update statistics
559        self.stats
560            .total_events_retrieved
561            .fetch_add(results.len() as u64, Ordering::Relaxed);
562        let retrieve_latency = start_time.elapsed();
563        self.stats
564            .average_retrieve_latency_ms
565            .store(retrieve_latency.as_millis() as u64, Ordering::Relaxed);
566
567        debug!(
568            "Query returned {} events in {:?}",
569            results.len(),
570            retrieve_latency
571        );
572
573        Ok(results)
574    }
575
576    /// Get events for a specific stream
577    pub async fn get_stream_events(
578        &self,
579        stream_id: &str,
580        from_version: Option<u64>,
581    ) -> Result<Vec<StoredEvent>> {
582        let query = EventQuery {
583            stream_id: Some(stream_id.to_string()),
584            event_types: None,
585            time_range: None,
586            sequence_range: None,
587            source: None,
588            custom_filters: HashMap::new(),
589            limit: None,
590            order: QueryOrder::SequenceAsc,
591        };
592
593        let mut events = self.query_events(query).await?;
594
595        if let Some(from_version) = from_version {
596            events.retain(|e| e.stream_version >= from_version);
597        }
598
599        Ok(events)
600    }
601
602    /// Replay events from a specific point in time
603    pub async fn replay_from_timestamp(
604        &self,
605        timestamp: DateTime<Utc>,
606    ) -> Result<Vec<StoredEvent>> {
607        let query = EventQuery {
608            stream_id: None,
609            event_types: None,
610            time_range: Some(TimeRange {
611                start: timestamp,
612                end: Utc::now(),
613            }),
614            sequence_range: None,
615            source: None,
616            custom_filters: HashMap::new(),
617            limit: None,
618            order: QueryOrder::SequenceAsc,
619        };
620
621        self.query_events(query).await
622    }
623
624    /// Create a snapshot for a stream
625    async fn create_snapshot(&self, stream_id: &str, stream_version: u64) -> Result<EventSnapshot> {
626        let events = self.get_stream_events(stream_id, None).await?;
627
628        // Aggregate state from events (simplified)
629        let state_data = self.aggregate_events(&events)?;
630        let compressed_data = self.compress_data(&state_data)?;
631
632        let snapshot = EventSnapshot {
633            snapshot_id: Uuid::new_v4(),
634            stream_id: stream_id.to_string(),
635            stream_version,
636            sequence_number: events.last().map(|e| e.sequence_number).unwrap_or(0),
637            created_at: Utc::now(),
638            state_data: compressed_data.clone(),
639            metadata: SnapshotMetadata {
640                compression: Some("gzip".to_string()),
641                original_size: state_data.len(),
642                compressed_size: compressed_data.len(),
643                checksum: self.calculate_data_checksum(&compressed_data)?,
644            },
645        };
646
647        // Store snapshot
648        {
649            let mut snapshots = self.snapshots.write().await;
650            let stream_snapshots = snapshots
651                .entry(stream_id.to_string())
652                .or_insert_with(Vec::new);
653            stream_snapshots.push(snapshot.clone());
654
655            // Keep only recent snapshots
656            if stream_snapshots.len() > self.config.snapshot_config.max_snapshots {
657                stream_snapshots.remove(0);
658            }
659        }
660
661        // Queue for persistence
662        if self.config.enable_persistence {
663            self.persistence_manager
664                .queue_operation(PersistenceOperation::StoreSnapshot(snapshot.clone()))
665                .await?;
666        }
667
668        self.stats.snapshots_created.fetch_add(1, Ordering::Relaxed);
669        info!(
670            "Created snapshot {} for stream {} at version {}",
671            snapshot.snapshot_id, stream_id, stream_version
672        );
673
674        Ok(snapshot)
675    }
676
677    /// Get the latest snapshot for a stream
678    pub async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>> {
679        let snapshots = self.snapshots.read().await;
680        if let Some(stream_snapshots) = snapshots.get(stream_id) {
681            Ok(stream_snapshots.last().cloned())
682        } else {
683            Ok(None)
684        }
685    }
686
687    /// Rebuild stream state from events and snapshots
688    pub async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>> {
689        // Get latest snapshot
690        if let Some(snapshot) = self.get_latest_snapshot(stream_id).await? {
691            // Get events after snapshot
692            let events = self
693                .get_stream_events(stream_id, Some(snapshot.stream_version + 1))
694                .await?;
695
696            // Start with snapshot state
697            let mut state = self.decompress_data(&snapshot.state_data)?;
698
699            // Apply subsequent events
700            for event in events {
701                state = self.apply_event_to_state(state, &event.event_data)?;
702            }
703
704            Ok(state)
705        } else {
706            // No snapshot, rebuild from all events
707            let events = self.get_stream_events(stream_id, None).await?;
708            let aggregated = self.aggregate_events(&events)?;
709            Ok(aggregated)
710        }
711    }
712
713    /// Check if an event matches the query criteria
714    fn matches_query(&self, event: &StoredEvent, query: &EventQuery) -> bool {
715        // Stream ID filter
716        if let Some(ref stream_id) = query.stream_id {
717            if &event.stream_id != stream_id {
718                return false;
719            }
720        }
721
722        // Event type filter
723        if let Some(ref event_types) = query.event_types {
724            let event_type = format!("{:?}", std::mem::discriminant(&event.event_data));
725            if !event_types.contains(&event_type) {
726                return false;
727            }
728        }
729
730        // Time range filter
731        if let Some(ref time_range) = query.time_range {
732            let event_time = event.event_data.metadata().timestamp;
733            if event_time < time_range.start || event_time > time_range.end {
734                return false;
735            }
736        }
737
738        // Sequence range filter
739        if let Some(ref seq_range) = query.sequence_range {
740            if event.sequence_number < seq_range.start || event.sequence_number > seq_range.end {
741                return false;
742            }
743        }
744
745        // Source filter
746        if let Some(ref source) = query.source {
747            if &event.event_data.metadata().source != source {
748                return false;
749            }
750        }
751
752        true
753    }
754
755    /// Sort results based on query order
756    fn sort_results(&self, results: &mut [StoredEvent], order: &QueryOrder) {
757        match order {
758            QueryOrder::SequenceAsc => {
759                results.sort_by_key(|e| e.sequence_number);
760            }
761            QueryOrder::SequenceDesc => {
762                results.sort_by_key(|e| std::cmp::Reverse(e.sequence_number));
763            }
764            QueryOrder::TimestampAsc => {
765                results.sort_by_key(|e| e.event_data.metadata().timestamp);
766            }
767            QueryOrder::TimestampDesc => {
768                results.sort_by_key(|e| std::cmp::Reverse(e.event_data.metadata().timestamp));
769            }
770        }
771    }
772
773    /// Calculate checksum for event
774    fn calculate_checksum(&self, event: &StreamEvent) -> Result<String> {
775        let serialized = serde_json::to_string(event)?;
776        Ok(format!("{:x}", crc32fast::hash(serialized.as_bytes())))
777    }
778
779    /// Calculate checksum for data
780    fn calculate_data_checksum(&self, data: &[u8]) -> Result<String> {
781        Ok(format!("{:x}", crc32fast::hash(data)))
782    }
783
784    /// Estimate size of an event
785    fn estimate_size(&self, event: &StreamEvent) -> usize {
786        serde_json::to_string(event)
787            .map(|s| s.len())
788            .unwrap_or(1024)
789    }
790
791    /// Aggregate events into state data
792    fn aggregate_events(&self, events: &[StoredEvent]) -> Result<Vec<u8>> {
793        // Simplified aggregation - in real implementation, this would be domain-specific
794        let aggregate = format!("Aggregated {} events", events.len());
795        Ok(aggregate.into_bytes())
796    }
797
798    /// Apply an event to existing state
799    fn apply_event_to_state(&self, mut state: Vec<u8>, _event: &StreamEvent) -> Result<Vec<u8>> {
800        // Simplified state application
801        state.extend_from_slice(b" +event");
802        Ok(state)
803    }
804
805    /// Compress data
806    fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
807        if self.config.enable_compression {
808            use flate2::write::GzEncoder;
809            use flate2::Compression;
810            use std::io::Write;
811
812            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
813            encoder.write_all(data)?;
814            Ok(encoder.finish()?)
815        } else {
816            Ok(data.to_vec())
817        }
818    }
819
820    /// Decompress data
821    fn decompress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
822        if self.config.enable_compression {
823            use flate2::read::GzDecoder;
824            use std::io::Read;
825
826            let mut decoder = GzDecoder::new(data);
827            let mut decompressed = Vec::new();
828            decoder.read_to_end(&mut decompressed)?;
829            Ok(decompressed)
830        } else {
831            Ok(data.to_vec())
832        }
833    }
834
835    /// Get event sourcing statistics
836    pub fn get_stats(&self) -> EventSourcingStats {
837        EventSourcingStats {
838            total_events_stored: AtomicU64::new(
839                self.stats.total_events_stored.load(Ordering::Relaxed),
840            ),
841            total_events_retrieved: AtomicU64::new(
842                self.stats.total_events_retrieved.load(Ordering::Relaxed),
843            ),
844            snapshots_created: AtomicU64::new(self.stats.snapshots_created.load(Ordering::Relaxed)),
845            events_archived: AtomicU64::new(self.stats.events_archived.load(Ordering::Relaxed)),
846            persistence_operations: AtomicU64::new(
847                self.stats.persistence_operations.load(Ordering::Relaxed),
848            ),
849            failed_operations: AtomicU64::new(self.stats.failed_operations.load(Ordering::Relaxed)),
850            memory_usage_bytes: AtomicU64::new(
851                self.stats.memory_usage_bytes.load(Ordering::Relaxed),
852            ),
853            disk_usage_bytes: AtomicU64::new(self.stats.disk_usage_bytes.load(Ordering::Relaxed)),
854            average_store_latency_ms: AtomicU64::new(
855                self.stats.average_store_latency_ms.load(Ordering::Relaxed),
856            ),
857            average_retrieve_latency_ms: AtomicU64::new(
858                self.stats
859                    .average_retrieve_latency_ms
860                    .load(Ordering::Relaxed),
861            ),
862        }
863    }
864}
865
866/// Implement the EventStoreTrait for the concrete EventStore
867#[async_trait::async_trait]
868impl EventStoreTrait for EventStore {
869    async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent> {
870        self.store_event(stream_id, event).await
871    }
872
873    async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>> {
874        self.query_events(query).await
875    }
876
877    async fn get_stream_events(
878        &self,
879        stream_id: &str,
880        from_version: Option<u64>,
881    ) -> Result<Vec<StoredEvent>> {
882        self.get_stream_events(stream_id, from_version).await
883    }
884
885    async fn replay_from_timestamp(&self, timestamp: DateTime<Utc>) -> Result<Vec<StoredEvent>> {
886        self.replay_from_timestamp(timestamp).await
887    }
888
889    async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>> {
890        self.get_latest_snapshot(stream_id).await
891    }
892
893    async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>> {
894        self.rebuild_stream_state(stream_id).await
895    }
896
897    async fn append_events(
898        &self,
899        aggregate_id: &str,
900        events: &[StreamEvent],
901        _expected_version: Option<u64>,
902    ) -> Result<u64> {
903        let mut last_version = 0u64;
904        for event in events {
905            let stored_event = self
906                .store_event(aggregate_id.to_string(), event.clone())
907                .await?;
908            last_version = stored_event.stream_version;
909        }
910        Ok(last_version)
911    }
912}
913
914impl Default for EventIndexes {
915    fn default() -> Self {
916        Self::new()
917    }
918}
919
920impl EventIndexes {
921    /// Create new event indexes
922    pub fn new() -> Self {
923        Self {
924            by_event_type: RwLock::new(HashMap::new()),
925            by_timestamp: RwLock::new(BTreeMap::new()),
926            by_source: RwLock::new(HashMap::new()),
927            by_stream: RwLock::new(HashMap::new()),
928            custom_indexes: RwLock::new(HashMap::new()),
929        }
930    }
931
932    /// Add an event to indexes
933    pub async fn add_event(&self, event: &StoredEvent) -> Result<()> {
934        let sequence = event.sequence_number;
935
936        // Index by event type
937        {
938            let mut by_type = self.by_event_type.write().await;
939            let event_type = format!("{:?}", std::mem::discriminant(&event.event_data));
940            by_type
941                .entry(event_type)
942                .or_insert_with(Vec::new)
943                .push(sequence);
944        }
945
946        // Index by timestamp
947        {
948            let mut by_timestamp = self.by_timestamp.write().await;
949            let timestamp = event.event_data.metadata().timestamp;
950            by_timestamp
951                .entry(timestamp)
952                .or_insert_with(Vec::new)
953                .push(sequence);
954        }
955
956        // Index by source
957        {
958            let mut by_source = self.by_source.write().await;
959            let source = &event.event_data.metadata().source;
960            by_source
961                .entry(source.clone())
962                .or_insert_with(Vec::new)
963                .push(sequence);
964        }
965
966        // Index by stream
967        {
968            let mut by_stream = self.by_stream.write().await;
969            by_stream
970                .entry(event.stream_id.clone())
971                .or_insert_with(Vec::new)
972                .push(sequence);
973        }
974
975        Ok(())
976    }
977
978    /// Find sequences matching query criteria
979    pub async fn find_matching_sequences(&self, query: &EventQuery) -> Result<Vec<u64>> {
980        let mut candidate_sequences = Vec::new();
981
982        // Start with stream filter if specified
983        if let Some(ref stream_id) = query.stream_id {
984            let by_stream = self.by_stream.read().await;
985            if let Some(sequences) = by_stream.get(stream_id) {
986                candidate_sequences = sequences.clone();
987            } else {
988                return Ok(Vec::new()); // Stream not found
989            }
990        } else {
991            // Get all sequences (this could be optimized)
992            let by_stream = self.by_stream.read().await;
993            for sequences in by_stream.values() {
994                candidate_sequences.extend(sequences);
995            }
996        }
997
998        // Apply other filters
999        if let Some(ref event_types) = query.event_types {
1000            let by_type = self.by_event_type.read().await;
1001            let mut type_sequences: HashSet<u64> = HashSet::new();
1002
1003            for event_type in event_types {
1004                if let Some(sequences) = by_type.get(event_type) {
1005                    type_sequences.extend(sequences);
1006                }
1007            }
1008
1009            candidate_sequences.retain(|seq| type_sequences.contains(seq));
1010        }
1011
1012        // Apply sequence range filter
1013        if let Some(ref seq_range) = query.sequence_range {
1014            candidate_sequences.retain(|&seq| seq >= seq_range.start && seq <= seq_range.end);
1015        }
1016
1017        candidate_sequences.sort_unstable();
1018        Ok(candidate_sequences)
1019    }
1020}
1021
1022impl PersistenceManager {
1023    /// Create new persistence manager
1024    pub fn new(backend: PersistenceBackend) -> Self {
1025        Self {
1026            backend,
1027            pending_operations: Arc::new(Mutex::new(VecDeque::new())),
1028            stats: Arc::new(PersistenceStats::default()),
1029        }
1030    }
1031
1032    /// Queue a persistence operation
1033    pub async fn queue_operation(&self, operation: PersistenceOperation) -> Result<()> {
1034        let mut queue = self.pending_operations.lock().await;
1035        queue.push_back(operation);
1036        self.stats.operations_queued.fetch_add(1, Ordering::Relaxed);
1037        Ok(())
1038    }
1039
1040    /// Process pending persistence operations
1041    pub async fn process_pending_operations(&self) -> Result<()> {
1042        let operations: Vec<PersistenceOperation> = {
1043            let mut queue = self.pending_operations.lock().await;
1044            queue.drain(..).collect()
1045        };
1046
1047        for operation in operations {
1048            match self.execute_operation(operation).await {
1049                Ok(_) => {
1050                    self.stats
1051                        .operations_completed
1052                        .fetch_add(1, Ordering::Relaxed);
1053                }
1054                Err(e) => {
1055                    self.stats.operations_failed.fetch_add(1, Ordering::Relaxed);
1056                    error!("Persistence operation failed: {}", e);
1057                }
1058            }
1059        }
1060
1061        Ok(())
1062    }
1063
1064    /// Execute a single persistence operation
1065    async fn execute_operation(&self, operation: PersistenceOperation) -> Result<()> {
1066        match &self.backend {
1067            PersistenceBackend::Memory => {
1068                // No-op for memory backend
1069                Ok(())
1070            }
1071            PersistenceBackend::FileSystem { base_path } => {
1072                self.execute_filesystem_operation(operation, base_path)
1073                    .await
1074            }
1075            _ => {
1076                // Other backends not implemented in this example
1077                warn!("Persistence backend not implemented: {:?}", self.backend);
1078                Ok(())
1079            }
1080        }
1081    }
1082
1083    /// Execute filesystem persistence operation
1084    async fn execute_filesystem_operation(
1085        &self,
1086        operation: PersistenceOperation,
1087        _base_path: &str,
1088    ) -> Result<()> {
1089        match operation {
1090            PersistenceOperation::StoreEvent(_event) => {
1091                // Simulate file write
1092                tokio::time::sleep(Duration::from_millis(1)).await;
1093                self.stats.bytes_written.fetch_add(1024, Ordering::Relaxed);
1094            }
1095            PersistenceOperation::StoreSnapshot(_snapshot) => {
1096                // Simulate snapshot write
1097                tokio::time::sleep(Duration::from_millis(5)).await;
1098                self.stats.bytes_written.fetch_add(10240, Ordering::Relaxed);
1099            }
1100            _ => {
1101                // Other operations
1102            }
1103        }
1104        Ok(())
1105    }
1106}
1107
1108// Helper trait for accessing metadata
1109trait EventMetadataAccessor {
1110    fn metadata(&self) -> &EventMetadata;
1111}
1112
1113impl EventMetadataAccessor for StreamEvent {
1114    fn metadata(&self) -> &EventMetadata {
1115        match self {
1116            StreamEvent::TripleAdded { metadata, .. } => metadata,
1117            StreamEvent::TripleRemoved { metadata, .. } => metadata,
1118            StreamEvent::QuadAdded { metadata, .. } => metadata,
1119            StreamEvent::QuadRemoved { metadata, .. } => metadata,
1120            StreamEvent::GraphCreated { metadata, .. } => metadata,
1121            StreamEvent::GraphCleared { metadata, .. } => metadata,
1122            StreamEvent::GraphDeleted { metadata, .. } => metadata,
1123            StreamEvent::SparqlUpdate { metadata, .. } => metadata,
1124            StreamEvent::TransactionBegin { metadata, .. } => metadata,
1125            StreamEvent::TransactionCommit { metadata, .. } => metadata,
1126            StreamEvent::TransactionAbort { metadata, .. } => metadata,
1127            StreamEvent::SchemaChanged { metadata, .. } => metadata,
1128            StreamEvent::Heartbeat { metadata, .. } => metadata,
1129            StreamEvent::QueryResultAdded { metadata, .. } => metadata,
1130            StreamEvent::QueryResultRemoved { metadata, .. } => metadata,
1131            StreamEvent::QueryCompleted { metadata, .. } => metadata,
1132            StreamEvent::ErrorOccurred { metadata, .. } => metadata,
1133            _ => {
1134                // For unmatched event types, return a static reference
1135                use once_cell::sync::Lazy;
1136                static DEFAULT_METADATA: Lazy<EventMetadata> = Lazy::new(EventMetadata::default);
1137                &DEFAULT_METADATA
1138            }
1139        }
1140    }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145    use super::*;
1146    use crate::VectorClock;
1147    use std::collections::HashMap;
1148
1149    fn create_test_event() -> StreamEvent {
1150        StreamEvent::TripleAdded {
1151            subject: "http://test.org/subject".to_string(),
1152            predicate: "http://test.org/predicate".to_string(),
1153            object: "\"test_value\"".to_string(),
1154            graph: None,
1155            metadata: EventMetadata {
1156                event_id: Uuid::new_v4().to_string(),
1157                timestamp: Utc::now(),
1158                source: "test".to_string(),
1159                user: None,
1160                context: None,
1161                caused_by: None,
1162                version: "1.0".to_string(),
1163                properties: HashMap::new(),
1164                checksum: None,
1165            },
1166        }
1167    }
1168
1169    #[tokio::test]
1170    async fn test_event_store_creation() {
1171        let config = EventStoreConfig::default();
1172        let store = EventStore::new(config);
1173
1174        let stats = store.get_stats();
1175        assert_eq!(stats.total_events_stored.load(Ordering::Relaxed), 0);
1176    }
1177
1178    #[tokio::test]
1179    async fn test_store_and_retrieve_event() {
1180        let config = EventStoreConfig::default();
1181        let store = EventStore::new(config);
1182
1183        let event = create_test_event();
1184        let stored_event = store
1185            .store_event("test_stream".to_string(), event)
1186            .await
1187            .unwrap();
1188
1189        assert_eq!(stored_event.stream_id, "test_stream");
1190        assert_eq!(stored_event.stream_version, 1);
1191        assert_eq!(stored_event.sequence_number, 1);
1192
1193        let stream_events = store.get_stream_events("test_stream", None).await.unwrap();
1194        assert_eq!(stream_events.len(), 1);
1195        assert_eq!(stream_events[0].event_id, stored_event.event_id);
1196    }
1197
1198    #[tokio::test]
1199    async fn test_event_query() {
1200        let config = EventStoreConfig::default();
1201        let store = EventStore::new(config);
1202
1203        // Store multiple events
1204        for i in 0..5 {
1205            let event = create_test_event();
1206            store
1207                .store_event(format!("stream_{}", i % 2), event)
1208                .await
1209                .unwrap();
1210        }
1211
1212        // Query specific stream
1213        let query = EventQuery {
1214            stream_id: Some("stream_0".to_string()),
1215            event_types: None,
1216            time_range: None,
1217            sequence_range: None,
1218            source: None,
1219            custom_filters: HashMap::new(),
1220            limit: None,
1221            order: QueryOrder::SequenceAsc,
1222        };
1223
1224        let results = store.query_events(query).await.unwrap();
1225        assert_eq!(results.len(), 3); // Events 0, 2, 4
1226
1227        // Verify sequence order
1228        for i in 1..results.len() {
1229            assert!(results[i].sequence_number > results[i - 1].sequence_number);
1230        }
1231    }
1232
1233    #[tokio::test]
1234    async fn test_snapshot_creation() {
1235        let mut config = EventStoreConfig::default();
1236        config.snapshot_config.snapshot_interval = 3; // Snapshot every 3 events
1237
1238        let store = EventStore::new(config);
1239
1240        // Store events to trigger snapshot
1241        for _ in 0..3 {
1242            let event = create_test_event();
1243            store
1244                .store_event("test_stream".to_string(), event)
1245                .await
1246                .unwrap();
1247        }
1248
1249        let snapshot = store.get_latest_snapshot("test_stream").await.unwrap();
1250        assert!(snapshot.is_some());
1251
1252        let snapshot = snapshot.unwrap();
1253        assert_eq!(snapshot.stream_id, "test_stream");
1254        assert_eq!(snapshot.stream_version, 3);
1255    }
1256
1257    #[tokio::test]
1258    async fn test_replay_from_timestamp() {
1259        let config = EventStoreConfig::default();
1260        let store = EventStore::new(config);
1261
1262        let start_time = Utc::now();
1263
1264        // Store some events
1265        for i in 0..3 {
1266            let event = create_test_event();
1267            store
1268                .store_event(format!("stream_{i}"), event)
1269                .await
1270                .unwrap();
1271        }
1272
1273        // Replay from start time
1274        let replayed_events = store.replay_from_timestamp(start_time).await.unwrap();
1275        assert!(replayed_events.len() >= 3);
1276
1277        // Verify chronological order
1278        for i in 1..replayed_events.len() {
1279            assert!(replayed_events[i].stored_at >= replayed_events[i - 1].stored_at);
1280        }
1281    }
1282
1283    #[tokio::test]
1284    async fn test_persistence_manager() {
1285        let backend = PersistenceBackend::Memory;
1286        let manager = PersistenceManager::new(backend);
1287
1288        let event = create_test_event();
1289        let stored_event = StoredEvent {
1290            event_id: Uuid::new_v4(),
1291            sequence_number: 1,
1292            stream_id: "test".to_string(),
1293            stream_version: 1,
1294            event_data: event,
1295            stored_at: Utc::now(),
1296            storage_metadata: StorageMetadata {
1297                checksum: "test".to_string(),
1298                compressed_size: None,
1299                original_size: 100,
1300                storage_location: "memory".to_string(),
1301                persistence_status: PersistenceStatus::InMemory,
1302            },
1303        };
1304
1305        manager
1306            .queue_operation(PersistenceOperation::StoreEvent(Box::new(stored_event)))
1307            .await
1308            .unwrap();
1309        manager.process_pending_operations().await.unwrap();
1310
1311        assert_eq!(manager.stats.operations_queued.load(Ordering::Relaxed), 1);
1312        assert_eq!(
1313            manager.stats.operations_completed.load(Ordering::Relaxed),
1314            1
1315        );
1316    }
1317
1318    #[test]
1319    fn test_vector_clock_operations() {
1320        let mut clock1 = VectorClock::new();
1321        let mut clock2 = VectorClock::new();
1322
1323        // Test concurrent clocks
1324        clock1.increment("region1");
1325        clock2.increment("region2");
1326        assert!(clock1.is_concurrent(&clock2));
1327
1328        // Test happens-before
1329        clock1.update(&clock2);
1330        clock1.increment("region1");
1331        assert!(clock2.happens_before(&clock1));
1332        assert!(!clock1.happens_before(&clock2));
1333    }
1334}