Skip to main content

azoth_sqlite/
store.rs

1use azoth_core::{
2    error::{AzothError, Result},
3    traits::ProjectionStore,
4    types::EventId,
5    ProjectionConfig,
6};
7use parking_lot::Mutex;
8use rusqlite::{Connection, OpenFlags};
9use std::path::Path;
10use std::sync::Arc;
11
12use crate::read_pool::SqliteReadPool;
13use crate::schema;
14use crate::txn::SimpleProjectionTxn;
15
16/// SQLite-backed projection store
17///
18/// Uses separate connections for reads and writes. SQLite WAL mode allows
19/// readers and writers to operate concurrently at the database level.
20/// While each connection still needs mutex protection (rusqlite::Connection
21/// is not Sync), using separate connections means reads don't block writes
22/// and vice versa.
23///
24/// Optionally supports a read connection pool for higher concurrency.
25pub struct SqliteProjectionStore {
26    /// Write connection (for transactions, schema operations, and writes)
27    write_conn: Arc<Mutex<Connection>>,
28    /// Read connection (for read-only queries, separate from write connection)
29    read_conn: Arc<Mutex<Connection>>,
30    config: ProjectionConfig,
31    /// Optional read pool for concurrent reads
32    read_pool: Option<Arc<SqliteReadPool>>,
33}
34
35impl SqliteProjectionStore {
36    /// Get the underlying write connection (for migrations and custom queries).
37    ///
38    /// **Prefer `write_conn()`** for new code -- this method exists for
39    /// backwards compatibility.
40    pub fn conn(&self) -> &Arc<Mutex<Connection>> {
41        &self.write_conn
42    }
43
44    /// Get the write connection explicitly.
45    ///
46    /// Use this for migrations, DDL, projector event application, and any
47    /// other operations that mutate the projection database. Reads should
48    /// go through `read_pool()`, `query()`, or `query_async()` instead.
49    pub fn write_conn(&self) -> &Arc<Mutex<Connection>> {
50        &self.write_conn
51    }
52
53    /// Open a read-only connection to the same database
54    fn open_read_connection(path: &Path, cfg: &ProjectionConfig) -> Result<Connection> {
55        let conn = Connection::open_with_flags(
56            path,
57            OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
58        )
59        .map_err(|e| AzothError::Projection(e.to_string()))?;
60
61        // cache_size affects read performance
62        conn.pragma_update(None, "cache_size", cfg.cache_size)
63            .map_err(|e| AzothError::Config(e.to_string()))?;
64
65        Ok(conn)
66    }
67
68    /// Initialize schema if needed
69    fn init_schema(conn: &Connection) -> Result<()> {
70        // Create projection_meta table
71        conn.execute(
72            "CREATE TABLE IF NOT EXISTS projection_meta (
73                id INTEGER PRIMARY KEY CHECK (id = 0),
74                last_applied_event_id INTEGER NOT NULL DEFAULT -1,
75                schema_version INTEGER NOT NULL,
76                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
77            )",
78            [],
79        )
80        .map_err(|e| AzothError::Projection(e.to_string()))?;
81
82        // Insert default row if not exists (-1 means no events processed yet)
83        // schema_version starts at 0 so that migrations starting from version 1 will be applied
84        conn.execute(
85            "INSERT OR IGNORE INTO projection_meta (id, last_applied_event_id, schema_version)
86             VALUES (0, -1, 0)",
87            [],
88        )
89        .map_err(|e| AzothError::Projection(e.to_string()))?;
90
91        Ok(())
92    }
93
94    /// Configure SQLite connection
95    fn configure_connection(conn: &Connection, cfg: &ProjectionConfig) -> Result<()> {
96        // Enable WAL mode
97        if cfg.wal_mode {
98            conn.pragma_update(None, "journal_mode", "WAL")
99                .map_err(|e| AzothError::Config(e.to_string()))?;
100        }
101
102        // Set synchronous mode
103        let sync_mode = match cfg.synchronous {
104            azoth_core::config::SynchronousMode::Full => "FULL",
105            azoth_core::config::SynchronousMode::Normal => "NORMAL",
106            azoth_core::config::SynchronousMode::Off => "OFF",
107        };
108        conn.pragma_update(None, "synchronous", sync_mode)
109            .map_err(|e| AzothError::Config(e.to_string()))?;
110
111        // Enable foreign keys
112        conn.pragma_update(None, "foreign_keys", "ON")
113            .map_err(|e| AzothError::Config(e.to_string()))?;
114
115        // Set cache size
116        conn.pragma_update(None, "cache_size", cfg.cache_size)
117            .map_err(|e| AzothError::Config(e.to_string()))?;
118
119        Ok(())
120    }
121
122    /// Execute a read-only SQL query asynchronously
123    ///
124    /// This method runs the query on a separate thread to avoid blocking,
125    /// making it safe to call from async contexts. Uses a read-only connection
126    /// that doesn't block writes.
127    ///
128    /// # Example
129    /// ```ignore
130    /// let balance: i64 = store.query_async(|conn| {
131    ///     conn.query_row("SELECT balance FROM accounts WHERE id = ?1", [account_id], |row| row.get(0))
132    /// }).await?;
133    /// ```
134    pub async fn query_async<F, R>(&self, f: F) -> Result<R>
135    where
136        F: FnOnce(&Connection) -> Result<R> + Send + 'static,
137        R: Send + 'static,
138    {
139        if let Some(pool) = &self.read_pool {
140            let pool = Arc::clone(pool);
141            return tokio::task::spawn_blocking(move || {
142                let conn = pool.acquire_blocking()?;
143                f(conn.connection())
144            })
145            .await
146            .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?;
147        }
148
149        let conn = self.read_conn.clone();
150        tokio::task::spawn_blocking(move || {
151            let conn_guard = conn.lock();
152            f(&conn_guard)
153        })
154        .await
155        .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?
156    }
157
158    /// Execute a read-only SQL query synchronously
159    ///
160    /// This is a convenience method for non-async contexts.
161    /// For async contexts, prefer `query_async`. Uses a read-only connection
162    /// that doesn't block writes.
163    ///
164    /// # Example
165    /// ```ignore
166    /// let balance: i64 = store.query(|conn| {
167    ///     conn.query_row("SELECT balance FROM accounts WHERE id = ?1", [account_id], |row| row.get(0))
168    /// })?;
169    /// ```
170    pub fn query<F, R>(&self, f: F) -> Result<R>
171    where
172        F: FnOnce(&Connection) -> Result<R>,
173    {
174        if let Some(pool) = &self.read_pool {
175            let conn = pool.acquire_blocking()?;
176            return f(conn.connection());
177        }
178
179        let conn_guard = self.read_conn.lock();
180        f(&conn_guard)
181    }
182
183    /// Execute arbitrary SQL statements (DDL/DML) asynchronously
184    ///
185    /// Useful for creating tables, indexes, or performing bulk updates.
186    /// Uses the write connection.
187    ///
188    /// # Example
189    /// ```ignore
190    /// store.execute_async(|conn| {
191    ///     conn.execute("CREATE TABLE IF NOT EXISTS balances (id INTEGER PRIMARY KEY, amount INTEGER)", [])?;
192    ///     conn.execute("CREATE INDEX IF NOT EXISTS idx_amount ON balances(amount)", [])?;
193    ///     Ok(())
194    /// }).await?;
195    /// ```
196    pub async fn execute_async<F>(&self, f: F) -> Result<()>
197    where
198        F: FnOnce(&Connection) -> Result<()> + Send + 'static,
199    {
200        let conn = self.write_conn.clone();
201        tokio::task::spawn_blocking(move || {
202            let conn_guard = conn.lock();
203            f(&conn_guard)
204        })
205        .await
206        .map_err(|e| AzothError::Projection(format!("Execute task failed: {}", e)))?
207    }
208
209    /// Execute arbitrary SQL statements (DDL/DML) synchronously
210    ///
211    /// Uses the write connection.
212    ///
213    /// # Example
214    /// ```ignore
215    /// store.execute(|conn| {
216    ///     conn.execute("CREATE TABLE IF NOT EXISTS balances (id INTEGER PRIMARY KEY, amount INTEGER)", [])?;
217    ///     Ok(())
218    /// })?;
219    /// ```
220    pub fn execute<F>(&self, f: F) -> Result<()>
221    where
222        F: FnOnce(&Connection) -> Result<()>,
223    {
224        let conn_guard = self.write_conn.lock();
225        f(&conn_guard)
226    }
227
228    /// Execute a transaction with multiple SQL statements
229    ///
230    /// The closure receives a transaction object and can execute multiple
231    /// statements atomically. If the closure returns an error, the transaction
232    /// is rolled back. Uses the write connection.
233    ///
234    /// # Example
235    /// ```ignore
236    /// store.transaction(|tx| {
237    ///     tx.execute("INSERT INTO accounts (id, balance) VALUES (?1, ?2)", params![1, 100])?;
238    ///     tx.execute("INSERT INTO accounts (id, balance) VALUES (?1, ?2)", params![2, 200])?;
239    ///     Ok(())
240    /// })?;
241    /// ```
242    pub fn transaction<F>(&self, f: F) -> Result<()>
243    where
244        F: FnOnce(&rusqlite::Transaction) -> Result<()>,
245    {
246        let mut conn_guard = self.write_conn.lock();
247        let tx = conn_guard
248            .transaction()
249            .map_err(|e| AzothError::Projection(e.to_string()))?;
250
251        f(&tx)?;
252
253        tx.commit()
254            .map_err(|e| AzothError::Projection(e.to_string()))?;
255        Ok(())
256    }
257
258    /// Execute a transaction asynchronously
259    ///
260    /// Uses the write connection.
261    pub async fn transaction_async<F>(&self, f: F) -> Result<()>
262    where
263        F: FnOnce(&rusqlite::Transaction) -> Result<()> + Send + 'static,
264    {
265        let conn = self.write_conn.clone();
266        tokio::task::spawn_blocking(move || {
267            let mut conn_guard = conn.lock();
268            let tx = conn_guard
269                .transaction()
270                .map_err(|e| AzothError::Projection(e.to_string()))?;
271
272            f(&tx)?;
273
274            tx.commit()
275                .map_err(|e| AzothError::Projection(e.to_string()))?;
276            Ok(())
277        })
278        .await
279        .map_err(|e| AzothError::Projection(format!("Transaction task failed: {}", e)))?
280    }
281
282    /// Get reference to the read pool (if enabled)
283    ///
284    /// Returns None if read pooling was not enabled in config.
285    pub fn read_pool(&self) -> Option<&Arc<SqliteReadPool>> {
286        self.read_pool.as_ref()
287    }
288
289    /// Check if read pooling is enabled
290    pub fn has_read_pool(&self) -> bool {
291        self.read_pool.is_some()
292    }
293
294    /// Get the database path
295    pub fn db_path(&self) -> &Path {
296        &self.config.path
297    }
298}
299
300impl ProjectionStore for SqliteProjectionStore {
301    type Txn<'a> = SimpleProjectionTxn<'a>;
302
303    fn open(cfg: ProjectionConfig) -> Result<Self> {
304        // Create parent directory if needed
305        if let Some(parent) = cfg.path.parent() {
306            std::fs::create_dir_all(parent)?;
307        }
308
309        // Open write connection
310        let write_conn = Connection::open_with_flags(
311            &cfg.path,
312            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
313        )
314        .map_err(|e| AzothError::Projection(e.to_string()))?;
315
316        // Configure write connection
317        Self::configure_connection(&write_conn, &cfg)?;
318
319        // Initialize schema
320        Self::init_schema(&write_conn)?;
321
322        // Open separate read connection for concurrent reads
323        let read_conn = Self::open_read_connection(&cfg.path, &cfg)?;
324
325        // Initialize read pool if enabled
326        let read_pool = if cfg.read_pool.enabled {
327            Some(Arc::new(SqliteReadPool::new(
328                &cfg.path,
329                cfg.read_pool.clone(),
330            )?))
331        } else {
332            None
333        };
334
335        Ok(Self {
336            write_conn: Arc::new(Mutex::new(write_conn)),
337            read_conn: Arc::new(Mutex::new(read_conn)),
338            config: cfg,
339            read_pool,
340        })
341    }
342
343    fn close(&self) -> Result<()> {
344        // SQLite connections close automatically on drop
345        Ok(())
346    }
347
348    fn begin_txn(&self) -> Result<Self::Txn<'_>> {
349        // Begin exclusive transaction using SimpleProjectionTxn (uses write connection)
350        let guard = self.write_conn.lock();
351        SimpleProjectionTxn::new(guard)
352    }
353
354    fn get_cursor(&self) -> Result<EventId> {
355        // Use read connection for this read-only operation
356        let conn = self.read_conn.lock();
357        let cursor: i64 = conn
358            .query_row(
359                "SELECT last_applied_event_id FROM projection_meta WHERE id = 0",
360                [],
361                |row| row.get(0),
362            )
363            .map_err(|e| AzothError::Projection(e.to_string()))?;
364
365        Ok(cursor as EventId)
366    }
367
368    fn migrate(&self, target_version: u32) -> Result<()> {
369        let conn = self.write_conn.lock();
370        schema::migrate(&conn, target_version)
371    }
372
373    fn backup_to(&self, path: &Path) -> Result<()> {
374        // Checkpoint WAL to flush all changes to the main database file
375        {
376            let conn = self.write_conn.lock();
377            // Execute checkpoint with full iteration of results
378            let mut stmt = conn
379                .prepare("PRAGMA wal_checkpoint(RESTART)")
380                .map_err(|e| AzothError::Projection(e.to_string()))?;
381            let mut rows = stmt
382                .query([])
383                .map_err(|e| AzothError::Projection(e.to_string()))?;
384            // Consume all rows to ensure checkpoint completes
385            while let Ok(Some(_)) = rows.next() {}
386        }
387
388        // Get source path
389        let src_path = &self.config.path;
390
391        // Copy database file (now includes all changes from WAL)
392        std::fs::copy(src_path, path)?;
393
394        Ok(())
395    }
396
397    fn restore_from(path: &Path, cfg: ProjectionConfig) -> Result<Self> {
398        // Copy backup file to target location
399        std::fs::copy(path, &cfg.path)?;
400
401        // Open the restored database
402        Self::open(cfg)
403    }
404
405    fn schema_version(&self) -> Result<u32> {
406        // Use read connection for this read-only operation
407        let conn = self.read_conn.lock();
408        let version: i64 = conn
409            .query_row(
410                "SELECT schema_version FROM projection_meta WHERE id = 0",
411                [],
412                |row| row.get(0),
413            )
414            .map_err(|e| AzothError::Projection(e.to_string()))?;
415
416        Ok(version as u32)
417    }
418}