Skip to main content

oxirs_stream/event_sourcing/
store.rs

1//! EventStore, EventIndexes, PersistenceManager implementations and the
2//! `EventMetadataAccessor` helper trait.
3
4use super::{
5    EventQuery, EventSnapshot, EventSourcingStats, EventStoreConfig, EventStoreTrait,
6    PersistenceBackend, PersistenceOperation, PersistenceStats, PersistenceStatus, QueryOrder,
7    SnapshotMetadata, StorageMetadata, StoredEvent, TimeRange,
8};
9use crate::{EventMetadata, StreamEvent};
10use anyhow::Result;
11use chrono::{DateTime, Utc};
12use std::collections::VecDeque;
13use std::collections::{BTreeMap, HashMap, HashSet};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{Mutex, RwLock, Semaphore};
18use tracing::{debug, error, info, warn};
19use uuid::Uuid;
20
21/// Main event store implementation
22pub struct EventStore {
23    /// Configuration
24    config: EventStoreConfig,
25    /// In-memory event storage
26    memory_events: Arc<RwLock<BTreeMap<u64, StoredEvent>>>,
27    /// Stream version tracking
28    stream_versions: Arc<RwLock<HashMap<String, u64>>>,
29    /// Next sequence number
30    next_sequence: Arc<AtomicU64>,
31    /// Event indexes
32    indexes: Arc<EventIndexes>,
33    /// Snapshots
34    snapshots: Arc<RwLock<HashMap<String, Vec<EventSnapshot>>>>,
35    /// Persistence manager
36    persistence_manager: Arc<PersistenceManager>,
37    /// Statistics
38    stats: Arc<EventSourcingStats>,
39    /// Operation semaphore
40    operation_semaphore: Arc<Semaphore>,
41}
42
43/// Event indexes for efficient querying
44pub struct EventIndexes {
45    /// Index by event type
46    by_event_type: RwLock<HashMap<String, Vec<u64>>>,
47    /// Index by timestamp
48    by_timestamp: RwLock<BTreeMap<DateTime<Utc>, Vec<u64>>>,
49    /// Index by source
50    by_source: RwLock<HashMap<String, Vec<u64>>>,
51    /// Index by stream ID
52    by_stream: RwLock<HashMap<String, Vec<u64>>>,
53    /// Custom indexes
54    custom_indexes: RwLock<HashMap<String, HashMap<String, Vec<u64>>>>,
55}
56
57/// Persistence manager for durable storage
58pub struct PersistenceManager {
59    /// Backend configuration
60    backend: PersistenceBackend,
61    /// Pending operations queue
62    pending_operations: Arc<Mutex<VecDeque<PersistenceOperation>>>,
63    /// Persistence statistics
64    pub(crate) stats: Arc<PersistenceStats>,
65}
66
67impl EventStore {
68    /// Create a new event store
69    pub fn new(config: EventStoreConfig) -> Self {
70        let persistence_manager =
71            Arc::new(PersistenceManager::new(config.persistence_backend.clone()));
72
73        Self {
74            config,
75            memory_events: Arc::new(RwLock::new(BTreeMap::new())),
76            stream_versions: Arc::new(RwLock::new(HashMap::new())),
77            next_sequence: Arc::new(AtomicU64::new(1)),
78            indexes: Arc::new(EventIndexes::new()),
79            snapshots: Arc::new(RwLock::new(HashMap::new())),
80            persistence_manager,
81            stats: Arc::new(EventSourcingStats::default()),
82            operation_semaphore: Arc::new(Semaphore::new(1000)), // Max 1000 concurrent operations
83        }
84    }
85
86    /// Store an event in the event store
87    pub async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent> {
88        let _permit = self.operation_semaphore.acquire().await?;
89        let start_time = Instant::now();
90
91        // Generate sequence number and stream version
92        let sequence_number = self.next_sequence.fetch_add(1, Ordering::SeqCst);
93        let stream_version = {
94            let mut versions = self.stream_versions.write().await;
95            let version = versions.get(&stream_id).unwrap_or(&0) + 1;
96            versions.insert(stream_id.clone(), version);
97            version
98        };
99
100        // Create stored event
101        let checksum = self.calculate_checksum(&event)?;
102        let original_size = self.estimate_size(&event);
103        let stored_event = StoredEvent {
104            event_id: Uuid::new_v4(),
105            sequence_number,
106            stream_id: stream_id.clone(),
107            stream_version,
108            event_data: event,
109            stored_at: Utc::now(),
110            storage_metadata: StorageMetadata {
111                checksum,
112                compressed_size: None,
113                original_size,
114                storage_location: format!("memory:{sequence_number}"),
115                persistence_status: PersistenceStatus::InMemory,
116            },
117        };
118
119        // Store in memory
120        {
121            let mut memory_events = self.memory_events.write().await;
122            memory_events.insert(sequence_number, stored_event.clone());
123
124            // Evict old events if needed
125            if memory_events.len() > self.config.max_memory_events {
126                let to_remove: Vec<u64> = memory_events
127                    .keys()
128                    .take(memory_events.len() - self.config.max_memory_events)
129                    .cloned()
130                    .collect();
131
132                for seq in to_remove {
133                    memory_events.remove(&seq);
134                }
135            }
136        }
137
138        // Update indexes
139        self.indexes.add_event(&stored_event).await?;
140
141        // Queue for persistence if enabled
142        if self.config.enable_persistence {
143            self.persistence_manager
144                .queue_operation(PersistenceOperation::StoreEvent(Box::new(
145                    stored_event.clone(),
146                )))
147                .await?;
148        }
149
150        // Check if snapshot is needed
151        if self.config.snapshot_config.enable_snapshots
152            && stream_version % self.config.snapshot_config.snapshot_interval as u64 == 0
153        {
154            self.create_snapshot(&stream_id, stream_version).await?;
155        }
156
157        // Update statistics
158        self.stats
159            .total_events_stored
160            .fetch_add(1, Ordering::Relaxed);
161        let store_latency = start_time.elapsed();
162        self.stats
163            .average_store_latency_ms
164            .store(store_latency.as_millis() as u64, Ordering::Relaxed);
165
166        info!(
167            "Stored event {} for stream {} (seq: {}, version: {})",
168            stored_event.event_id, stream_id, sequence_number, stream_version
169        );
170
171        Ok(stored_event)
172    }
173
174    /// Retrieve events by query
175    pub async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>> {
176        let _permit = self.operation_semaphore.acquire().await?;
177        let start_time = Instant::now();
178
179        let candidate_sequences = self.indexes.find_matching_sequences(&query).await?;
180        let mut results = Vec::new();
181
182        let memory_events = self.memory_events.read().await;
183        for &sequence in &candidate_sequences {
184            if let Some(stored_event) = memory_events.get(&sequence) {
185                if self.matches_query(stored_event, &query) {
186                    results.push(stored_event.clone());
187
188                    if let Some(limit) = query.limit {
189                        if results.len() >= limit {
190                            break;
191                        }
192                    }
193                }
194            }
195        }
196
197        // Sort results based on query order
198        self.sort_results(&mut results, &query.order);
199
200        // Update statistics
201        self.stats
202            .total_events_retrieved
203            .fetch_add(results.len() as u64, Ordering::Relaxed);
204        let retrieve_latency = start_time.elapsed();
205        self.stats
206            .average_retrieve_latency_ms
207            .store(retrieve_latency.as_millis() as u64, Ordering::Relaxed);
208
209        debug!(
210            "Query returned {} events in {:?}",
211            results.len(),
212            retrieve_latency
213        );
214
215        Ok(results)
216    }
217
218    /// Get events for a specific stream
219    pub async fn get_stream_events(
220        &self,
221        stream_id: &str,
222        from_version: Option<u64>,
223    ) -> Result<Vec<StoredEvent>> {
224        let query = EventQuery {
225            stream_id: Some(stream_id.to_string()),
226            event_types: None,
227            time_range: None,
228            sequence_range: None,
229            source: None,
230            custom_filters: HashMap::new(),
231            limit: None,
232            order: QueryOrder::SequenceAsc,
233        };
234
235        let mut events = self.query_events(query).await?;
236
237        if let Some(from_version) = from_version {
238            events.retain(|e| e.stream_version >= from_version);
239        }
240
241        Ok(events)
242    }
243
244    /// Replay events from a specific point in time
245    pub async fn replay_from_timestamp(
246        &self,
247        timestamp: DateTime<Utc>,
248    ) -> Result<Vec<StoredEvent>> {
249        let query = EventQuery {
250            stream_id: None,
251            event_types: None,
252            time_range: Some(TimeRange {
253                start: timestamp,
254                end: Utc::now(),
255            }),
256            sequence_range: None,
257            source: None,
258            custom_filters: HashMap::new(),
259            limit: None,
260            order: QueryOrder::SequenceAsc,
261        };
262
263        self.query_events(query).await
264    }
265
266    /// Create a snapshot for a stream
267    async fn create_snapshot(&self, stream_id: &str, stream_version: u64) -> Result<EventSnapshot> {
268        let events = self.get_stream_events(stream_id, None).await?;
269
270        // Aggregate state from events (simplified)
271        let state_data = self.aggregate_events(&events)?;
272        let compressed_data = self.compress_data(&state_data)?;
273
274        let snapshot = EventSnapshot {
275            snapshot_id: Uuid::new_v4(),
276            stream_id: stream_id.to_string(),
277            stream_version,
278            sequence_number: events.last().map(|e| e.sequence_number).unwrap_or(0),
279            created_at: Utc::now(),
280            state_data: compressed_data.clone(),
281            metadata: SnapshotMetadata {
282                compression: Some("gzip".to_string()),
283                original_size: state_data.len(),
284                compressed_size: compressed_data.len(),
285                checksum: self.calculate_data_checksum(&compressed_data)?,
286            },
287        };
288
289        // Store snapshot
290        {
291            let mut snapshots = self.snapshots.write().await;
292            let stream_snapshots = snapshots
293                .entry(stream_id.to_string())
294                .or_insert_with(Vec::new);
295            stream_snapshots.push(snapshot.clone());
296
297            // Keep only recent snapshots
298            if stream_snapshots.len() > self.config.snapshot_config.max_snapshots {
299                stream_snapshots.remove(0);
300            }
301        }
302
303        // Queue for persistence
304        if self.config.enable_persistence {
305            self.persistence_manager
306                .queue_operation(PersistenceOperation::StoreSnapshot(snapshot.clone()))
307                .await?;
308        }
309
310        self.stats.snapshots_created.fetch_add(1, Ordering::Relaxed);
311        info!(
312            "Created snapshot {} for stream {} at version {}",
313            snapshot.snapshot_id, stream_id, stream_version
314        );
315
316        Ok(snapshot)
317    }
318
319    /// Get the latest snapshot for a stream
320    pub async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>> {
321        let snapshots = self.snapshots.read().await;
322        if let Some(stream_snapshots) = snapshots.get(stream_id) {
323            Ok(stream_snapshots.last().cloned())
324        } else {
325            Ok(None)
326        }
327    }
328
329    /// Rebuild stream state from events and snapshots
330    pub async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>> {
331        // Get latest snapshot
332        if let Some(snapshot) = self.get_latest_snapshot(stream_id).await? {
333            // Get events after snapshot
334            let events = self
335                .get_stream_events(stream_id, Some(snapshot.stream_version + 1))
336                .await?;
337
338            // Start with snapshot state
339            let mut state = self.decompress_data(&snapshot.state_data)?;
340
341            // Apply subsequent events
342            for event in events {
343                state = self.apply_event_to_state(state, &event.event_data)?;
344            }
345
346            Ok(state)
347        } else {
348            // No snapshot, rebuild from all events
349            let events = self.get_stream_events(stream_id, None).await?;
350            let aggregated = self.aggregate_events(&events)?;
351            Ok(aggregated)
352        }
353    }
354
355    /// Check if an event matches the query criteria
356    fn matches_query(&self, event: &StoredEvent, query: &EventQuery) -> bool {
357        // Stream ID filter
358        if let Some(ref stream_id) = query.stream_id {
359            if &event.stream_id != stream_id {
360                return false;
361            }
362        }
363
364        // Event type filter
365        if let Some(ref event_types) = query.event_types {
366            let event_type = format!("{:?}", std::mem::discriminant(&event.event_data));
367            if !event_types.contains(&event_type) {
368                return false;
369            }
370        }
371
372        // Time range filter
373        if let Some(ref time_range) = query.time_range {
374            let event_time = event.event_data.metadata().timestamp;
375            if event_time < time_range.start || event_time > time_range.end {
376                return false;
377            }
378        }
379
380        // Sequence range filter
381        if let Some(ref seq_range) = query.sequence_range {
382            if event.sequence_number < seq_range.start || event.sequence_number > seq_range.end {
383                return false;
384            }
385        }
386
387        // Source filter
388        if let Some(ref source) = query.source {
389            if &event.event_data.metadata().source != source {
390                return false;
391            }
392        }
393
394        true
395    }
396
397    /// Sort results based on query order
398    fn sort_results(&self, results: &mut [StoredEvent], order: &QueryOrder) {
399        match order {
400            QueryOrder::SequenceAsc => {
401                results.sort_by_key(|e| e.sequence_number);
402            }
403            QueryOrder::SequenceDesc => {
404                results.sort_by_key(|e| std::cmp::Reverse(e.sequence_number));
405            }
406            QueryOrder::TimestampAsc => {
407                results.sort_by_key(|e| e.event_data.metadata().timestamp);
408            }
409            QueryOrder::TimestampDesc => {
410                results.sort_by_key(|e| std::cmp::Reverse(e.event_data.metadata().timestamp));
411            }
412        }
413    }
414
415    /// Calculate checksum for event
416    fn calculate_checksum(&self, event: &StreamEvent) -> Result<String> {
417        let serialized = serde_json::to_string(event)?;
418        Ok(format!("{:x}", crc32fast::hash(serialized.as_bytes())))
419    }
420
421    /// Calculate checksum for data
422    fn calculate_data_checksum(&self, data: &[u8]) -> Result<String> {
423        Ok(format!("{:x}", crc32fast::hash(data)))
424    }
425
426    /// Estimate size of an event
427    fn estimate_size(&self, event: &StreamEvent) -> usize {
428        serde_json::to_string(event)
429            .map(|s| s.len())
430            .unwrap_or(1024)
431    }
432
433    /// Aggregate events into state data
434    fn aggregate_events(&self, events: &[StoredEvent]) -> Result<Vec<u8>> {
435        // Simplified aggregation - in real implementation, this would be domain-specific
436        let aggregate = format!("Aggregated {} events", events.len());
437        Ok(aggregate.into_bytes())
438    }
439
440    /// Apply an event to existing state
441    fn apply_event_to_state(&self, mut state: Vec<u8>, _event: &StreamEvent) -> Result<Vec<u8>> {
442        // Simplified state application
443        state.extend_from_slice(b" +event");
444        Ok(state)
445    }
446
447    /// Compress data
448    fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
449        if self.config.enable_compression {
450            oxiarc_deflate::gzip_compress(data, 6)
451                .map_err(|e| anyhow::anyhow!("Gzip compression failed: {e}"))
452        } else {
453            Ok(data.to_vec())
454        }
455    }
456
457    /// Decompress data
458    fn decompress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
459        if self.config.enable_compression {
460            oxiarc_deflate::gzip_decompress(data)
461                .map_err(|e| anyhow::anyhow!("Gzip decompression failed: {e}"))
462        } else {
463            Ok(data.to_vec())
464        }
465    }
466
467    /// Get event sourcing statistics
468    pub fn get_stats(&self) -> super::EventSourcingStats {
469        super::EventSourcingStats {
470            total_events_stored: AtomicU64::new(
471                self.stats.total_events_stored.load(Ordering::Relaxed),
472            ),
473            total_events_retrieved: AtomicU64::new(
474                self.stats.total_events_retrieved.load(Ordering::Relaxed),
475            ),
476            snapshots_created: AtomicU64::new(self.stats.snapshots_created.load(Ordering::Relaxed)),
477            events_archived: AtomicU64::new(self.stats.events_archived.load(Ordering::Relaxed)),
478            persistence_operations: AtomicU64::new(
479                self.stats.persistence_operations.load(Ordering::Relaxed),
480            ),
481            failed_operations: AtomicU64::new(self.stats.failed_operations.load(Ordering::Relaxed)),
482            memory_usage_bytes: AtomicU64::new(
483                self.stats.memory_usage_bytes.load(Ordering::Relaxed),
484            ),
485            disk_usage_bytes: AtomicU64::new(self.stats.disk_usage_bytes.load(Ordering::Relaxed)),
486            average_store_latency_ms: AtomicU64::new(
487                self.stats.average_store_latency_ms.load(Ordering::Relaxed),
488            ),
489            average_retrieve_latency_ms: AtomicU64::new(
490                self.stats
491                    .average_retrieve_latency_ms
492                    .load(Ordering::Relaxed),
493            ),
494        }
495    }
496}
497
498/// Implement the EventStoreTrait for the concrete EventStore
499#[async_trait::async_trait]
500impl EventStoreTrait for EventStore {
501    async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent> {
502        self.store_event(stream_id, event).await
503    }
504
505    async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>> {
506        self.query_events(query).await
507    }
508
509    async fn get_stream_events(
510        &self,
511        stream_id: &str,
512        from_version: Option<u64>,
513    ) -> Result<Vec<StoredEvent>> {
514        self.get_stream_events(stream_id, from_version).await
515    }
516
517    async fn replay_from_timestamp(&self, timestamp: DateTime<Utc>) -> Result<Vec<StoredEvent>> {
518        self.replay_from_timestamp(timestamp).await
519    }
520
521    async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>> {
522        self.get_latest_snapshot(stream_id).await
523    }
524
525    async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>> {
526        self.rebuild_stream_state(stream_id).await
527    }
528
529    async fn append_events(
530        &self,
531        aggregate_id: &str,
532        events: &[StreamEvent],
533        _expected_version: Option<u64>,
534    ) -> Result<u64> {
535        let mut last_version = 0u64;
536        for event in events {
537            let stored_event = self
538                .store_event(aggregate_id.to_string(), event.clone())
539                .await?;
540            last_version = stored_event.stream_version;
541        }
542        Ok(last_version)
543    }
544}
545
546impl Default for EventIndexes {
547    fn default() -> Self {
548        Self::new()
549    }
550}
551
552impl EventIndexes {
553    /// Create new event indexes
554    pub fn new() -> Self {
555        Self {
556            by_event_type: RwLock::new(HashMap::new()),
557            by_timestamp: RwLock::new(BTreeMap::new()),
558            by_source: RwLock::new(HashMap::new()),
559            by_stream: RwLock::new(HashMap::new()),
560            custom_indexes: RwLock::new(HashMap::new()),
561        }
562    }
563
564    /// Add an event to indexes
565    pub async fn add_event(&self, event: &StoredEvent) -> Result<()> {
566        let sequence = event.sequence_number;
567
568        // Index by event type
569        {
570            let mut by_type = self.by_event_type.write().await;
571            let event_type = format!("{:?}", std::mem::discriminant(&event.event_data));
572            by_type
573                .entry(event_type)
574                .or_insert_with(Vec::new)
575                .push(sequence);
576        }
577
578        // Index by timestamp
579        {
580            let mut by_timestamp = self.by_timestamp.write().await;
581            let timestamp = event.event_data.metadata().timestamp;
582            by_timestamp
583                .entry(timestamp)
584                .or_insert_with(Vec::new)
585                .push(sequence);
586        }
587
588        // Index by source
589        {
590            let mut by_source = self.by_source.write().await;
591            let source = &event.event_data.metadata().source;
592            by_source
593                .entry(source.clone())
594                .or_insert_with(Vec::new)
595                .push(sequence);
596        }
597
598        // Index by stream
599        {
600            let mut by_stream = self.by_stream.write().await;
601            by_stream
602                .entry(event.stream_id.clone())
603                .or_insert_with(Vec::new)
604                .push(sequence);
605        }
606
607        Ok(())
608    }
609
610    /// Find sequences matching query criteria
611    pub async fn find_matching_sequences(&self, query: &EventQuery) -> Result<Vec<u64>> {
612        let mut candidate_sequences = Vec::new();
613
614        // Start with stream filter if specified
615        if let Some(ref stream_id) = query.stream_id {
616            let by_stream = self.by_stream.read().await;
617            if let Some(sequences) = by_stream.get(stream_id) {
618                candidate_sequences = sequences.clone();
619            } else {
620                return Ok(Vec::new()); // Stream not found
621            }
622        } else {
623            // Get all sequences (this could be optimized)
624            let by_stream = self.by_stream.read().await;
625            for sequences in by_stream.values() {
626                candidate_sequences.extend(sequences);
627            }
628        }
629
630        // Apply other filters
631        if let Some(ref event_types) = query.event_types {
632            let by_type = self.by_event_type.read().await;
633            let mut type_sequences: HashSet<u64> = HashSet::new();
634
635            for event_type in event_types {
636                if let Some(sequences) = by_type.get(event_type) {
637                    type_sequences.extend(sequences);
638                }
639            }
640
641            candidate_sequences.retain(|seq| type_sequences.contains(seq));
642        }
643
644        // Apply sequence range filter
645        if let Some(ref seq_range) = query.sequence_range {
646            candidate_sequences.retain(|&seq| seq >= seq_range.start && seq <= seq_range.end);
647        }
648
649        candidate_sequences.sort_unstable();
650        Ok(candidate_sequences)
651    }
652}
653
654impl PersistenceManager {
655    /// Create new persistence manager
656    pub fn new(backend: PersistenceBackend) -> Self {
657        Self {
658            backend,
659            pending_operations: Arc::new(Mutex::new(VecDeque::new())),
660            stats: Arc::new(PersistenceStats::default()),
661        }
662    }
663
664    /// Queue a persistence operation
665    pub async fn queue_operation(&self, operation: PersistenceOperation) -> Result<()> {
666        let mut queue = self.pending_operations.lock().await;
667        queue.push_back(operation);
668        self.stats.operations_queued.fetch_add(1, Ordering::Relaxed);
669        Ok(())
670    }
671
672    /// Process pending persistence operations
673    pub async fn process_pending_operations(&self) -> Result<()> {
674        let operations: Vec<PersistenceOperation> = {
675            let mut queue = self.pending_operations.lock().await;
676            queue.drain(..).collect()
677        };
678
679        for operation in operations {
680            match self.execute_operation(operation).await {
681                Ok(_) => {
682                    self.stats
683                        .operations_completed
684                        .fetch_add(1, Ordering::Relaxed);
685                }
686                Err(e) => {
687                    self.stats.operations_failed.fetch_add(1, Ordering::Relaxed);
688                    error!("Persistence operation failed: {}", e);
689                }
690            }
691        }
692
693        Ok(())
694    }
695
696    /// Execute a single persistence operation
697    async fn execute_operation(&self, operation: PersistenceOperation) -> Result<()> {
698        match &self.backend {
699            PersistenceBackend::Memory => {
700                // No-op for memory backend
701                Ok(())
702            }
703            PersistenceBackend::FileSystem { base_path } => {
704                self.execute_filesystem_operation(operation, base_path)
705                    .await
706            }
707            _ => {
708                // Other backends not implemented in this example
709                warn!("Persistence backend not implemented: {:?}", self.backend);
710                Ok(())
711            }
712        }
713    }
714
715    /// Execute filesystem persistence operation
716    async fn execute_filesystem_operation(
717        &self,
718        operation: PersistenceOperation,
719        _base_path: &str,
720    ) -> Result<()> {
721        match operation {
722            PersistenceOperation::StoreEvent(_event) => {
723                // Simulate file write
724                tokio::time::sleep(Duration::from_millis(1)).await;
725                self.stats.bytes_written.fetch_add(1024, Ordering::Relaxed);
726            }
727            PersistenceOperation::StoreSnapshot(_snapshot) => {
728                // Simulate snapshot write
729                tokio::time::sleep(Duration::from_millis(5)).await;
730                self.stats.bytes_written.fetch_add(10240, Ordering::Relaxed);
731            }
732            _ => {
733                // Other operations
734            }
735        }
736        Ok(())
737    }
738}
739
740// Helper trait for accessing metadata
741pub trait EventMetadataAccessor {
742    fn metadata(&self) -> &EventMetadata;
743}
744
745impl EventMetadataAccessor for StreamEvent {
746    fn metadata(&self) -> &EventMetadata {
747        match self {
748            StreamEvent::TripleAdded { metadata, .. } => metadata,
749            StreamEvent::TripleRemoved { metadata, .. } => metadata,
750            StreamEvent::QuadAdded { metadata, .. } => metadata,
751            StreamEvent::QuadRemoved { metadata, .. } => metadata,
752            StreamEvent::GraphCreated { metadata, .. } => metadata,
753            StreamEvent::GraphCleared { metadata, .. } => metadata,
754            StreamEvent::GraphDeleted { metadata, .. } => metadata,
755            StreamEvent::SparqlUpdate { metadata, .. } => metadata,
756            StreamEvent::TransactionBegin { metadata, .. } => metadata,
757            StreamEvent::TransactionCommit { metadata, .. } => metadata,
758            StreamEvent::TransactionAbort { metadata, .. } => metadata,
759            StreamEvent::SchemaChanged { metadata, .. } => metadata,
760            StreamEvent::Heartbeat { metadata, .. } => metadata,
761            StreamEvent::QueryResultAdded { metadata, .. } => metadata,
762            StreamEvent::QueryResultRemoved { metadata, .. } => metadata,
763            StreamEvent::QueryCompleted { metadata, .. } => metadata,
764            StreamEvent::ErrorOccurred { metadata, .. } => metadata,
765            _ => {
766                // For unmatched event types, return a static reference
767                use once_cell::sync::Lazy;
768                static DEFAULT_METADATA: Lazy<EventMetadata> = Lazy::new(EventMetadata::default);
769                &DEFAULT_METADATA
770            }
771        }
772    }
773}