1use azoth_core::{
2 error::{AzothError, Result},
3 traits::ProjectionStore,
4 types::EventId,
5 ProjectionConfig,
6};
7use rusqlite::{Connection, OpenFlags};
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11use crate::read_pool::SqliteReadPool;
12use crate::schema;
13use crate::txn::SimpleProjectionTxn;
14
15pub struct SqliteProjectionStore {
25 write_conn: Arc<Mutex<Connection>>,
27 read_conn: Arc<Mutex<Connection>>,
29 config: ProjectionConfig,
30 read_pool: Option<Arc<SqliteReadPool>>,
32}
33
34impl SqliteProjectionStore {
35 pub fn conn(&self) -> &Arc<Mutex<Connection>> {
40 &self.write_conn
41 }
42
43 fn open_read_connection(path: &Path, cfg: &ProjectionConfig) -> Result<Connection> {
45 let conn = Connection::open_with_flags(
46 path,
47 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
48 )
49 .map_err(|e| AzothError::Projection(e.to_string()))?;
50
51 conn.pragma_update(None, "cache_size", cfg.cache_size)
53 .map_err(|e| AzothError::Config(e.to_string()))?;
54
55 Ok(conn)
56 }
57
58 fn init_schema(conn: &Connection) -> Result<()> {
60 conn.execute(
62 "CREATE TABLE IF NOT EXISTS projection_meta (
63 id INTEGER PRIMARY KEY CHECK (id = 0),
64 last_applied_event_id INTEGER NOT NULL DEFAULT -1,
65 schema_version INTEGER NOT NULL,
66 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
67 )",
68 [],
69 )
70 .map_err(|e| AzothError::Projection(e.to_string()))?;
71
72 conn.execute(
75 "INSERT OR IGNORE INTO projection_meta (id, last_applied_event_id, schema_version)
76 VALUES (0, -1, 0)",
77 [],
78 )
79 .map_err(|e| AzothError::Projection(e.to_string()))?;
80
81 Ok(())
82 }
83
84 fn configure_connection(conn: &Connection, cfg: &ProjectionConfig) -> Result<()> {
86 if cfg.wal_mode {
88 conn.pragma_update(None, "journal_mode", "WAL")
89 .map_err(|e| AzothError::Config(e.to_string()))?;
90 }
91
92 let sync_mode = match cfg.synchronous {
94 azoth_core::config::SynchronousMode::Full => "FULL",
95 azoth_core::config::SynchronousMode::Normal => "NORMAL",
96 azoth_core::config::SynchronousMode::Off => "OFF",
97 };
98 conn.pragma_update(None, "synchronous", sync_mode)
99 .map_err(|e| AzothError::Config(e.to_string()))?;
100
101 conn.pragma_update(None, "foreign_keys", "ON")
103 .map_err(|e| AzothError::Config(e.to_string()))?;
104
105 conn.pragma_update(None, "cache_size", cfg.cache_size)
107 .map_err(|e| AzothError::Config(e.to_string()))?;
108
109 Ok(())
110 }
111
112 pub async fn query_async<F, R>(&self, f: F) -> Result<R>
125 where
126 F: FnOnce(&Connection) -> Result<R> + Send + 'static,
127 R: Send + 'static,
128 {
129 let conn = self.read_conn.clone();
130 tokio::task::spawn_blocking(move || {
131 let conn_guard = conn.lock().unwrap();
132 f(&conn_guard)
133 })
134 .await
135 .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?
136 }
137
138 pub fn query<F, R>(&self, f: F) -> Result<R>
151 where
152 F: FnOnce(&Connection) -> Result<R>,
153 {
154 let conn_guard = self.read_conn.lock().unwrap();
155 f(&conn_guard)
156 }
157
158 pub async fn execute_async<F>(&self, f: F) -> Result<()>
172 where
173 F: FnOnce(&Connection) -> Result<()> + Send + 'static,
174 {
175 let conn = self.write_conn.clone();
176 tokio::task::spawn_blocking(move || {
177 let conn_guard = conn.lock().unwrap();
178 f(&conn_guard)
179 })
180 .await
181 .map_err(|e| AzothError::Projection(format!("Execute task failed: {}", e)))?
182 }
183
184 pub fn execute<F>(&self, f: F) -> Result<()>
196 where
197 F: FnOnce(&Connection) -> Result<()>,
198 {
199 let conn_guard = self.write_conn.lock().unwrap();
200 f(&conn_guard)
201 }
202
203 pub fn transaction<F>(&self, f: F) -> Result<()>
218 where
219 F: FnOnce(&rusqlite::Transaction) -> Result<()>,
220 {
221 let mut conn_guard = self.write_conn.lock().unwrap();
222 let tx = conn_guard
223 .transaction()
224 .map_err(|e| AzothError::Projection(e.to_string()))?;
225
226 f(&tx)?;
227
228 tx.commit()
229 .map_err(|e| AzothError::Projection(e.to_string()))?;
230 Ok(())
231 }
232
233 pub async fn transaction_async<F>(&self, f: F) -> Result<()>
237 where
238 F: FnOnce(&rusqlite::Transaction) -> Result<()> + Send + 'static,
239 {
240 let conn = self.write_conn.clone();
241 tokio::task::spawn_blocking(move || {
242 let mut conn_guard = conn.lock().unwrap();
243 let tx = conn_guard
244 .transaction()
245 .map_err(|e| AzothError::Projection(e.to_string()))?;
246
247 f(&tx)?;
248
249 tx.commit()
250 .map_err(|e| AzothError::Projection(e.to_string()))?;
251 Ok(())
252 })
253 .await
254 .map_err(|e| AzothError::Projection(format!("Transaction task failed: {}", e)))?
255 }
256
257 pub fn read_pool(&self) -> Option<&Arc<SqliteReadPool>> {
261 self.read_pool.as_ref()
262 }
263
264 pub fn has_read_pool(&self) -> bool {
266 self.read_pool.is_some()
267 }
268
269 pub fn db_path(&self) -> &Path {
271 &self.config.path
272 }
273}
274
275impl ProjectionStore for SqliteProjectionStore {
276 type Txn<'a> = SimpleProjectionTxn<'a>;
277
278 fn open(cfg: ProjectionConfig) -> Result<Self> {
279 if let Some(parent) = cfg.path.parent() {
281 std::fs::create_dir_all(parent)?;
282 }
283
284 let write_conn = Connection::open_with_flags(
286 &cfg.path,
287 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
288 )
289 .map_err(|e| AzothError::Projection(e.to_string()))?;
290
291 Self::configure_connection(&write_conn, &cfg)?;
293
294 Self::init_schema(&write_conn)?;
296
297 let read_conn = Self::open_read_connection(&cfg.path, &cfg)?;
299
300 let read_pool = if cfg.read_pool.enabled {
302 Some(Arc::new(SqliteReadPool::new(
303 &cfg.path,
304 cfg.read_pool.clone(),
305 )?))
306 } else {
307 None
308 };
309
310 Ok(Self {
311 write_conn: Arc::new(Mutex::new(write_conn)),
312 read_conn: Arc::new(Mutex::new(read_conn)),
313 config: cfg,
314 read_pool,
315 })
316 }
317
318 fn close(&self) -> Result<()> {
319 Ok(())
321 }
322
323 fn begin_txn(&self) -> Result<Self::Txn<'_>> {
324 let guard = self.write_conn.lock().unwrap();
326 SimpleProjectionTxn::new(guard)
327 }
328
329 fn get_cursor(&self) -> Result<EventId> {
330 let conn = self.read_conn.lock().unwrap();
332 let cursor: i64 = conn
333 .query_row(
334 "SELECT last_applied_event_id FROM projection_meta WHERE id = 0",
335 [],
336 |row| row.get(0),
337 )
338 .map_err(|e| AzothError::Projection(e.to_string()))?;
339
340 Ok(cursor as EventId)
341 }
342
343 fn migrate(&self, target_version: u32) -> Result<()> {
344 let conn = self.write_conn.lock().unwrap();
345 schema::migrate(&conn, target_version)
346 }
347
348 fn backup_to(&self, path: &Path) -> Result<()> {
349 {
351 let conn = self.write_conn.lock().unwrap();
352 let mut stmt = conn
354 .prepare("PRAGMA wal_checkpoint(RESTART)")
355 .map_err(|e| AzothError::Projection(e.to_string()))?;
356 let mut rows = stmt
357 .query([])
358 .map_err(|e| AzothError::Projection(e.to_string()))?;
359 while let Ok(Some(_)) = rows.next() {}
361 }
362
363 let src_path = &self.config.path;
365
366 std::fs::copy(src_path, path)?;
368
369 Ok(())
370 }
371
372 fn restore_from(path: &Path, cfg: ProjectionConfig) -> Result<Self> {
373 std::fs::copy(path, &cfg.path)?;
375
376 Self::open(cfg)
378 }
379
380 fn schema_version(&self) -> Result<u32> {
381 let conn = self.read_conn.lock().unwrap();
383 let version: i64 = conn
384 .query_row(
385 "SELECT schema_version FROM projection_meta WHERE id = 0",
386 [],
387 |row| row.get(0),
388 )
389 .map_err(|e| AzothError::Projection(e.to_string()))?;
390
391 Ok(version as u32)
392 }
393}