Skip to main content

azoth_sqlite/
store.rs

1use azoth_core::{
2    error::{AzothError, Result},
3    traits::ProjectionStore,
4    types::EventId,
5    ProjectionConfig,
6};
7use rusqlite::{Connection, OpenFlags};
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11use crate::schema;
12use crate::txn::SimpleProjectionTxn;
13
14/// SQLite-backed projection store
15pub struct SqliteProjectionStore {
16    conn: Arc<Mutex<Connection>>,
17    config: ProjectionConfig,
18}
19
20impl SqliteProjectionStore {
21    /// Get the underlying connection (for migrations and custom queries)
22    ///
23    /// Returns an Arc to the Mutex-protected SQLite connection.
24    /// Users should lock the mutex to access the connection.
25    pub fn conn(&self) -> &Arc<Mutex<Connection>> {
26        &self.conn
27    }
28
29    /// Initialize schema if needed
30    fn init_schema(conn: &Connection) -> Result<()> {
31        // Create projection_meta table
32        conn.execute(
33            "CREATE TABLE IF NOT EXISTS projection_meta (
34                id INTEGER PRIMARY KEY CHECK (id = 0),
35                last_applied_event_id INTEGER NOT NULL DEFAULT -1,
36                schema_version INTEGER NOT NULL,
37                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
38            )",
39            [],
40        )
41        .map_err(|e| AzothError::Projection(e.to_string()))?;
42
43        // Insert default row if not exists (-1 means no events processed yet)
44        conn.execute(
45            "INSERT OR IGNORE INTO projection_meta (id, last_applied_event_id, schema_version)
46             VALUES (0, -1, 1)",
47            [],
48        )
49        .map_err(|e| AzothError::Projection(e.to_string()))?;
50
51        Ok(())
52    }
53
54    /// Configure SQLite connection
55    fn configure_connection(conn: &Connection, cfg: &ProjectionConfig) -> Result<()> {
56        // Enable WAL mode
57        if cfg.wal_mode {
58            conn.pragma_update(None, "journal_mode", "WAL")
59                .map_err(|e| AzothError::Config(e.to_string()))?;
60        }
61
62        // Set synchronous mode
63        let sync_mode = match cfg.synchronous {
64            azoth_core::config::SynchronousMode::Full => "FULL",
65            azoth_core::config::SynchronousMode::Normal => "NORMAL",
66            azoth_core::config::SynchronousMode::Off => "OFF",
67        };
68        conn.pragma_update(None, "synchronous", sync_mode)
69            .map_err(|e| AzothError::Config(e.to_string()))?;
70
71        // Enable foreign keys
72        conn.pragma_update(None, "foreign_keys", "ON")
73            .map_err(|e| AzothError::Config(e.to_string()))?;
74
75        // Set cache size
76        conn.pragma_update(None, "cache_size", cfg.cache_size)
77            .map_err(|e| AzothError::Config(e.to_string()))?;
78
79        Ok(())
80    }
81}
82
83impl ProjectionStore for SqliteProjectionStore {
84    type Txn<'a> = SimpleProjectionTxn<'a>;
85
86    fn open(cfg: ProjectionConfig) -> Result<Self> {
87        // Create parent directory if needed
88        if let Some(parent) = cfg.path.parent() {
89            std::fs::create_dir_all(parent)?;
90        }
91
92        // Open connection
93        let conn = Connection::open_with_flags(
94            &cfg.path,
95            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
96        )
97        .map_err(|e| AzothError::Projection(e.to_string()))?;
98
99        // Configure connection
100        Self::configure_connection(&conn, &cfg)?;
101
102        // Initialize schema
103        Self::init_schema(&conn)?;
104
105        Ok(Self {
106            conn: Arc::new(Mutex::new(conn)),
107            config: cfg,
108        })
109    }
110
111    fn close(&self) -> Result<()> {
112        // SQLite connection closes automatically on drop
113        Ok(())
114    }
115
116    fn begin_txn(&self) -> Result<Self::Txn<'_>> {
117        // Begin exclusive transaction using SimpleProjectionTxn which actually works
118        let guard = self.conn.lock().unwrap();
119        SimpleProjectionTxn::new(guard)
120    }
121
122    fn get_cursor(&self) -> Result<EventId> {
123        let conn = self.conn.lock().unwrap();
124        let cursor: i64 = conn
125            .query_row(
126                "SELECT last_applied_event_id FROM projection_meta WHERE id = 0",
127                [],
128                |row| row.get(0),
129            )
130            .map_err(|e| AzothError::Projection(e.to_string()))?;
131
132        Ok(cursor as EventId)
133    }
134
135    fn migrate(&self, target_version: u32) -> Result<()> {
136        let conn = self.conn.lock().unwrap();
137        schema::migrate(&conn, target_version)
138    }
139
140    fn backup_to(&self, path: &Path) -> Result<()> {
141        // Checkpoint WAL to flush all changes to the main database file
142        {
143            let conn = self.conn.lock().unwrap();
144            // Execute checkpoint with full iteration of results
145            let mut stmt = conn
146                .prepare("PRAGMA wal_checkpoint(RESTART)")
147                .map_err(|e| AzothError::Projection(e.to_string()))?;
148            let mut rows = stmt
149                .query([])
150                .map_err(|e| AzothError::Projection(e.to_string()))?;
151            // Consume all rows to ensure checkpoint completes
152            while let Ok(Some(_)) = rows.next() {}
153        }
154
155        // Get source path
156        let src_path = &self.config.path;
157
158        // Copy database file (now includes all changes from WAL)
159        std::fs::copy(src_path, path)?;
160
161        Ok(())
162    }
163
164    fn restore_from(path: &Path, cfg: ProjectionConfig) -> Result<Self> {
165        // Copy backup file to target location
166        std::fs::copy(path, &cfg.path)?;
167
168        // Open the restored database
169        Self::open(cfg)
170    }
171
172    fn schema_version(&self) -> Result<u32> {
173        let conn = self.conn.lock().unwrap();
174        let version: i64 = conn
175            .query_row(
176                "SELECT schema_version FROM projection_meta WHERE id = 0",
177                [],
178                |row| row.get(0),
179            )
180            .map_err(|e| AzothError::Projection(e.to_string()))?;
181
182        Ok(version as u32)
183    }
184}