1use super::{
5 EventQuery, EventSnapshot, EventSourcingStats, EventStoreConfig, EventStoreTrait,
6 PersistenceBackend, PersistenceOperation, PersistenceStats, PersistenceStatus, QueryOrder,
7 SnapshotMetadata, StorageMetadata, StoredEvent, TimeRange,
8};
9use crate::{EventMetadata, StreamEvent};
10use anyhow::Result;
11use chrono::{DateTime, Utc};
12use std::collections::VecDeque;
13use std::collections::{BTreeMap, HashMap, HashSet};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{Mutex, RwLock, Semaphore};
18use tracing::{debug, error, info, warn};
19use uuid::Uuid;
20
21pub struct EventStore {
23 config: EventStoreConfig,
25 memory_events: Arc<RwLock<BTreeMap<u64, StoredEvent>>>,
27 stream_versions: Arc<RwLock<HashMap<String, u64>>>,
29 next_sequence: Arc<AtomicU64>,
31 indexes: Arc<EventIndexes>,
33 snapshots: Arc<RwLock<HashMap<String, Vec<EventSnapshot>>>>,
35 persistence_manager: Arc<PersistenceManager>,
37 stats: Arc<EventSourcingStats>,
39 operation_semaphore: Arc<Semaphore>,
41}
42
43pub struct EventIndexes {
45 by_event_type: RwLock<HashMap<String, Vec<u64>>>,
47 by_timestamp: RwLock<BTreeMap<DateTime<Utc>, Vec<u64>>>,
49 by_source: RwLock<HashMap<String, Vec<u64>>>,
51 by_stream: RwLock<HashMap<String, Vec<u64>>>,
53 custom_indexes: RwLock<HashMap<String, HashMap<String, Vec<u64>>>>,
55}
56
57pub struct PersistenceManager {
59 backend: PersistenceBackend,
61 pending_operations: Arc<Mutex<VecDeque<PersistenceOperation>>>,
63 pub(crate) stats: Arc<PersistenceStats>,
65}
66
67impl EventStore {
68 pub fn new(config: EventStoreConfig) -> Self {
70 let persistence_manager =
71 Arc::new(PersistenceManager::new(config.persistence_backend.clone()));
72
73 Self {
74 config,
75 memory_events: Arc::new(RwLock::new(BTreeMap::new())),
76 stream_versions: Arc::new(RwLock::new(HashMap::new())),
77 next_sequence: Arc::new(AtomicU64::new(1)),
78 indexes: Arc::new(EventIndexes::new()),
79 snapshots: Arc::new(RwLock::new(HashMap::new())),
80 persistence_manager,
81 stats: Arc::new(EventSourcingStats::default()),
82 operation_semaphore: Arc::new(Semaphore::new(1000)), }
84 }
85
86 pub async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent> {
88 let _permit = self.operation_semaphore.acquire().await?;
89 let start_time = Instant::now();
90
91 let sequence_number = self.next_sequence.fetch_add(1, Ordering::SeqCst);
93 let stream_version = {
94 let mut versions = self.stream_versions.write().await;
95 let version = versions.get(&stream_id).unwrap_or(&0) + 1;
96 versions.insert(stream_id.clone(), version);
97 version
98 };
99
100 let checksum = self.calculate_checksum(&event)?;
102 let original_size = self.estimate_size(&event);
103 let stored_event = StoredEvent {
104 event_id: Uuid::new_v4(),
105 sequence_number,
106 stream_id: stream_id.clone(),
107 stream_version,
108 event_data: event,
109 stored_at: Utc::now(),
110 storage_metadata: StorageMetadata {
111 checksum,
112 compressed_size: None,
113 original_size,
114 storage_location: format!("memory:{sequence_number}"),
115 persistence_status: PersistenceStatus::InMemory,
116 },
117 };
118
119 {
121 let mut memory_events = self.memory_events.write().await;
122 memory_events.insert(sequence_number, stored_event.clone());
123
124 if memory_events.len() > self.config.max_memory_events {
126 let to_remove: Vec<u64> = memory_events
127 .keys()
128 .take(memory_events.len() - self.config.max_memory_events)
129 .cloned()
130 .collect();
131
132 for seq in to_remove {
133 memory_events.remove(&seq);
134 }
135 }
136 }
137
138 self.indexes.add_event(&stored_event).await?;
140
141 if self.config.enable_persistence {
143 self.persistence_manager
144 .queue_operation(PersistenceOperation::StoreEvent(Box::new(
145 stored_event.clone(),
146 )))
147 .await?;
148 }
149
150 if self.config.snapshot_config.enable_snapshots
152 && stream_version % self.config.snapshot_config.snapshot_interval as u64 == 0
153 {
154 self.create_snapshot(&stream_id, stream_version).await?;
155 }
156
157 self.stats
159 .total_events_stored
160 .fetch_add(1, Ordering::Relaxed);
161 let store_latency = start_time.elapsed();
162 self.stats
163 .average_store_latency_ms
164 .store(store_latency.as_millis() as u64, Ordering::Relaxed);
165
166 info!(
167 "Stored event {} for stream {} (seq: {}, version: {})",
168 stored_event.event_id, stream_id, sequence_number, stream_version
169 );
170
171 Ok(stored_event)
172 }
173
174 pub async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>> {
176 let _permit = self.operation_semaphore.acquire().await?;
177 let start_time = Instant::now();
178
179 let candidate_sequences = self.indexes.find_matching_sequences(&query).await?;
180 let mut results = Vec::new();
181
182 let memory_events = self.memory_events.read().await;
183 for &sequence in &candidate_sequences {
184 if let Some(stored_event) = memory_events.get(&sequence) {
185 if self.matches_query(stored_event, &query) {
186 results.push(stored_event.clone());
187
188 if let Some(limit) = query.limit {
189 if results.len() >= limit {
190 break;
191 }
192 }
193 }
194 }
195 }
196
197 self.sort_results(&mut results, &query.order);
199
200 self.stats
202 .total_events_retrieved
203 .fetch_add(results.len() as u64, Ordering::Relaxed);
204 let retrieve_latency = start_time.elapsed();
205 self.stats
206 .average_retrieve_latency_ms
207 .store(retrieve_latency.as_millis() as u64, Ordering::Relaxed);
208
209 debug!(
210 "Query returned {} events in {:?}",
211 results.len(),
212 retrieve_latency
213 );
214
215 Ok(results)
216 }
217
218 pub async fn get_stream_events(
220 &self,
221 stream_id: &str,
222 from_version: Option<u64>,
223 ) -> Result<Vec<StoredEvent>> {
224 let query = EventQuery {
225 stream_id: Some(stream_id.to_string()),
226 event_types: None,
227 time_range: None,
228 sequence_range: None,
229 source: None,
230 custom_filters: HashMap::new(),
231 limit: None,
232 order: QueryOrder::SequenceAsc,
233 };
234
235 let mut events = self.query_events(query).await?;
236
237 if let Some(from_version) = from_version {
238 events.retain(|e| e.stream_version >= from_version);
239 }
240
241 Ok(events)
242 }
243
244 pub async fn replay_from_timestamp(
246 &self,
247 timestamp: DateTime<Utc>,
248 ) -> Result<Vec<StoredEvent>> {
249 let query = EventQuery {
250 stream_id: None,
251 event_types: None,
252 time_range: Some(TimeRange {
253 start: timestamp,
254 end: Utc::now(),
255 }),
256 sequence_range: None,
257 source: None,
258 custom_filters: HashMap::new(),
259 limit: None,
260 order: QueryOrder::SequenceAsc,
261 };
262
263 self.query_events(query).await
264 }
265
266 async fn create_snapshot(&self, stream_id: &str, stream_version: u64) -> Result<EventSnapshot> {
268 let events = self.get_stream_events(stream_id, None).await?;
269
270 let state_data = self.aggregate_events(&events)?;
272 let compressed_data = self.compress_data(&state_data)?;
273
274 let snapshot = EventSnapshot {
275 snapshot_id: Uuid::new_v4(),
276 stream_id: stream_id.to_string(),
277 stream_version,
278 sequence_number: events.last().map(|e| e.sequence_number).unwrap_or(0),
279 created_at: Utc::now(),
280 state_data: compressed_data.clone(),
281 metadata: SnapshotMetadata {
282 compression: Some("gzip".to_string()),
283 original_size: state_data.len(),
284 compressed_size: compressed_data.len(),
285 checksum: self.calculate_data_checksum(&compressed_data)?,
286 },
287 };
288
289 {
291 let mut snapshots = self.snapshots.write().await;
292 let stream_snapshots = snapshots
293 .entry(stream_id.to_string())
294 .or_insert_with(Vec::new);
295 stream_snapshots.push(snapshot.clone());
296
297 if stream_snapshots.len() > self.config.snapshot_config.max_snapshots {
299 stream_snapshots.remove(0);
300 }
301 }
302
303 if self.config.enable_persistence {
305 self.persistence_manager
306 .queue_operation(PersistenceOperation::StoreSnapshot(snapshot.clone()))
307 .await?;
308 }
309
310 self.stats.snapshots_created.fetch_add(1, Ordering::Relaxed);
311 info!(
312 "Created snapshot {} for stream {} at version {}",
313 snapshot.snapshot_id, stream_id, stream_version
314 );
315
316 Ok(snapshot)
317 }
318
319 pub async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>> {
321 let snapshots = self.snapshots.read().await;
322 if let Some(stream_snapshots) = snapshots.get(stream_id) {
323 Ok(stream_snapshots.last().cloned())
324 } else {
325 Ok(None)
326 }
327 }
328
329 pub async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>> {
331 if let Some(snapshot) = self.get_latest_snapshot(stream_id).await? {
333 let events = self
335 .get_stream_events(stream_id, Some(snapshot.stream_version + 1))
336 .await?;
337
338 let mut state = self.decompress_data(&snapshot.state_data)?;
340
341 for event in events {
343 state = self.apply_event_to_state(state, &event.event_data)?;
344 }
345
346 Ok(state)
347 } else {
348 let events = self.get_stream_events(stream_id, None).await?;
350 let aggregated = self.aggregate_events(&events)?;
351 Ok(aggregated)
352 }
353 }
354
355 fn matches_query(&self, event: &StoredEvent, query: &EventQuery) -> bool {
357 if let Some(ref stream_id) = query.stream_id {
359 if &event.stream_id != stream_id {
360 return false;
361 }
362 }
363
364 if let Some(ref event_types) = query.event_types {
366 let event_type = format!("{:?}", std::mem::discriminant(&event.event_data));
367 if !event_types.contains(&event_type) {
368 return false;
369 }
370 }
371
372 if let Some(ref time_range) = query.time_range {
374 let event_time = event.event_data.metadata().timestamp;
375 if event_time < time_range.start || event_time > time_range.end {
376 return false;
377 }
378 }
379
380 if let Some(ref seq_range) = query.sequence_range {
382 if event.sequence_number < seq_range.start || event.sequence_number > seq_range.end {
383 return false;
384 }
385 }
386
387 if let Some(ref source) = query.source {
389 if &event.event_data.metadata().source != source {
390 return false;
391 }
392 }
393
394 true
395 }
396
397 fn sort_results(&self, results: &mut [StoredEvent], order: &QueryOrder) {
399 match order {
400 QueryOrder::SequenceAsc => {
401 results.sort_by_key(|e| e.sequence_number);
402 }
403 QueryOrder::SequenceDesc => {
404 results.sort_by_key(|e| std::cmp::Reverse(e.sequence_number));
405 }
406 QueryOrder::TimestampAsc => {
407 results.sort_by_key(|e| e.event_data.metadata().timestamp);
408 }
409 QueryOrder::TimestampDesc => {
410 results.sort_by_key(|e| std::cmp::Reverse(e.event_data.metadata().timestamp));
411 }
412 }
413 }
414
415 fn calculate_checksum(&self, event: &StreamEvent) -> Result<String> {
417 let serialized = serde_json::to_string(event)?;
418 Ok(format!("{:x}", crc32fast::hash(serialized.as_bytes())))
419 }
420
421 fn calculate_data_checksum(&self, data: &[u8]) -> Result<String> {
423 Ok(format!("{:x}", crc32fast::hash(data)))
424 }
425
426 fn estimate_size(&self, event: &StreamEvent) -> usize {
428 serde_json::to_string(event)
429 .map(|s| s.len())
430 .unwrap_or(1024)
431 }
432
433 fn aggregate_events(&self, events: &[StoredEvent]) -> Result<Vec<u8>> {
435 let aggregate = format!("Aggregated {} events", events.len());
437 Ok(aggregate.into_bytes())
438 }
439
440 fn apply_event_to_state(&self, mut state: Vec<u8>, _event: &StreamEvent) -> Result<Vec<u8>> {
442 state.extend_from_slice(b" +event");
444 Ok(state)
445 }
446
447 fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
449 if self.config.enable_compression {
450 use flate2::write::GzEncoder;
451 use flate2::Compression;
452 use std::io::Write;
453
454 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
455 encoder.write_all(data)?;
456 Ok(encoder.finish()?)
457 } else {
458 Ok(data.to_vec())
459 }
460 }
461
462 fn decompress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
464 if self.config.enable_compression {
465 use flate2::read::GzDecoder;
466 use std::io::Read;
467
468 let mut decoder = GzDecoder::new(data);
469 let mut decompressed = Vec::new();
470 decoder.read_to_end(&mut decompressed)?;
471 Ok(decompressed)
472 } else {
473 Ok(data.to_vec())
474 }
475 }
476
477 pub fn get_stats(&self) -> super::EventSourcingStats {
479 super::EventSourcingStats {
480 total_events_stored: AtomicU64::new(
481 self.stats.total_events_stored.load(Ordering::Relaxed),
482 ),
483 total_events_retrieved: AtomicU64::new(
484 self.stats.total_events_retrieved.load(Ordering::Relaxed),
485 ),
486 snapshots_created: AtomicU64::new(self.stats.snapshots_created.load(Ordering::Relaxed)),
487 events_archived: AtomicU64::new(self.stats.events_archived.load(Ordering::Relaxed)),
488 persistence_operations: AtomicU64::new(
489 self.stats.persistence_operations.load(Ordering::Relaxed),
490 ),
491 failed_operations: AtomicU64::new(self.stats.failed_operations.load(Ordering::Relaxed)),
492 memory_usage_bytes: AtomicU64::new(
493 self.stats.memory_usage_bytes.load(Ordering::Relaxed),
494 ),
495 disk_usage_bytes: AtomicU64::new(self.stats.disk_usage_bytes.load(Ordering::Relaxed)),
496 average_store_latency_ms: AtomicU64::new(
497 self.stats.average_store_latency_ms.load(Ordering::Relaxed),
498 ),
499 average_retrieve_latency_ms: AtomicU64::new(
500 self.stats
501 .average_retrieve_latency_ms
502 .load(Ordering::Relaxed),
503 ),
504 }
505 }
506}
507
508#[async_trait::async_trait]
510impl EventStoreTrait for EventStore {
511 async fn store_event(&self, stream_id: String, event: StreamEvent) -> Result<StoredEvent> {
512 self.store_event(stream_id, event).await
513 }
514
515 async fn query_events(&self, query: EventQuery) -> Result<Vec<StoredEvent>> {
516 self.query_events(query).await
517 }
518
519 async fn get_stream_events(
520 &self,
521 stream_id: &str,
522 from_version: Option<u64>,
523 ) -> Result<Vec<StoredEvent>> {
524 self.get_stream_events(stream_id, from_version).await
525 }
526
527 async fn replay_from_timestamp(&self, timestamp: DateTime<Utc>) -> Result<Vec<StoredEvent>> {
528 self.replay_from_timestamp(timestamp).await
529 }
530
531 async fn get_latest_snapshot(&self, stream_id: &str) -> Result<Option<EventSnapshot>> {
532 self.get_latest_snapshot(stream_id).await
533 }
534
535 async fn rebuild_stream_state(&self, stream_id: &str) -> Result<Vec<u8>> {
536 self.rebuild_stream_state(stream_id).await
537 }
538
539 async fn append_events(
540 &self,
541 aggregate_id: &str,
542 events: &[StreamEvent],
543 _expected_version: Option<u64>,
544 ) -> Result<u64> {
545 let mut last_version = 0u64;
546 for event in events {
547 let stored_event = self
548 .store_event(aggregate_id.to_string(), event.clone())
549 .await?;
550 last_version = stored_event.stream_version;
551 }
552 Ok(last_version)
553 }
554}
555
556impl Default for EventIndexes {
557 fn default() -> Self {
558 Self::new()
559 }
560}
561
562impl EventIndexes {
563 pub fn new() -> Self {
565 Self {
566 by_event_type: RwLock::new(HashMap::new()),
567 by_timestamp: RwLock::new(BTreeMap::new()),
568 by_source: RwLock::new(HashMap::new()),
569 by_stream: RwLock::new(HashMap::new()),
570 custom_indexes: RwLock::new(HashMap::new()),
571 }
572 }
573
574 pub async fn add_event(&self, event: &StoredEvent) -> Result<()> {
576 let sequence = event.sequence_number;
577
578 {
580 let mut by_type = self.by_event_type.write().await;
581 let event_type = format!("{:?}", std::mem::discriminant(&event.event_data));
582 by_type
583 .entry(event_type)
584 .or_insert_with(Vec::new)
585 .push(sequence);
586 }
587
588 {
590 let mut by_timestamp = self.by_timestamp.write().await;
591 let timestamp = event.event_data.metadata().timestamp;
592 by_timestamp
593 .entry(timestamp)
594 .or_insert_with(Vec::new)
595 .push(sequence);
596 }
597
598 {
600 let mut by_source = self.by_source.write().await;
601 let source = &event.event_data.metadata().source;
602 by_source
603 .entry(source.clone())
604 .or_insert_with(Vec::new)
605 .push(sequence);
606 }
607
608 {
610 let mut by_stream = self.by_stream.write().await;
611 by_stream
612 .entry(event.stream_id.clone())
613 .or_insert_with(Vec::new)
614 .push(sequence);
615 }
616
617 Ok(())
618 }
619
620 pub async fn find_matching_sequences(&self, query: &EventQuery) -> Result<Vec<u64>> {
622 let mut candidate_sequences = Vec::new();
623
624 if let Some(ref stream_id) = query.stream_id {
626 let by_stream = self.by_stream.read().await;
627 if let Some(sequences) = by_stream.get(stream_id) {
628 candidate_sequences = sequences.clone();
629 } else {
630 return Ok(Vec::new()); }
632 } else {
633 let by_stream = self.by_stream.read().await;
635 for sequences in by_stream.values() {
636 candidate_sequences.extend(sequences);
637 }
638 }
639
640 if let Some(ref event_types) = query.event_types {
642 let by_type = self.by_event_type.read().await;
643 let mut type_sequences: HashSet<u64> = HashSet::new();
644
645 for event_type in event_types {
646 if let Some(sequences) = by_type.get(event_type) {
647 type_sequences.extend(sequences);
648 }
649 }
650
651 candidate_sequences.retain(|seq| type_sequences.contains(seq));
652 }
653
654 if let Some(ref seq_range) = query.sequence_range {
656 candidate_sequences.retain(|&seq| seq >= seq_range.start && seq <= seq_range.end);
657 }
658
659 candidate_sequences.sort_unstable();
660 Ok(candidate_sequences)
661 }
662}
663
664impl PersistenceManager {
665 pub fn new(backend: PersistenceBackend) -> Self {
667 Self {
668 backend,
669 pending_operations: Arc::new(Mutex::new(VecDeque::new())),
670 stats: Arc::new(PersistenceStats::default()),
671 }
672 }
673
674 pub async fn queue_operation(&self, operation: PersistenceOperation) -> Result<()> {
676 let mut queue = self.pending_operations.lock().await;
677 queue.push_back(operation);
678 self.stats.operations_queued.fetch_add(1, Ordering::Relaxed);
679 Ok(())
680 }
681
682 pub async fn process_pending_operations(&self) -> Result<()> {
684 let operations: Vec<PersistenceOperation> = {
685 let mut queue = self.pending_operations.lock().await;
686 queue.drain(..).collect()
687 };
688
689 for operation in operations {
690 match self.execute_operation(operation).await {
691 Ok(_) => {
692 self.stats
693 .operations_completed
694 .fetch_add(1, Ordering::Relaxed);
695 }
696 Err(e) => {
697 self.stats.operations_failed.fetch_add(1, Ordering::Relaxed);
698 error!("Persistence operation failed: {}", e);
699 }
700 }
701 }
702
703 Ok(())
704 }
705
706 async fn execute_operation(&self, operation: PersistenceOperation) -> Result<()> {
708 match &self.backend {
709 PersistenceBackend::Memory => {
710 Ok(())
712 }
713 PersistenceBackend::FileSystem { base_path } => {
714 self.execute_filesystem_operation(operation, base_path)
715 .await
716 }
717 _ => {
718 warn!("Persistence backend not implemented: {:?}", self.backend);
720 Ok(())
721 }
722 }
723 }
724
725 async fn execute_filesystem_operation(
727 &self,
728 operation: PersistenceOperation,
729 _base_path: &str,
730 ) -> Result<()> {
731 match operation {
732 PersistenceOperation::StoreEvent(_event) => {
733 tokio::time::sleep(Duration::from_millis(1)).await;
735 self.stats.bytes_written.fetch_add(1024, Ordering::Relaxed);
736 }
737 PersistenceOperation::StoreSnapshot(_snapshot) => {
738 tokio::time::sleep(Duration::from_millis(5)).await;
740 self.stats.bytes_written.fetch_add(10240, Ordering::Relaxed);
741 }
742 _ => {
743 }
745 }
746 Ok(())
747 }
748}
749
750pub trait EventMetadataAccessor {
752 fn metadata(&self) -> &EventMetadata;
753}
754
755impl EventMetadataAccessor for StreamEvent {
756 fn metadata(&self) -> &EventMetadata {
757 match self {
758 StreamEvent::TripleAdded { metadata, .. } => metadata,
759 StreamEvent::TripleRemoved { metadata, .. } => metadata,
760 StreamEvent::QuadAdded { metadata, .. } => metadata,
761 StreamEvent::QuadRemoved { metadata, .. } => metadata,
762 StreamEvent::GraphCreated { metadata, .. } => metadata,
763 StreamEvent::GraphCleared { metadata, .. } => metadata,
764 StreamEvent::GraphDeleted { metadata, .. } => metadata,
765 StreamEvent::SparqlUpdate { metadata, .. } => metadata,
766 StreamEvent::TransactionBegin { metadata, .. } => metadata,
767 StreamEvent::TransactionCommit { metadata, .. } => metadata,
768 StreamEvent::TransactionAbort { metadata, .. } => metadata,
769 StreamEvent::SchemaChanged { metadata, .. } => metadata,
770 StreamEvent::Heartbeat { metadata, .. } => metadata,
771 StreamEvent::QueryResultAdded { metadata, .. } => metadata,
772 StreamEvent::QueryResultRemoved { metadata, .. } => metadata,
773 StreamEvent::QueryCompleted { metadata, .. } => metadata,
774 StreamEvent::ErrorOccurred { metadata, .. } => metadata,
775 _ => {
776 use once_cell::sync::Lazy;
778 static DEFAULT_METADATA: Lazy<EventMetadata> = Lazy::new(EventMetadata::default);
779 &DEFAULT_METADATA
780 }
781 }
782 }
783}