1use azoth_core::{
2 error::{AzothError, Result},
3 traits::ProjectionStore,
4 types::EventId,
5 ProjectionConfig,
6};
7use rusqlite::{Connection, OpenFlags};
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10
11use crate::schema;
12use crate::txn::SimpleProjectionTxn;
13
14pub struct SqliteProjectionStore {
16 conn: Arc<Mutex<Connection>>,
17 config: ProjectionConfig,
18}
19
20impl SqliteProjectionStore {
21 pub fn conn(&self) -> &Arc<Mutex<Connection>> {
26 &self.conn
27 }
28
29 fn init_schema(conn: &Connection) -> Result<()> {
31 conn.execute(
33 "CREATE TABLE IF NOT EXISTS projection_meta (
34 id INTEGER PRIMARY KEY CHECK (id = 0),
35 last_applied_event_id INTEGER NOT NULL DEFAULT -1,
36 schema_version INTEGER NOT NULL,
37 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
38 )",
39 [],
40 )
41 .map_err(|e| AzothError::Projection(e.to_string()))?;
42
43 conn.execute(
45 "INSERT OR IGNORE INTO projection_meta (id, last_applied_event_id, schema_version)
46 VALUES (0, -1, 1)",
47 [],
48 )
49 .map_err(|e| AzothError::Projection(e.to_string()))?;
50
51 Ok(())
52 }
53
54 fn configure_connection(conn: &Connection, cfg: &ProjectionConfig) -> Result<()> {
56 if cfg.wal_mode {
58 conn.pragma_update(None, "journal_mode", "WAL")
59 .map_err(|e| AzothError::Config(e.to_string()))?;
60 }
61
62 let sync_mode = match cfg.synchronous {
64 azoth_core::config::SynchronousMode::Full => "FULL",
65 azoth_core::config::SynchronousMode::Normal => "NORMAL",
66 azoth_core::config::SynchronousMode::Off => "OFF",
67 };
68 conn.pragma_update(None, "synchronous", sync_mode)
69 .map_err(|e| AzothError::Config(e.to_string()))?;
70
71 conn.pragma_update(None, "foreign_keys", "ON")
73 .map_err(|e| AzothError::Config(e.to_string()))?;
74
75 conn.pragma_update(None, "cache_size", cfg.cache_size)
77 .map_err(|e| AzothError::Config(e.to_string()))?;
78
79 Ok(())
80 }
81}
82
83impl ProjectionStore for SqliteProjectionStore {
84 type Txn<'a> = SimpleProjectionTxn<'a>;
85
86 fn open(cfg: ProjectionConfig) -> Result<Self> {
87 if let Some(parent) = cfg.path.parent() {
89 std::fs::create_dir_all(parent)?;
90 }
91
92 let conn = Connection::open_with_flags(
94 &cfg.path,
95 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE,
96 )
97 .map_err(|e| AzothError::Projection(e.to_string()))?;
98
99 Self::configure_connection(&conn, &cfg)?;
101
102 Self::init_schema(&conn)?;
104
105 Ok(Self {
106 conn: Arc::new(Mutex::new(conn)),
107 config: cfg,
108 })
109 }
110
111 fn close(&self) -> Result<()> {
112 Ok(())
114 }
115
116 fn begin_txn(&self) -> Result<Self::Txn<'_>> {
117 let guard = self.conn.lock().unwrap();
119 SimpleProjectionTxn::new(guard)
120 }
121
122 fn get_cursor(&self) -> Result<EventId> {
123 let conn = self.conn.lock().unwrap();
124 let cursor: i64 = conn
125 .query_row(
126 "SELECT last_applied_event_id FROM projection_meta WHERE id = 0",
127 [],
128 |row| row.get(0),
129 )
130 .map_err(|e| AzothError::Projection(e.to_string()))?;
131
132 Ok(cursor as EventId)
133 }
134
135 fn migrate(&self, target_version: u32) -> Result<()> {
136 let conn = self.conn.lock().unwrap();
137 schema::migrate(&conn, target_version)
138 }
139
140 fn backup_to(&self, path: &Path) -> Result<()> {
141 {
143 let conn = self.conn.lock().unwrap();
144 let mut stmt = conn
146 .prepare("PRAGMA wal_checkpoint(RESTART)")
147 .map_err(|e| AzothError::Projection(e.to_string()))?;
148 let mut rows = stmt
149 .query([])
150 .map_err(|e| AzothError::Projection(e.to_string()))?;
151 while let Ok(Some(_)) = rows.next() {}
153 }
154
155 let src_path = &self.config.path;
157
158 std::fs::copy(src_path, path)?;
160
161 Ok(())
162 }
163
164 fn restore_from(path: &Path, cfg: ProjectionConfig) -> Result<Self> {
165 std::fs::copy(path, &cfg.path)?;
167
168 Self::open(cfg)
170 }
171
172 fn schema_version(&self) -> Result<u32> {
173 let conn = self.conn.lock().unwrap();
174 let version: i64 = conn
175 .query_row(
176 "SELECT schema_version FROM projection_meta WHERE id = 0",
177 [],
178 |row| row.get(0),
179 )
180 .map_err(|e| AzothError::Projection(e.to_string()))?;
181
182 Ok(version as u32)
183 }
184}