1#[cfg(feature = "cqrs-sqlite")]
8mod inner {
9 use std::collections::HashMap;
10 use std::marker::PhantomData;
11 use std::sync::{Arc, Mutex};
12
13 use async_trait::async_trait;
14 use rusqlite::Connection;
15
16 use super::super::{BackendStats, Event, EventStoreBackend};
17
18 #[derive(Clone)]
23 pub struct SqliteEventStoreBackend<E: Event> {
24 conn: Arc<Mutex<Connection>>,
25 _phantom: PhantomData<E>,
26 }
27
28 impl<E: Event> SqliteEventStoreBackend<E> {
29 pub async fn new(path: &str) -> Result<Self, String> {
34 let path = path.to_string();
35 let conn = tokio::task::spawn_blocking(move || {
36 let conn = Connection::open(&path).map_err(|e| format!("SQLite open: {}", e))?;
37 conn.execute_batch("PRAGMA journal_mode=WAL;")
38 .map_err(|e| format!("WAL pragma: {}", e))?;
39 conn.execute_batch(
40 "CREATE TABLE IF NOT EXISTS events (
41 id INTEGER PRIMARY KEY AUTOINCREMENT,
42 aggregate_id TEXT NOT NULL,
43 event_data BLOB NOT NULL,
44 created_at TEXT NOT NULL DEFAULT (datetime('now'))
45 );
46 CREATE INDEX IF NOT EXISTS idx_events_aggregate ON events(aggregate_id);
47 CREATE TABLE IF NOT EXISTS snapshots (
48 aggregate_id TEXT PRIMARY KEY,
49 snapshot_data BLOB NOT NULL,
50 version INTEGER NOT NULL
51 );",
52 )
53 .map_err(|e| format!("Schema init: {}", e))?;
54 Ok::<_, String>(conn)
55 })
56 .await
57 .map_err(|e| format!("spawn_blocking: {}", e))??;
58
59 Ok(Self {
60 conn: Arc::new(Mutex::new(conn)),
61 _phantom: PhantomData,
62 })
63 }
64 }
65
66 #[async_trait]
67 impl<E: Event> EventStoreBackend<E> for SqliteEventStoreBackend<E> {
68 async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
69 let conn = Arc::clone(&self.conn);
70 let agg_id = aggregate_id.to_string();
71 tokio::task::spawn_blocking(move || {
72 let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
73 let tx = conn
74 .unchecked_transaction()
75 .map_err(|e| format!("Begin tx: {}", e))?;
76 {
77 let mut stmt = tx
78 .prepare_cached(
79 "INSERT INTO events (aggregate_id, event_data) VALUES (?1, ?2)",
80 )
81 .map_err(|e| format!("Prepare: {}", e))?;
82 for event in &events {
83 let data = serde_json::to_vec(event)
84 .map_err(|e| format!("Serialize: {}", e))?;
85 stmt.execute(rusqlite::params![agg_id, data])
86 .map_err(|e| format!("Insert: {}", e))?;
87 }
88 }
89 tx.commit().map_err(|e| format!("Commit: {}", e))?;
90 Ok(())
91 })
92 .await
93 .map_err(|e| format!("spawn_blocking: {}", e))?
94 }
95
96 async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
97 let conn = Arc::clone(&self.conn);
98 let agg_id = aggregate_id.to_string();
99 tokio::task::spawn_blocking(move || {
100 let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
101 let mut stmt = conn
102 .prepare_cached(
103 "SELECT event_data FROM events WHERE aggregate_id = ?1 ORDER BY id",
104 )
105 .map_err(|e| format!("Prepare: {}", e))?;
106 let rows = stmt
107 .query_map(rusqlite::params![agg_id], |row| {
108 row.get::<_, Vec<u8>>(0)
109 })
110 .map_err(|e| format!("Query: {}", e))?;
111 let mut events = Vec::new();
112 for row in rows {
113 let data = row.map_err(|e| format!("Row: {}", e))?;
114 let event: E = serde_json::from_slice(&data)
115 .map_err(|e| format!("Deserialize: {}", e))?;
116 events.push(event);
117 }
118 Ok(events)
119 })
120 .await
121 .map_err(|e| format!("spawn_blocking: {}", e))?
122 }
123
124 async fn get_all_events(&self) -> Result<Vec<E>, String> {
125 let conn = Arc::clone(&self.conn);
126 tokio::task::spawn_blocking(move || {
127 let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
128 let mut stmt = conn
129 .prepare_cached("SELECT event_data FROM events ORDER BY id")
130 .map_err(|e| format!("Prepare: {}", e))?;
131 let rows = stmt
132 .query_map([], |row| row.get::<_, Vec<u8>>(0))
133 .map_err(|e| format!("Query: {}", e))?;
134 let mut events = Vec::new();
135 for row in rows {
136 let data = row.map_err(|e| format!("Row: {}", e))?;
137 let event: E = serde_json::from_slice(&data)
138 .map_err(|e| format!("Deserialize: {}", e))?;
139 events.push(event);
140 }
141 Ok(events)
142 })
143 .await
144 .map_err(|e| format!("spawn_blocking: {}", e))?
145 }
146
147 async fn get_events_after(
148 &self,
149 aggregate_id: &str,
150 version: u64,
151 ) -> Result<Vec<E>, String> {
152 let conn = Arc::clone(&self.conn);
153 let agg_id = aggregate_id.to_string();
154 tokio::task::spawn_blocking(move || {
155 let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
156 let mut stmt = conn
158 .prepare_cached(
159 "SELECT event_data FROM events WHERE aggregate_id = ?1 ORDER BY id LIMIT -1 OFFSET ?2",
160 )
161 .map_err(|e| format!("Prepare: {}", e))?;
162 let rows = stmt
163 .query_map(rusqlite::params![agg_id, version as i64], |row| {
164 row.get::<_, Vec<u8>>(0)
165 })
166 .map_err(|e| format!("Query: {}", e))?;
167 let mut events = Vec::new();
168 for row in rows {
169 let data = row.map_err(|e| format!("Row: {}", e))?;
170 let event: E = serde_json::from_slice(&data)
171 .map_err(|e| format!("Deserialize: {}", e))?;
172 events.push(event);
173 }
174 Ok(events)
175 })
176 .await
177 .map_err(|e| format!("spawn_blocking: {}", e))?
178 }
179
180 async fn save_snapshot(
181 &self,
182 aggregate_id: &str,
183 snapshot_data: Vec<u8>,
184 version: u64,
185 ) -> Result<(), String> {
186 let conn = Arc::clone(&self.conn);
187 let agg_id = aggregate_id.to_string();
188 tokio::task::spawn_blocking(move || {
189 let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
190 conn.execute(
191 "INSERT OR REPLACE INTO snapshots (aggregate_id, snapshot_data, version) VALUES (?1, ?2, ?3)",
192 rusqlite::params![agg_id, snapshot_data, version as i64],
193 )
194 .map_err(|e| format!("Snapshot save: {}", e))?;
195 Ok(())
196 })
197 .await
198 .map_err(|e| format!("spawn_blocking: {}", e))?
199 }
200
201 async fn get_latest_snapshot(&self, aggregate_id: &str) -> Result<(Vec<u8>, u64), String> {
202 let conn = Arc::clone(&self.conn);
203 let agg_id = aggregate_id.to_string();
204 tokio::task::spawn_blocking(move || {
205 let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
206 conn.query_row(
207 "SELECT snapshot_data, version FROM snapshots WHERE aggregate_id = ?1",
208 rusqlite::params![agg_id],
209 |row| {
210 let data: Vec<u8> = row.get(0)?;
211 let version: i64 = row.get(1)?;
212 Ok((data, version as u64))
213 },
214 )
215 .map_err(|e| format!("Snapshot get: {}", e))
216 })
217 .await
218 .map_err(|e| format!("spawn_blocking: {}", e))?
219 }
220
221 async fn flush(&self) -> Result<(), String> {
222 let conn = Arc::clone(&self.conn);
223 tokio::task::spawn_blocking(move || {
224 let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
225 conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
226 .map_err(|e| format!("Checkpoint: {}", e))?;
227 Ok(())
228 })
229 .await
230 .map_err(|e| format!("spawn_blocking: {}", e))?
231 }
232
233 async fn stats(&self) -> BackendStats {
234 let conn = Arc::clone(&self.conn);
235 tokio::task::spawn_blocking(move || {
236 let conn = match conn.lock() {
237 Ok(c) => c,
238 Err(_) => return BackendStats::default(),
239 };
240
241 let total_events: u64 = conn
242 .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
243 .unwrap_or(0);
244
245 let total_aggregates: u64 = conn
246 .query_row(
247 "SELECT COUNT(DISTINCT aggregate_id) FROM events",
248 [],
249 |row| row.get(0),
250 )
251 .unwrap_or(0);
252
253 let total_snapshots: u64 = conn
254 .query_row("SELECT COUNT(*) FROM snapshots", [], |row| row.get(0))
255 .unwrap_or(0);
256
257 let journal_mode: String = conn
258 .query_row("PRAGMA journal_mode", [], |row| row.get(0))
259 .unwrap_or_default();
260
261 let mut backend_specific = HashMap::new();
262 backend_specific.insert("journal_mode".to_string(), journal_mode);
263 backend_specific.insert("backend_type".to_string(), "sqlite".to_string());
264
265 BackendStats {
266 total_events,
267 total_aggregates,
268 total_snapshots,
269 backend_specific,
270 }
271 })
272 .await
273 .unwrap_or_default()
274 }
275 }
276}
277
278#[cfg(feature = "cqrs-sqlite")]
279pub use inner::*;
280
281#[cfg(not(feature = "cqrs-sqlite"))]
282mod placeholder {
283 use std::marker::PhantomData;
284
285 pub struct SqliteEventStoreBackend<E> {
287 _phantom: PhantomData<E>,
288 }
289}
290
291#[cfg(not(feature = "cqrs-sqlite"))]
292pub use placeholder::*;