1use azoth_core::{
2 error::{AzothError, Result},
3 traits::ProjectionStore,
4 types::EventId,
5 ProjectionConfig,
6};
7use rusqlite::{Connection, OpenFlags};
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11use crate::schema;
12use crate::txn::SimpleProjectionTxn;
13
14pub struct SqliteProjectionStore {
16 conn: Arc<Mutex<Connection>>,
17 config: ProjectionConfig,
18}
19
20impl SqliteProjectionStore {
21 pub fn conn(&self) -> &Arc<Mutex<Connection>> {
26 &self.conn
27 }
28
29 fn init_schema(conn: &Connection) -> Result<()> {
31 conn.execute(
33 "CREATE TABLE IF NOT EXISTS projection_meta (
34 id INTEGER PRIMARY KEY CHECK (id = 0),
35 last_applied_event_id INTEGER NOT NULL DEFAULT -1,
36 schema_version INTEGER NOT NULL,
37 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
38 )",
39 [],
40 )
41 .map_err(|e| AzothError::Projection(e.to_string()))?;
42
43 conn.execute(
45 "INSERT OR IGNORE INTO projection_meta (id, last_applied_event_id, schema_version)
46 VALUES (0, -1, 1)",
47 [],
48 )
49 .map_err(|e| AzothError::Projection(e.to_string()))?;
50
51 Ok(())
52 }
53
54 fn configure_connection(conn: &Connection, cfg: &ProjectionConfig) -> Result<()> {
56 if cfg.wal_mode {
58 conn.pragma_update(None, "journal_mode", "WAL")
59 .map_err(|e| AzothError::Config(e.to_string()))?;
60 }
61
62 let sync_mode = match cfg.synchronous {
64 azoth_core::config::SynchronousMode::Full => "FULL",
65 azoth_core::config::SynchronousMode::Normal => "NORMAL",
66 azoth_core::config::SynchronousMode::Off => "OFF",
67 };
68 conn.pragma_update(None, "synchronous", sync_mode)
69 .map_err(|e| AzothError::Config(e.to_string()))?;
70
71 conn.pragma_update(None, "foreign_keys", "ON")
73 .map_err(|e| AzothError::Config(e.to_string()))?;
74
75 conn.pragma_update(None, "cache_size", cfg.cache_size)
77 .map_err(|e| AzothError::Config(e.to_string()))?;
78
79 Ok(())
80 }
81
82 pub async fn query_async<F, R>(&self, f: F) -> Result<R>
94 where
95 F: FnOnce(&Connection) -> Result<R> + Send + 'static,
96 R: Send + 'static,
97 {
98 let conn = self.conn.clone();
99 tokio::task::spawn_blocking(move || {
100 let conn_guard = conn.lock().unwrap();
101 f(&conn_guard)
102 })
103 .await
104 .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?
105 }
106
107 pub fn query<F, R>(&self, f: F) -> Result<R>
119 where
120 F: FnOnce(&Connection) -> Result<R>,
121 {
122 let conn_guard = self.conn.lock().unwrap();
123 f(&conn_guard)
124 }
125
126 pub async fn execute_async<F>(&self, f: F) -> Result<()>
139 where
140 F: FnOnce(&Connection) -> Result<()> + Send + 'static,
141 {
142 let conn = self.conn.clone();
143 tokio::task::spawn_blocking(move || {
144 let conn_guard = conn.lock().unwrap();
145 f(&conn_guard)
146 })
147 .await
148 .map_err(|e| AzothError::Projection(format!("Execute task failed: {}", e)))?
149 }
150
151 pub fn execute<F>(&self, f: F) -> Result<()>
161 where
162 F: FnOnce(&Connection) -> Result<()>,
163 {
164 let conn_guard = self.conn.lock().unwrap();
165 f(&conn_guard)
166 }
167
168 pub fn transaction<F>(&self, f: F) -> Result<()>
183 where
184 F: FnOnce(&rusqlite::Transaction) -> Result<()>,
185 {
186 let mut conn_guard = self.conn.lock().unwrap();
187 let tx = conn_guard
188 .transaction()
189 .map_err(|e| AzothError::Projection(e.to_string()))?;
190
191 f(&tx)?;
192
193 tx.commit()
194 .map_err(|e| AzothError::Projection(e.to_string()))?;
195 Ok(())
196 }
197
198 pub async fn transaction_async<F>(&self, f: F) -> Result<()>
200 where
201 F: FnOnce(&rusqlite::Transaction) -> Result<()> + Send + 'static,
202 {
203 let conn = self.conn.clone();
204 tokio::task::spawn_blocking(move || {
205 let mut conn_guard = conn.lock().unwrap();
206 let tx = conn_guard
207 .transaction()
208 .map_err(|e| AzothError::Projection(e.to_string()))?;
209
210 f(&tx)?;
211
212 tx.commit()
213 .map_err(|e| AzothError::Projection(e.to_string()))?;
214 Ok(())
215 })
216 .await
217 .map_err(|e| AzothError::Projection(format!("Transaction task failed: {}", e)))?
218 }
219}
220
221impl ProjectionStore for SqliteProjectionStore {
222 type Txn<'a> = SimpleProjectionTxn<'a>;
223
224 fn open(cfg: ProjectionConfig) -> Result<Self> {
225 if let Some(parent) = cfg.path.parent() {
227 std::fs::create_dir_all(parent)?;
228 }
229
230 let conn = Connection::open_with_flags(
232 &cfg.path,
233 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
234 )
235 .map_err(|e| AzothError::Projection(e.to_string()))?;
236
237 Self::configure_connection(&conn, &cfg)?;
239
240 Self::init_schema(&conn)?;
242
243 Ok(Self {
244 conn: Arc::new(Mutex::new(conn)),
245 config: cfg,
246 })
247 }
248
249 fn close(&self) -> Result<()> {
250 Ok(())
252 }
253
254 fn begin_txn(&self) -> Result<Self::Txn<'_>> {
255 let guard = self.conn.lock().unwrap();
257 SimpleProjectionTxn::new(guard)
258 }
259
260 fn get_cursor(&self) -> Result<EventId> {
261 let conn = self.conn.lock().unwrap();
262 let cursor: i64 = conn
263 .query_row(
264 "SELECT last_applied_event_id FROM projection_meta WHERE id = 0",
265 [],
266 |row| row.get(0),
267 )
268 .map_err(|e| AzothError::Projection(e.to_string()))?;
269
270 Ok(cursor as EventId)
271 }
272
273 fn migrate(&self, target_version: u32) -> Result<()> {
274 let conn = self.conn.lock().unwrap();
275 schema::migrate(&conn, target_version)
276 }
277
278 fn backup_to(&self, path: &Path) -> Result<()> {
279 {
281 let conn = self.conn.lock().unwrap();
282 let mut stmt = conn
284 .prepare("PRAGMA wal_checkpoint(RESTART)")
285 .map_err(|e| AzothError::Projection(e.to_string()))?;
286 let mut rows = stmt
287 .query([])
288 .map_err(|e| AzothError::Projection(e.to_string()))?;
289 while let Ok(Some(_)) = rows.next() {}
291 }
292
293 let src_path = &self.config.path;
295
296 std::fs::copy(src_path, path)?;
298
299 Ok(())
300 }
301
302 fn restore_from(path: &Path, cfg: ProjectionConfig) -> Result<Self> {
303 std::fs::copy(path, &cfg.path)?;
305
306 Self::open(cfg)
308 }
309
310 fn schema_version(&self) -> Result<u32> {
311 let conn = self.conn.lock().unwrap();
312 let version: i64 = conn
313 .query_row(
314 "SELECT schema_version FROM projection_meta WHERE id = 0",
315 [],
316 |row| row.get(0),
317 )
318 .map_err(|e| AzothError::Projection(e.to_string()))?;
319
320 Ok(version as u32)
321 }
322}