Skip to main content

allframe_core/cqrs/
sqlite_backend.rs

1//! SQLite-backed event store backend for offline-first deployments
2//!
3//! Provides a persistent event store using SQLite with WAL mode for
4//! concurrent read/write access. All database operations use
5//! `tokio::task::spawn_blocking` since rusqlite is synchronous.
6
7#[cfg(feature = "cqrs-sqlite")]
8mod inner {
9    use std::collections::HashMap;
10    use std::marker::PhantomData;
11    use std::sync::{Arc, Mutex};
12
13    use async_trait::async_trait;
14    use rusqlite::Connection;
15
16    use super::super::{BackendStats, Event, EventStoreBackend};
17
18    /// SQLite-backed event store backend.
19    ///
20    /// Uses WAL journal mode for concurrent read/write access.
21    /// All operations are executed via `spawn_blocking` for async compatibility.
22    #[derive(Clone)]
23    pub struct SqliteEventStoreBackend<E: Event> {
24        conn: Arc<Mutex<Connection>>,
25        _phantom: PhantomData<E>,
26    }
27
28    impl<E: Event> SqliteEventStoreBackend<E> {
29        /// Create a new SQLite event store backend at the given path.
30        ///
31        /// Enables WAL journal mode and creates the `events` and `snapshots`
32        /// tables if they don't exist.
33        pub async fn new(path: &str) -> Result<Self, String> {
34            let path = path.to_string();
35            let conn = tokio::task::spawn_blocking(move || {
36                let conn = Connection::open(&path).map_err(|e| format!("SQLite open: {}", e))?;
37                conn.execute_batch("PRAGMA journal_mode=WAL;")
38                    .map_err(|e| format!("WAL pragma: {}", e))?;
39                conn.execute_batch(
40                    "CREATE TABLE IF NOT EXISTS events (
41                        id INTEGER PRIMARY KEY AUTOINCREMENT,
42                        aggregate_id TEXT NOT NULL,
43                        event_data BLOB NOT NULL,
44                        created_at TEXT NOT NULL DEFAULT (datetime('now'))
45                    );
46                    CREATE INDEX IF NOT EXISTS idx_events_aggregate ON events(aggregate_id);
47                    CREATE TABLE IF NOT EXISTS snapshots (
48                        aggregate_id TEXT PRIMARY KEY,
49                        snapshot_data BLOB NOT NULL,
50                        version INTEGER NOT NULL
51                    );",
52                )
53                .map_err(|e| format!("Schema init: {}", e))?;
54                Ok::<_, String>(conn)
55            })
56            .await
57            .map_err(|e| format!("spawn_blocking: {}", e))??;
58
59            Ok(Self {
60                conn: Arc::new(Mutex::new(conn)),
61                _phantom: PhantomData,
62            })
63        }
64    }
65
66    #[async_trait]
67    impl<E: Event> EventStoreBackend<E> for SqliteEventStoreBackend<E> {
68        async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
69            let conn = Arc::clone(&self.conn);
70            let agg_id = aggregate_id.to_string();
71            tokio::task::spawn_blocking(move || {
72                let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
73                let tx = conn
74                    .unchecked_transaction()
75                    .map_err(|e| format!("Begin tx: {}", e))?;
76                {
77                    let mut stmt = tx
78                        .prepare_cached(
79                            "INSERT INTO events (aggregate_id, event_data) VALUES (?1, ?2)",
80                        )
81                        .map_err(|e| format!("Prepare: {}", e))?;
82                    for event in &events {
83                        let data = serde_json::to_vec(event)
84                            .map_err(|e| format!("Serialize: {}", e))?;
85                        stmt.execute(rusqlite::params![agg_id, data])
86                            .map_err(|e| format!("Insert: {}", e))?;
87                    }
88                }
89                tx.commit().map_err(|e| format!("Commit: {}", e))?;
90                Ok(())
91            })
92            .await
93            .map_err(|e| format!("spawn_blocking: {}", e))?
94        }
95
96        async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
97            let conn = Arc::clone(&self.conn);
98            let agg_id = aggregate_id.to_string();
99            tokio::task::spawn_blocking(move || {
100                let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
101                let mut stmt = conn
102                    .prepare_cached(
103                        "SELECT event_data FROM events WHERE aggregate_id = ?1 ORDER BY id",
104                    )
105                    .map_err(|e| format!("Prepare: {}", e))?;
106                let rows = stmt
107                    .query_map(rusqlite::params![agg_id], |row| {
108                        row.get::<_, Vec<u8>>(0)
109                    })
110                    .map_err(|e| format!("Query: {}", e))?;
111                let mut events = Vec::new();
112                for row in rows {
113                    let data = row.map_err(|e| format!("Row: {}", e))?;
114                    let event: E = serde_json::from_slice(&data)
115                        .map_err(|e| format!("Deserialize: {}", e))?;
116                    events.push(event);
117                }
118                Ok(events)
119            })
120            .await
121            .map_err(|e| format!("spawn_blocking: {}", e))?
122        }
123
124        async fn get_all_events(&self) -> Result<Vec<E>, String> {
125            let conn = Arc::clone(&self.conn);
126            tokio::task::spawn_blocking(move || {
127                let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
128                let mut stmt = conn
129                    .prepare_cached("SELECT event_data FROM events ORDER BY id")
130                    .map_err(|e| format!("Prepare: {}", e))?;
131                let rows = stmt
132                    .query_map([], |row| row.get::<_, Vec<u8>>(0))
133                    .map_err(|e| format!("Query: {}", e))?;
134                let mut events = Vec::new();
135                for row in rows {
136                    let data = row.map_err(|e| format!("Row: {}", e))?;
137                    let event: E = serde_json::from_slice(&data)
138                        .map_err(|e| format!("Deserialize: {}", e))?;
139                    events.push(event);
140                }
141                Ok(events)
142            })
143            .await
144            .map_err(|e| format!("spawn_blocking: {}", e))?
145        }
146
147        async fn get_events_after(
148            &self,
149            aggregate_id: &str,
150            version: u64,
151        ) -> Result<Vec<E>, String> {
152            let conn = Arc::clone(&self.conn);
153            let agg_id = aggregate_id.to_string();
154            tokio::task::spawn_blocking(move || {
155                let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
156                // Events are 1-indexed by rowid within aggregate; we skip `version` events
157                let mut stmt = conn
158                    .prepare_cached(
159                        "SELECT event_data FROM events WHERE aggregate_id = ?1 ORDER BY id LIMIT -1 OFFSET ?2",
160                    )
161                    .map_err(|e| format!("Prepare: {}", e))?;
162                let rows = stmt
163                    .query_map(rusqlite::params![agg_id, version as i64], |row| {
164                        row.get::<_, Vec<u8>>(0)
165                    })
166                    .map_err(|e| format!("Query: {}", e))?;
167                let mut events = Vec::new();
168                for row in rows {
169                    let data = row.map_err(|e| format!("Row: {}", e))?;
170                    let event: E = serde_json::from_slice(&data)
171                        .map_err(|e| format!("Deserialize: {}", e))?;
172                    events.push(event);
173                }
174                Ok(events)
175            })
176            .await
177            .map_err(|e| format!("spawn_blocking: {}", e))?
178        }
179
180        async fn save_snapshot(
181            &self,
182            aggregate_id: &str,
183            snapshot_data: Vec<u8>,
184            version: u64,
185        ) -> Result<(), String> {
186            let conn = Arc::clone(&self.conn);
187            let agg_id = aggregate_id.to_string();
188            tokio::task::spawn_blocking(move || {
189                let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
190                conn.execute(
191                    "INSERT OR REPLACE INTO snapshots (aggregate_id, snapshot_data, version) VALUES (?1, ?2, ?3)",
192                    rusqlite::params![agg_id, snapshot_data, version as i64],
193                )
194                .map_err(|e| format!("Snapshot save: {}", e))?;
195                Ok(())
196            })
197            .await
198            .map_err(|e| format!("spawn_blocking: {}", e))?
199        }
200
201        async fn get_latest_snapshot(&self, aggregate_id: &str) -> Result<(Vec<u8>, u64), String> {
202            let conn = Arc::clone(&self.conn);
203            let agg_id = aggregate_id.to_string();
204            tokio::task::spawn_blocking(move || {
205                let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
206                conn.query_row(
207                    "SELECT snapshot_data, version FROM snapshots WHERE aggregate_id = ?1",
208                    rusqlite::params![agg_id],
209                    |row| {
210                        let data: Vec<u8> = row.get(0)?;
211                        let version: i64 = row.get(1)?;
212                        Ok((data, version as u64))
213                    },
214                )
215                .map_err(|e| format!("Snapshot get: {}", e))
216            })
217            .await
218            .map_err(|e| format!("spawn_blocking: {}", e))?
219        }
220
221        async fn flush(&self) -> Result<(), String> {
222            let conn = Arc::clone(&self.conn);
223            tokio::task::spawn_blocking(move || {
224                let conn = conn.lock().map_err(|e| format!("Lock: {}", e))?;
225                conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
226                    .map_err(|e| format!("Checkpoint: {}", e))?;
227                Ok(())
228            })
229            .await
230            .map_err(|e| format!("spawn_blocking: {}", e))?
231        }
232
233        async fn stats(&self) -> BackendStats {
234            let conn = Arc::clone(&self.conn);
235            tokio::task::spawn_blocking(move || {
236                let conn = match conn.lock() {
237                    Ok(c) => c,
238                    Err(_) => return BackendStats::default(),
239                };
240
241                let total_events: u64 = conn
242                    .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
243                    .unwrap_or(0);
244
245                let total_aggregates: u64 = conn
246                    .query_row(
247                        "SELECT COUNT(DISTINCT aggregate_id) FROM events",
248                        [],
249                        |row| row.get(0),
250                    )
251                    .unwrap_or(0);
252
253                let total_snapshots: u64 = conn
254                    .query_row("SELECT COUNT(*) FROM snapshots", [], |row| row.get(0))
255                    .unwrap_or(0);
256
257                let journal_mode: String = conn
258                    .query_row("PRAGMA journal_mode", [], |row| row.get(0))
259                    .unwrap_or_default();
260
261                let mut backend_specific = HashMap::new();
262                backend_specific.insert("journal_mode".to_string(), journal_mode);
263                backend_specific.insert("backend_type".to_string(), "sqlite".to_string());
264
265                BackendStats {
266                    total_events,
267                    total_aggregates,
268                    total_snapshots,
269                    backend_specific,
270                }
271            })
272            .await
273            .unwrap_or_default()
274        }
275    }
276}
277
278#[cfg(feature = "cqrs-sqlite")]
279pub use inner::*;
280
281#[cfg(not(feature = "cqrs-sqlite"))]
282mod placeholder {
283    use std::marker::PhantomData;
284
285    /// Placeholder when `cqrs-sqlite` feature is not enabled.
286    pub struct SqliteEventStoreBackend<E> {
287        _phantom: PhantomData<E>,
288    }
289}
290
291#[cfg(not(feature = "cqrs-sqlite"))]
292pub use placeholder::*;