Skip to main content

azoth_sqlite/
store.rs

1use azoth_core::{
2    error::{AzothError, Result},
3    traits::ProjectionStore,
4    types::EventId,
5    ProjectionConfig,
6};
7use rusqlite::{Connection, OpenFlags};
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11use crate::read_pool::SqliteReadPool;
12use crate::schema;
13use crate::txn::SimpleProjectionTxn;
14
15/// SQLite-backed projection store
16///
17/// Uses separate connections for reads and writes. SQLite WAL mode allows
18/// readers and writers to operate concurrently at the database level.
19/// While each connection still needs mutex protection (rusqlite::Connection
20/// is not Sync), using separate connections means reads don't block writes
21/// and vice versa.
22///
23/// Optionally supports a read connection pool for higher concurrency.
24pub struct SqliteProjectionStore {
25    /// Write connection (for transactions, schema operations, and writes)
26    write_conn: Arc<Mutex<Connection>>,
27    /// Read connection (for read-only queries, separate from write connection)
28    read_conn: Arc<Mutex<Connection>>,
29    config: ProjectionConfig,
30    /// Optional read pool for concurrent reads
31    read_pool: Option<Arc<SqliteReadPool>>,
32}
33
34impl SqliteProjectionStore {
35    /// Get the underlying write connection (for migrations and custom queries)
36    ///
37    /// Returns an Arc to the Mutex-protected SQLite connection.
38    /// Users should lock the mutex to access the connection.
39    pub fn conn(&self) -> &Arc<Mutex<Connection>> {
40        &self.write_conn
41    }
42
43    /// Open a read-only connection to the same database
44    fn open_read_connection(path: &Path, cfg: &ProjectionConfig) -> Result<Connection> {
45        let conn = Connection::open_with_flags(
46            path,
47            OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
48        )
49        .map_err(|e| AzothError::Projection(e.to_string()))?;
50
51        // cache_size affects read performance
52        conn.pragma_update(None, "cache_size", cfg.cache_size)
53            .map_err(|e| AzothError::Config(e.to_string()))?;
54
55        Ok(conn)
56    }
57
58    /// Initialize schema if needed
59    fn init_schema(conn: &Connection) -> Result<()> {
60        // Create projection_meta table
61        conn.execute(
62            "CREATE TABLE IF NOT EXISTS projection_meta (
63                id INTEGER PRIMARY KEY CHECK (id = 0),
64                last_applied_event_id INTEGER NOT NULL DEFAULT -1,
65                schema_version INTEGER NOT NULL,
66                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
67            )",
68            [],
69        )
70        .map_err(|e| AzothError::Projection(e.to_string()))?;
71
72        // Insert default row if not exists (-1 means no events processed yet)
73        // schema_version starts at 0 so that migrations starting from version 1 will be applied
74        conn.execute(
75            "INSERT OR IGNORE INTO projection_meta (id, last_applied_event_id, schema_version)
76             VALUES (0, -1, 0)",
77            [],
78        )
79        .map_err(|e| AzothError::Projection(e.to_string()))?;
80
81        Ok(())
82    }
83
84    /// Configure SQLite connection
85    fn configure_connection(conn: &Connection, cfg: &ProjectionConfig) -> Result<()> {
86        // Enable WAL mode
87        if cfg.wal_mode {
88            conn.pragma_update(None, "journal_mode", "WAL")
89                .map_err(|e| AzothError::Config(e.to_string()))?;
90        }
91
92        // Set synchronous mode
93        let sync_mode = match cfg.synchronous {
94            azoth_core::config::SynchronousMode::Full => "FULL",
95            azoth_core::config::SynchronousMode::Normal => "NORMAL",
96            azoth_core::config::SynchronousMode::Off => "OFF",
97        };
98        conn.pragma_update(None, "synchronous", sync_mode)
99            .map_err(|e| AzothError::Config(e.to_string()))?;
100
101        // Enable foreign keys
102        conn.pragma_update(None, "foreign_keys", "ON")
103            .map_err(|e| AzothError::Config(e.to_string()))?;
104
105        // Set cache size
106        conn.pragma_update(None, "cache_size", cfg.cache_size)
107            .map_err(|e| AzothError::Config(e.to_string()))?;
108
109        Ok(())
110    }
111
112    /// Execute a read-only SQL query asynchronously
113    ///
114    /// This method runs the query on a separate thread to avoid blocking,
115    /// making it safe to call from async contexts. Uses a read-only connection
116    /// that doesn't block writes.
117    ///
118    /// # Example
119    /// ```ignore
120    /// let balance: i64 = store.query_async(|conn| {
121    ///     conn.query_row("SELECT balance FROM accounts WHERE id = ?1", [account_id], |row| row.get(0))
122    /// }).await?;
123    /// ```
124    pub async fn query_async<F, R>(&self, f: F) -> Result<R>
125    where
126        F: FnOnce(&Connection) -> Result<R> + Send + 'static,
127        R: Send + 'static,
128    {
129        if let Some(pool) = &self.read_pool {
130            let pool = Arc::clone(pool);
131            return tokio::task::spawn_blocking(move || {
132                let conn = pool.acquire_blocking()?;
133                f(conn.connection())
134            })
135            .await
136            .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?;
137        }
138
139        let conn = self.read_conn.clone();
140        tokio::task::spawn_blocking(move || {
141            let conn_guard = conn.lock().unwrap();
142            f(&conn_guard)
143        })
144        .await
145        .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?
146    }
147
148    /// Execute a read-only SQL query synchronously
149    ///
150    /// This is a convenience method for non-async contexts.
151    /// For async contexts, prefer `query_async`. Uses a read-only connection
152    /// that doesn't block writes.
153    ///
154    /// # Example
155    /// ```ignore
156    /// let balance: i64 = store.query(|conn| {
157    ///     conn.query_row("SELECT balance FROM accounts WHERE id = ?1", [account_id], |row| row.get(0))
158    /// })?;
159    /// ```
160    pub fn query<F, R>(&self, f: F) -> Result<R>
161    where
162        F: FnOnce(&Connection) -> Result<R>,
163    {
164        if let Some(pool) = &self.read_pool {
165            let conn = pool.acquire_blocking()?;
166            return f(conn.connection());
167        }
168
169        let conn_guard = self.read_conn.lock().unwrap();
170        f(&conn_guard)
171    }
172
173    /// Execute arbitrary SQL statements (DDL/DML) asynchronously
174    ///
175    /// Useful for creating tables, indexes, or performing bulk updates.
176    /// Uses the write connection.
177    ///
178    /// # Example
179    /// ```ignore
180    /// store.execute_async(|conn| {
181    ///     conn.execute("CREATE TABLE IF NOT EXISTS balances (id INTEGER PRIMARY KEY, amount INTEGER)", [])?;
182    ///     conn.execute("CREATE INDEX IF NOT EXISTS idx_amount ON balances(amount)", [])?;
183    ///     Ok(())
184    /// }).await?;
185    /// ```
186    pub async fn execute_async<F>(&self, f: F) -> Result<()>
187    where
188        F: FnOnce(&Connection) -> Result<()> + Send + 'static,
189    {
190        let conn = self.write_conn.clone();
191        tokio::task::spawn_blocking(move || {
192            let conn_guard = conn.lock().unwrap();
193            f(&conn_guard)
194        })
195        .await
196        .map_err(|e| AzothError::Projection(format!("Execute task failed: {}", e)))?
197    }
198
199    /// Execute arbitrary SQL statements (DDL/DML) synchronously
200    ///
201    /// Uses the write connection.
202    ///
203    /// # Example
204    /// ```ignore
205    /// store.execute(|conn| {
206    ///     conn.execute("CREATE TABLE IF NOT EXISTS balances (id INTEGER PRIMARY KEY, amount INTEGER)", [])?;
207    ///     Ok(())
208    /// })?;
209    /// ```
210    pub fn execute<F>(&self, f: F) -> Result<()>
211    where
212        F: FnOnce(&Connection) -> Result<()>,
213    {
214        let conn_guard = self.write_conn.lock().unwrap();
215        f(&conn_guard)
216    }
217
218    /// Execute a transaction with multiple SQL statements
219    ///
220    /// The closure receives a transaction object and can execute multiple
221    /// statements atomically. If the closure returns an error, the transaction
222    /// is rolled back. Uses the write connection.
223    ///
224    /// # Example
225    /// ```ignore
226    /// store.transaction(|tx| {
227    ///     tx.execute("INSERT INTO accounts (id, balance) VALUES (?1, ?2)", params![1, 100])?;
228    ///     tx.execute("INSERT INTO accounts (id, balance) VALUES (?1, ?2)", params![2, 200])?;
229    ///     Ok(())
230    /// })?;
231    /// ```
232    pub fn transaction<F>(&self, f: F) -> Result<()>
233    where
234        F: FnOnce(&rusqlite::Transaction) -> Result<()>,
235    {
236        let mut conn_guard = self.write_conn.lock().unwrap();
237        let tx = conn_guard
238            .transaction()
239            .map_err(|e| AzothError::Projection(e.to_string()))?;
240
241        f(&tx)?;
242
243        tx.commit()
244            .map_err(|e| AzothError::Projection(e.to_string()))?;
245        Ok(())
246    }
247
248    /// Execute a transaction asynchronously
249    ///
250    /// Uses the write connection.
251    pub async fn transaction_async<F>(&self, f: F) -> Result<()>
252    where
253        F: FnOnce(&rusqlite::Transaction) -> Result<()> + Send + 'static,
254    {
255        let conn = self.write_conn.clone();
256        tokio::task::spawn_blocking(move || {
257            let mut conn_guard = conn.lock().unwrap();
258            let tx = conn_guard
259                .transaction()
260                .map_err(|e| AzothError::Projection(e.to_string()))?;
261
262            f(&tx)?;
263
264            tx.commit()
265                .map_err(|e| AzothError::Projection(e.to_string()))?;
266            Ok(())
267        })
268        .await
269        .map_err(|e| AzothError::Projection(format!("Transaction task failed: {}", e)))?
270    }
271
272    /// Get reference to the read pool (if enabled)
273    ///
274    /// Returns None if read pooling was not enabled in config.
275    pub fn read_pool(&self) -> Option<&Arc<SqliteReadPool>> {
276        self.read_pool.as_ref()
277    }
278
279    /// Check if read pooling is enabled
280    pub fn has_read_pool(&self) -> bool {
281        self.read_pool.is_some()
282    }
283
284    /// Get the database path
285    pub fn db_path(&self) -> &Path {
286        &self.config.path
287    }
288}
289
290impl ProjectionStore for SqliteProjectionStore {
291    type Txn<'a> = SimpleProjectionTxn<'a>;
292
293    fn open(cfg: ProjectionConfig) -> Result<Self> {
294        // Create parent directory if needed
295        if let Some(parent) = cfg.path.parent() {
296            std::fs::create_dir_all(parent)?;
297        }
298
299        // Open write connection
300        let write_conn = Connection::open_with_flags(
301            &cfg.path,
302            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
303        )
304        .map_err(|e| AzothError::Projection(e.to_string()))?;
305
306        // Configure write connection
307        Self::configure_connection(&write_conn, &cfg)?;
308
309        // Initialize schema
310        Self::init_schema(&write_conn)?;
311
312        // Open separate read connection for concurrent reads
313        let read_conn = Self::open_read_connection(&cfg.path, &cfg)?;
314
315        // Initialize read pool if enabled
316        let read_pool = if cfg.read_pool.enabled {
317            Some(Arc::new(SqliteReadPool::new(
318                &cfg.path,
319                cfg.read_pool.clone(),
320            )?))
321        } else {
322            None
323        };
324
325        Ok(Self {
326            write_conn: Arc::new(Mutex::new(write_conn)),
327            read_conn: Arc::new(Mutex::new(read_conn)),
328            config: cfg,
329            read_pool,
330        })
331    }
332
333    fn close(&self) -> Result<()> {
334        // SQLite connections close automatically on drop
335        Ok(())
336    }
337
338    fn begin_txn(&self) -> Result<Self::Txn<'_>> {
339        // Begin exclusive transaction using SimpleProjectionTxn (uses write connection)
340        let guard = self.write_conn.lock().unwrap();
341        SimpleProjectionTxn::new(guard)
342    }
343
344    fn get_cursor(&self) -> Result<EventId> {
345        // Use read connection for this read-only operation
346        let conn = self.read_conn.lock().unwrap();
347        let cursor: i64 = conn
348            .query_row(
349                "SELECT last_applied_event_id FROM projection_meta WHERE id = 0",
350                [],
351                |row| row.get(0),
352            )
353            .map_err(|e| AzothError::Projection(e.to_string()))?;
354
355        Ok(cursor as EventId)
356    }
357
358    fn migrate(&self, target_version: u32) -> Result<()> {
359        let conn = self.write_conn.lock().unwrap();
360        schema::migrate(&conn, target_version)
361    }
362
363    fn backup_to(&self, path: &Path) -> Result<()> {
364        // Checkpoint WAL to flush all changes to the main database file
365        {
366            let conn = self.write_conn.lock().unwrap();
367            // Execute checkpoint with full iteration of results
368            let mut stmt = conn
369                .prepare("PRAGMA wal_checkpoint(RESTART)")
370                .map_err(|e| AzothError::Projection(e.to_string()))?;
371            let mut rows = stmt
372                .query([])
373                .map_err(|e| AzothError::Projection(e.to_string()))?;
374            // Consume all rows to ensure checkpoint completes
375            while let Ok(Some(_)) = rows.next() {}
376        }
377
378        // Get source path
379        let src_path = &self.config.path;
380
381        // Copy database file (now includes all changes from WAL)
382        std::fs::copy(src_path, path)?;
383
384        Ok(())
385    }
386
387    fn restore_from(path: &Path, cfg: ProjectionConfig) -> Result<Self> {
388        // Copy backup file to target location
389        std::fs::copy(path, &cfg.path)?;
390
391        // Open the restored database
392        Self::open(cfg)
393    }
394
395    fn schema_version(&self) -> Result<u32> {
396        // Use read connection for this read-only operation
397        let conn = self.read_conn.lock().unwrap();
398        let version: i64 = conn
399            .query_row(
400                "SELECT schema_version FROM projection_meta WHERE id = 0",
401                [],
402                |row| row.get(0),
403            )
404            .map_err(|e| AzothError::Projection(e.to_string()))?;
405
406        Ok(version as u32)
407    }
408}