Skip to main content

azoth_sqlite/
store.rs

1use azoth_core::{
2    error::{AzothError, Result},
3    traits::ProjectionStore,
4    types::EventId,
5    ProjectionConfig,
6};
7use rusqlite::{Connection, OpenFlags};
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11use crate::read_pool::SqliteReadPool;
12use crate::schema;
13use crate::txn::SimpleProjectionTxn;
14
15/// SQLite-backed projection store
16///
17/// Uses separate connections for reads and writes. SQLite WAL mode allows
18/// readers and writers to operate concurrently at the database level.
19/// While each connection still needs mutex protection (rusqlite::Connection
20/// is not Sync), using separate connections means reads don't block writes
21/// and vice versa.
22///
23/// Optionally supports a read connection pool for higher concurrency.
24pub struct SqliteProjectionStore {
25    /// Write connection (for transactions, schema operations, and writes)
26    write_conn: Arc<Mutex<Connection>>,
27    /// Read connection (for read-only queries, separate from write connection)
28    read_conn: Arc<Mutex<Connection>>,
29    config: ProjectionConfig,
30    /// Optional read pool for concurrent reads
31    read_pool: Option<Arc<SqliteReadPool>>,
32}
33
34impl SqliteProjectionStore {
35    /// Get the underlying write connection (for migrations and custom queries)
36    ///
37    /// Returns an Arc to the Mutex-protected SQLite connection.
38    /// Users should lock the mutex to access the connection.
39    pub fn conn(&self) -> &Arc<Mutex<Connection>> {
40        &self.write_conn
41    }
42
43    /// Open a read-only connection to the same database
44    fn open_read_connection(path: &Path, cfg: &ProjectionConfig) -> Result<Connection> {
45        let conn = Connection::open_with_flags(
46            path,
47            OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
48        )
49        .map_err(|e| AzothError::Projection(e.to_string()))?;
50
51        // cache_size affects read performance
52        conn.pragma_update(None, "cache_size", cfg.cache_size)
53            .map_err(|e| AzothError::Config(e.to_string()))?;
54
55        Ok(conn)
56    }
57
58    /// Initialize schema if needed
59    fn init_schema(conn: &Connection) -> Result<()> {
60        // Create projection_meta table
61        conn.execute(
62            "CREATE TABLE IF NOT EXISTS projection_meta (
63                id INTEGER PRIMARY KEY CHECK (id = 0),
64                last_applied_event_id INTEGER NOT NULL DEFAULT -1,
65                schema_version INTEGER NOT NULL,
66                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
67            )",
68            [],
69        )
70        .map_err(|e| AzothError::Projection(e.to_string()))?;
71
72        // Insert default row if not exists (-1 means no events processed yet)
73        // schema_version starts at 0 so that migrations starting from version 1 will be applied
74        conn.execute(
75            "INSERT OR IGNORE INTO projection_meta (id, last_applied_event_id, schema_version)
76             VALUES (0, -1, 0)",
77            [],
78        )
79        .map_err(|e| AzothError::Projection(e.to_string()))?;
80
81        Ok(())
82    }
83
84    /// Configure SQLite connection
85    fn configure_connection(conn: &Connection, cfg: &ProjectionConfig) -> Result<()> {
86        // Enable WAL mode
87        if cfg.wal_mode {
88            conn.pragma_update(None, "journal_mode", "WAL")
89                .map_err(|e| AzothError::Config(e.to_string()))?;
90        }
91
92        // Set synchronous mode
93        let sync_mode = match cfg.synchronous {
94            azoth_core::config::SynchronousMode::Full => "FULL",
95            azoth_core::config::SynchronousMode::Normal => "NORMAL",
96            azoth_core::config::SynchronousMode::Off => "OFF",
97        };
98        conn.pragma_update(None, "synchronous", sync_mode)
99            .map_err(|e| AzothError::Config(e.to_string()))?;
100
101        // Enable foreign keys
102        conn.pragma_update(None, "foreign_keys", "ON")
103            .map_err(|e| AzothError::Config(e.to_string()))?;
104
105        // Set cache size
106        conn.pragma_update(None, "cache_size", cfg.cache_size)
107            .map_err(|e| AzothError::Config(e.to_string()))?;
108
109        Ok(())
110    }
111
112    /// Execute a read-only SQL query asynchronously
113    ///
114    /// This method runs the query on a separate thread to avoid blocking,
115    /// making it safe to call from async contexts. Uses a read-only connection
116    /// that doesn't block writes.
117    ///
118    /// # Example
119    /// ```ignore
120    /// let balance: i64 = store.query_async(|conn| {
121    ///     conn.query_row("SELECT balance FROM accounts WHERE id = ?1", [account_id], |row| row.get(0))
122    /// }).await?;
123    /// ```
124    pub async fn query_async<F, R>(&self, f: F) -> Result<R>
125    where
126        F: FnOnce(&Connection) -> Result<R> + Send + 'static,
127        R: Send + 'static,
128    {
129        let conn = self.read_conn.clone();
130        tokio::task::spawn_blocking(move || {
131            let conn_guard = conn.lock().unwrap();
132            f(&conn_guard)
133        })
134        .await
135        .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?
136    }
137
138    /// Execute a read-only SQL query synchronously
139    ///
140    /// This is a convenience method for non-async contexts.
141    /// For async contexts, prefer `query_async`. Uses a read-only connection
142    /// that doesn't block writes.
143    ///
144    /// # Example
145    /// ```ignore
146    /// let balance: i64 = store.query(|conn| {
147    ///     conn.query_row("SELECT balance FROM accounts WHERE id = ?1", [account_id], |row| row.get(0))
148    /// })?;
149    /// ```
150    pub fn query<F, R>(&self, f: F) -> Result<R>
151    where
152        F: FnOnce(&Connection) -> Result<R>,
153    {
154        let conn_guard = self.read_conn.lock().unwrap();
155        f(&conn_guard)
156    }
157
158    /// Execute arbitrary SQL statements (DDL/DML) asynchronously
159    ///
160    /// Useful for creating tables, indexes, or performing bulk updates.
161    /// Uses the write connection.
162    ///
163    /// # Example
164    /// ```ignore
165    /// store.execute_async(|conn| {
166    ///     conn.execute("CREATE TABLE IF NOT EXISTS balances (id INTEGER PRIMARY KEY, amount INTEGER)", [])?;
167    ///     conn.execute("CREATE INDEX IF NOT EXISTS idx_amount ON balances(amount)", [])?;
168    ///     Ok(())
169    /// }).await?;
170    /// ```
171    pub async fn execute_async<F>(&self, f: F) -> Result<()>
172    where
173        F: FnOnce(&Connection) -> Result<()> + Send + 'static,
174    {
175        let conn = self.write_conn.clone();
176        tokio::task::spawn_blocking(move || {
177            let conn_guard = conn.lock().unwrap();
178            f(&conn_guard)
179        })
180        .await
181        .map_err(|e| AzothError::Projection(format!("Execute task failed: {}", e)))?
182    }
183
184    /// Execute arbitrary SQL statements (DDL/DML) synchronously
185    ///
186    /// Uses the write connection.
187    ///
188    /// # Example
189    /// ```ignore
190    /// store.execute(|conn| {
191    ///     conn.execute("CREATE TABLE IF NOT EXISTS balances (id INTEGER PRIMARY KEY, amount INTEGER)", [])?;
192    ///     Ok(())
193    /// })?;
194    /// ```
195    pub fn execute<F>(&self, f: F) -> Result<()>
196    where
197        F: FnOnce(&Connection) -> Result<()>,
198    {
199        let conn_guard = self.write_conn.lock().unwrap();
200        f(&conn_guard)
201    }
202
203    /// Execute a transaction with multiple SQL statements
204    ///
205    /// The closure receives a transaction object and can execute multiple
206    /// statements atomically. If the closure returns an error, the transaction
207    /// is rolled back. Uses the write connection.
208    ///
209    /// # Example
210    /// ```ignore
211    /// store.transaction(|tx| {
212    ///     tx.execute("INSERT INTO accounts (id, balance) VALUES (?1, ?2)", params![1, 100])?;
213    ///     tx.execute("INSERT INTO accounts (id, balance) VALUES (?1, ?2)", params![2, 200])?;
214    ///     Ok(())
215    /// })?;
216    /// ```
217    pub fn transaction<F>(&self, f: F) -> Result<()>
218    where
219        F: FnOnce(&rusqlite::Transaction) -> Result<()>,
220    {
221        let mut conn_guard = self.write_conn.lock().unwrap();
222        let tx = conn_guard
223            .transaction()
224            .map_err(|e| AzothError::Projection(e.to_string()))?;
225
226        f(&tx)?;
227
228        tx.commit()
229            .map_err(|e| AzothError::Projection(e.to_string()))?;
230        Ok(())
231    }
232
233    /// Execute a transaction asynchronously
234    ///
235    /// Uses the write connection.
236    pub async fn transaction_async<F>(&self, f: F) -> Result<()>
237    where
238        F: FnOnce(&rusqlite::Transaction) -> Result<()> + Send + 'static,
239    {
240        let conn = self.write_conn.clone();
241        tokio::task::spawn_blocking(move || {
242            let mut conn_guard = conn.lock().unwrap();
243            let tx = conn_guard
244                .transaction()
245                .map_err(|e| AzothError::Projection(e.to_string()))?;
246
247            f(&tx)?;
248
249            tx.commit()
250                .map_err(|e| AzothError::Projection(e.to_string()))?;
251            Ok(())
252        })
253        .await
254        .map_err(|e| AzothError::Projection(format!("Transaction task failed: {}", e)))?
255    }
256
257    /// Get reference to the read pool (if enabled)
258    ///
259    /// Returns None if read pooling was not enabled in config.
260    pub fn read_pool(&self) -> Option<&Arc<SqliteReadPool>> {
261        self.read_pool.as_ref()
262    }
263
264    /// Check if read pooling is enabled
265    pub fn has_read_pool(&self) -> bool {
266        self.read_pool.is_some()
267    }
268
269    /// Get the database path
270    pub fn db_path(&self) -> &Path {
271        &self.config.path
272    }
273}
274
275impl ProjectionStore for SqliteProjectionStore {
276    type Txn<'a> = SimpleProjectionTxn<'a>;
277
278    fn open(cfg: ProjectionConfig) -> Result<Self> {
279        // Create parent directory if needed
280        if let Some(parent) = cfg.path.parent() {
281            std::fs::create_dir_all(parent)?;
282        }
283
284        // Open write connection
285        let write_conn = Connection::open_with_flags(
286            &cfg.path,
287            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
288        )
289        .map_err(|e| AzothError::Projection(e.to_string()))?;
290
291        // Configure write connection
292        Self::configure_connection(&write_conn, &cfg)?;
293
294        // Initialize schema
295        Self::init_schema(&write_conn)?;
296
297        // Open separate read connection for concurrent reads
298        let read_conn = Self::open_read_connection(&cfg.path, &cfg)?;
299
300        // Initialize read pool if enabled
301        let read_pool = if cfg.read_pool.enabled {
302            Some(Arc::new(SqliteReadPool::new(
303                &cfg.path,
304                cfg.read_pool.clone(),
305            )?))
306        } else {
307            None
308        };
309
310        Ok(Self {
311            write_conn: Arc::new(Mutex::new(write_conn)),
312            read_conn: Arc::new(Mutex::new(read_conn)),
313            config: cfg,
314            read_pool,
315        })
316    }
317
318    fn close(&self) -> Result<()> {
319        // SQLite connections close automatically on drop
320        Ok(())
321    }
322
323    fn begin_txn(&self) -> Result<Self::Txn<'_>> {
324        // Begin exclusive transaction using SimpleProjectionTxn (uses write connection)
325        let guard = self.write_conn.lock().unwrap();
326        SimpleProjectionTxn::new(guard)
327    }
328
329    fn get_cursor(&self) -> Result<EventId> {
330        // Use read connection for this read-only operation
331        let conn = self.read_conn.lock().unwrap();
332        let cursor: i64 = conn
333            .query_row(
334                "SELECT last_applied_event_id FROM projection_meta WHERE id = 0",
335                [],
336                |row| row.get(0),
337            )
338            .map_err(|e| AzothError::Projection(e.to_string()))?;
339
340        Ok(cursor as EventId)
341    }
342
343    fn migrate(&self, target_version: u32) -> Result<()> {
344        let conn = self.write_conn.lock().unwrap();
345        schema::migrate(&conn, target_version)
346    }
347
348    fn backup_to(&self, path: &Path) -> Result<()> {
349        // Checkpoint WAL to flush all changes to the main database file
350        {
351            let conn = self.write_conn.lock().unwrap();
352            // Execute checkpoint with full iteration of results
353            let mut stmt = conn
354                .prepare("PRAGMA wal_checkpoint(RESTART)")
355                .map_err(|e| AzothError::Projection(e.to_string()))?;
356            let mut rows = stmt
357                .query([])
358                .map_err(|e| AzothError::Projection(e.to_string()))?;
359            // Consume all rows to ensure checkpoint completes
360            while let Ok(Some(_)) = rows.next() {}
361        }
362
363        // Get source path
364        let src_path = &self.config.path;
365
366        // Copy database file (now includes all changes from WAL)
367        std::fs::copy(src_path, path)?;
368
369        Ok(())
370    }
371
372    fn restore_from(path: &Path, cfg: ProjectionConfig) -> Result<Self> {
373        // Copy backup file to target location
374        std::fs::copy(path, &cfg.path)?;
375
376        // Open the restored database
377        Self::open(cfg)
378    }
379
380    fn schema_version(&self) -> Result<u32> {
381        // Use read connection for this read-only operation
382        let conn = self.read_conn.lock().unwrap();
383        let version: i64 = conn
384            .query_row(
385                "SELECT schema_version FROM projection_meta WHERE id = 0",
386                [],
387                |row| row.get(0),
388            )
389            .map_err(|e| AzothError::Projection(e.to_string()))?;
390
391        Ok(version as u32)
392    }
393}