1use crate::{EventMetadata, StreamEvent};
8use anyhow::Result;
9use chrono::{DateTime, Duration as ChronoDuration, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{Mutex, RwLock, Semaphore};
16use tracing::{debug, error, info, warn};
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct EventStoreConfig {
22 pub max_memory_events: usize,
24 pub enable_persistence: bool,
26 pub persistence_backend: PersistenceBackend,
28 pub snapshot_config: SnapshotConfig,
30 pub retention_policy: RetentionPolicy,
32 pub indexing_config: IndexingConfig,
34 pub enable_compression: bool,
36 pub persistence_batch_size: usize,
38}
39
40impl Default for EventStoreConfig {
41 fn default() -> Self {
42 Self {
43 max_memory_events: 1_000_000,
44 enable_persistence: true,
45 persistence_backend: PersistenceBackend::FileSystem {
46 base_path: "/tmp/oxirs-event-store".to_string(),
47 },
48 snapshot_config: SnapshotConfig::default(),
49 retention_policy: RetentionPolicy::default(),
50 indexing_config: IndexingConfig::default(),
51 enable_compression: true,
52 persistence_batch_size: 1000,
53 }
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub enum PersistenceBackend {
60 FileSystem { base_path: String },
62 Database { connection_string: String },
64 ObjectStorage {
66 endpoint: String,
67 bucket: String,
68 access_key: String,
69 secret_key: String,
70 },
71 Memory,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct SnapshotConfig {
78 pub enable_snapshots: bool,
80 pub snapshot_interval: usize,
82 pub max_snapshots: usize,
84 pub compress_snapshots: bool,
86}
87
88impl Default for SnapshotConfig {
89 fn default() -> Self {
90 Self {
91 enable_snapshots: true,
92 snapshot_interval: 10000,
93 max_snapshots: 10,
94 compress_snapshots: true,
95 }
96 }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct RetentionPolicy {
102 pub max_age: Option<ChronoDuration>,
104 pub max_events: Option<u64>,
106 pub enable_archiving: bool,
108 pub archive_backend: Option<PersistenceBackend>,
110}
111
112impl Default for RetentionPolicy {
113 fn default() -> Self {
114 Self {
115 max_age: Some(ChronoDuration::days(365)), max_events: Some(10_000_000), enable_archiving: true,
118 archive_backend: None,
119 }
120 }
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct IndexingConfig {
126 pub index_by_event_type: bool,
128 pub index_by_timestamp: bool,
130 pub index_by_source: bool,
132 pub custom_indexes: Vec<CustomIndex>,
134}
135
136impl Default for IndexingConfig {
137 fn default() -> Self {
138 Self {
139 index_by_event_type: true,
140 index_by_timestamp: true,
141 index_by_source: true,
142 custom_indexes: Vec::new(),
143 }
144 }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct CustomIndex {
150 pub name: String,
152 pub field_path: String,
154 pub index_type: IndexType,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160pub enum IndexType {
161 Hash,
163 BTree,
165 FullText,
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct StoredEvent {
172 pub event_id: Uuid,
174 pub sequence_number: u64,
176 pub stream_id: String,
178 pub stream_version: u64,
180 pub event_data: StreamEvent,
182 pub stored_at: DateTime<Utc>,
184 pub storage_metadata: StorageMetadata,
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct StorageMetadata {
191 pub checksum: String,
193 pub compressed_size: Option<usize>,
195 pub original_size: usize,
197 pub storage_location: String,
199 pub persistence_status: PersistenceStatus,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize)]
205pub enum PersistenceStatus {
206 InMemory,
208 Persisted,
210 Archived,
212 Failed { error: String },
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct EventSnapshot {
219 pub snapshot_id: Uuid,
221 pub stream_id: String,
223 pub stream_version: u64,
225 pub sequence_number: u64,
227 pub created_at: DateTime<Utc>,
229 pub state_data: Vec<u8>,
231 pub metadata: SnapshotMetadata,
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct SnapshotMetadata {
238 pub compression: Option<String>,
240 pub original_size: usize,
242 pub compressed_size: usize,
244 pub checksum: String,
246}
247
248#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct EventQuery {
251 pub stream_id: Option<String>,
253 pub event_types: Option<Vec<String>>,
255 pub time_range: Option<TimeRange>,
257 pub sequence_range: Option<SequenceRange>,
259 pub source: Option<String>,
261 pub custom_filters: HashMap<String, String>,
263 pub limit: Option<usize>,
265 pub order: QueryOrder,
267}
268
269#[derive(Debug, Clone, Serialize, Deserialize)]
271pub struct TimeRange {
272 pub start: DateTime<Utc>,
273 pub end: DateTime<Utc>,
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct SequenceRange {
279 pub start: u64,
280 pub end: u64,
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
285pub enum QueryOrder {
286 SequenceAsc,
288 SequenceDesc,
290 TimestampAsc,
292 TimestampDesc,
294}
295
296#[derive(Debug, Default)]
298pub struct EventSourcingStats {
299 pub total_events_stored: AtomicU64,
300 pub total_events_retrieved: AtomicU64,
301 pub snapshots_created: AtomicU64,
302 pub events_archived: AtomicU64,
303 pub persistence_operations: AtomicU64,
304 pub failed_operations: AtomicU64,
305 pub memory_usage_bytes: AtomicU64,
306 pub disk_usage_bytes: AtomicU64,
307 pub average_store_latency_ms: AtomicU64,
308 pub average_retrieve_latency_ms: AtomicU64,
309}
310
311#[async_trait::async_trait]
313pub trait EventStoreTrait: Send + Sync {
314 async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent>;
315 async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>>;
316 async fn get_stream_events(
317 &self,
318 stream_id: &str,
319 from_version: Option<u64>,
320 ) -> Result<Vec<StoredEvent>>;
321 async fn replay_from_timestamp(&self, timestamp: DateTime<Utc>) -> Result<Vec<StoredEvent>>;
322 async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>>;
323 async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>>;
324 async fn append_events(
325 &self,
326 aggregate_id: &str,
327 events: &[StreamEvent],
328 expected_version: Option<u64>,
329 ) -> Result<u64>;
330}
331
332#[async_trait::async_trait]
334pub trait EventStream: Send + Sync {
335 async fn next_event(&mut self) -> Option<StoredEvent>;
336 async fn has_events(&self) -> bool;
337 async fn read_events_from_position(
338 &self,
339 position: u64,
340 max_events: usize,
341 ) -> Result<Vec<StoredEvent>>;
342}
343
344#[async_trait::async_trait]
346pub trait SnapshotStore: Send + Sync {
347 async fn store_snapshot(&self, snapshot: EventSnapshot) -> Result<()>;
348 async fn get_snapshot(
349 &self,
350 stream_id: &str,
351 version: Option<u64>,
352 ) -> Result<Option<EventSnapshot>>;
353 async fn list_snapshots(&self, stream_id: &str) -> Result<Vec<EventSnapshot>>;
354}
355
356pub struct EventStore {
358 config: EventStoreConfig,
360 memory_events: Arc<RwLock<BTreeMap<u64, StoredEvent>>>,
362 stream_versions: Arc<RwLock<HashMap<String, u64>>>,
364 next_sequence: Arc<AtomicU64>,
366 indexes: Arc<EventIndexes>,
368 snapshots: Arc<RwLock<HashMap<String, Vec<EventSnapshot>>>>,
370 persistence_manager: Arc<PersistenceManager>,
372 stats: Arc<EventSourcingStats>,
374 operation_semaphore: Arc<Semaphore>,
376}
377
378pub struct EventIndexes {
380 by_event_type: RwLock<HashMap<String, Vec<u64>>>,
382 by_timestamp: RwLock<BTreeMap<DateTime<Utc>, Vec<u64>>>,
384 by_source: RwLock<HashMap<String, Vec<u64>>>,
386 by_stream: RwLock<HashMap<String, Vec<u64>>>,
388 custom_indexes: RwLock<HashMap<String, HashMap<String, Vec<u64>>>>,
390}
391
392pub struct PersistenceManager {
394 backend: PersistenceBackend,
396 pending_operations: Arc<Mutex<VecDeque<PersistenceOperation>>>,
398 stats: Arc<PersistenceStats>,
400}
401
402#[derive(Debug, Clone)]
404pub enum PersistenceOperation {
405 StoreEvent(Box<StoredEvent>),
407 StoreSnapshot(EventSnapshot),
409 ArchiveEvents(Vec<StoredEvent>),
411 DeleteEvents(Vec<u64>),
413}
414
415#[derive(Debug, Default)]
417pub struct PersistenceStats {
418 pub operations_queued: AtomicU64,
419 pub operations_completed: AtomicU64,
420 pub operations_failed: AtomicU64,
421 pub bytes_written: AtomicU64,
422 pub bytes_read: AtomicU64,
423}
424
425impl EventStore {
426 pub fn new(config: EventStoreConfig) -> Self {
428 let persistence_manager =
429 Arc::new(PersistenceManager::new(config.persistence_backend.clone()));
430
431 Self {
432 config,
433 memory_events: Arc::new(RwLock::new(BTreeMap::new())),
434 stream_versions: Arc::new(RwLock::new(HashMap::new())),
435 next_sequence: Arc::new(AtomicU64::new(1)),
436 indexes: Arc::new(EventIndexes::new()),
437 snapshots: Arc::new(RwLock::new(HashMap::new())),
438 persistence_manager,
439 stats: Arc::new(EventSourcingStats::default()),
440 operation_semaphore: Arc::new(Semaphore::new(1000)), }
442 }
443
444 pub async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent> {
446 let _permit = self.operation_semaphore.acquire().await?;
447 let start_time = Instant::now();
448
449 let sequence_number = self.next_sequence.fetch_add(1, Ordering::SeqCst);
451 let stream_version = {
452 let mut versions = self.stream_versions.write().await;
453 let version = versions.get(&stream_id).unwrap_or(&0) + 1;
454 versions.insert(stream_id.clone(), version);
455 version
456 };
457
458 let checksum = self.calculate_checksum(&event)?;
460 let original_size = self.estimate_size(&event);
461 let stored_event = StoredEvent {
462 event_id: Uuid::new_v4(),
463 sequence_number,
464 stream_id: stream_id.clone(),
465 stream_version,
466 event_data: event,
467 stored_at: Utc::now(),
468 storage_metadata: StorageMetadata {
469 checksum,
470 compressed_size: None,
471 original_size,
472 storage_location: format!("memory:{sequence_number}"),
473 persistence_status: PersistenceStatus::InMemory,
474 },
475 };
476
477 {
479 let mut memory_events = self.memory_events.write().await;
480 memory_events.insert(sequence_number, stored_event.clone());
481
482 if memory_events.len() > self.config.max_memory_events {
484 let to_remove: Vec<u64> = memory_events
485 .keys()
486 .take(memory_events.len() - self.config.max_memory_events)
487 .cloned()
488 .collect();
489
490 for seq in to_remove {
491 memory_events.remove(&seq);
492 }
493 }
494 }
495
496 self.indexes.add_event(&stored_event).await?;
498
499 if self.config.enable_persistence {
501 self.persistence_manager
502 .queue_operation(PersistenceOperation::StoreEvent(Box::new(
503 stored_event.clone(),
504 )))
505 .await?;
506 }
507
508 if self.config.snapshot_config.enable_snapshots
510 && stream_version % self.config.snapshot_config.snapshot_interval as u64 == 0
511 {
512 self.create_snapshot(&stream_id, stream_version).await?;
513 }
514
515 self.stats
517 .total_events_stored
518 .fetch_add(1, Ordering::Relaxed);
519 let store_latency = start_time.elapsed();
520 self.stats
521 .average_store_latency_ms
522 .store(store_latency.as_millis() as u64, Ordering::Relaxed);
523
524 info!(
525 "Stored event {} for stream {} (seq: {}, version: {})",
526 stored_event.event_id, stream_id, sequence_number, stream_version
527 );
528
529 Ok(stored_event)
530 }
531
532 pub async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>> {
534 let _permit = self.operation_semaphore.acquire().await?;
535 let start_time = Instant::now();
536
537 let candidate_sequences = self.indexes.find_matching_sequences(&query).await?;
538 let mut results = Vec::new();
539
540 let memory_events = self.memory_events.read().await;
541 for &sequence in &candidate_sequences {
542 if let Some(stored_event) = memory_events.get(&sequence) {
543 if self.matches_query(stored_event, &query) {
544 results.push(stored_event.clone());
545
546 if let Some(limit) = query.limit {
547 if results.len() >= limit {
548 break;
549 }
550 }
551 }
552 }
553 }
554
555 self.sort_results(&mut results, &query.order);
557
558 self.stats
560 .total_events_retrieved
561 .fetch_add(results.len() as u64, Ordering::Relaxed);
562 let retrieve_latency = start_time.elapsed();
563 self.stats
564 .average_retrieve_latency_ms
565 .store(retrieve_latency.as_millis() as u64, Ordering::Relaxed);
566
567 debug!(
568 "Query returned {} events in {:?}",
569 results.len(),
570 retrieve_latency
571 );
572
573 Ok(results)
574 }
575
576 pub async fn get_stream_events(
578 &self,
579 stream_id: &str,
580 from_version: Option<u64>,
581 ) -> Result<Vec<StoredEvent>> {
582 let query = EventQuery {
583 stream_id: Some(stream_id.to_string()),
584 event_types: None,
585 time_range: None,
586 sequence_range: None,
587 source: None,
588 custom_filters: HashMap::new(),
589 limit: None,
590 order: QueryOrder::SequenceAsc,
591 };
592
593 let mut events = self.query_events(query).await?;
594
595 if let Some(from_version) = from_version {
596 events.retain(|e| e.stream_version >= from_version);
597 }
598
599 Ok(events)
600 }
601
602 pub async fn replay_from_timestamp(
604 &self,
605 timestamp: DateTime<Utc>,
606 ) -> Result<Vec<StoredEvent>> {
607 let query = EventQuery {
608 stream_id: None,
609 event_types: None,
610 time_range: Some(TimeRange {
611 start: timestamp,
612 end: Utc::now(),
613 }),
614 sequence_range: None,
615 source: None,
616 custom_filters: HashMap::new(),
617 limit: None,
618 order: QueryOrder::SequenceAsc,
619 };
620
621 self.query_events(query).await
622 }
623
624 async fn create_snapshot(&self, stream_id: &str, stream_version: u64) -> Result<EventSnapshot> {
626 let events = self.get_stream_events(stream_id, None).await?;
627
628 let state_data = self.aggregate_events(&events)?;
630 let compressed_data = self.compress_data(&state_data)?;
631
632 let snapshot = EventSnapshot {
633 snapshot_id: Uuid::new_v4(),
634 stream_id: stream_id.to_string(),
635 stream_version,
636 sequence_number: events.last().map(|e| e.sequence_number).unwrap_or(0),
637 created_at: Utc::now(),
638 state_data: compressed_data.clone(),
639 metadata: SnapshotMetadata {
640 compression: Some("gzip".to_string()),
641 original_size: state_data.len(),
642 compressed_size: compressed_data.len(),
643 checksum: self.calculate_data_checksum(&compressed_data)?,
644 },
645 };
646
647 {
649 let mut snapshots = self.snapshots.write().await;
650 let stream_snapshots = snapshots
651 .entry(stream_id.to_string())
652 .or_insert_with(Vec::new);
653 stream_snapshots.push(snapshot.clone());
654
655 if stream_snapshots.len() > self.config.snapshot_config.max_snapshots {
657 stream_snapshots.remove(0);
658 }
659 }
660
661 if self.config.enable_persistence {
663 self.persistence_manager
664 .queue_operation(PersistenceOperation::StoreSnapshot(snapshot.clone()))
665 .await?;
666 }
667
668 self.stats.snapshots_created.fetch_add(1, Ordering::Relaxed);
669 info!(
670 "Created snapshot {} for stream {} at version {}",
671 snapshot.snapshot_id, stream_id, stream_version
672 );
673
674 Ok(snapshot)
675 }
676
677 pub async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>> {
679 let snapshots = self.snapshots.read().await;
680 if let Some(stream_snapshots) = snapshots.get(stream_id) {
681 Ok(stream_snapshots.last().cloned())
682 } else {
683 Ok(None)
684 }
685 }
686
687 pub async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>> {
689 if let Some(snapshot) = self.get_latest_snapshot(stream_id).await? {
691 let events = self
693 .get_stream_events(stream_id, Some(snapshot.stream_version + 1))
694 .await?;
695
696 let mut state = self.decompress_data(&snapshot.state_data)?;
698
699 for event in events {
701 state = self.apply_event_to_state(state, &event.event_data)?;
702 }
703
704 Ok(state)
705 } else {
706 let events = self.get_stream_events(stream_id, None).await?;
708 let aggregated = self.aggregate_events(&events)?;
709 Ok(aggregated)
710 }
711 }
712
713 fn matches_query(&self, event: &StoredEvent, query: &EventQuery) -> bool {
715 if let Some(ref stream_id) = query.stream_id {
717 if &event.stream_id != stream_id {
718 return false;
719 }
720 }
721
722 if let Some(ref event_types) = query.event_types {
724 let event_type = format!("{:?}", std::mem::discriminant(&event.event_data));
725 if !event_types.contains(&event_type) {
726 return false;
727 }
728 }
729
730 if let Some(ref time_range) = query.time_range {
732 let event_time = event.event_data.metadata().timestamp;
733 if event_time < time_range.start || event_time > time_range.end {
734 return false;
735 }
736 }
737
738 if let Some(ref seq_range) = query.sequence_range {
740 if event.sequence_number < seq_range.start || event.sequence_number > seq_range.end {
741 return false;
742 }
743 }
744
745 if let Some(ref source) = query.source {
747 if &event.event_data.metadata().source != source {
748 return false;
749 }
750 }
751
752 true
753 }
754
755 fn sort_results(&self, results: &mut [StoredEvent], order: &QueryOrder) {
757 match order {
758 QueryOrder::SequenceAsc => {
759 results.sort_by_key(|e| e.sequence_number);
760 }
761 QueryOrder::SequenceDesc => {
762 results.sort_by_key(|e| std::cmp::Reverse(e.sequence_number));
763 }
764 QueryOrder::TimestampAsc => {
765 results.sort_by_key(|e| e.event_data.metadata().timestamp);
766 }
767 QueryOrder::TimestampDesc => {
768 results.sort_by_key(|e| std::cmp::Reverse(e.event_data.metadata().timestamp));
769 }
770 }
771 }
772
773 fn calculate_checksum(&self, event: &StreamEvent) -> Result<String> {
775 let serialized = serde_json::to_string(event)?;
776 Ok(format!("{:x}", crc32fast::hash(serialized.as_bytes())))
777 }
778
779 fn calculate_data_checksum(&self, data: &[u8]) -> Result<String> {
781 Ok(format!("{:x}", crc32fast::hash(data)))
782 }
783
784 fn estimate_size(&self, event: &StreamEvent) -> usize {
786 serde_json::to_string(event)
787 .map(|s| s.len())
788 .unwrap_or(1024)
789 }
790
791 fn aggregate_events(&self, events: &[StoredEvent]) -> Result<Vec<u8>> {
793 let aggregate = format!("Aggregated {} events", events.len());
795 Ok(aggregate.into_bytes())
796 }
797
798 fn apply_event_to_state(&self, mut state: Vec<u8>, _event: &StreamEvent) -> Result<Vec<u8>> {
800 state.extend_from_slice(b" +event");
802 Ok(state)
803 }
804
805 fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
807 if self.config.enable_compression {
808 use flate2::write::GzEncoder;
809 use flate2::Compression;
810 use std::io::Write;
811
812 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
813 encoder.write_all(data)?;
814 Ok(encoder.finish()?)
815 } else {
816 Ok(data.to_vec())
817 }
818 }
819
820 fn decompress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
822 if self.config.enable_compression {
823 use flate2::read::GzDecoder;
824 use std::io::Read;
825
826 let mut decoder = GzDecoder::new(data);
827 let mut decompressed = Vec::new();
828 decoder.read_to_end(&mut decompressed)?;
829 Ok(decompressed)
830 } else {
831 Ok(data.to_vec())
832 }
833 }
834
835 pub fn get_stats(&self) -> EventSourcingStats {
837 EventSourcingStats {
838 total_events_stored: AtomicU64::new(
839 self.stats.total_events_stored.load(Ordering::Relaxed),
840 ),
841 total_events_retrieved: AtomicU64::new(
842 self.stats.total_events_retrieved.load(Ordering::Relaxed),
843 ),
844 snapshots_created: AtomicU64::new(self.stats.snapshots_created.load(Ordering::Relaxed)),
845 events_archived: AtomicU64::new(self.stats.events_archived.load(Ordering::Relaxed)),
846 persistence_operations: AtomicU64::new(
847 self.stats.persistence_operations.load(Ordering::Relaxed),
848 ),
849 failed_operations: AtomicU64::new(self.stats.failed_operations.load(Ordering::Relaxed)),
850 memory_usage_bytes: AtomicU64::new(
851 self.stats.memory_usage_bytes.load(Ordering::Relaxed),
852 ),
853 disk_usage_bytes: AtomicU64::new(self.stats.disk_usage_bytes.load(Ordering::Relaxed)),
854 average_store_latency_ms: AtomicU64::new(
855 self.stats.average_store_latency_ms.load(Ordering::Relaxed),
856 ),
857 average_retrieve_latency_ms: AtomicU64::new(
858 self.stats
859 .average_retrieve_latency_ms
860 .load(Ordering::Relaxed),
861 ),
862 }
863 }
864}
865
866#[async_trait::async_trait]
868impl EventStoreTrait for EventStore {
869 async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent> {
870 self.store_event(stream_id, event).await
871 }
872
873 async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>> {
874 self.query_events(query).await
875 }
876
877 async fn get_stream_events(
878 &self,
879 stream_id: &str,
880 from_version: Option<u64>,
881 ) -> Result<Vec<StoredEvent>> {
882 self.get_stream_events(stream_id, from_version).await
883 }
884
885 async fn replay_from_timestamp(&self, timestamp: DateTime<Utc>) -> Result<Vec<StoredEvent>> {
886 self.replay_from_timestamp(timestamp).await
887 }
888
889 async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>> {
890 self.get_latest_snapshot(stream_id).await
891 }
892
893 async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>> {
894 self.rebuild_stream_state(stream_id).await
895 }
896
897 async fn append_events(
898 &self,
899 aggregate_id: &str,
900 events: &[StreamEvent],
901 _expected_version: Option<u64>,
902 ) -> Result<u64> {
903 let mut last_version = 0u64;
904 for event in events {
905 let stored_event = self
906 .store_event(aggregate_id.to_string(), event.clone())
907 .await?;
908 last_version = stored_event.stream_version;
909 }
910 Ok(last_version)
911 }
912}
913
914impl Default for EventIndexes {
915 fn default() -> Self {
916 Self::new()
917 }
918}
919
920impl EventIndexes {
921 pub fn new() -> Self {
923 Self {
924 by_event_type: RwLock::new(HashMap::new()),
925 by_timestamp: RwLock::new(BTreeMap::new()),
926 by_source: RwLock::new(HashMap::new()),
927 by_stream: RwLock::new(HashMap::new()),
928 custom_indexes: RwLock::new(HashMap::new()),
929 }
930 }
931
932 pub async fn add_event(&self, event: &StoredEvent) -> Result<()> {
934 let sequence = event.sequence_number;
935
936 {
938 let mut by_type = self.by_event_type.write().await;
939 let event_type = format!("{:?}", std::mem::discriminant(&event.event_data));
940 by_type
941 .entry(event_type)
942 .or_insert_with(Vec::new)
943 .push(sequence);
944 }
945
946 {
948 let mut by_timestamp = self.by_timestamp.write().await;
949 let timestamp = event.event_data.metadata().timestamp;
950 by_timestamp
951 .entry(timestamp)
952 .or_insert_with(Vec::new)
953 .push(sequence);
954 }
955
956 {
958 let mut by_source = self.by_source.write().await;
959 let source = &event.event_data.metadata().source;
960 by_source
961 .entry(source.clone())
962 .or_insert_with(Vec::new)
963 .push(sequence);
964 }
965
966 {
968 let mut by_stream = self.by_stream.write().await;
969 by_stream
970 .entry(event.stream_id.clone())
971 .or_insert_with(Vec::new)
972 .push(sequence);
973 }
974
975 Ok(())
976 }
977
978 pub async fn find_matching_sequences(&self, query: &EventQuery) -> Result<Vec<u64>> {
980 let mut candidate_sequences = Vec::new();
981
982 if let Some(ref stream_id) = query.stream_id {
984 let by_stream = self.by_stream.read().await;
985 if let Some(sequences) = by_stream.get(stream_id) {
986 candidate_sequences = sequences.clone();
987 } else {
988 return Ok(Vec::new()); }
990 } else {
991 let by_stream = self.by_stream.read().await;
993 for sequences in by_stream.values() {
994 candidate_sequences.extend(sequences);
995 }
996 }
997
998 if let Some(ref event_types) = query.event_types {
1000 let by_type = self.by_event_type.read().await;
1001 let mut type_sequences: HashSet<u64> = HashSet::new();
1002
1003 for event_type in event_types {
1004 if let Some(sequences) = by_type.get(event_type) {
1005 type_sequences.extend(sequences);
1006 }
1007 }
1008
1009 candidate_sequences.retain(|seq| type_sequences.contains(seq));
1010 }
1011
1012 if let Some(ref seq_range) = query.sequence_range {
1014 candidate_sequences.retain(|&seq| seq >= seq_range.start && seq <= seq_range.end);
1015 }
1016
1017 candidate_sequences.sort_unstable();
1018 Ok(candidate_sequences)
1019 }
1020}
1021
1022impl PersistenceManager {
1023 pub fn new(backend: PersistenceBackend) -> Self {
1025 Self {
1026 backend,
1027 pending_operations: Arc::new(Mutex::new(VecDeque::new())),
1028 stats: Arc::new(PersistenceStats::default()),
1029 }
1030 }
1031
1032 pub async fn queue_operation(&self, operation: PersistenceOperation) -> Result<()> {
1034 let mut queue = self.pending_operations.lock().await;
1035 queue.push_back(operation);
1036 self.stats.operations_queued.fetch_add(1, Ordering::Relaxed);
1037 Ok(())
1038 }
1039
1040 pub async fn process_pending_operations(&self) -> Result<()> {
1042 let operations: Vec<PersistenceOperation> = {
1043 let mut queue = self.pending_operations.lock().await;
1044 queue.drain(..).collect()
1045 };
1046
1047 for operation in operations {
1048 match self.execute_operation(operation).await {
1049 Ok(_) => {
1050 self.stats
1051 .operations_completed
1052 .fetch_add(1, Ordering::Relaxed);
1053 }
1054 Err(e) => {
1055 self.stats.operations_failed.fetch_add(1, Ordering::Relaxed);
1056 error!("Persistence operation failed: {}", e);
1057 }
1058 }
1059 }
1060
1061 Ok(())
1062 }
1063
1064 async fn execute_operation(&self, operation: PersistenceOperation) -> Result<()> {
1066 match &self.backend {
1067 PersistenceBackend::Memory => {
1068 Ok(())
1070 }
1071 PersistenceBackend::FileSystem { base_path } => {
1072 self.execute_filesystem_operation(operation, base_path)
1073 .await
1074 }
1075 _ => {
1076 warn!("Persistence backend not implemented: {:?}", self.backend);
1078 Ok(())
1079 }
1080 }
1081 }
1082
1083 async fn execute_filesystem_operation(
1085 &self,
1086 operation: PersistenceOperation,
1087 _base_path: &str,
1088 ) -> Result<()> {
1089 match operation {
1090 PersistenceOperation::StoreEvent(_event) => {
1091 tokio::time::sleep(Duration::from_millis(1)).await;
1093 self.stats.bytes_written.fetch_add(1024, Ordering::Relaxed);
1094 }
1095 PersistenceOperation::StoreSnapshot(_snapshot) => {
1096 tokio::time::sleep(Duration::from_millis(5)).await;
1098 self.stats.bytes_written.fetch_add(10240, Ordering::Relaxed);
1099 }
1100 _ => {
1101 }
1103 }
1104 Ok(())
1105 }
1106}
1107
1108trait EventMetadataAccessor {
1110 fn metadata(&self) -> &EventMetadata;
1111}
1112
1113impl EventMetadataAccessor for StreamEvent {
1114 fn metadata(&self) -> &EventMetadata {
1115 match self {
1116 StreamEvent::TripleAdded { metadata, .. } => metadata,
1117 StreamEvent::TripleRemoved { metadata, .. } => metadata,
1118 StreamEvent::QuadAdded { metadata, .. } => metadata,
1119 StreamEvent::QuadRemoved { metadata, .. } => metadata,
1120 StreamEvent::GraphCreated { metadata, .. } => metadata,
1121 StreamEvent::GraphCleared { metadata, .. } => metadata,
1122 StreamEvent::GraphDeleted { metadata, .. } => metadata,
1123 StreamEvent::SparqlUpdate { metadata, .. } => metadata,
1124 StreamEvent::TransactionBegin { metadata, .. } => metadata,
1125 StreamEvent::TransactionCommit { metadata, .. } => metadata,
1126 StreamEvent::TransactionAbort { metadata, .. } => metadata,
1127 StreamEvent::SchemaChanged { metadata, .. } => metadata,
1128 StreamEvent::Heartbeat { metadata, .. } => metadata,
1129 StreamEvent::QueryResultAdded { metadata, .. } => metadata,
1130 StreamEvent::QueryResultRemoved { metadata, .. } => metadata,
1131 StreamEvent::QueryCompleted { metadata, .. } => metadata,
1132 StreamEvent::ErrorOccurred { metadata, .. } => metadata,
1133 _ => {
1134 use once_cell::sync::Lazy;
1136 static DEFAULT_METADATA: Lazy<EventMetadata> = Lazy::new(EventMetadata::default);
1137 &DEFAULT_METADATA
1138 }
1139 }
1140 }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145 use super::*;
1146 use crate::VectorClock;
1147 use std::collections::HashMap;
1148
1149 fn create_test_event() -> StreamEvent {
1150 StreamEvent::TripleAdded {
1151 subject: "http://test.org/subject".to_string(),
1152 predicate: "http://test.org/predicate".to_string(),
1153 object: "\"test_value\"".to_string(),
1154 graph: None,
1155 metadata: EventMetadata {
1156 event_id: Uuid::new_v4().to_string(),
1157 timestamp: Utc::now(),
1158 source: "test".to_string(),
1159 user: None,
1160 context: None,
1161 caused_by: None,
1162 version: "1.0".to_string(),
1163 properties: HashMap::new(),
1164 checksum: None,
1165 },
1166 }
1167 }
1168
1169 #[tokio::test]
1170 async fn test_event_store_creation() {
1171 let config = EventStoreConfig::default();
1172 let store = EventStore::new(config);
1173
1174 let stats = store.get_stats();
1175 assert_eq!(stats.total_events_stored.load(Ordering::Relaxed), 0);
1176 }
1177
1178 #[tokio::test]
1179 async fn test_store_and_retrieve_event() {
1180 let config = EventStoreConfig::default();
1181 let store = EventStore::new(config);
1182
1183 let event = create_test_event();
1184 let stored_event = store
1185 .store_event("test_stream".to_string(), event)
1186 .await
1187 .unwrap();
1188
1189 assert_eq!(stored_event.stream_id, "test_stream");
1190 assert_eq!(stored_event.stream_version, 1);
1191 assert_eq!(stored_event.sequence_number, 1);
1192
1193 let stream_events = store.get_stream_events("test_stream", None).await.unwrap();
1194 assert_eq!(stream_events.len(), 1);
1195 assert_eq!(stream_events[0].event_id, stored_event.event_id);
1196 }
1197
1198 #[tokio::test]
1199 async fn test_event_query() {
1200 let config = EventStoreConfig::default();
1201 let store = EventStore::new(config);
1202
1203 for i in 0..5 {
1205 let event = create_test_event();
1206 store
1207 .store_event(format!("stream_{}", i % 2), event)
1208 .await
1209 .unwrap();
1210 }
1211
1212 let query = EventQuery {
1214 stream_id: Some("stream_0".to_string()),
1215 event_types: None,
1216 time_range: None,
1217 sequence_range: None,
1218 source: None,
1219 custom_filters: HashMap::new(),
1220 limit: None,
1221 order: QueryOrder::SequenceAsc,
1222 };
1223
1224 let results = store.query_events(query).await.unwrap();
1225 assert_eq!(results.len(), 3); for i in 1..results.len() {
1229 assert!(results[i].sequence_number > results[i - 1].sequence_number);
1230 }
1231 }
1232
1233 #[tokio::test]
1234 async fn test_snapshot_creation() {
1235 let mut config = EventStoreConfig::default();
1236 config.snapshot_config.snapshot_interval = 3; let store = EventStore::new(config);
1239
1240 for _ in 0..3 {
1242 let event = create_test_event();
1243 store
1244 .store_event("test_stream".to_string(), event)
1245 .await
1246 .unwrap();
1247 }
1248
1249 let snapshot = store.get_latest_snapshot("test_stream").await.unwrap();
1250 assert!(snapshot.is_some());
1251
1252 let snapshot = snapshot.unwrap();
1253 assert_eq!(snapshot.stream_id, "test_stream");
1254 assert_eq!(snapshot.stream_version, 3);
1255 }
1256
1257 #[tokio::test]
1258 async fn test_replay_from_timestamp() {
1259 let config = EventStoreConfig::default();
1260 let store = EventStore::new(config);
1261
1262 let start_time = Utc::now();
1263
1264 for i in 0..3 {
1266 let event = create_test_event();
1267 store
1268 .store_event(format!("stream_{i}"), event)
1269 .await
1270 .unwrap();
1271 }
1272
1273 let replayed_events = store.replay_from_timestamp(start_time).await.unwrap();
1275 assert!(replayed_events.len() >= 3);
1276
1277 for i in 1..replayed_events.len() {
1279 assert!(replayed_events[i].stored_at >= replayed_events[i - 1].stored_at);
1280 }
1281 }
1282
1283 #[tokio::test]
1284 async fn test_persistence_manager() {
1285 let backend = PersistenceBackend::Memory;
1286 let manager = PersistenceManager::new(backend);
1287
1288 let event = create_test_event();
1289 let stored_event = StoredEvent {
1290 event_id: Uuid::new_v4(),
1291 sequence_number: 1,
1292 stream_id: "test".to_string(),
1293 stream_version: 1,
1294 event_data: event,
1295 stored_at: Utc::now(),
1296 storage_metadata: StorageMetadata {
1297 checksum: "test".to_string(),
1298 compressed_size: None,
1299 original_size: 100,
1300 storage_location: "memory".to_string(),
1301 persistence_status: PersistenceStatus::InMemory,
1302 },
1303 };
1304
1305 manager
1306 .queue_operation(PersistenceOperation::StoreEvent(Box::new(stored_event)))
1307 .await
1308 .unwrap();
1309 manager.process_pending_operations().await.unwrap();
1310
1311 assert_eq!(manager.stats.operations_queued.load(Ordering::Relaxed), 1);
1312 assert_eq!(
1313 manager.stats.operations_completed.load(Ordering::Relaxed),
1314 1
1315 );
1316 }
1317
1318 #[test]
1319 fn test_vector_clock_operations() {
1320 let mut clock1 = VectorClock::new();
1321 let mut clock2 = VectorClock::new();
1322
1323 clock1.increment("region1");
1325 clock2.increment("region2");
1326 assert!(clock1.is_concurrent(&clock2));
1327
1328 clock1.update(&clock2);
1330 clock1.increment("region1");
1331 assert!(clock2.happens_before(&clock1));
1332 assert!(!clock1.happens_before(&clock2));
1333 }
1334}