Skip to main content

oxirs_stream/event_sourcing/
store.rs

1//! EventStore, EventIndexes, PersistenceManager implementations and the
2//! `EventMetadataAccessor` helper trait.
3
4use super::{
5    EventQuery, EventSnapshot, EventSourcingStats, EventStoreConfig, EventStoreTrait,
6    PersistenceBackend, PersistenceOperation, PersistenceStats, PersistenceStatus, QueryOrder,
7    SnapshotMetadata, StorageMetadata, StoredEvent, TimeRange,
8};
9use crate::{EventMetadata, StreamEvent};
10use anyhow::Result;
11use chrono::{DateTime, Utc};
12use std::collections::VecDeque;
13use std::collections::{BTreeMap, HashMap, HashSet};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{Mutex, RwLock, Semaphore};
18use tracing::{debug, error, info, warn};
19use uuid::Uuid;
20
21/// Main event store implementation
22pub struct EventStore {
23    /// Configuration
24    config: EventStoreConfig,
25    /// In-memory event storage
26    memory_events: Arc<RwLock<BTreeMap<u64, StoredEvent>>>,
27    /// Stream version tracking
28    stream_versions: Arc<RwLock<HashMap<String, u64>>>,
29    /// Next sequence number
30    next_sequence: Arc<AtomicU64>,
31    /// Event indexes
32    indexes: Arc<EventIndexes>,
33    /// Snapshots
34    snapshots: Arc<RwLock<HashMap<String, Vec<EventSnapshot>>>>,
35    /// Persistence manager
36    persistence_manager: Arc<PersistenceManager>,
37    /// Statistics
38    stats: Arc<EventSourcingStats>,
39    /// Operation semaphore
40    operation_semaphore: Arc<Semaphore>,
41}
42
43/// Event indexes for efficient querying
44pub struct EventIndexes {
45    /// Index by event type
46    by_event_type: RwLock<HashMap<String, Vec<u64>>>,
47    /// Index by timestamp
48    by_timestamp: RwLock<BTreeMap<DateTime<Utc>, Vec<u64>>>,
49    /// Index by source
50    by_source: RwLock<HashMap<String, Vec<u64>>>,
51    /// Index by stream ID
52    by_stream: RwLock<HashMap<String, Vec<u64>>>,
53    /// Custom indexes
54    custom_indexes: RwLock<HashMap<String, HashMap<String, Vec<u64>>>>,
55}
56
57/// Persistence manager for durable storage
58pub struct PersistenceManager {
59    /// Backend configuration
60    backend: PersistenceBackend,
61    /// Pending operations queue
62    pending_operations: Arc<Mutex<VecDeque<PersistenceOperation>>>,
63    /// Persistence statistics
64    pub(crate) stats: Arc<PersistenceStats>,
65}
66
67impl EventStore {
68    /// Create a new event store
69    pub fn new(config: EventStoreConfig) -> Self {
70        let persistence_manager =
71            Arc::new(PersistenceManager::new(config.persistence_backend.clone()));
72
73        Self {
74            config,
75            memory_events: Arc::new(RwLock::new(BTreeMap::new())),
76            stream_versions: Arc::new(RwLock::new(HashMap::new())),
77            next_sequence: Arc::new(AtomicU64::new(1)),
78            indexes: Arc::new(EventIndexes::new()),
79            snapshots: Arc::new(RwLock::new(HashMap::new())),
80            persistence_manager,
81            stats: Arc::new(EventSourcingStats::default()),
82            operation_semaphore: Arc::new(Semaphore::new(1000)), // Max 1000 concurrent operations
83        }
84    }
85
86    /// Store an event in the event store
87    pub async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent> {
88        let _permit = self.operation_semaphore.acquire().await?;
89        let start_time = Instant::now();
90
91        // Generate sequence number and stream version
92        let sequence_number = self.next_sequence.fetch_add(1, Ordering::SeqCst);
93        let stream_version = {
94            let mut versions = self.stream_versions.write().await;
95            let version = versions.get(&stream_id).unwrap_or(&0) + 1;
96            versions.insert(stream_id.clone(), version);
97            version
98        };
99
100        // Create stored event
101        let checksum = self.calculate_checksum(&event)?;
102        let original_size = self.estimate_size(&event);
103        let stored_event = StoredEvent {
104            event_id: Uuid::new_v4(),
105            sequence_number,
106            stream_id: stream_id.clone(),
107            stream_version,
108            event_data: event,
109            stored_at: Utc::now(),
110            storage_metadata: StorageMetadata {
111                checksum,
112                compressed_size: None,
113                original_size,
114                storage_location: format!("memory:{sequence_number}"),
115                persistence_status: PersistenceStatus::InMemory,
116            },
117        };
118
119        // Store in memory
120        {
121            let mut memory_events = self.memory_events.write().await;
122            memory_events.insert(sequence_number, stored_event.clone());
123
124            // Evict old events if needed
125            if memory_events.len() > self.config.max_memory_events {
126                let to_remove: Vec<u64> = memory_events
127                    .keys()
128                    .take(memory_events.len() - self.config.max_memory_events)
129                    .cloned()
130                    .collect();
131
132                for seq in to_remove {
133                    memory_events.remove(&seq);
134                }
135            }
136        }
137
138        // Update indexes
139        self.indexes.add_event(&stored_event).await?;
140
141        // Queue for persistence if enabled
142        if self.config.enable_persistence {
143            self.persistence_manager
144                .queue_operation(PersistenceOperation::StoreEvent(Box::new(
145                    stored_event.clone(),
146                )))
147                .await?;
148        }
149
150        // Check if snapshot is needed
151        if self.config.snapshot_config.enable_snapshots
152            && stream_version % self.config.snapshot_config.snapshot_interval as u64 == 0
153        {
154            self.create_snapshot(&stream_id, stream_version).await?;
155        }
156
157        // Update statistics
158        self.stats
159            .total_events_stored
160            .fetch_add(1, Ordering::Relaxed);
161        let store_latency = start_time.elapsed();
162        self.stats
163            .average_store_latency_ms
164            .store(store_latency.as_millis() as u64, Ordering::Relaxed);
165
166        info!(
167            "Stored event {} for stream {} (seq: {}, version: {})",
168            stored_event.event_id, stream_id, sequence_number, stream_version
169        );
170
171        Ok(stored_event)
172    }
173
174    /// Retrieve events by query
175    pub async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>> {
176        let _permit = self.operation_semaphore.acquire().await?;
177        let start_time = Instant::now();
178
179        let candidate_sequences = self.indexes.find_matching_sequences(&query).await?;
180        let mut results = Vec::new();
181
182        let memory_events = self.memory_events.read().await;
183        for &sequence in &candidate_sequences {
184            if let Some(stored_event) = memory_events.get(&sequence) {
185                if self.matches_query(stored_event, &query) {
186                    results.push(stored_event.clone());
187
188                    if let Some(limit) = query.limit {
189                        if results.len() >= limit {
190                            break;
191                        }
192                    }
193                }
194            }
195        }
196
197        // Sort results based on query order
198        self.sort_results(&mut results, &query.order);
199
200        // Update statistics
201        self.stats
202            .total_events_retrieved
203            .fetch_add(results.len() as u64, Ordering::Relaxed);
204        let retrieve_latency = start_time.elapsed();
205        self.stats
206            .average_retrieve_latency_ms
207            .store(retrieve_latency.as_millis() as u64, Ordering::Relaxed);
208
209        debug!(
210            "Query returned {} events in {:?}",
211            results.len(),
212            retrieve_latency
213        );
214
215        Ok(results)
216    }
217
218    /// Get events for a specific stream
219    pub async fn get_stream_events(
220        &self,
221        stream_id: &str,
222        from_version: Option<u64>,
223    ) -> Result<Vec<StoredEvent>> {
224        let query = EventQuery {
225            stream_id: Some(stream_id.to_string()),
226            event_types: None,
227            time_range: None,
228            sequence_range: None,
229            source: None,
230            custom_filters: HashMap::new(),
231            limit: None,
232            order: QueryOrder::SequenceAsc,
233        };
234
235        let mut events = self.query_events(query).await?;
236
237        if let Some(from_version) = from_version {
238            events.retain(|e| e.stream_version >= from_version);
239        }
240
241        Ok(events)
242    }
243
244    /// Replay events from a specific point in time
245    pub async fn replay_from_timestamp(
246        &self,
247        timestamp: DateTime<Utc>,
248    ) -> Result<Vec<StoredEvent>> {
249        let query = EventQuery {
250            stream_id: None,
251            event_types: None,
252            time_range: Some(TimeRange {
253                start: timestamp,
254                end: Utc::now(),
255            }),
256            sequence_range: None,
257            source: None,
258            custom_filters: HashMap::new(),
259            limit: None,
260            order: QueryOrder::SequenceAsc,
261        };
262
263        self.query_events(query).await
264    }
265
266    /// Create a snapshot for a stream
267    async fn create_snapshot(&self, stream_id: &str, stream_version: u64) -> Result<EventSnapshot> {
268        let events = self.get_stream_events(stream_id, None).await?;
269
270        // Aggregate state from events (simplified)
271        let state_data = self.aggregate_events(&events)?;
272        let compressed_data = self.compress_data(&state_data)?;
273
274        let snapshot = EventSnapshot {
275            snapshot_id: Uuid::new_v4(),
276            stream_id: stream_id.to_string(),
277            stream_version,
278            sequence_number: events.last().map(|e| e.sequence_number).unwrap_or(0),
279            created_at: Utc::now(),
280            state_data: compressed_data.clone(),
281            metadata: SnapshotMetadata {
282                compression: Some("gzip".to_string()),
283                original_size: state_data.len(),
284                compressed_size: compressed_data.len(),
285                checksum: self.calculate_data_checksum(&compressed_data)?,
286            },
287        };
288
289        // Store snapshot
290        {
291            let mut snapshots = self.snapshots.write().await;
292            let stream_snapshots = snapshots
293                .entry(stream_id.to_string())
294                .or_insert_with(Vec::new);
295            stream_snapshots.push(snapshot.clone());
296
297            // Keep only recent snapshots
298            if stream_snapshots.len() > self.config.snapshot_config.max_snapshots {
299                stream_snapshots.remove(0);
300            }
301        }
302
303        // Queue for persistence
304        if self.config.enable_persistence {
305            self.persistence_manager
306                .queue_operation(PersistenceOperation::StoreSnapshot(snapshot.clone()))
307                .await?;
308        }
309
310        self.stats.snapshots_created.fetch_add(1, Ordering::Relaxed);
311        info!(
312            "Created snapshot {} for stream {} at version {}",
313            snapshot.snapshot_id, stream_id, stream_version
314        );
315
316        Ok(snapshot)
317    }
318
319    /// Get the latest snapshot for a stream
320    pub async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>> {
321        let snapshots = self.snapshots.read().await;
322        if let Some(stream_snapshots) = snapshots.get(stream_id) {
323            Ok(stream_snapshots.last().cloned())
324        } else {
325            Ok(None)
326        }
327    }
328
329    /// Rebuild stream state from events and snapshots
330    pub async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>> {
331        // Get latest snapshot
332        if let Some(snapshot) = self.get_latest_snapshot(stream_id).await? {
333            // Get events after snapshot
334            let events = self
335                .get_stream_events(stream_id, Some(snapshot.stream_version + 1))
336                .await?;
337
338            // Start with snapshot state
339            let mut state = self.decompress_data(&snapshot.state_data)?;
340
341            // Apply subsequent events
342            for event in events {
343                state = self.apply_event_to_state(state, &event.event_data)?;
344            }
345
346            Ok(state)
347        } else {
348            // No snapshot, rebuild from all events
349            let events = self.get_stream_events(stream_id, None).await?;
350            let aggregated = self.aggregate_events(&events)?;
351            Ok(aggregated)
352        }
353    }
354
355    /// Check if an event matches the query criteria
356    fn matches_query(&self, event: &StoredEvent, query: &EventQuery) -> bool {
357        // Stream ID filter
358        if let Some(ref stream_id) = query.stream_id {
359            if &event.stream_id != stream_id {
360                return false;
361            }
362        }
363
364        // Event type filter
365        if let Some(ref event_types) = query.event_types {
366            let event_type = format!("{:?}", std::mem::discriminant(&event.event_data));
367            if !event_types.contains(&event_type) {
368                return false;
369            }
370        }
371
372        // Time range filter
373        if let Some(ref time_range) = query.time_range {
374            let event_time = event.event_data.metadata().timestamp;
375            if event_time < time_range.start || event_time > time_range.end {
376                return false;
377            }
378        }
379
380        // Sequence range filter
381        if let Some(ref seq_range) = query.sequence_range {
382            if event.sequence_number < seq_range.start || event.sequence_number > seq_range.end {
383                return false;
384            }
385        }
386
387        // Source filter
388        if let Some(ref source) = query.source {
389            if &event.event_data.metadata().source != source {
390                return false;
391            }
392        }
393
394        true
395    }
396
397    /// Sort results based on query order
398    fn sort_results(&self, results: &mut [StoredEvent], order: &QueryOrder) {
399        match order {
400            QueryOrder::SequenceAsc => {
401                results.sort_by_key(|e| e.sequence_number);
402            }
403            QueryOrder::SequenceDesc => {
404                results.sort_by_key(|e| std::cmp::Reverse(e.sequence_number));
405            }
406            QueryOrder::TimestampAsc => {
407                results.sort_by_key(|e| e.event_data.metadata().timestamp);
408            }
409            QueryOrder::TimestampDesc => {
410                results.sort_by_key(|e| std::cmp::Reverse(e.event_data.metadata().timestamp));
411            }
412        }
413    }
414
415    /// Calculate checksum for event
416    fn calculate_checksum(&self, event: &StreamEvent) -> Result<String> {
417        let serialized = serde_json::to_string(event)?;
418        Ok(format!("{:x}", crc32fast::hash(serialized.as_bytes())))
419    }
420
421    /// Calculate checksum for data
422    fn calculate_data_checksum(&self, data: &[u8]) -> Result<String> {
423        Ok(format!("{:x}", crc32fast::hash(data)))
424    }
425
426    /// Estimate size of an event
427    fn estimate_size(&self, event: &StreamEvent) -> usize {
428        serde_json::to_string(event)
429            .map(|s| s.len())
430            .unwrap_or(1024)
431    }
432
433    /// Aggregate events into state data
434    fn aggregate_events(&self, events: &[StoredEvent]) -> Result<Vec<u8>> {
435        // Simplified aggregation - in real implementation, this would be domain-specific
436        let aggregate = format!("Aggregated {} events", events.len());
437        Ok(aggregate.into_bytes())
438    }
439
440    /// Apply an event to existing state
441    fn apply_event_to_state(&self, mut state: Vec<u8>, _event: &StreamEvent) -> Result<Vec<u8>> {
442        // Simplified state application
443        state.extend_from_slice(b" +event");
444        Ok(state)
445    }
446
447    /// Compress data
448    fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
449        if self.config.enable_compression {
450            use flate2::write::GzEncoder;
451            use flate2::Compression;
452            use std::io::Write;
453
454            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
455            encoder.write_all(data)?;
456            Ok(encoder.finish()?)
457        } else {
458            Ok(data.to_vec())
459        }
460    }
461
462    /// Decompress data
463    fn decompress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
464        if self.config.enable_compression {
465            use flate2::read::GzDecoder;
466            use std::io::Read;
467
468            let mut decoder = GzDecoder::new(data);
469            let mut decompressed = Vec::new();
470            decoder.read_to_end(&mut decompressed)?;
471            Ok(decompressed)
472        } else {
473            Ok(data.to_vec())
474        }
475    }
476
477    /// Get event sourcing statistics
478    pub fn get_stats(&self) -> super::EventSourcingStats {
479        super::EventSourcingStats {
480            total_events_stored: AtomicU64::new(
481                self.stats.total_events_stored.load(Ordering::Relaxed),
482            ),
483            total_events_retrieved: AtomicU64::new(
484                self.stats.total_events_retrieved.load(Ordering::Relaxed),
485            ),
486            snapshots_created: AtomicU64::new(self.stats.snapshots_created.load(Ordering::Relaxed)),
487            events_archived: AtomicU64::new(self.stats.events_archived.load(Ordering::Relaxed)),
488            persistence_operations: AtomicU64::new(
489                self.stats.persistence_operations.load(Ordering::Relaxed),
490            ),
491            failed_operations: AtomicU64::new(self.stats.failed_operations.load(Ordering::Relaxed)),
492            memory_usage_bytes: AtomicU64::new(
493                self.stats.memory_usage_bytes.load(Ordering::Relaxed),
494            ),
495            disk_usage_bytes: AtomicU64::new(self.stats.disk_usage_bytes.load(Ordering::Relaxed)),
496            average_store_latency_ms: AtomicU64::new(
497                self.stats.average_store_latency_ms.load(Ordering::Relaxed),
498            ),
499            average_retrieve_latency_ms: AtomicU64::new(
500                self.stats
501                    .average_retrieve_latency_ms
502                    .load(Ordering::Relaxed),
503            ),
504        }
505    }
506}
507
508/// Implement the EventStoreTrait for the concrete EventStore
509#[async_trait::async_trait]
510impl EventStoreTrait for EventStore {
511    async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent> {
512        self.store_event(stream_id, event).await
513    }
514
515    async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>> {
516        self.query_events(query).await
517    }
518
519    async fn get_stream_events(
520        &self,
521        stream_id: &str,
522        from_version: Option<u64>,
523    ) -> Result<Vec<StoredEvent>> {
524        self.get_stream_events(stream_id, from_version).await
525    }
526
527    async fn replay_from_timestamp(&self, timestamp: DateTime<Utc>) -> Result<Vec<StoredEvent>> {
528        self.replay_from_timestamp(timestamp).await
529    }
530
531    async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>> {
532        self.get_latest_snapshot(stream_id).await
533    }
534
535    async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>> {
536        self.rebuild_stream_state(stream_id).await
537    }
538
539    async fn append_events(
540        &self,
541        aggregate_id: &str,
542        events: &[StreamEvent],
543        _expected_version: Option<u64>,
544    ) -> Result<u64> {
545        let mut last_version = 0u64;
546        for event in events {
547            let stored_event = self
548                .store_event(aggregate_id.to_string(), event.clone())
549                .await?;
550            last_version = stored_event.stream_version;
551        }
552        Ok(last_version)
553    }
554}
555
556impl Default for EventIndexes {
557    fn default() -> Self {
558        Self::new()
559    }
560}
561
562impl EventIndexes {
563    /// Create new event indexes
564    pub fn new() -> Self {
565        Self {
566            by_event_type: RwLock::new(HashMap::new()),
567            by_timestamp: RwLock::new(BTreeMap::new()),
568            by_source: RwLock::new(HashMap::new()),
569            by_stream: RwLock::new(HashMap::new()),
570            custom_indexes: RwLock::new(HashMap::new()),
571        }
572    }
573
574    /// Add an event to indexes
575    pub async fn add_event(&self, event: &StoredEvent) -> Result<()> {
576        let sequence = event.sequence_number;
577
578        // Index by event type
579        {
580            let mut by_type = self.by_event_type.write().await;
581            let event_type = format!("{:?}", std::mem::discriminant(&event.event_data));
582            by_type
583                .entry(event_type)
584                .or_insert_with(Vec::new)
585                .push(sequence);
586        }
587
588        // Index by timestamp
589        {
590            let mut by_timestamp = self.by_timestamp.write().await;
591            let timestamp = event.event_data.metadata().timestamp;
592            by_timestamp
593                .entry(timestamp)
594                .or_insert_with(Vec::new)
595                .push(sequence);
596        }
597
598        // Index by source
599        {
600            let mut by_source = self.by_source.write().await;
601            let source = &event.event_data.metadata().source;
602            by_source
603                .entry(source.clone())
604                .or_insert_with(Vec::new)
605                .push(sequence);
606        }
607
608        // Index by stream
609        {
610            let mut by_stream = self.by_stream.write().await;
611            by_stream
612                .entry(event.stream_id.clone())
613                .or_insert_with(Vec::new)
614                .push(sequence);
615        }
616
617        Ok(())
618    }
619
620    /// Find sequences matching query criteria
621    pub async fn find_matching_sequences(&self, query: &EventQuery) -> Result<Vec<u64>> {
622        let mut candidate_sequences = Vec::new();
623
624        // Start with stream filter if specified
625        if let Some(ref stream_id) = query.stream_id {
626            let by_stream = self.by_stream.read().await;
627            if let Some(sequences) = by_stream.get(stream_id) {
628                candidate_sequences = sequences.clone();
629            } else {
630                return Ok(Vec::new()); // Stream not found
631            }
632        } else {
633            // Get all sequences (this could be optimized)
634            let by_stream = self.by_stream.read().await;
635            for sequences in by_stream.values() {
636                candidate_sequences.extend(sequences);
637            }
638        }
639
640        // Apply other filters
641        if let Some(ref event_types) = query.event_types {
642            let by_type = self.by_event_type.read().await;
643            let mut type_sequences: HashSet<u64> = HashSet::new();
644
645            for event_type in event_types {
646                if let Some(sequences) = by_type.get(event_type) {
647                    type_sequences.extend(sequences);
648                }
649            }
650
651            candidate_sequences.retain(|seq| type_sequences.contains(seq));
652        }
653
654        // Apply sequence range filter
655        if let Some(ref seq_range) = query.sequence_range {
656            candidate_sequences.retain(|&seq| seq >= seq_range.start && seq <= seq_range.end);
657        }
658
659        candidate_sequences.sort_unstable();
660        Ok(candidate_sequences)
661    }
662}
663
664impl PersistenceManager {
665    /// Create new persistence manager
666    pub fn new(backend: PersistenceBackend) -> Self {
667        Self {
668            backend,
669            pending_operations: Arc::new(Mutex::new(VecDeque::new())),
670            stats: Arc::new(PersistenceStats::default()),
671        }
672    }
673
674    /// Queue a persistence operation
675    pub async fn queue_operation(&self, operation: PersistenceOperation) -> Result<()> {
676        let mut queue = self.pending_operations.lock().await;
677        queue.push_back(operation);
678        self.stats.operations_queued.fetch_add(1, Ordering::Relaxed);
679        Ok(())
680    }
681
682    /// Process pending persistence operations
683    pub async fn process_pending_operations(&self) -> Result<()> {
684        let operations: Vec<PersistenceOperation> = {
685            let mut queue = self.pending_operations.lock().await;
686            queue.drain(..).collect()
687        };
688
689        for operation in operations {
690            match self.execute_operation(operation).await {
691                Ok(_) => {
692                    self.stats
693                        .operations_completed
694                        .fetch_add(1, Ordering::Relaxed);
695                }
696                Err(e) => {
697                    self.stats.operations_failed.fetch_add(1, Ordering::Relaxed);
698                    error!("Persistence operation failed: {}", e);
699                }
700            }
701        }
702
703        Ok(())
704    }
705
706    /// Execute a single persistence operation
707    async fn execute_operation(&self, operation: PersistenceOperation) -> Result<()> {
708        match &self.backend {
709            PersistenceBackend::Memory => {
710                // No-op for memory backend
711                Ok(())
712            }
713            PersistenceBackend::FileSystem { base_path } => {
714                self.execute_filesystem_operation(operation, base_path)
715                    .await
716            }
717            _ => {
718                // Other backends not implemented in this example
719                warn!("Persistence backend not implemented: {:?}", self.backend);
720                Ok(())
721            }
722        }
723    }
724
725    /// Execute filesystem persistence operation
726    async fn execute_filesystem_operation(
727        &self,
728        operation: PersistenceOperation,
729        _base_path: &str,
730    ) -> Result<()> {
731        match operation {
732            PersistenceOperation::StoreEvent(_event) => {
733                // Simulate file write
734                tokio::time::sleep(Duration::from_millis(1)).await;
735                self.stats.bytes_written.fetch_add(1024, Ordering::Relaxed);
736            }
737            PersistenceOperation::StoreSnapshot(_snapshot) => {
738                // Simulate snapshot write
739                tokio::time::sleep(Duration::from_millis(5)).await;
740                self.stats.bytes_written.fetch_add(10240, Ordering::Relaxed);
741            }
742            _ => {
743                // Other operations
744            }
745        }
746        Ok(())
747    }
748}
749
750// Helper trait for accessing metadata
751pub trait EventMetadataAccessor {
752    fn metadata(&self) -> &EventMetadata;
753}
754
755impl EventMetadataAccessor for StreamEvent {
756    fn metadata(&self) -> &EventMetadata {
757        match self {
758            StreamEvent::TripleAdded { metadata, .. } => metadata,
759            StreamEvent::TripleRemoved { metadata, .. } => metadata,
760            StreamEvent::QuadAdded { metadata, .. } => metadata,
761            StreamEvent::QuadRemoved { metadata, .. } => metadata,
762            StreamEvent::GraphCreated { metadata, .. } => metadata,
763            StreamEvent::GraphCleared { metadata, .. } => metadata,
764            StreamEvent::GraphDeleted { metadata, .. } => metadata,
765            StreamEvent::SparqlUpdate { metadata, .. } => metadata,
766            StreamEvent::TransactionBegin { metadata, .. } => metadata,
767            StreamEvent::TransactionCommit { metadata, .. } => metadata,
768            StreamEvent::TransactionAbort { metadata, .. } => metadata,
769            StreamEvent::SchemaChanged { metadata, .. } => metadata,
770            StreamEvent::Heartbeat { metadata, .. } => metadata,
771            StreamEvent::QueryResultAdded { metadata, .. } => metadata,
772            StreamEvent::QueryResultRemoved { metadata, .. } => metadata,
773            StreamEvent::QueryCompleted { metadata, .. } => metadata,
774            StreamEvent::ErrorOccurred { metadata, .. } => metadata,
775            _ => {
776                // For unmatched event types, return a static reference
777                use once_cell::sync::Lazy;
778                static DEFAULT_METADATA: Lazy<EventMetadata> = Lazy::new(EventMetadata::default);
779                &DEFAULT_METADATA
780            }
781        }
782    }
783}