1use super::{
5 EventQuery, EventSnapshot, EventSourcingStats, EventStoreConfig, EventStoreTrait,
6 PersistenceBackend, PersistenceOperation, PersistenceStats, PersistenceStatus, QueryOrder,
7 SnapshotMetadata, StorageMetadata, StoredEvent, TimeRange,
8};
9use crate::{EventMetadata, StreamEvent};
10use anyhow::Result;
11use chrono::{DateTime, Utc};
12use std::collections::VecDeque;
13use std::collections::{BTreeMap, HashMap, HashSet};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{Mutex, RwLock, Semaphore};
18use tracing::{debug, error, info, warn};
19use uuid::Uuid;
20
21pub struct EventStore {
23 config: EventStoreConfig,
25 memory_events: Arc<RwLock<BTreeMap<u64, StoredEvent>>>,
27 stream_versions: Arc<RwLock<HashMap<String, u64>>>,
29 next_sequence: Arc<AtomicU64>,
31 indexes: Arc<EventIndexes>,
33 snapshots: Arc<RwLock<HashMap<String, Vec<EventSnapshot>>>>,
35 persistence_manager: Arc<PersistenceManager>,
37 stats: Arc<EventSourcingStats>,
39 operation_semaphore: Arc<Semaphore>,
41}
42
43pub struct EventIndexes {
45 by_event_type: RwLock<HashMap<String, Vec<u64>>>,
47 by_timestamp: RwLock<BTreeMap<DateTime<Utc>, Vec<u64>>>,
49 by_source: RwLock<HashMap<String, Vec<u64>>>,
51 by_stream: RwLock<HashMap<String, Vec<u64>>>,
53 custom_indexes: RwLock<HashMap<String, HashMap<String, Vec<u64>>>>,
55}
56
57pub struct PersistenceManager {
59 backend: PersistenceBackend,
61 pending_operations: Arc<Mutex<VecDeque<PersistenceOperation>>>,
63 pub(crate) stats: Arc<PersistenceStats>,
65}
66
67impl EventStore {
68 pub fn new(config: EventStoreConfig) -> Self {
70 let persistence_manager =
71 Arc::new(PersistenceManager::new(config.persistence_backend.clone()));
72
73 Self {
74 config,
75 memory_events: Arc::new(RwLock::new(BTreeMap::new())),
76 stream_versions: Arc::new(RwLock::new(HashMap::new())),
77 next_sequence: Arc::new(AtomicU64::new(1)),
78 indexes: Arc::new(EventIndexes::new()),
79 snapshots: Arc::new(RwLock::new(HashMap::new())),
80 persistence_manager,
81 stats: Arc::new(EventSourcingStats::default()),
82 operation_semaphore: Arc::new(Semaphore::new(1000)), }
84 }
85
86 pub async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent> {
88 let _permit = self.operation_semaphore.acquire().await?;
89 let start_time = Instant::now();
90
91 let sequence_number = self.next_sequence.fetch_add(1, Ordering::SeqCst);
93 let stream_version = {
94 let mut versions = self.stream_versions.write().await;
95 let version = versions.get(&stream_id).unwrap_or(&0) + 1;
96 versions.insert(stream_id.clone(), version);
97 version
98 };
99
100 let checksum = self.calculate_checksum(&event)?;
102 let original_size = self.estimate_size(&event);
103 let stored_event = StoredEvent {
104 event_id: Uuid::new_v4(),
105 sequence_number,
106 stream_id: stream_id.clone(),
107 stream_version,
108 event_data: event,
109 stored_at: Utc::now(),
110 storage_metadata: StorageMetadata {
111 checksum,
112 compressed_size: None,
113 original_size,
114 storage_location: format!("memory:{sequence_number}"),
115 persistence_status: PersistenceStatus::InMemory,
116 },
117 };
118
119 {
121 let mut memory_events = self.memory_events.write().await;
122 memory_events.insert(sequence_number, stored_event.clone());
123
124 if memory_events.len() > self.config.max_memory_events {
126 let to_remove: Vec<u64> = memory_events
127 .keys()
128 .take(memory_events.len() - self.config.max_memory_events)
129 .cloned()
130 .collect();
131
132 for seq in to_remove {
133 memory_events.remove(&seq);
134 }
135 }
136 }
137
138 self.indexes.add_event(&stored_event).await?;
140
141 if self.config.enable_persistence {
143 self.persistence_manager
144 .queue_operation(PersistenceOperation::StoreEvent(Box::new(
145 stored_event.clone(),
146 )))
147 .await?;
148 }
149
150 if self.config.snapshot_config.enable_snapshots
152 && stream_version % self.config.snapshot_config.snapshot_interval as u64 == 0
153 {
154 self.create_snapshot(&stream_id, stream_version).await?;
155 }
156
157 self.stats
159 .total_events_stored
160 .fetch_add(1, Ordering::Relaxed);
161 let store_latency = start_time.elapsed();
162 self.stats
163 .average_store_latency_ms
164 .store(store_latency.as_millis() as u64, Ordering::Relaxed);
165
166 info!(
167 "Stored event {} for stream {} (seq: {}, version: {})",
168 stored_event.event_id, stream_id, sequence_number, stream_version
169 );
170
171 Ok(stored_event)
172 }
173
174 pub async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>> {
176 let _permit = self.operation_semaphore.acquire().await?;
177 let start_time = Instant::now();
178
179 let candidate_sequences = self.indexes.find_matching_sequences(&query).await?;
180 let mut results = Vec::new();
181
182 let memory_events = self.memory_events.read().await;
183 for &sequence in &candidate_sequences {
184 if let Some(stored_event) = memory_events.get(&sequence) {
185 if self.matches_query(stored_event, &query) {
186 results.push(stored_event.clone());
187
188 if let Some(limit) = query.limit {
189 if results.len() >= limit {
190 break;
191 }
192 }
193 }
194 }
195 }
196
197 self.sort_results(&mut results, &query.order);
199
200 self.stats
202 .total_events_retrieved
203 .fetch_add(results.len() as u64, Ordering::Relaxed);
204 let retrieve_latency = start_time.elapsed();
205 self.stats
206 .average_retrieve_latency_ms
207 .store(retrieve_latency.as_millis() as u64, Ordering::Relaxed);
208
209 debug!(
210 "Query returned {} events in {:?}",
211 results.len(),
212 retrieve_latency
213 );
214
215 Ok(results)
216 }
217
218 pub async fn get_stream_events(
220 &self,
221 stream_id: &str,
222 from_version: Option<u64>,
223 ) -> Result<Vec<StoredEvent>> {
224 let query = EventQuery {
225 stream_id: Some(stream_id.to_string()),
226 event_types: None,
227 time_range: None,
228 sequence_range: None,
229 source: None,
230 custom_filters: HashMap::new(),
231 limit: None,
232 order: QueryOrder::SequenceAsc,
233 };
234
235 let mut events = self.query_events(query).await?;
236
237 if let Some(from_version) = from_version {
238 events.retain(|e| e.stream_version >= from_version);
239 }
240
241 Ok(events)
242 }
243
244 pub async fn replay_from_timestamp(
246 &self,
247 timestamp: DateTime<Utc>,
248 ) -> Result<Vec<StoredEvent>> {
249 let query = EventQuery {
250 stream_id: None,
251 event_types: None,
252 time_range: Some(TimeRange {
253 start: timestamp,
254 end: Utc::now(),
255 }),
256 sequence_range: None,
257 source: None,
258 custom_filters: HashMap::new(),
259 limit: None,
260 order: QueryOrder::SequenceAsc,
261 };
262
263 self.query_events(query).await
264 }
265
266 async fn create_snapshot(&self, stream_id: &str, stream_version: u64) -> Result<EventSnapshot> {
268 let events = self.get_stream_events(stream_id, None).await?;
269
270 let state_data = self.aggregate_events(&events)?;
272 let compressed_data = self.compress_data(&state_data)?;
273
274 let snapshot = EventSnapshot {
275 snapshot_id: Uuid::new_v4(),
276 stream_id: stream_id.to_string(),
277 stream_version,
278 sequence_number: events.last().map(|e| e.sequence_number).unwrap_or(0),
279 created_at: Utc::now(),
280 state_data: compressed_data.clone(),
281 metadata: SnapshotMetadata {
282 compression: Some("gzip".to_string()),
283 original_size: state_data.len(),
284 compressed_size: compressed_data.len(),
285 checksum: self.calculate_data_checksum(&compressed_data)?,
286 },
287 };
288
289 {
291 let mut snapshots = self.snapshots.write().await;
292 let stream_snapshots = snapshots
293 .entry(stream_id.to_string())
294 .or_insert_with(Vec::new);
295 stream_snapshots.push(snapshot.clone());
296
297 if stream_snapshots.len() > self.config.snapshot_config.max_snapshots {
299 stream_snapshots.remove(0);
300 }
301 }
302
303 if self.config.enable_persistence {
305 self.persistence_manager
306 .queue_operation(PersistenceOperation::StoreSnapshot(snapshot.clone()))
307 .await?;
308 }
309
310 self.stats.snapshots_created.fetch_add(1, Ordering::Relaxed);
311 info!(
312 "Created snapshot {} for stream {} at version {}",
313 snapshot.snapshot_id, stream_id, stream_version
314 );
315
316 Ok(snapshot)
317 }
318
319 pub async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>> {
321 let snapshots = self.snapshots.read().await;
322 if let Some(stream_snapshots) = snapshots.get(stream_id) {
323 Ok(stream_snapshots.last().cloned())
324 } else {
325 Ok(None)
326 }
327 }
328
329 pub async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>> {
331 if let Some(snapshot) = self.get_latest_snapshot(stream_id).await? {
333 let events = self
335 .get_stream_events(stream_id, Some(snapshot.stream_version + 1))
336 .await?;
337
338 let mut state = self.decompress_data(&snapshot.state_data)?;
340
341 for event in events {
343 state = self.apply_event_to_state(state, &event.event_data)?;
344 }
345
346 Ok(state)
347 } else {
348 let events = self.get_stream_events(stream_id, None).await?;
350 let aggregated = self.aggregate_events(&events)?;
351 Ok(aggregated)
352 }
353 }
354
355 fn matches_query(&self, event: &StoredEvent, query: &EventQuery) -> bool {
357 if let Some(ref stream_id) = query.stream_id {
359 if &event.stream_id != stream_id {
360 return false;
361 }
362 }
363
364 if let Some(ref event_types) = query.event_types {
366 let event_type = format!("{:?}", std::mem::discriminant(&event.event_data));
367 if !event_types.contains(&event_type) {
368 return false;
369 }
370 }
371
372 if let Some(ref time_range) = query.time_range {
374 let event_time = event.event_data.metadata().timestamp;
375 if event_time < time_range.start || event_time > time_range.end {
376 return false;
377 }
378 }
379
380 if let Some(ref seq_range) = query.sequence_range {
382 if event.sequence_number < seq_range.start || event.sequence_number > seq_range.end {
383 return false;
384 }
385 }
386
387 if let Some(ref source) = query.source {
389 if &event.event_data.metadata().source != source {
390 return false;
391 }
392 }
393
394 true
395 }
396
397 fn sort_results(&self, results: &mut [StoredEvent], order: &QueryOrder) {
399 match order {
400 QueryOrder::SequenceAsc => {
401 results.sort_by_key(|e| e.sequence_number);
402 }
403 QueryOrder::SequenceDesc => {
404 results.sort_by_key(|e| std::cmp::Reverse(e.sequence_number));
405 }
406 QueryOrder::TimestampAsc => {
407 results.sort_by_key(|e| e.event_data.metadata().timestamp);
408 }
409 QueryOrder::TimestampDesc => {
410 results.sort_by_key(|e| std::cmp::Reverse(e.event_data.metadata().timestamp));
411 }
412 }
413 }
414
415 fn calculate_checksum(&self, event: &StreamEvent) -> Result<String> {
417 let serialized = serde_json::to_string(event)?;
418 Ok(format!("{:x}", crc32fast::hash(serialized.as_bytes())))
419 }
420
421 fn calculate_data_checksum(&self, data: &[u8]) -> Result<String> {
423 Ok(format!("{:x}", crc32fast::hash(data)))
424 }
425
426 fn estimate_size(&self, event: &StreamEvent) -> usize {
428 serde_json::to_string(event)
429 .map(|s| s.len())
430 .unwrap_or(1024)
431 }
432
433 fn aggregate_events(&self, events: &[StoredEvent]) -> Result<Vec<u8>> {
435 let aggregate = format!("Aggregated {} events", events.len());
437 Ok(aggregate.into_bytes())
438 }
439
440 fn apply_event_to_state(&self, mut state: Vec<u8>, _event: &StreamEvent) -> Result<Vec<u8>> {
442 state.extend_from_slice(b" +event");
444 Ok(state)
445 }
446
447 fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
449 if self.config.enable_compression {
450 oxiarc_deflate::gzip_compress(data, 6)
451 .map_err(|e| anyhow::anyhow!("Gzip compression failed: {e}"))
452 } else {
453 Ok(data.to_vec())
454 }
455 }
456
457 fn decompress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
459 if self.config.enable_compression {
460 oxiarc_deflate::gzip_decompress(data)
461 .map_err(|e| anyhow::anyhow!("Gzip decompression failed: {e}"))
462 } else {
463 Ok(data.to_vec())
464 }
465 }
466
467 pub fn get_stats(&self) -> super::EventSourcingStats {
469 super::EventSourcingStats {
470 total_events_stored: AtomicU64::new(
471 self.stats.total_events_stored.load(Ordering::Relaxed),
472 ),
473 total_events_retrieved: AtomicU64::new(
474 self.stats.total_events_retrieved.load(Ordering::Relaxed),
475 ),
476 snapshots_created: AtomicU64::new(self.stats.snapshots_created.load(Ordering::Relaxed)),
477 events_archived: AtomicU64::new(self.stats.events_archived.load(Ordering::Relaxed)),
478 persistence_operations: AtomicU64::new(
479 self.stats.persistence_operations.load(Ordering::Relaxed),
480 ),
481 failed_operations: AtomicU64::new(self.stats.failed_operations.load(Ordering::Relaxed)),
482 memory_usage_bytes: AtomicU64::new(
483 self.stats.memory_usage_bytes.load(Ordering::Relaxed),
484 ),
485 disk_usage_bytes: AtomicU64::new(self.stats.disk_usage_bytes.load(Ordering::Relaxed)),
486 average_store_latency_ms: AtomicU64::new(
487 self.stats.average_store_latency_ms.load(Ordering::Relaxed),
488 ),
489 average_retrieve_latency_ms: AtomicU64::new(
490 self.stats
491 .average_retrieve_latency_ms
492 .load(Ordering::Relaxed),
493 ),
494 }
495 }
496}
497
498#[async_trait::async_trait]
500impl EventStoreTrait for EventStore {
501 async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent> {
502 self.store_event(stream_id, event).await
503 }
504
505 async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>> {
506 self.query_events(query).await
507 }
508
509 async fn get_stream_events(
510 &self,
511 stream_id: &str,
512 from_version: Option<u64>,
513 ) -> Result<Vec<StoredEvent>> {
514 self.get_stream_events(stream_id, from_version).await
515 }
516
517 async fn replay_from_timestamp(&self, timestamp: DateTime<Utc>) -> Result<Vec<StoredEvent>> {
518 self.replay_from_timestamp(timestamp).await
519 }
520
521 async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>> {
522 self.get_latest_snapshot(stream_id).await
523 }
524
525 async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>> {
526 self.rebuild_stream_state(stream_id).await
527 }
528
529 async fn append_events(
530 &self,
531 aggregate_id: &str,
532 events: &[StreamEvent],
533 _expected_version: Option<u64>,
534 ) -> Result<u64> {
535 let mut last_version = 0u64;
536 for event in events {
537 let stored_event = self
538 .store_event(aggregate_id.to_string(), event.clone())
539 .await?;
540 last_version = stored_event.stream_version;
541 }
542 Ok(last_version)
543 }
544}
545
546impl Default for EventIndexes {
547 fn default() -> Self {
548 Self::new()
549 }
550}
551
552impl EventIndexes {
553 pub fn new() -> Self {
555 Self {
556 by_event_type: RwLock::new(HashMap::new()),
557 by_timestamp: RwLock::new(BTreeMap::new()),
558 by_source: RwLock::new(HashMap::new()),
559 by_stream: RwLock::new(HashMap::new()),
560 custom_indexes: RwLock::new(HashMap::new()),
561 }
562 }
563
564 pub async fn add_event(&self, event: &StoredEvent) -> Result<()> {
566 let sequence = event.sequence_number;
567
568 {
570 let mut by_type = self.by_event_type.write().await;
571 let event_type = format!("{:?}", std::mem::discriminant(&event.event_data));
572 by_type
573 .entry(event_type)
574 .or_insert_with(Vec::new)
575 .push(sequence);
576 }
577
578 {
580 let mut by_timestamp = self.by_timestamp.write().await;
581 let timestamp = event.event_data.metadata().timestamp;
582 by_timestamp
583 .entry(timestamp)
584 .or_insert_with(Vec::new)
585 .push(sequence);
586 }
587
588 {
590 let mut by_source = self.by_source.write().await;
591 let source = &event.event_data.metadata().source;
592 by_source
593 .entry(source.clone())
594 .or_insert_with(Vec::new)
595 .push(sequence);
596 }
597
598 {
600 let mut by_stream = self.by_stream.write().await;
601 by_stream
602 .entry(event.stream_id.clone())
603 .or_insert_with(Vec::new)
604 .push(sequence);
605 }
606
607 Ok(())
608 }
609
610 pub async fn find_matching_sequences(&self, query: &EventQuery) -> Result<Vec<u64>> {
612 let mut candidate_sequences = Vec::new();
613
614 if let Some(ref stream_id) = query.stream_id {
616 let by_stream = self.by_stream.read().await;
617 if let Some(sequences) = by_stream.get(stream_id) {
618 candidate_sequences = sequences.clone();
619 } else {
620 return Ok(Vec::new()); }
622 } else {
623 let by_stream = self.by_stream.read().await;
625 for sequences in by_stream.values() {
626 candidate_sequences.extend(sequences);
627 }
628 }
629
630 if let Some(ref event_types) = query.event_types {
632 let by_type = self.by_event_type.read().await;
633 let mut type_sequences: HashSet<u64> = HashSet::new();
634
635 for event_type in event_types {
636 if let Some(sequences) = by_type.get(event_type) {
637 type_sequences.extend(sequences);
638 }
639 }
640
641 candidate_sequences.retain(|seq| type_sequences.contains(seq));
642 }
643
644 if let Some(ref seq_range) = query.sequence_range {
646 candidate_sequences.retain(|&seq| seq >= seq_range.start && seq <= seq_range.end);
647 }
648
649 candidate_sequences.sort_unstable();
650 Ok(candidate_sequences)
651 }
652}
653
654impl PersistenceManager {
655 pub fn new(backend: PersistenceBackend) -> Self {
657 Self {
658 backend,
659 pending_operations: Arc::new(Mutex::new(VecDeque::new())),
660 stats: Arc::new(PersistenceStats::default()),
661 }
662 }
663
664 pub async fn queue_operation(&self, operation: PersistenceOperation) -> Result<()> {
666 let mut queue = self.pending_operations.lock().await;
667 queue.push_back(operation);
668 self.stats.operations_queued.fetch_add(1, Ordering::Relaxed);
669 Ok(())
670 }
671
672 pub async fn process_pending_operations(&self) -> Result<()> {
674 let operations: Vec<PersistenceOperation> = {
675 let mut queue = self.pending_operations.lock().await;
676 queue.drain(..).collect()
677 };
678
679 for operation in operations {
680 match self.execute_operation(operation).await {
681 Ok(_) => {
682 self.stats
683 .operations_completed
684 .fetch_add(1, Ordering::Relaxed);
685 }
686 Err(e) => {
687 self.stats.operations_failed.fetch_add(1, Ordering::Relaxed);
688 error!("Persistence operation failed: {}", e);
689 }
690 }
691 }
692
693 Ok(())
694 }
695
696 async fn execute_operation(&self, operation: PersistenceOperation) -> Result<()> {
698 match &self.backend {
699 PersistenceBackend::Memory => {
700 Ok(())
702 }
703 PersistenceBackend::FileSystem { base_path } => {
704 self.execute_filesystem_operation(operation, base_path)
705 .await
706 }
707 _ => {
708 warn!("Persistence backend not implemented: {:?}", self.backend);
710 Ok(())
711 }
712 }
713 }
714
715 async fn execute_filesystem_operation(
717 &self,
718 operation: PersistenceOperation,
719 _base_path: &str,
720 ) -> Result<()> {
721 match operation {
722 PersistenceOperation::StoreEvent(_event) => {
723 tokio::time::sleep(Duration::from_millis(1)).await;
725 self.stats.bytes_written.fetch_add(1024, Ordering::Relaxed);
726 }
727 PersistenceOperation::StoreSnapshot(_snapshot) => {
728 tokio::time::sleep(Duration::from_millis(5)).await;
730 self.stats.bytes_written.fetch_add(10240, Ordering::Relaxed);
731 }
732 _ => {
733 }
735 }
736 Ok(())
737 }
738}
739
740pub trait EventMetadataAccessor {
742 fn metadata(&self) -> &EventMetadata;
743}
744
745impl EventMetadataAccessor for StreamEvent {
746 fn metadata(&self) -> &EventMetadata {
747 match self {
748 StreamEvent::TripleAdded { metadata, .. } => metadata,
749 StreamEvent::TripleRemoved { metadata, .. } => metadata,
750 StreamEvent::QuadAdded { metadata, .. } => metadata,
751 StreamEvent::QuadRemoved { metadata, .. } => metadata,
752 StreamEvent::GraphCreated { metadata, .. } => metadata,
753 StreamEvent::GraphCleared { metadata, .. } => metadata,
754 StreamEvent::GraphDeleted { metadata, .. } => metadata,
755 StreamEvent::SparqlUpdate { metadata, .. } => metadata,
756 StreamEvent::TransactionBegin { metadata, .. } => metadata,
757 StreamEvent::TransactionCommit { metadata, .. } => metadata,
758 StreamEvent::TransactionAbort { metadata, .. } => metadata,
759 StreamEvent::SchemaChanged { metadata, .. } => metadata,
760 StreamEvent::Heartbeat { metadata, .. } => metadata,
761 StreamEvent::QueryResultAdded { metadata, .. } => metadata,
762 StreamEvent::QueryResultRemoved { metadata, .. } => metadata,
763 StreamEvent::QueryCompleted { metadata, .. } => metadata,
764 StreamEvent::ErrorOccurred { metadata, .. } => metadata,
765 _ => {
766 use once_cell::sync::Lazy;
768 static DEFAULT_METADATA: Lazy<EventMetadata> = Lazy::new(EventMetadata::default);
769 &DEFAULT_METADATA
770 }
771 }
772 }
773}