Skip to main content

oxirs_stream/event_sourcing/
mod.rs

1//! # Event Sourcing Framework
2//!
3//! Complete event sourcing implementation for OxiRS Stream providing event storage,
4//! replay capabilities, snapshots, and temporal queries. This forms the foundation
5//! for CQRS patterns and enables advanced temporal analytics.
6
7pub mod rdf_store_mod;
8mod simple;
9mod store;
10
11#[cfg(test)]
12mod simple_tests;
13#[cfg(test)]
14mod tests;
15
16// Re-export all public types from sub-modules
17pub use rdf_store_mod as rdf_store;
18pub use simple::{
19    EventStreamIter, ProjectionRunner, SimpleEvent, SimpleEventBus, SimpleEventHandler,
20    SimpleEventStore, SimpleSnapshot, SimpleSnapshotStore,
21};
22pub use store::{EventIndexes, EventMetadataAccessor, EventStore, PersistenceManager};
23
24use crate::StreamEvent;
25use chrono::{DateTime, Duration as ChronoDuration, Utc};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::sync::atomic::AtomicU64;
29use uuid::Uuid;
30
31/// Event store configuration
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct EventStoreConfig {
34    /// Maximum events to keep in memory
35    pub max_memory_events: usize,
36    /// Enable persistent storage
37    pub enable_persistence: bool,
38    /// Persistence backend type
39    pub persistence_backend: PersistenceBackend,
40    /// Snapshot configuration
41    pub snapshot_config: SnapshotConfig,
42    /// Retention policy
43    pub retention_policy: RetentionPolicy,
44    /// Indexing configuration
45    pub indexing_config: IndexingConfig,
46    /// Enable compression for stored events
47    pub enable_compression: bool,
48    /// Batch size for persistence operations
49    pub persistence_batch_size: usize,
50}
51
52impl Default for EventStoreConfig {
53    fn default() -> Self {
54        Self {
55            max_memory_events: 1_000_000,
56            enable_persistence: true,
57            persistence_backend: PersistenceBackend::FileSystem {
58                base_path: "/tmp/oxirs-event-store".to_string(),
59            },
60            snapshot_config: SnapshotConfig::default(),
61            retention_policy: RetentionPolicy::default(),
62            indexing_config: IndexingConfig::default(),
63            enable_compression: true,
64            persistence_batch_size: 1000,
65        }
66    }
67}
68
69/// Persistence backend options
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub enum PersistenceBackend {
72    /// File system based storage
73    FileSystem { base_path: String },
74    /// Database storage
75    Database { connection_string: String },
76    /// S3-compatible object storage
77    ObjectStorage {
78        endpoint: String,
79        bucket: String,
80        access_key: String,
81        secret_key: String,
82    },
83    /// In-memory only (no persistence)
84    Memory,
85}
86
87/// Snapshot configuration
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct SnapshotConfig {
90    /// Enable automatic snapshots
91    pub enable_snapshots: bool,
92    /// Snapshot interval (number of events)
93    pub snapshot_interval: usize,
94    /// Maximum snapshots to keep
95    pub max_snapshots: usize,
96    /// Snapshot compression
97    pub compress_snapshots: bool,
98}
99
100impl Default for SnapshotConfig {
101    fn default() -> Self {
102        Self {
103            enable_snapshots: true,
104            snapshot_interval: 10000,
105            max_snapshots: 10,
106            compress_snapshots: true,
107        }
108    }
109}
110
111/// Event retention policy
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct RetentionPolicy {
114    /// Maximum age of events to keep
115    pub max_age: Option<ChronoDuration>,
116    /// Maximum number of events to keep
117    pub max_events: Option<u64>,
118    /// Archive old events instead of deleting
119    pub enable_archiving: bool,
120    /// Archive backend
121    pub archive_backend: Option<PersistenceBackend>,
122}
123
124impl Default for RetentionPolicy {
125    fn default() -> Self {
126        Self {
127            max_age: Some(ChronoDuration::days(365)), // 1 year
128            max_events: Some(10_000_000),             // 10M events
129            enable_archiving: true,
130            archive_backend: None,
131        }
132    }
133}
134
135/// Indexing configuration
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct IndexingConfig {
138    /// Enable event type indexing
139    pub index_by_event_type: bool,
140    /// Enable timestamp indexing
141    pub index_by_timestamp: bool,
142    /// Enable source indexing
143    pub index_by_source: bool,
144    /// Enable custom field indexing
145    pub custom_indexes: Vec<CustomIndex>,
146}
147
148impl Default for IndexingConfig {
149    fn default() -> Self {
150        Self {
151            index_by_event_type: true,
152            index_by_timestamp: true,
153            index_by_source: true,
154            custom_indexes: Vec::new(),
155        }
156    }
157}
158
159/// Custom index definition
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct CustomIndex {
162    /// Index name
163    pub name: String,
164    /// Field path to index
165    pub field_path: String,
166    /// Index type
167    pub index_type: IndexType,
168}
169
170/// Index type
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub enum IndexType {
173    /// Hash index for exact matches
174    Hash,
175    /// B-tree index for range queries
176    BTree,
177    /// Full-text search index
178    FullText,
179}
180
181/// Stored event with metadata
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct StoredEvent {
184    /// Unique event ID
185    pub event_id: Uuid,
186    /// Event sequence number (global order)
187    pub sequence_number: u64,
188    /// Stream ID (for grouping related events)
189    pub stream_id: String,
190    /// Event version within the stream
191    pub stream_version: u64,
192    /// Original event data
193    pub event_data: StreamEvent,
194    /// Storage timestamp
195    pub stored_at: DateTime<Utc>,
196    /// Storage metadata
197    pub storage_metadata: StorageMetadata,
198}
199
200/// Storage metadata
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct StorageMetadata {
203    /// Checksum for integrity verification
204    pub checksum: String,
205    /// Compressed size (if compressed)
206    pub compressed_size: Option<usize>,
207    /// Original size
208    pub original_size: usize,
209    /// Storage location
210    pub storage_location: String,
211    /// Persistence status
212    pub persistence_status: PersistenceStatus,
213}
214
215/// Persistence status
216#[derive(Debug, Clone, Serialize, Deserialize)]
217pub enum PersistenceStatus {
218    /// Only in memory
219    InMemory,
220    /// Persisted to disk
221    Persisted,
222    /// Archived to long-term storage
223    Archived,
224    /// Failed to persist
225    Failed { error: String },
226}
227
228/// Event stream snapshot
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct EventSnapshot {
231    /// Snapshot ID
232    pub snapshot_id: Uuid,
233    /// Stream ID
234    pub stream_id: String,
235    /// Stream version at snapshot time
236    pub stream_version: u64,
237    /// Sequence number at snapshot time
238    pub sequence_number: u64,
239    /// Snapshot timestamp
240    pub created_at: DateTime<Utc>,
241    /// Aggregated state data
242    pub state_data: Vec<u8>,
243    /// Snapshot metadata
244    pub metadata: SnapshotMetadata,
245}
246
247/// Snapshot metadata
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct SnapshotMetadata {
250    /// Compression algorithm used
251    pub compression: Option<String>,
252    /// Original state size
253    pub original_size: usize,
254    /// Compressed size
255    pub compressed_size: usize,
256    /// Checksum for integrity
257    pub checksum: String,
258}
259
260/// Query criteria for event retrieval
261#[derive(Debug, Clone, Serialize, Deserialize)]
262pub struct EventQuery {
263    /// Stream ID filter
264    pub stream_id: Option<String>,
265    /// Event type filter
266    pub event_types: Option<Vec<String>>,
267    /// Time range filter
268    pub time_range: Option<TimeRange>,
269    /// Sequence number range
270    pub sequence_range: Option<SequenceRange>,
271    /// Source filter
272    pub source: Option<String>,
273    /// Custom field filters
274    pub custom_filters: HashMap<String, String>,
275    /// Maximum number of events to return
276    pub limit: Option<usize>,
277    /// Ordering preference
278    pub order: QueryOrder,
279}
280
281/// Time range for queries
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct TimeRange {
284    pub start: DateTime<Utc>,
285    pub end: DateTime<Utc>,
286}
287
288/// Sequence number range for queries
289#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct SequenceRange {
291    pub start: u64,
292    pub end: u64,
293}
294
295/// Query ordering
296#[derive(Debug, Clone, Serialize, Deserialize)]
297pub enum QueryOrder {
298    /// Ascending by sequence number
299    SequenceAsc,
300    /// Descending by sequence number
301    SequenceDesc,
302    /// Ascending by timestamp
303    TimestampAsc,
304    /// Descending by timestamp
305    TimestampDesc,
306}
307
308/// Event sourcing statistics
309#[derive(Debug, Default)]
310pub struct EventSourcingStats {
311    pub total_events_stored: AtomicU64,
312    pub total_events_retrieved: AtomicU64,
313    pub snapshots_created: AtomicU64,
314    pub events_archived: AtomicU64,
315    pub persistence_operations: AtomicU64,
316    pub failed_operations: AtomicU64,
317    pub memory_usage_bytes: AtomicU64,
318    pub disk_usage_bytes: AtomicU64,
319    pub average_store_latency_ms: AtomicU64,
320    pub average_retrieve_latency_ms: AtomicU64,
321}
322
323/// EventStore trait for abstracting event storage
324#[async_trait::async_trait]
325pub trait EventStoreTrait: Send + Sync {
326    async fn store_event(
327        &self,
328        stream_id: String,
329        event: StreamEvent,
330    ) -> anyhow::Result<StoredEvent>;
331    async fn query_events(&self, query: EventQuery) -> anyhow::Result<Vec<StoredEvent>>;
332    async fn get_stream_events(
333        &self,
334        stream_id: &str,
335        from_version: Option<u64>,
336    ) -> anyhow::Result<Vec<StoredEvent>>;
337    async fn replay_from_timestamp(
338        &self,
339        timestamp: DateTime<Utc>,
340    ) -> anyhow::Result<Vec<StoredEvent>>;
341    async fn get_latest_snapshot(&self, stream_id: &str) -> anyhow::Result<Option<EventSnapshot>>;
342    async fn rebuild_stream_state(&self, stream_id: &str) -> anyhow::Result<Vec<u8>>;
343    async fn append_events(
344        &self,
345        aggregate_id: &str,
346        events: &[StreamEvent],
347        expected_version: Option<u64>,
348    ) -> anyhow::Result<u64>;
349}
350
351/// Event stream trait for streaming events
352#[async_trait::async_trait]
353pub trait EventStream: Send + Sync {
354    async fn next_event(&mut self) -> Option<StoredEvent>;
355    async fn has_events(&self) -> bool;
356    async fn read_events_from_position(
357        &self,
358        position: u64,
359        max_events: usize,
360    ) -> anyhow::Result<Vec<StoredEvent>>;
361}
362
363/// Snapshot store trait for managing snapshots
364#[async_trait::async_trait]
365pub trait SnapshotStore: Send + Sync {
366    async fn store_snapshot(&self, snapshot: EventSnapshot) -> anyhow::Result<()>;
367    async fn get_snapshot(
368        &self,
369        stream_id: &str,
370        version: Option<u64>,
371    ) -> anyhow::Result<Option<EventSnapshot>>;
372    async fn list_snapshots(&self, stream_id: &str) -> anyhow::Result<Vec<EventSnapshot>>;
373}
374
375/// Persistence operation
376#[derive(Debug, Clone)]
377pub enum PersistenceOperation {
378    /// Store event
379    StoreEvent(Box<StoredEvent>),
380    /// Store snapshot
381    StoreSnapshot(EventSnapshot),
382    /// Archive events
383    ArchiveEvents(Vec<StoredEvent>),
384    /// Delete events
385    DeleteEvents(Vec<u64>),
386}
387
388/// Persistence statistics
389#[derive(Debug, Default)]
390pub struct PersistenceStats {
391    pub operations_queued: AtomicU64,
392    pub operations_completed: AtomicU64,
393    pub operations_failed: AtomicU64,
394    pub bytes_written: AtomicU64,
395    pub bytes_read: AtomicU64,
396}