1use azoth_core::{
2 error::{AzothError, Result},
3 event_log::{EventLog, EventLogIterator},
4 lock_manager::LockManager,
5 traits::{CanonicalStore, EventIter},
6 types::{BackupInfo, CanonicalMeta, EventId},
7 CanonicalConfig,
8};
9use azoth_file_log::{FileEventLog, FileEventLogConfig};
10use lmdb::{Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, WriteFlags};
11use std::path::Path;
12use std::sync::{
13 atomic::{AtomicBool, Ordering},
14 Arc, Mutex,
15};
16
17use crate::keys::meta_keys;
18use crate::txn::LmdbWriteTxn;
19
20struct EventIterAdapter(Box<dyn EventLogIterator>);
22
23impl EventIter for EventIterAdapter {
24 fn next(&mut self) -> Result<Option<(EventId, Vec<u8>)>> {
25 match self.0.next() {
27 Some(Ok(item)) => Ok(Some(item)),
28 Some(Err(e)) => Err(e),
29 None => Ok(None),
30 }
31 }
32}
33
34pub struct LmdbCanonicalStore {
39 pub(crate) env: Arc<Environment>,
40 pub(crate) state_db: Database,
41 pub(crate) events_db: Database, pub(crate) meta_db: Database,
43 pub(crate) config_path: std::path::PathBuf,
44 pub(crate) event_log: Arc<FileEventLog>, lock_manager: Arc<LockManager>,
46 write_lock: Arc<Mutex<()>>, paused: Arc<AtomicBool>,
48}
49
50impl LmdbCanonicalStore {
51 fn get_next_event_id<T: Transaction>(&self, txn: &T) -> Result<EventId> {
53 match txn.get(self.meta_db, &meta_keys::NEXT_EVENT_ID) {
54 Ok(bytes) => {
55 let id_str = std::str::from_utf8(bytes)
56 .map_err(|e| AzothError::Serialization(e.to_string()))?;
57 id_str
58 .parse::<EventId>()
59 .map_err(|e| AzothError::Serialization(e.to_string()))
60 }
61 Err(lmdb::Error::NotFound) => Ok(0),
62 Err(e) => Err(AzothError::Transaction(e.to_string())),
63 }
64 }
65
66 fn get_sealed_event_id<T: Transaction>(&self, txn: &T) -> Result<Option<EventId>> {
68 match txn.get(self.meta_db, &meta_keys::SEALED_EVENT_ID) {
69 Ok(bytes) => {
70 let id_str = std::str::from_utf8(bytes)
71 .map_err(|e| AzothError::Serialization(e.to_string()))?;
72 let id = id_str
73 .parse::<EventId>()
74 .map_err(|e| AzothError::Serialization(e.to_string()))?;
75 Ok(Some(id))
76 }
77 Err(lmdb::Error::NotFound) => Ok(None),
78 Err(e) => Err(AzothError::Transaction(e.to_string())),
79 }
80 }
81
82 fn set_meta(&self, txn: &mut lmdb::RwTransaction, key: &str, value: &str) -> Result<()> {
84 txn.put(self.meta_db, &key, &value, WriteFlags::empty())
85 .map_err(|e| AzothError::Transaction(e.to_string()))
86 }
87
88 fn get_meta<T: Transaction>(&self, txn: &T, key: &str) -> Result<Option<String>> {
90 match txn.get(self.meta_db, &key) {
91 Ok(bytes) => {
92 let value = std::str::from_utf8(bytes)
93 .map_err(|e| AzothError::Serialization(e.to_string()))?
94 .to_string();
95 Ok(Some(value))
96 }
97 Err(lmdb::Error::NotFound) => Ok(None),
98 Err(e) => Err(AzothError::Transaction(e.to_string())),
99 }
100 }
101}
102
103impl CanonicalStore for LmdbCanonicalStore {
104 type Txn<'a> = LmdbWriteTxn<'a>;
105
106 fn open(cfg: CanonicalConfig) -> Result<Self> {
107 std::fs::create_dir_all(&cfg.path)?;
109
110 let mut env_builder = Environment::new();
112 env_builder.set_max_dbs(3); env_builder.set_map_size(cfg.map_size);
114 env_builder.set_max_readers(cfg.max_readers);
115
116 let mut flags = EnvironmentFlags::empty();
118 match cfg.sync_mode {
119 azoth_core::config::SyncMode::Full => {}
120 azoth_core::config::SyncMode::NoMetaSync => {
121 flags.insert(EnvironmentFlags::NO_META_SYNC);
122 }
123 azoth_core::config::SyncMode::NoSync => {
124 flags.insert(EnvironmentFlags::NO_SYNC);
125 }
126 }
127 env_builder.set_flags(flags);
128
129 let env = env_builder
130 .open(&cfg.path)
131 .map_err(|e| AzothError::Io(std::io::Error::other(e)))?;
132
133 let state_db = env
135 .create_db(Some("state"), DatabaseFlags::empty())
136 .map_err(|e| AzothError::Transaction(e.to_string()))?;
137
138 let events_db = env
139 .create_db(Some("events"), DatabaseFlags::empty())
140 .map_err(|e| AzothError::Transaction(e.to_string()))?;
141
142 let meta_db = env
143 .create_db(Some("meta"), DatabaseFlags::empty())
144 .map_err(|e| AzothError::Transaction(e.to_string()))?;
145
146 let env = Arc::new(env);
147 let lock_manager = Arc::new(LockManager::new(cfg.stripe_count));
148
149 let event_log_config = FileEventLogConfig {
151 base_dir: cfg.path.join("event-log"),
152 ..Default::default()
153 };
154 let event_log = Arc::new(FileEventLog::open(event_log_config)?);
155
156 {
158 let mut txn = env
159 .begin_rw_txn()
160 .map_err(|e| AzothError::Transaction(e.to_string()))?;
161
162 if txn.get(meta_db, &meta_keys::NEXT_EVENT_ID).is_err() {
164 txn.put(
165 meta_db,
166 &meta_keys::NEXT_EVENT_ID,
167 &"0",
168 WriteFlags::empty(),
169 )
170 .map_err(|e| AzothError::Transaction(e.to_string()))?;
171 }
172
173 if txn.get(meta_db, &meta_keys::SCHEMA_VERSION).is_err() {
175 txn.put(
176 meta_db,
177 &meta_keys::SCHEMA_VERSION,
178 &"1",
179 WriteFlags::empty(),
180 )
181 .map_err(|e| AzothError::Transaction(e.to_string()))?;
182 }
183
184 let now = chrono::Utc::now().to_rfc3339();
186 if txn.get(meta_db, &meta_keys::CREATED_AT).is_err() {
187 txn.put(meta_db, &meta_keys::CREATED_AT, &now, WriteFlags::empty())
188 .map_err(|e| AzothError::Transaction(e.to_string()))?;
189 }
190 txn.put(meta_db, &meta_keys::UPDATED_AT, &now, WriteFlags::empty())
191 .map_err(|e| AzothError::Transaction(e.to_string()))?;
192
193 txn.commit()
194 .map_err(|e| AzothError::Transaction(e.to_string()))?;
195 }
196
197 Ok(Self {
198 env,
199 state_db,
200 events_db,
201 meta_db,
202 config_path: cfg.path.clone(),
203 event_log,
204 lock_manager,
205 write_lock: Arc::new(Mutex::new(())),
206 paused: Arc::new(AtomicBool::new(false)),
207 })
208 }
209
210 fn close(&self) -> Result<()> {
211 Ok(())
213 }
214
215 fn read_txn(&self) -> Result<Self::Txn<'_>> {
216 Err(AzothError::InvalidState(
219 "Read-only transactions not yet implemented".into(),
220 ))
221 }
222
223 fn write_txn(&self) -> Result<Self::Txn<'_>> {
224 let _guard = self.write_lock.lock().unwrap();
227
228 if self.paused.load(Ordering::SeqCst) {
230 return Err(AzothError::Paused);
231 }
232
233 let ro_txn = self
235 .env
236 .begin_ro_txn()
237 .map_err(|e| AzothError::Transaction(e.to_string()))?;
238 if let Some(sealed_id) = self.get_sealed_event_id(&ro_txn)? {
239 return Err(AzothError::Sealed(sealed_id));
240 }
241 drop(ro_txn); let txn = self
244 .env
245 .begin_rw_txn()
246 .map_err(|e| AzothError::Transaction(e.to_string()))?;
247
248 drop(_guard);
251
252 Ok(LmdbWriteTxn::new(
253 txn,
254 self.state_db,
255 self.events_db,
256 self.meta_db,
257 self.event_log.clone(),
258 ))
259 }
260
261 fn iter_events(&self, from: EventId, to: Option<EventId>) -> Result<Box<dyn EventIter>> {
262 let iter = self.event_log.iter_range(from, to)?;
265 Ok(Box::new(EventIterAdapter(iter)))
266 }
267
268 fn seal(&self) -> Result<EventId> {
269 let _guard = self.write_lock.lock().unwrap();
270
271 let mut txn = self
272 .env
273 .begin_rw_txn()
274 .map_err(|e| AzothError::Transaction(e.to_string()))?;
275
276 let next_event_id = self.get_next_event_id(&txn)?;
277 let sealed_event_id = if next_event_id > 0 {
278 next_event_id - 1
279 } else {
280 0
281 };
282
283 self.set_meta(
284 &mut txn,
285 meta_keys::SEALED_EVENT_ID,
286 &sealed_event_id.to_string(),
287 )?;
288 self.set_meta(
289 &mut txn,
290 meta_keys::UPDATED_AT,
291 &chrono::Utc::now().to_rfc3339(),
292 )?;
293
294 txn.commit()
295 .map_err(|e| AzothError::Transaction(e.to_string()))?;
296
297 Ok(sealed_event_id)
298 }
299
300 fn lock_manager(&self) -> &LockManager {
301 &self.lock_manager
302 }
303
304 fn pause_ingestion(&self) -> Result<()> {
305 self.paused.store(true, Ordering::SeqCst);
306 let _guard = self.write_lock.lock().unwrap();
308 Ok(())
309 }
310
311 fn resume_ingestion(&self) -> Result<()> {
312 self.paused.store(false, Ordering::SeqCst);
313 Ok(())
314 }
315
316 fn is_paused(&self) -> bool {
317 self.paused.load(Ordering::SeqCst)
318 }
319
320 fn backup_to(&self, dir: &Path) -> Result<BackupInfo> {
321 crate::backup::backup_to(self, dir)
322 }
323
324 fn restore_from(dir: &Path, cfg: CanonicalConfig) -> Result<Self> {
325 crate::backup::restore_from(dir, cfg)
326 }
327
328 fn meta(&self) -> Result<CanonicalMeta> {
329 let txn = self
330 .env
331 .begin_ro_txn()
332 .map_err(|e| AzothError::Transaction(e.to_string()))?;
333
334 let next_event_id = self.get_next_event_id(&txn)?;
335 let sealed_event_id = self.get_sealed_event_id(&txn)?;
336 let schema_version = self
337 .get_meta(&txn, meta_keys::SCHEMA_VERSION)?
338 .and_then(|s| s.parse().ok())
339 .unwrap_or(1);
340 let created_at = self
341 .get_meta(&txn, meta_keys::CREATED_AT)?
342 .unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
343 let updated_at = self
344 .get_meta(&txn, meta_keys::UPDATED_AT)?
345 .unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
346
347 Ok(CanonicalMeta {
348 next_event_id,
349 sealed_event_id,
350 schema_version,
351 created_at,
352 updated_at,
353 })
354 }
355}
356
357impl LmdbCanonicalStore {
358 pub fn event_log(&self) -> &Arc<FileEventLog> {
362 &self.event_log
363 }
364}