1use azoth_core::{
2 error::{AzothError, Result},
3 traits::ProjectionStore,
4 types::EventId,
5 ProjectionConfig,
6};
7use parking_lot::Mutex;
8use rusqlite::{Connection, OpenFlags};
9use std::path::Path;
10use std::sync::Arc;
11
12use crate::read_pool::SqliteReadPool;
13use crate::schema;
14use crate::txn::SimpleProjectionTxn;
15
16pub struct SqliteProjectionStore {
26 write_conn: Arc<Mutex<Connection>>,
28 read_conn: Arc<Mutex<Connection>>,
30 config: ProjectionConfig,
31 read_pool: Option<Arc<SqliteReadPool>>,
33}
34
35impl SqliteProjectionStore {
36 pub fn conn(&self) -> &Arc<Mutex<Connection>> {
41 &self.write_conn
42 }
43
44 pub fn write_conn(&self) -> &Arc<Mutex<Connection>> {
50 &self.write_conn
51 }
52
53 fn open_read_connection(path: &Path, cfg: &ProjectionConfig) -> Result<Connection> {
55 let conn = Connection::open_with_flags(
56 path,
57 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
58 )
59 .map_err(|e| AzothError::Projection(e.to_string()))?;
60
61 conn.pragma_update(None, "cache_size", cfg.cache_size)
63 .map_err(|e| AzothError::Config(e.to_string()))?;
64
65 Ok(conn)
66 }
67
68 fn init_schema(conn: &Connection) -> Result<()> {
70 conn.execute(
72 "CREATE TABLE IF NOT EXISTS projection_meta (
73 id INTEGER PRIMARY KEY CHECK (id = 0),
74 last_applied_event_id INTEGER NOT NULL DEFAULT -1,
75 schema_version INTEGER NOT NULL,
76 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
77 )",
78 [],
79 )
80 .map_err(|e| AzothError::Projection(e.to_string()))?;
81
82 conn.execute(
85 "INSERT OR IGNORE INTO projection_meta (id, last_applied_event_id, schema_version)
86 VALUES (0, -1, 0)",
87 [],
88 )
89 .map_err(|e| AzothError::Projection(e.to_string()))?;
90
91 Ok(())
92 }
93
94 fn configure_connection(conn: &Connection, cfg: &ProjectionConfig) -> Result<()> {
96 if cfg.wal_mode {
98 conn.pragma_update(None, "journal_mode", "WAL")
99 .map_err(|e| AzothError::Config(e.to_string()))?;
100 }
101
102 let sync_mode = match cfg.synchronous {
104 azoth_core::config::SynchronousMode::Full => "FULL",
105 azoth_core::config::SynchronousMode::Normal => "NORMAL",
106 azoth_core::config::SynchronousMode::Off => "OFF",
107 };
108 conn.pragma_update(None, "synchronous", sync_mode)
109 .map_err(|e| AzothError::Config(e.to_string()))?;
110
111 conn.pragma_update(None, "foreign_keys", "ON")
113 .map_err(|e| AzothError::Config(e.to_string()))?;
114
115 conn.pragma_update(None, "cache_size", cfg.cache_size)
117 .map_err(|e| AzothError::Config(e.to_string()))?;
118
119 Ok(())
120 }
121
122 pub async fn query_async<F, R>(&self, f: F) -> Result<R>
135 where
136 F: FnOnce(&Connection) -> Result<R> + Send + 'static,
137 R: Send + 'static,
138 {
139 if let Some(pool) = &self.read_pool {
140 let pool = Arc::clone(pool);
141 return tokio::task::spawn_blocking(move || {
142 let conn = pool.acquire_blocking()?;
143 f(conn.connection())
144 })
145 .await
146 .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?;
147 }
148
149 let conn = self.read_conn.clone();
150 tokio::task::spawn_blocking(move || {
151 let conn_guard = conn.lock();
152 f(&conn_guard)
153 })
154 .await
155 .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?
156 }
157
158 pub fn query<F, R>(&self, f: F) -> Result<R>
171 where
172 F: FnOnce(&Connection) -> Result<R>,
173 {
174 if let Some(pool) = &self.read_pool {
175 let conn = pool.acquire_blocking()?;
176 return f(conn.connection());
177 }
178
179 let conn_guard = self.read_conn.lock();
180 f(&conn_guard)
181 }
182
183 pub async fn execute_async<F>(&self, f: F) -> Result<()>
197 where
198 F: FnOnce(&Connection) -> Result<()> + Send + 'static,
199 {
200 let conn = self.write_conn.clone();
201 tokio::task::spawn_blocking(move || {
202 let conn_guard = conn.lock();
203 f(&conn_guard)
204 })
205 .await
206 .map_err(|e| AzothError::Projection(format!("Execute task failed: {}", e)))?
207 }
208
209 pub fn execute<F>(&self, f: F) -> Result<()>
221 where
222 F: FnOnce(&Connection) -> Result<()>,
223 {
224 let conn_guard = self.write_conn.lock();
225 f(&conn_guard)
226 }
227
228 pub fn transaction<F>(&self, f: F) -> Result<()>
243 where
244 F: FnOnce(&rusqlite::Transaction) -> Result<()>,
245 {
246 let mut conn_guard = self.write_conn.lock();
247 let tx = conn_guard
248 .transaction()
249 .map_err(|e| AzothError::Projection(e.to_string()))?;
250
251 f(&tx)?;
252
253 tx.commit()
254 .map_err(|e| AzothError::Projection(e.to_string()))?;
255 Ok(())
256 }
257
258 pub async fn transaction_async<F>(&self, f: F) -> Result<()>
262 where
263 F: FnOnce(&rusqlite::Transaction) -> Result<()> + Send + 'static,
264 {
265 let conn = self.write_conn.clone();
266 tokio::task::spawn_blocking(move || {
267 let mut conn_guard = conn.lock();
268 let tx = conn_guard
269 .transaction()
270 .map_err(|e| AzothError::Projection(e.to_string()))?;
271
272 f(&tx)?;
273
274 tx.commit()
275 .map_err(|e| AzothError::Projection(e.to_string()))?;
276 Ok(())
277 })
278 .await
279 .map_err(|e| AzothError::Projection(format!("Transaction task failed: {}", e)))?
280 }
281
282 pub fn read_pool(&self) -> Option<&Arc<SqliteReadPool>> {
286 self.read_pool.as_ref()
287 }
288
289 pub fn has_read_pool(&self) -> bool {
291 self.read_pool.is_some()
292 }
293
294 pub fn db_path(&self) -> &Path {
296 &self.config.path
297 }
298}
299
300impl ProjectionStore for SqliteProjectionStore {
301 type Txn<'a> = SimpleProjectionTxn<'a>;
302
303 fn open(cfg: ProjectionConfig) -> Result<Self> {
304 if let Some(parent) = cfg.path.parent() {
306 std::fs::create_dir_all(parent)?;
307 }
308
309 let write_conn = Connection::open_with_flags(
311 &cfg.path,
312 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
313 )
314 .map_err(|e| AzothError::Projection(e.to_string()))?;
315
316 Self::configure_connection(&write_conn, &cfg)?;
318
319 Self::init_schema(&write_conn)?;
321
322 let read_conn = Self::open_read_connection(&cfg.path, &cfg)?;
324
325 let read_pool = if cfg.read_pool.enabled {
327 Some(Arc::new(SqliteReadPool::new(
328 &cfg.path,
329 cfg.read_pool.clone(),
330 )?))
331 } else {
332 None
333 };
334
335 Ok(Self {
336 write_conn: Arc::new(Mutex::new(write_conn)),
337 read_conn: Arc::new(Mutex::new(read_conn)),
338 config: cfg,
339 read_pool,
340 })
341 }
342
343 fn close(&self) -> Result<()> {
344 Ok(())
346 }
347
348 fn begin_txn(&self) -> Result<Self::Txn<'_>> {
349 let guard = self.write_conn.lock();
351 SimpleProjectionTxn::new(guard)
352 }
353
354 fn get_cursor(&self) -> Result<EventId> {
355 let conn = self.read_conn.lock();
357 let cursor: i64 = conn
358 .query_row(
359 "SELECT last_applied_event_id FROM projection_meta WHERE id = 0",
360 [],
361 |row| row.get(0),
362 )
363 .map_err(|e| AzothError::Projection(e.to_string()))?;
364
365 Ok(cursor as EventId)
366 }
367
368 fn migrate(&self, target_version: u32) -> Result<()> {
369 let conn = self.write_conn.lock();
370 schema::migrate(&conn, target_version)
371 }
372
373 fn backup_to(&self, path: &Path) -> Result<()> {
374 {
376 let conn = self.write_conn.lock();
377 let mut stmt = conn
379 .prepare("PRAGMA wal_checkpoint(RESTART)")
380 .map_err(|e| AzothError::Projection(e.to_string()))?;
381 let mut rows = stmt
382 .query([])
383 .map_err(|e| AzothError::Projection(e.to_string()))?;
384 while let Ok(Some(_)) = rows.next() {}
386 }
387
388 let src_path = &self.config.path;
390
391 std::fs::copy(src_path, path)?;
393
394 Ok(())
395 }
396
397 fn restore_from(path: &Path, cfg: ProjectionConfig) -> Result<Self> {
398 std::fs::copy(path, &cfg.path)?;
400
401 Self::open(cfg)
403 }
404
405 fn schema_version(&self) -> Result<u32> {
406 let conn = self.read_conn.lock();
408 let version: i64 = conn
409 .query_row(
410 "SELECT schema_version FROM projection_meta WHERE id = 0",
411 [],
412 |row| row.get(0),
413 )
414 .map_err(|e| AzothError::Projection(e.to_string()))?;
415
416 Ok(version as u32)
417 }
418}