eventuali_core/store/
mod.rs

1//! Event store implementations and abstractions.
2//!
3//! This module provides the core event storage functionality for Eventuali:
4//! - [`EventStore`] trait defining the storage interface
5//! - [`EventStoreImpl`] concrete implementation with backend abstraction
6//! - Multiple backends: PostgreSQL, SQLite
7//! - Configuration and factory functions
8//!
9//! # Example
10//!
11//! ```rust
12//! use eventuali_core::{EventStoreConfig, create_event_store};
13//!
14//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
15//! // Create SQLite event store
16//! let config = EventStoreConfig::sqlite(":memory:").await?;
17//! let store = create_event_store(config).await?;
18//!
19//! // Create PostgreSQL event store
20//! let config = EventStoreConfig::postgres(
21//!     "postgresql://user:pass@localhost/events"
22//! ).await?;
23//! let store = create_event_store(config).await?;
24//! # Ok(())
25//! # }
26//! ```
27
28pub mod traits;
29pub mod postgres;
30pub mod sqlite;
31pub mod config;
32
33pub use traits::{EventStore, EventStoreBackend};
34pub use config::EventStoreConfig;
35
36use crate::{Event, AggregateId, AggregateVersion, Result};
37use crate::streaming::EventStreamer;
38use async_trait::async_trait;
39use std::sync::Arc;
40use tokio::sync::Mutex;
41
42/// Core event store implementation with pluggable backends.
43///
44/// EventStoreImpl provides a unified interface over different storage backends
45/// while maintaining consistency guarantees and optional event streaming.
46///
47/// # Type Parameters
48///
49/// * `B` - The backend implementation (PostgreSQL, SQLite, etc.)
50///
51/// # Features
52///
53/// - Atomic event batching
54/// - Optional real-time event streaming
55/// - Global event ordering
56/// - Backend abstraction
57pub struct EventStoreImpl<B: EventStoreBackend> {
58    backend: B,
59    streamer: Option<Arc<dyn EventStreamer + Send + Sync>>,
60    global_position: Arc<Mutex<u64>>,
61}
62
63impl<B: EventStoreBackend> EventStoreImpl<B> {
64    /// Creates a new event store with the specified backend.
65    ///
66    /// # Arguments
67    ///
68    /// * `backend` - The storage backend implementation
69    ///
70    /// # Example
71    ///
72    /// ```rust
73    /// use eventuali_core::store::{EventStoreImpl, sqlite::SQLiteBackend};
74    ///
75    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
76    /// let backend = SQLiteBackend::new(":memory:").await?;
77    /// let store = EventStoreImpl::new(backend);
78    /// # Ok(())
79    /// # }
80    /// ```
81    pub fn new(backend: B) -> Self {
82        Self { 
83            backend,
84            streamer: None,
85            global_position: Arc::new(Mutex::new(0)),
86        }
87    }
88}
89
90#[async_trait]
91impl<B: EventStoreBackend + Send + Sync> EventStore for EventStoreImpl<B> {
92    async fn save_events(&self, events: Vec<Event>) -> Result<()> {
93        // Save events to backend first
94        self.backend.save_events(events.clone()).await?;
95        
96        // If we have a streamer configured, publish the events
97        if let Some(streamer) = &self.streamer {
98            let mut global_pos = self.global_position.lock().await;
99            
100            for event in events {
101                *global_pos += 1;
102                let stream_position = event.aggregate_version as u64;
103                
104                streamer.publish_event(event, stream_position, *global_pos).await?;
105            }
106        }
107        
108        Ok(())
109    }
110
111    async fn load_events(
112        &self,
113        aggregate_id: &AggregateId,
114        from_version: Option<AggregateVersion>,
115    ) -> Result<Vec<Event>> {
116        self.backend.load_events(aggregate_id, from_version).await
117    }
118
119    async fn load_events_by_type(
120        &self,
121        aggregate_type: &str,
122        from_version: Option<AggregateVersion>,
123    ) -> Result<Vec<Event>> {
124        self.backend.load_events_by_type(aggregate_type, from_version).await
125    }
126
127    async fn get_aggregate_version(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateVersion>> {
128        self.backend.get_aggregate_version(aggregate_id).await
129    }
130    
131    fn set_event_streamer(&mut self, streamer: Arc<dyn EventStreamer + Send + Sync>) {
132        self.streamer = Some(streamer);
133    }
134}
135
136// Factory function for creating event stores
137pub async fn create_event_store(config: EventStoreConfig) -> Result<Box<dyn EventStore + Send + Sync>> {
138    match &config {
139        #[cfg(feature = "postgres")]
140        EventStoreConfig::PostgreSQL { .. } => {
141            let mut backend = postgres::PostgreSQLBackend::new(&config).await?;
142            backend.initialize().await?;
143            Ok(Box::new(EventStoreImpl::new(backend)))
144        }
145        #[cfg(feature = "sqlite")]
146        EventStoreConfig::SQLite { .. } => {
147            let mut backend = sqlite::SQLiteBackend::new(&config).await?;
148            backend.initialize().await?;
149            Ok(Box::new(EventStoreImpl::new(backend)))
150        }
151        #[cfg(not(any(feature = "postgres", feature = "sqlite")))]
152        _ => Err(EventualiError::Configuration(
153            "No database backend features enabled".to_string(),
154        )),
155    }
156}