1use azoth_core::{
2 error::{AzothError, Result},
3 event_log::{EventLog, EventLogIterator},
4 lock_manager::LockManager,
5 traits::{self, CanonicalStore, CanonicalTxn, EventIter, StateIter},
6 types::{BackupInfo, CanonicalMeta, CommitInfo, EventId},
7 CanonicalConfig,
8};
9use azoth_file_log::{FileEventLog, FileEventLogConfig};
10use lmdb::{Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, WriteFlags};
11use parking_lot::Mutex;
12use std::path::Path;
13use std::sync::{
14 atomic::{AtomicBool, AtomicUsize, Ordering},
15 Arc,
16};
17use std::time::Duration;
18use tokio::sync::Notify;
19
20use crate::keys::meta_keys;
21use crate::preflight_cache::PreflightCache;
22use crate::read_pool::LmdbReadPool;
23use crate::txn::LmdbWriteTxn;
24
25struct EventIterAdapter(Box<dyn EventLogIterator>);
27
28impl EventIter for EventIterAdapter {
29 fn next(&mut self) -> Result<Option<(EventId, Vec<u8>)>> {
30 match self.0.next() {
32 Some(Ok(item)) => Ok(Some(item)),
33 Some(Err(e)) => Err(e),
34 None => Ok(None),
35 }
36 }
37}
38
39pub struct LmdbCanonicalStore {
44 pub(crate) env: Arc<Environment>,
45 pub(crate) state_db: Database,
46 pub(crate) meta_db: Database,
47 pub(crate) config_path: std::path::PathBuf,
48 pub(crate) event_log: Arc<FileEventLog>,
49 dead_letter_queue: Arc<crate::dead_letter_queue::DeadLetterQueue>,
50 lock_manager: Arc<LockManager>,
51 write_lock: Arc<Mutex<()>>,
52 paused: Arc<AtomicBool>,
53 chunk_size: usize,
54 txn_counter: Arc<AtomicUsize>,
55 preflight_cache: Arc<PreflightCache>,
56 read_pool: Option<Arc<LmdbReadPool>>,
57 event_notify: Arc<Notify>,
63}
64
65impl LmdbCanonicalStore {
66 fn get_next_event_id<T: Transaction>(&self, txn: &T) -> Result<EventId> {
68 match txn.get(self.meta_db, &meta_keys::NEXT_EVENT_ID) {
69 Ok(bytes) => {
70 let id_str = std::str::from_utf8(bytes)
71 .map_err(|e| AzothError::Serialization(e.to_string()))?;
72 id_str
73 .parse::<EventId>()
74 .map_err(|e| AzothError::Serialization(e.to_string()))
75 }
76 Err(lmdb::Error::NotFound) => Ok(0),
77 Err(e) => Err(AzothError::Transaction(e.to_string())),
78 }
79 }
80
81 fn get_sealed_event_id<T: Transaction>(&self, txn: &T) -> Result<Option<EventId>> {
83 match txn.get(self.meta_db, &meta_keys::SEALED_EVENT_ID) {
84 Ok(bytes) => {
85 let id_str = std::str::from_utf8(bytes)
86 .map_err(|e| AzothError::Serialization(e.to_string()))?;
87 let id = id_str
88 .parse::<EventId>()
89 .map_err(|e| AzothError::Serialization(e.to_string()))?;
90 Ok(Some(id))
91 }
92 Err(lmdb::Error::NotFound) => Ok(None),
93 Err(e) => Err(AzothError::Transaction(e.to_string())),
94 }
95 }
96
97 fn set_meta(&self, txn: &mut lmdb::RwTransaction, key: &str, value: &str) -> Result<()> {
99 txn.put(self.meta_db, &key, &value, WriteFlags::empty())
100 .map_err(|e| AzothError::Transaction(e.to_string()))
101 }
102
103 fn get_meta<T: Transaction>(&self, txn: &T, key: &str) -> Result<Option<String>> {
105 match txn.get(self.meta_db, &key) {
106 Ok(bytes) => {
107 let value = std::str::from_utf8(bytes)
108 .map_err(|e| AzothError::Serialization(e.to_string()))?
109 .to_string();
110 Ok(Some(value))
111 }
112 Err(lmdb::Error::NotFound) => Ok(None),
113 Err(e) => Err(AzothError::Transaction(e.to_string())),
114 }
115 }
116}
117
118impl CanonicalStore for LmdbCanonicalStore {
119 type Txn<'a> = LmdbWriteTxn<'a>;
120 type ReadTxn<'a> = crate::txn::LmdbReadTxn<'a>;
121
122 fn open(cfg: CanonicalConfig) -> Result<Self> {
123 std::fs::create_dir_all(&cfg.path)?;
125
126 let mut env_builder = Environment::new();
128 env_builder.set_max_dbs(2); env_builder.set_map_size(cfg.map_size);
130 env_builder.set_max_readers(cfg.max_readers);
131
132 let mut flags = EnvironmentFlags::empty();
134 match cfg.sync_mode {
135 azoth_core::config::SyncMode::Full => {}
136 azoth_core::config::SyncMode::NoMetaSync => {
137 flags.insert(EnvironmentFlags::NO_META_SYNC);
138 }
139 azoth_core::config::SyncMode::NoSync => {
140 flags.insert(EnvironmentFlags::NO_SYNC);
141 }
142 }
143 env_builder.set_flags(flags);
144
145 let env = env_builder
146 .open(&cfg.path)
147 .map_err(|e| AzothError::Io(std::io::Error::other(e)))?;
148
149 let state_db = env
151 .create_db(Some("state"), DatabaseFlags::empty())
152 .map_err(|e| AzothError::Transaction(e.to_string()))?;
153
154 let meta_db = env
155 .create_db(Some("meta"), DatabaseFlags::empty())
156 .map_err(|e| AzothError::Transaction(e.to_string()))?;
157
158 let env = Arc::new(env);
159 let lock_manager = Arc::new(LockManager::new(
160 cfg.stripe_count,
161 Duration::from_millis(cfg.lock_timeout_ms),
162 ));
163
164 let preflight_cache = Arc::new(PreflightCache::new(
166 cfg.preflight_cache_size,
167 cfg.preflight_cache_ttl_secs,
168 cfg.preflight_cache_enabled,
169 ));
170
171 let event_notify = Arc::new(Notify::new());
173 let event_log_config = FileEventLogConfig {
174 base_dir: cfg.path.join("event-log"),
175 max_event_size: cfg.event_max_size_bytes,
176 max_batch_bytes: cfg.event_batch_max_bytes,
177 ..Default::default()
178 };
179 let mut event_log = FileEventLog::open(event_log_config)?;
180 event_log.set_event_notify(event_notify.clone());
181 let event_log = Arc::new(event_log);
182
183 let dlq_dir = cfg.path.join("dlq");
185 let dead_letter_queue = crate::dead_letter_queue::DeadLetterQueue::open(&dlq_dir)?;
186
187 match dead_letter_queue.entry_count() {
189 Ok(count) if count > 0 => {
190 tracing::warn!(
191 dlq_entries = count,
192 dlq_path = %dlq_dir.display(),
193 "Dead letter queue has {} unprocessed entries from previous runs. \
194 These events were committed to state but not to the event log. \
195 Run the DLQ recovery tool to replay them.",
196 count
197 );
198 }
199 Ok(_) => {}
200 Err(e) => {
201 tracing::warn!("Failed to check DLQ entry count: {}", e);
202 }
203 }
204
205 {
207 let mut txn = env
208 .begin_rw_txn()
209 .map_err(|e| AzothError::Transaction(e.to_string()))?;
210
211 if txn.get(meta_db, &meta_keys::NEXT_EVENT_ID).is_err() {
213 txn.put(
214 meta_db,
215 &meta_keys::NEXT_EVENT_ID,
216 &"0",
217 WriteFlags::empty(),
218 )
219 .map_err(|e| AzothError::Transaction(e.to_string()))?;
220 }
221
222 if txn.get(meta_db, &meta_keys::SCHEMA_VERSION).is_err() {
225 txn.put(
226 meta_db,
227 &meta_keys::SCHEMA_VERSION,
228 &"0",
229 WriteFlags::empty(),
230 )
231 .map_err(|e| AzothError::Transaction(e.to_string()))?;
232 }
233
234 let now = chrono::Utc::now().to_rfc3339();
236 if txn.get(meta_db, &meta_keys::CREATED_AT).is_err() {
237 txn.put(meta_db, &meta_keys::CREATED_AT, &now, WriteFlags::empty())
238 .map_err(|e| AzothError::Transaction(e.to_string()))?;
239 }
240 txn.put(meta_db, &meta_keys::UPDATED_AT, &now, WriteFlags::empty())
241 .map_err(|e| AzothError::Transaction(e.to_string()))?;
242
243 txn.commit()
244 .map_err(|e| AzothError::Transaction(e.to_string()))?;
245 }
246
247 let read_pool = if cfg.read_pool.enabled {
249 Some(Arc::new(LmdbReadPool::new(
250 env.clone(),
251 state_db,
252 cfg.read_pool.clone(),
253 )))
254 } else {
255 None
256 };
257
258 Ok(Self {
259 env,
260 state_db,
261 meta_db,
262 config_path: cfg.path.clone(),
263 event_log,
264 dead_letter_queue,
265 lock_manager,
266 write_lock: Arc::new(Mutex::new(())),
267 paused: Arc::new(AtomicBool::new(false)),
268 chunk_size: cfg.state_iter_chunk_size,
269 txn_counter: Arc::new(AtomicUsize::new(0)),
270 preflight_cache,
271 read_pool,
272 event_notify,
273 })
274 }
275
276 fn close(&self) -> Result<()> {
277 Ok(())
279 }
280
281 fn read_txn(&self) -> Result<Self::ReadTxn<'_>> {
282 let txn = self
283 .env
284 .begin_ro_txn()
285 .map_err(|e| AzothError::Transaction(e.to_string()))?;
286 Ok(crate::txn::LmdbReadTxn::new(txn, self.state_db))
287 }
288
289 fn write_txn(&self) -> Result<Self::Txn<'_>> {
290 self.txn_counter.fetch_add(1, Ordering::SeqCst);
292
293 if self.paused.load(Ordering::SeqCst) {
295 self.txn_counter.fetch_sub(1, Ordering::SeqCst);
296 return Err(AzothError::Paused);
297 }
298
299 let ro_txn = self.env.begin_ro_txn().map_err(|e| {
301 self.txn_counter.fetch_sub(1, Ordering::SeqCst);
302 AzothError::Transaction(e.to_string())
303 })?;
304 if let Some(sealed_id) = self.get_sealed_event_id(&ro_txn)? {
305 self.txn_counter.fetch_sub(1, Ordering::SeqCst);
306 return Err(AzothError::Sealed(sealed_id));
307 }
308 drop(ro_txn); let txn = self.env.begin_rw_txn().map_err(|e| {
312 self.txn_counter.fetch_sub(1, Ordering::SeqCst);
313 AzothError::Transaction(e.to_string())
314 })?;
315
316 Ok(LmdbWriteTxn::new(
317 txn,
318 self.state_db,
319 self.meta_db,
320 self.event_log.clone(),
321 self.dead_letter_queue.clone(),
322 Arc::downgrade(&self.txn_counter),
323 self.preflight_cache.clone(),
324 ))
325 }
326
327 fn iter_events(&self, from: EventId, to: Option<EventId>) -> Result<Box<dyn EventIter>> {
328 let iter = self.event_log.iter_range(from, to)?;
331 Ok(Box::new(EventIterAdapter(iter)))
332 }
333
334 fn range(&self, start: &[u8], end: Option<&[u8]>) -> Result<Box<dyn traits::StateIter>> {
335 let iter = crate::state_iter::LmdbStateIter::new(
336 self.env.clone(),
337 self.state_db,
338 start,
339 end,
340 self.chunk_size,
341 )?;
342 Ok(Box::new(iter))
343 }
344
345 fn scan_prefix(&self, prefix: &[u8]) -> Result<Box<dyn traits::StateIter>> {
346 let iter = crate::state_iter::LmdbStateIter::with_prefix(
347 self.env.clone(),
348 self.state_db,
349 prefix,
350 self.chunk_size,
351 )?;
352 Ok(Box::new(iter))
353 }
354
355 fn seal(&self) -> Result<EventId> {
356 let _guard = self.write_lock.lock();
357
358 let mut txn = self
359 .env
360 .begin_rw_txn()
361 .map_err(|e| AzothError::Transaction(e.to_string()))?;
362
363 let next_event_id = self.get_next_event_id(&txn)?;
364 let sealed_event_id = if next_event_id > 0 {
367 next_event_id - 1
368 } else {
369 0
370 };
371 if next_event_id > 0 {
372 self.set_meta(
373 &mut txn,
374 meta_keys::SEALED_EVENT_ID,
375 &sealed_event_id.to_string(),
376 )?;
377 } else {
378 let _ = txn.del(self.meta_db, &meta_keys::SEALED_EVENT_ID, None);
380 }
381 self.set_meta(
382 &mut txn,
383 meta_keys::UPDATED_AT,
384 &chrono::Utc::now().to_rfc3339(),
385 )?;
386
387 txn.commit()
388 .map_err(|e| AzothError::Transaction(e.to_string()))?;
389
390 Ok(sealed_event_id)
391 }
392
393 fn lock_manager(&self) -> &LockManager {
394 &self.lock_manager
395 }
396
397 fn pause_ingestion(&self) -> Result<()> {
398 self.paused.store(true, Ordering::SeqCst);
399
400 let start = std::time::Instant::now();
402 while self.txn_counter.load(Ordering::SeqCst) > 0 {
403 if start.elapsed() > std::time::Duration::from_secs(30) {
404 return Err(AzothError::Timeout(
405 "Waiting for in-flight transactions to complete".into(),
406 ));
407 }
408 std::thread::sleep(std::time::Duration::from_millis(10));
409 }
410
411 Ok(())
412 }
413
414 fn resume_ingestion(&self) -> Result<()> {
415 self.paused.store(false, Ordering::SeqCst);
416 Ok(())
417 }
418
419 fn is_paused(&self) -> bool {
420 self.paused.load(Ordering::SeqCst)
421 }
422
423 fn backup_to(&self, dir: &Path) -> Result<BackupInfo> {
424 crate::backup::backup_to(self, dir)
425 }
426
427 fn restore_from(dir: &Path, cfg: CanonicalConfig) -> Result<Self> {
428 crate::backup::restore_from(dir, cfg)
429 }
430
431 fn meta(&self) -> Result<CanonicalMeta> {
432 let txn = self
433 .env
434 .begin_ro_txn()
435 .map_err(|e| AzothError::Transaction(e.to_string()))?;
436
437 let next_event_id = self.get_next_event_id(&txn)?;
438 let sealed_event_id = self.get_sealed_event_id(&txn)?;
439 let schema_version = self
440 .get_meta(&txn, meta_keys::SCHEMA_VERSION)?
441 .and_then(|s| s.parse().ok())
442 .unwrap_or(1);
443 let created_at = self
444 .get_meta(&txn, meta_keys::CREATED_AT)?
445 .unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
446 let updated_at = self
447 .get_meta(&txn, meta_keys::UPDATED_AT)?
448 .unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
449
450 Ok(CanonicalMeta {
451 next_event_id,
452 sealed_event_id,
453 schema_version,
454 created_at,
455 updated_at,
456 })
457 }
458}
459
460impl LmdbCanonicalStore {
461 pub fn clear_seal(&self) -> Result<()> {
466 let _guard = self.write_lock.lock();
467
468 let mut txn = self
469 .env
470 .begin_rw_txn()
471 .map_err(|e| AzothError::Transaction(e.to_string()))?;
472
473 let _ = txn.del(self.meta_db, &meta_keys::SEALED_EVENT_ID, None);
475 let _ = self.set_meta(
476 &mut txn,
477 meta_keys::UPDATED_AT,
478 &chrono::Utc::now().to_rfc3339(),
479 );
480
481 txn.commit()
482 .map_err(|e| AzothError::Transaction(e.to_string()))?;
483 Ok(())
484 }
485
486 pub fn read_only_txn(&self) -> Result<crate::txn::LmdbReadTxn<'_>> {
491 let txn = self
492 .env
493 .begin_ro_txn()
494 .map_err(|e| AzothError::Transaction(e.to_string()))?;
495 Ok(crate::txn::LmdbReadTxn::new(txn, self.state_db))
496 }
497
498 pub async fn iter_events_async(
503 &self,
504 from: EventId,
505 to: Option<EventId>,
506 ) -> Result<Vec<(EventId, Vec<u8>)>> {
507 let event_log = self.event_log.clone();
508
509 tokio::task::spawn_blocking(move || {
510 let mut iter = event_log.iter_range(from, to)?;
511 let mut results = Vec::new();
512
513 loop {
514 match iter.next() {
515 Some(Ok(item)) => results.push(item),
516 Some(Err(e)) => return Err(e),
517 None => break,
518 }
519 }
520
521 Ok(results)
522 })
523 .await
524 .map_err(|e| AzothError::Internal(format!("Task join error: {}", e)))?
525 }
526
527 pub async fn get_state_async(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
532 let env = self.env.clone();
533 let state_db = self.state_db;
534 let key = key.to_vec();
535
536 tokio::task::spawn_blocking(move || {
537 let txn = env
538 .begin_ro_txn()
539 .map_err(|e| AzothError::Transaction(e.to_string()))?;
540
541 match txn.get(state_db, &key) {
542 Ok(bytes) => Ok(Some(bytes.to_vec())),
543 Err(lmdb::Error::NotFound) => Ok(None),
544 Err(e) => Err(AzothError::Transaction(e.to_string())),
545 }
546 })
547 .await
548 .map_err(|e| AzothError::Internal(format!("Task join error: {}", e)))?
549 }
550
551 pub async fn scan_prefix_async(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
556 let env = self.env.clone();
557 let state_db = self.state_db;
558 let prefix = prefix.to_vec();
559 let chunk_size = self.chunk_size;
560
561 tokio::task::spawn_blocking(move || {
562 let mut iter =
563 crate::state_iter::LmdbStateIter::with_prefix(env, state_db, &prefix, chunk_size)?;
564
565 let mut results = Vec::new();
566 while let Some(item) = iter.next()? {
567 results.push(item);
568 }
569
570 Ok(results)
571 })
572 .await
573 .map_err(|e| AzothError::Internal(format!("Task join error: {}", e)))?
574 }
575
576 pub fn event_log(&self) -> &Arc<FileEventLog> {
580 &self.event_log
581 }
582
583 pub fn event_notify(&self) -> Arc<Notify> {
589 self.event_notify.clone()
590 }
591
592 pub fn preflight_cache(&self) -> &Arc<PreflightCache> {
596 &self.preflight_cache
597 }
598
599 pub fn read_pool(&self) -> Option<&Arc<LmdbReadPool>> {
603 self.read_pool.as_ref()
604 }
605
606 pub async fn submit_write<F, R>(&self, f: F) -> Result<R>
627 where
628 F: for<'a> FnOnce(&mut LmdbWriteTxn<'a>) -> Result<R> + Send + 'static,
629 R: Send + 'static,
630 {
631 let env = self.env.clone();
632 let state_db = self.state_db;
633 let meta_db = self.meta_db;
634 let event_log = self.event_log.clone();
635 let dead_letter_queue = self.dead_letter_queue.clone();
636 let txn_counter = self.txn_counter.clone();
637 let preflight_cache = self.preflight_cache.clone();
638 let paused = self.paused.clone();
639
640 tokio::task::spawn_blocking(move || {
641 if paused.load(Ordering::SeqCst) {
643 return Err(AzothError::Paused);
644 }
645
646 txn_counter.fetch_add(1, Ordering::SeqCst);
648
649 let rw_txn = match env.begin_rw_txn() {
650 Ok(t) => t,
651 Err(e) => {
652 txn_counter.fetch_sub(1, Ordering::SeqCst);
653 return Err(AzothError::Transaction(e.to_string()));
654 }
655 };
656
657 let mut write_txn = LmdbWriteTxn::new(
658 rw_txn,
659 state_db,
660 meta_db,
661 event_log,
662 dead_letter_queue,
663 Arc::downgrade(&txn_counter),
664 preflight_cache,
665 );
666
667 let result = f(&mut write_txn)?;
668 write_txn.commit()?;
669 Ok(result)
670 })
671 .await
672 .map_err(|e| AzothError::Internal(format!("Task join error: {}", e)))?
673 }
674
675 pub async fn async_put_state(&self, key: &[u8], value: &[u8]) -> Result<CommitInfo> {
680 let key = key.to_vec();
681 let value = value.to_vec();
682 self.submit_write(move |txn| {
683 txn.put_state(&key, &value)?;
684 Ok(())
685 })
686 .await?;
687 Ok(CommitInfo {
689 events_written: 0,
690 first_event_id: None,
691 last_event_id: None,
692 state_keys_written: 1,
693 state_keys_deleted: 0,
694 dlq_events: 0,
695 })
696 }
697
698 pub async fn async_del_state(&self, key: &[u8]) -> Result<CommitInfo> {
702 let key = key.to_vec();
703 self.submit_write(move |txn| {
704 txn.del_state(&key)?;
705 Ok(())
706 })
707 .await?;
708 Ok(CommitInfo {
709 events_written: 0,
710 first_event_id: None,
711 last_event_id: None,
712 state_keys_written: 0,
713 state_keys_deleted: 1,
714 dlq_events: 0,
715 })
716 }
717
718 pub fn has_read_pool(&self) -> bool {
720 self.read_pool.is_some()
721 }
722}