eventuali_core/store/
mod.rs1pub mod traits;
29pub mod postgres;
30pub mod sqlite;
31pub mod config;
32
33pub use traits::{EventStore, EventStoreBackend};
34pub use config::EventStoreConfig;
35
36use crate::{Event, AggregateId, AggregateVersion, Result};
37use crate::streaming::EventStreamer;
38use async_trait::async_trait;
39use std::sync::Arc;
40use tokio::sync::Mutex;
41
42pub struct EventStoreImpl<B: EventStoreBackend> {
58 backend: B,
59 streamer: Option<Arc<dyn EventStreamer + Send + Sync>>,
60 global_position: Arc<Mutex<u64>>,
61}
62
63impl<B: EventStoreBackend> EventStoreImpl<B> {
64 pub fn new(backend: B) -> Self {
82 Self {
83 backend,
84 streamer: None,
85 global_position: Arc::new(Mutex::new(0)),
86 }
87 }
88}
89
90#[async_trait]
91impl<B: EventStoreBackend + Send + Sync> EventStore for EventStoreImpl<B> {
92 async fn save_events(&self, events: Vec<Event>) -> Result<()> {
93 self.backend.save_events(events.clone()).await?;
95
96 if let Some(streamer) = &self.streamer {
98 let mut global_pos = self.global_position.lock().await;
99
100 for event in events {
101 *global_pos += 1;
102 let stream_position = event.aggregate_version as u64;
103
104 streamer.publish_event(event, stream_position, *global_pos).await?;
105 }
106 }
107
108 Ok(())
109 }
110
111 async fn load_events(
112 &self,
113 aggregate_id: &AggregateId,
114 from_version: Option<AggregateVersion>,
115 ) -> Result<Vec<Event>> {
116 self.backend.load_events(aggregate_id, from_version).await
117 }
118
119 async fn load_events_by_type(
120 &self,
121 aggregate_type: &str,
122 from_version: Option<AggregateVersion>,
123 ) -> Result<Vec<Event>> {
124 self.backend.load_events_by_type(aggregate_type, from_version).await
125 }
126
127 async fn get_aggregate_version(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateVersion>> {
128 self.backend.get_aggregate_version(aggregate_id).await
129 }
130
131 fn set_event_streamer(&mut self, streamer: Arc<dyn EventStreamer + Send + Sync>) {
132 self.streamer = Some(streamer);
133 }
134}
135
136pub async fn create_event_store(config: EventStoreConfig) -> Result<Box<dyn EventStore + Send + Sync>> {
138 match &config {
139 #[cfg(feature = "postgres")]
140 EventStoreConfig::PostgreSQL { .. } => {
141 let mut backend = postgres::PostgreSQLBackend::new(&config).await?;
142 backend.initialize().await?;
143 Ok(Box::new(EventStoreImpl::new(backend)))
144 }
145 #[cfg(feature = "sqlite")]
146 EventStoreConfig::SQLite { .. } => {
147 let mut backend = sqlite::SQLiteBackend::new(&config).await?;
148 backend.initialize().await?;
149 Ok(Box::new(EventStoreImpl::new(backend)))
150 }
151 #[cfg(not(any(feature = "postgres", feature = "sqlite")))]
152 _ => Err(EventualiError::Configuration(
153 "No database backend features enabled".to_string(),
154 )),
155 }
156}