Skip to main content

azoth_sqlite/
store.rs

1use azoth_core::{
2    error::{AzothError, Result},
3    traits::ProjectionStore,
4    types::EventId,
5    ProjectionConfig,
6};
7use rusqlite::{Connection, OpenFlags};
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11use crate::schema;
12use crate::txn::SimpleProjectionTxn;
13
14/// SQLite-backed projection store
15pub struct SqliteProjectionStore {
16    conn: Arc<Mutex<Connection>>,
17    config: ProjectionConfig,
18}
19
20impl SqliteProjectionStore {
21    /// Get the underlying connection (for migrations and custom queries)
22    ///
23    /// Returns an Arc to the Mutex-protected SQLite connection.
24    /// Users should lock the mutex to access the connection.
25    pub fn conn(&self) -> &Arc<Mutex<Connection>> {
26        &self.conn
27    }
28
29    /// Initialize schema if needed
30    fn init_schema(conn: &Connection) -> Result<()> {
31        // Create projection_meta table
32        conn.execute(
33            "CREATE TABLE IF NOT EXISTS projection_meta (
34                id INTEGER PRIMARY KEY CHECK (id = 0),
35                last_applied_event_id INTEGER NOT NULL DEFAULT -1,
36                schema_version INTEGER NOT NULL,
37                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
38            )",
39            [],
40        )
41        .map_err(|e| AzothError::Projection(e.to_string()))?;
42
43        // Insert default row if not exists (-1 means no events processed yet)
44        conn.execute(
45            "INSERT OR IGNORE INTO projection_meta (id, last_applied_event_id, schema_version)
46             VALUES (0, -1, 1)",
47            [],
48        )
49        .map_err(|e| AzothError::Projection(e.to_string()))?;
50
51        Ok(())
52    }
53
54    /// Configure SQLite connection
55    fn configure_connection(conn: &Connection, cfg: &ProjectionConfig) -> Result<()> {
56        // Enable WAL mode
57        if cfg.wal_mode {
58            conn.pragma_update(None, "journal_mode", "WAL")
59                .map_err(|e| AzothError::Config(e.to_string()))?;
60        }
61
62        // Set synchronous mode
63        let sync_mode = match cfg.synchronous {
64            azoth_core::config::SynchronousMode::Full => "FULL",
65            azoth_core::config::SynchronousMode::Normal => "NORMAL",
66            azoth_core::config::SynchronousMode::Off => "OFF",
67        };
68        conn.pragma_update(None, "synchronous", sync_mode)
69            .map_err(|e| AzothError::Config(e.to_string()))?;
70
71        // Enable foreign keys
72        conn.pragma_update(None, "foreign_keys", "ON")
73            .map_err(|e| AzothError::Config(e.to_string()))?;
74
75        // Set cache size
76        conn.pragma_update(None, "cache_size", cfg.cache_size)
77            .map_err(|e| AzothError::Config(e.to_string()))?;
78
79        Ok(())
80    }
81
82    /// Execute a read-only SQL query asynchronously
83    ///
84    /// This method runs the query on a separate thread to avoid blocking,
85    /// making it safe to call from async contexts.
86    ///
87    /// # Example
88    /// ```ignore
89    /// let balance: i64 = store.query_async(|conn| {
90    ///     conn.query_row("SELECT balance FROM accounts WHERE id = ?1", [account_id], |row| row.get(0))
91    /// }).await?;
92    /// ```
93    pub async fn query_async<F, R>(&self, f: F) -> Result<R>
94    where
95        F: FnOnce(&Connection) -> Result<R> + Send + 'static,
96        R: Send + 'static,
97    {
98        let conn = self.conn.clone();
99        tokio::task::spawn_blocking(move || {
100            let conn_guard = conn.lock().unwrap();
101            f(&conn_guard)
102        })
103        .await
104        .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?
105    }
106
107    /// Execute a read-only SQL query synchronously
108    ///
109    /// This is a convenience method for non-async contexts.
110    /// For async contexts, prefer `query_async`.
111    ///
112    /// # Example
113    /// ```ignore
114    /// let balance: i64 = store.query(|conn| {
115    ///     conn.query_row("SELECT balance FROM accounts WHERE id = ?1", [account_id], |row| row.get(0))
116    /// })?;
117    /// ```
118    pub fn query<F, R>(&self, f: F) -> Result<R>
119    where
120        F: FnOnce(&Connection) -> Result<R>,
121    {
122        let conn_guard = self.conn.lock().unwrap();
123        f(&conn_guard)
124    }
125
126    /// Execute arbitrary SQL statements (DDL/DML) asynchronously
127    ///
128    /// Useful for creating tables, indexes, or performing bulk updates.
129    ///
130    /// # Example
131    /// ```ignore
132    /// store.execute_async(|conn| {
133    ///     conn.execute("CREATE TABLE IF NOT EXISTS balances (id INTEGER PRIMARY KEY, amount INTEGER)", [])?;
134    ///     conn.execute("CREATE INDEX IF NOT EXISTS idx_amount ON balances(amount)", [])?;
135    ///     Ok(())
136    /// }).await?;
137    /// ```
138    pub async fn execute_async<F>(&self, f: F) -> Result<()>
139    where
140        F: FnOnce(&Connection) -> Result<()> + Send + 'static,
141    {
142        let conn = self.conn.clone();
143        tokio::task::spawn_blocking(move || {
144            let conn_guard = conn.lock().unwrap();
145            f(&conn_guard)
146        })
147        .await
148        .map_err(|e| AzothError::Projection(format!("Execute task failed: {}", e)))?
149    }
150
151    /// Execute arbitrary SQL statements (DDL/DML) synchronously
152    ///
153    /// # Example
154    /// ```ignore
155    /// store.execute(|conn| {
156    ///     conn.execute("CREATE TABLE IF NOT EXISTS balances (id INTEGER PRIMARY KEY, amount INTEGER)", [])?;
157    ///     Ok(())
158    /// })?;
159    /// ```
160    pub fn execute<F>(&self, f: F) -> Result<()>
161    where
162        F: FnOnce(&Connection) -> Result<()>,
163    {
164        let conn_guard = self.conn.lock().unwrap();
165        f(&conn_guard)
166    }
167
168    /// Execute a transaction with multiple SQL statements
169    ///
170    /// The closure receives a transaction object and can execute multiple
171    /// statements atomically. If the closure returns an error, the transaction
172    /// is rolled back.
173    ///
174    /// # Example
175    /// ```ignore
176    /// store.transaction(|tx| {
177    ///     tx.execute("INSERT INTO accounts (id, balance) VALUES (?1, ?2)", params![1, 100])?;
178    ///     tx.execute("INSERT INTO accounts (id, balance) VALUES (?1, ?2)", params![2, 200])?;
179    ///     Ok(())
180    /// })?;
181    /// ```
182    pub fn transaction<F>(&self, f: F) -> Result<()>
183    where
184        F: FnOnce(&rusqlite::Transaction) -> Result<()>,
185    {
186        let mut conn_guard = self.conn.lock().unwrap();
187        let tx = conn_guard
188            .transaction()
189            .map_err(|e| AzothError::Projection(e.to_string()))?;
190
191        f(&tx)?;
192
193        tx.commit()
194            .map_err(|e| AzothError::Projection(e.to_string()))?;
195        Ok(())
196    }
197
198    /// Execute a transaction asynchronously
199    pub async fn transaction_async<F>(&self, f: F) -> Result<()>
200    where
201        F: FnOnce(&rusqlite::Transaction) -> Result<()> + Send + 'static,
202    {
203        let conn = self.conn.clone();
204        tokio::task::spawn_blocking(move || {
205            let mut conn_guard = conn.lock().unwrap();
206            let tx = conn_guard
207                .transaction()
208                .map_err(|e| AzothError::Projection(e.to_string()))?;
209
210            f(&tx)?;
211
212            tx.commit()
213                .map_err(|e| AzothError::Projection(e.to_string()))?;
214            Ok(())
215        })
216        .await
217        .map_err(|e| AzothError::Projection(format!("Transaction task failed: {}", e)))?
218    }
219}
220
221impl ProjectionStore for SqliteProjectionStore {
222    type Txn<'a> = SimpleProjectionTxn<'a>;
223
224    fn open(cfg: ProjectionConfig) -> Result<Self> {
225        // Create parent directory if needed
226        if let Some(parent) = cfg.path.parent() {
227            std::fs::create_dir_all(parent)?;
228        }
229
230        // Open connection
231        let conn = Connection::open_with_flags(
232            &cfg.path,
233            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
234        )
235        .map_err(|e| AzothError::Projection(e.to_string()))?;
236
237        // Configure connection
238        Self::configure_connection(&conn, &cfg)?;
239
240        // Initialize schema
241        Self::init_schema(&conn)?;
242
243        Ok(Self {
244            conn: Arc::new(Mutex::new(conn)),
245            config: cfg,
246        })
247    }
248
249    fn close(&self) -> Result<()> {
250        // SQLite connection closes automatically on drop
251        Ok(())
252    }
253
254    fn begin_txn(&self) -> Result<Self::Txn<'_>> {
255        // Begin exclusive transaction using SimpleProjectionTxn which actually works
256        let guard = self.conn.lock().unwrap();
257        SimpleProjectionTxn::new(guard)
258    }
259
260    fn get_cursor(&self) -> Result<EventId> {
261        let conn = self.conn.lock().unwrap();
262        let cursor: i64 = conn
263            .query_row(
264                "SELECT last_applied_event_id FROM projection_meta WHERE id = 0",
265                [],
266                |row| row.get(0),
267            )
268            .map_err(|e| AzothError::Projection(e.to_string()))?;
269
270        Ok(cursor as EventId)
271    }
272
273    fn migrate(&self, target_version: u32) -> Result<()> {
274        let conn = self.conn.lock().unwrap();
275        schema::migrate(&conn, target_version)
276    }
277
278    fn backup_to(&self, path: &Path) -> Result<()> {
279        // Checkpoint WAL to flush all changes to the main database file
280        {
281            let conn = self.conn.lock().unwrap();
282            // Execute checkpoint with full iteration of results
283            let mut stmt = conn
284                .prepare("PRAGMA wal_checkpoint(RESTART)")
285                .map_err(|e| AzothError::Projection(e.to_string()))?;
286            let mut rows = stmt
287                .query([])
288                .map_err(|e| AzothError::Projection(e.to_string()))?;
289            // Consume all rows to ensure checkpoint completes
290            while let Ok(Some(_)) = rows.next() {}
291        }
292
293        // Get source path
294        let src_path = &self.config.path;
295
296        // Copy database file (now includes all changes from WAL)
297        std::fs::copy(src_path, path)?;
298
299        Ok(())
300    }
301
302    fn restore_from(path: &Path, cfg: ProjectionConfig) -> Result<Self> {
303        // Copy backup file to target location
304        std::fs::copy(path, &cfg.path)?;
305
306        // Open the restored database
307        Self::open(cfg)
308    }
309
310    fn schema_version(&self) -> Result<u32> {
311        let conn = self.conn.lock().unwrap();
312        let version: i64 = conn
313            .query_row(
314                "SELECT schema_version FROM projection_meta WHERE id = 0",
315                [],
316                |row| row.get(0),
317            )
318            .map_err(|e| AzothError::Projection(e.to_string()))?;
319
320        Ok(version as u32)
321    }
322}