1use azoth_core::{
2 error::{AzothError, Result},
3 traits::ProjectionStore,
4 types::EventId,
5 ProjectionConfig,
6};
7use rusqlite::{Connection, OpenFlags};
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11use crate::read_pool::SqliteReadPool;
12use crate::schema;
13use crate::txn::SimpleProjectionTxn;
14
15pub struct SqliteProjectionStore {
25 write_conn: Arc<Mutex<Connection>>,
27 read_conn: Arc<Mutex<Connection>>,
29 config: ProjectionConfig,
30 read_pool: Option<Arc<SqliteReadPool>>,
32}
33
34impl SqliteProjectionStore {
35 pub fn conn(&self) -> &Arc<Mutex<Connection>> {
40 &self.write_conn
41 }
42
43 fn open_read_connection(path: &Path, cfg: &ProjectionConfig) -> Result<Connection> {
45 let conn = Connection::open_with_flags(
46 path,
47 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
48 )
49 .map_err(|e| AzothError::Projection(e.to_string()))?;
50
51 conn.pragma_update(None, "cache_size", cfg.cache_size)
53 .map_err(|e| AzothError::Config(e.to_string()))?;
54
55 Ok(conn)
56 }
57
58 fn init_schema(conn: &Connection) -> Result<()> {
60 conn.execute(
62 "CREATE TABLE IF NOT EXISTS projection_meta (
63 id INTEGER PRIMARY KEY CHECK (id = 0),
64 last_applied_event_id INTEGER NOT NULL DEFAULT -1,
65 schema_version INTEGER NOT NULL,
66 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
67 )",
68 [],
69 )
70 .map_err(|e| AzothError::Projection(e.to_string()))?;
71
72 conn.execute(
75 "INSERT OR IGNORE INTO projection_meta (id, last_applied_event_id, schema_version)
76 VALUES (0, -1, 0)",
77 [],
78 )
79 .map_err(|e| AzothError::Projection(e.to_string()))?;
80
81 Ok(())
82 }
83
84 fn configure_connection(conn: &Connection, cfg: &ProjectionConfig) -> Result<()> {
86 if cfg.wal_mode {
88 conn.pragma_update(None, "journal_mode", "WAL")
89 .map_err(|e| AzothError::Config(e.to_string()))?;
90 }
91
92 let sync_mode = match cfg.synchronous {
94 azoth_core::config::SynchronousMode::Full => "FULL",
95 azoth_core::config::SynchronousMode::Normal => "NORMAL",
96 azoth_core::config::SynchronousMode::Off => "OFF",
97 };
98 conn.pragma_update(None, "synchronous", sync_mode)
99 .map_err(|e| AzothError::Config(e.to_string()))?;
100
101 conn.pragma_update(None, "foreign_keys", "ON")
103 .map_err(|e| AzothError::Config(e.to_string()))?;
104
105 conn.pragma_update(None, "cache_size", cfg.cache_size)
107 .map_err(|e| AzothError::Config(e.to_string()))?;
108
109 Ok(())
110 }
111
112 pub async fn query_async<F, R>(&self, f: F) -> Result<R>
125 where
126 F: FnOnce(&Connection) -> Result<R> + Send + 'static,
127 R: Send + 'static,
128 {
129 if let Some(pool) = &self.read_pool {
130 let pool = Arc::clone(pool);
131 return tokio::task::spawn_blocking(move || {
132 let conn = pool.acquire_blocking()?;
133 f(conn.connection())
134 })
135 .await
136 .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?;
137 }
138
139 let conn = self.read_conn.clone();
140 tokio::task::spawn_blocking(move || {
141 let conn_guard = conn.lock().unwrap();
142 f(&conn_guard)
143 })
144 .await
145 .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?
146 }
147
148 pub fn query<F, R>(&self, f: F) -> Result<R>
161 where
162 F: FnOnce(&Connection) -> Result<R>,
163 {
164 if let Some(pool) = &self.read_pool {
165 let conn = pool.acquire_blocking()?;
166 return f(conn.connection());
167 }
168
169 let conn_guard = self.read_conn.lock().unwrap();
170 f(&conn_guard)
171 }
172
173 pub async fn execute_async<F>(&self, f: F) -> Result<()>
187 where
188 F: FnOnce(&Connection) -> Result<()> + Send + 'static,
189 {
190 let conn = self.write_conn.clone();
191 tokio::task::spawn_blocking(move || {
192 let conn_guard = conn.lock().unwrap();
193 f(&conn_guard)
194 })
195 .await
196 .map_err(|e| AzothError::Projection(format!("Execute task failed: {}", e)))?
197 }
198
199 pub fn execute<F>(&self, f: F) -> Result<()>
211 where
212 F: FnOnce(&Connection) -> Result<()>,
213 {
214 let conn_guard = self.write_conn.lock().unwrap();
215 f(&conn_guard)
216 }
217
218 pub fn transaction<F>(&self, f: F) -> Result<()>
233 where
234 F: FnOnce(&rusqlite::Transaction) -> Result<()>,
235 {
236 let mut conn_guard = self.write_conn.lock().unwrap();
237 let tx = conn_guard
238 .transaction()
239 .map_err(|e| AzothError::Projection(e.to_string()))?;
240
241 f(&tx)?;
242
243 tx.commit()
244 .map_err(|e| AzothError::Projection(e.to_string()))?;
245 Ok(())
246 }
247
248 pub async fn transaction_async<F>(&self, f: F) -> Result<()>
252 where
253 F: FnOnce(&rusqlite::Transaction) -> Result<()> + Send + 'static,
254 {
255 let conn = self.write_conn.clone();
256 tokio::task::spawn_blocking(move || {
257 let mut conn_guard = conn.lock().unwrap();
258 let tx = conn_guard
259 .transaction()
260 .map_err(|e| AzothError::Projection(e.to_string()))?;
261
262 f(&tx)?;
263
264 tx.commit()
265 .map_err(|e| AzothError::Projection(e.to_string()))?;
266 Ok(())
267 })
268 .await
269 .map_err(|e| AzothError::Projection(format!("Transaction task failed: {}", e)))?
270 }
271
272 pub fn read_pool(&self) -> Option<&Arc<SqliteReadPool>> {
276 self.read_pool.as_ref()
277 }
278
279 pub fn has_read_pool(&self) -> bool {
281 self.read_pool.is_some()
282 }
283
284 pub fn db_path(&self) -> &Path {
286 &self.config.path
287 }
288}
289
290impl ProjectionStore for SqliteProjectionStore {
291 type Txn<'a> = SimpleProjectionTxn<'a>;
292
293 fn open(cfg: ProjectionConfig) -> Result<Self> {
294 if let Some(parent) = cfg.path.parent() {
296 std::fs::create_dir_all(parent)?;
297 }
298
299 let write_conn = Connection::open_with_flags(
301 &cfg.path,
302 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
303 )
304 .map_err(|e| AzothError::Projection(e.to_string()))?;
305
306 Self::configure_connection(&write_conn, &cfg)?;
308
309 Self::init_schema(&write_conn)?;
311
312 let read_conn = Self::open_read_connection(&cfg.path, &cfg)?;
314
315 let read_pool = if cfg.read_pool.enabled {
317 Some(Arc::new(SqliteReadPool::new(
318 &cfg.path,
319 cfg.read_pool.clone(),
320 )?))
321 } else {
322 None
323 };
324
325 Ok(Self {
326 write_conn: Arc::new(Mutex::new(write_conn)),
327 read_conn: Arc::new(Mutex::new(read_conn)),
328 config: cfg,
329 read_pool,
330 })
331 }
332
333 fn close(&self) -> Result<()> {
334 Ok(())
336 }
337
338 fn begin_txn(&self) -> Result<Self::Txn<'_>> {
339 let guard = self.write_conn.lock().unwrap();
341 SimpleProjectionTxn::new(guard)
342 }
343
344 fn get_cursor(&self) -> Result<EventId> {
345 let conn = self.read_conn.lock().unwrap();
347 let cursor: i64 = conn
348 .query_row(
349 "SELECT last_applied_event_id FROM projection_meta WHERE id = 0",
350 [],
351 |row| row.get(0),
352 )
353 .map_err(|e| AzothError::Projection(e.to_string()))?;
354
355 Ok(cursor as EventId)
356 }
357
358 fn migrate(&self, target_version: u32) -> Result<()> {
359 let conn = self.write_conn.lock().unwrap();
360 schema::migrate(&conn, target_version)
361 }
362
363 fn backup_to(&self, path: &Path) -> Result<()> {
364 {
366 let conn = self.write_conn.lock().unwrap();
367 let mut stmt = conn
369 .prepare("PRAGMA wal_checkpoint(RESTART)")
370 .map_err(|e| AzothError::Projection(e.to_string()))?;
371 let mut rows = stmt
372 .query([])
373 .map_err(|e| AzothError::Projection(e.to_string()))?;
374 while let Ok(Some(_)) = rows.next() {}
376 }
377
378 let src_path = &self.config.path;
380
381 std::fs::copy(src_path, path)?;
383
384 Ok(())
385 }
386
387 fn restore_from(path: &Path, cfg: ProjectionConfig) -> Result<Self> {
388 std::fs::copy(path, &cfg.path)?;
390
391 Self::open(cfg)
393 }
394
395 fn schema_version(&self) -> Result<u32> {
396 let conn = self.read_conn.lock().unwrap();
398 let version: i64 = conn
399 .query_row(
400 "SELECT schema_version FROM projection_meta WHERE id = 0",
401 [],
402 |row| row.get(0),
403 )
404 .map_err(|e| AzothError::Projection(e.to_string()))?;
405
406 Ok(version as u32)
407 }
408}