Skip to main content

azoth_lmdb/
store.rs

1use azoth_core::{
2    error::{AzothError, Result},
3    event_log::{EventLog, EventLogIterator},
4    lock_manager::LockManager,
5    traits::{CanonicalStore, EventIter},
6    types::{BackupInfo, CanonicalMeta, EventId},
7    CanonicalConfig,
8};
9use azoth_file_log::{FileEventLog, FileEventLogConfig};
10use lmdb::{Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, WriteFlags};
11use std::path::Path;
12use std::sync::{
13    atomic::{AtomicBool, Ordering},
14    Arc, Mutex,
15};
16
17use crate::keys::meta_keys;
18use crate::txn::LmdbWriteTxn;
19
20/// Adapter to convert EventLogIterator to EventIter
21struct EventIterAdapter(Box<dyn EventLogIterator>);
22
23impl EventIter for EventIterAdapter {
24    fn next(&mut self) -> Result<Option<(EventId, Vec<u8>)>> {
25        // Convert from Iterator's Option<Result<T>> to EventIter's Result<Option<T>>
26        match self.0.next() {
27            Some(Ok(item)) => Ok(Some(item)),
28            Some(Err(e)) => Err(e),
29            None => Ok(None),
30        }
31    }
32}
33
34/// LMDB-backed canonical store
35///
36/// State is stored in LMDB for transactional integrity.
37/// Events are stored in a file-based event log for better performance.
38pub struct LmdbCanonicalStore {
39    pub(crate) env: Arc<Environment>,
40    pub(crate) state_db: Database,
41    pub(crate) events_db: Database, // Deprecated: kept for backward compatibility
42    pub(crate) meta_db: Database,
43    pub(crate) config_path: std::path::PathBuf,
44    pub(crate) event_log: Arc<FileEventLog>, // NEW: File-based event storage
45    lock_manager: Arc<LockManager>,
46    write_lock: Arc<Mutex<()>>, // Single writer
47    paused: Arc<AtomicBool>,
48}
49
50impl LmdbCanonicalStore {
51    /// Get the next event ID from meta
52    fn get_next_event_id<T: Transaction>(&self, txn: &T) -> Result<EventId> {
53        match txn.get(self.meta_db, &meta_keys::NEXT_EVENT_ID) {
54            Ok(bytes) => {
55                let id_str = std::str::from_utf8(bytes)
56                    .map_err(|e| AzothError::Serialization(e.to_string()))?;
57                id_str
58                    .parse::<EventId>()
59                    .map_err(|e| AzothError::Serialization(e.to_string()))
60            }
61            Err(lmdb::Error::NotFound) => Ok(0),
62            Err(e) => Err(AzothError::Transaction(e.to_string())),
63        }
64    }
65
66    /// Get the sealed event ID from meta (if any)
67    fn get_sealed_event_id<T: Transaction>(&self, txn: &T) -> Result<Option<EventId>> {
68        match txn.get(self.meta_db, &meta_keys::SEALED_EVENT_ID) {
69            Ok(bytes) => {
70                let id_str = std::str::from_utf8(bytes)
71                    .map_err(|e| AzothError::Serialization(e.to_string()))?;
72                let id = id_str
73                    .parse::<EventId>()
74                    .map_err(|e| AzothError::Serialization(e.to_string()))?;
75                Ok(Some(id))
76            }
77            Err(lmdb::Error::NotFound) => Ok(None),
78            Err(e) => Err(AzothError::Transaction(e.to_string())),
79        }
80    }
81
82    /// Set meta value
83    fn set_meta(&self, txn: &mut lmdb::RwTransaction, key: &str, value: &str) -> Result<()> {
84        txn.put(self.meta_db, &key, &value, WriteFlags::empty())
85            .map_err(|e| AzothError::Transaction(e.to_string()))
86    }
87
88    /// Get meta value
89    fn get_meta<T: Transaction>(&self, txn: &T, key: &str) -> Result<Option<String>> {
90        match txn.get(self.meta_db, &key) {
91            Ok(bytes) => {
92                let value = std::str::from_utf8(bytes)
93                    .map_err(|e| AzothError::Serialization(e.to_string()))?
94                    .to_string();
95                Ok(Some(value))
96            }
97            Err(lmdb::Error::NotFound) => Ok(None),
98            Err(e) => Err(AzothError::Transaction(e.to_string())),
99        }
100    }
101}
102
103impl CanonicalStore for LmdbCanonicalStore {
104    type Txn<'a> = LmdbWriteTxn<'a>;
105
106    fn open(cfg: CanonicalConfig) -> Result<Self> {
107        // Create directory if it doesn't exist
108        std::fs::create_dir_all(&cfg.path)?;
109
110        // Configure LMDB environment
111        let mut env_builder = Environment::new();
112        env_builder.set_max_dbs(3); // state, events, meta
113        env_builder.set_map_size(cfg.map_size);
114        env_builder.set_max_readers(cfg.max_readers);
115
116        // Set sync flags based on config
117        let mut flags = EnvironmentFlags::empty();
118        match cfg.sync_mode {
119            azoth_core::config::SyncMode::Full => {}
120            azoth_core::config::SyncMode::NoMetaSync => {
121                flags.insert(EnvironmentFlags::NO_META_SYNC);
122            }
123            azoth_core::config::SyncMode::NoSync => {
124                flags.insert(EnvironmentFlags::NO_SYNC);
125            }
126        }
127        env_builder.set_flags(flags);
128
129        let env = env_builder
130            .open(&cfg.path)
131            .map_err(|e| AzothError::Io(std::io::Error::other(e)))?;
132
133        // Open databases
134        let state_db = env
135            .create_db(Some("state"), DatabaseFlags::empty())
136            .map_err(|e| AzothError::Transaction(e.to_string()))?;
137
138        let events_db = env
139            .create_db(Some("events"), DatabaseFlags::empty())
140            .map_err(|e| AzothError::Transaction(e.to_string()))?;
141
142        let meta_db = env
143            .create_db(Some("meta"), DatabaseFlags::empty())
144            .map_err(|e| AzothError::Transaction(e.to_string()))?;
145
146        let env = Arc::new(env);
147        let lock_manager = Arc::new(LockManager::new(cfg.stripe_count));
148
149        // Initialize file-based event log
150        let event_log_config = FileEventLogConfig {
151            base_dir: cfg.path.join("event-log"),
152            ..Default::default()
153        };
154        let event_log = Arc::new(FileEventLog::open(event_log_config)?);
155
156        // Initialize metadata if needed
157        {
158            let mut txn = env
159                .begin_rw_txn()
160                .map_err(|e| AzothError::Transaction(e.to_string()))?;
161
162            // Initialize next_event_id if not present
163            if txn.get(meta_db, &meta_keys::NEXT_EVENT_ID).is_err() {
164                txn.put(
165                    meta_db,
166                    &meta_keys::NEXT_EVENT_ID,
167                    &"0",
168                    WriteFlags::empty(),
169                )
170                .map_err(|e| AzothError::Transaction(e.to_string()))?;
171            }
172
173            // Initialize schema_version if not present
174            if txn.get(meta_db, &meta_keys::SCHEMA_VERSION).is_err() {
175                txn.put(
176                    meta_db,
177                    &meta_keys::SCHEMA_VERSION,
178                    &"1",
179                    WriteFlags::empty(),
180                )
181                .map_err(|e| AzothError::Transaction(e.to_string()))?;
182            }
183
184            // Initialize timestamps if not present
185            let now = chrono::Utc::now().to_rfc3339();
186            if txn.get(meta_db, &meta_keys::CREATED_AT).is_err() {
187                txn.put(meta_db, &meta_keys::CREATED_AT, &now, WriteFlags::empty())
188                    .map_err(|e| AzothError::Transaction(e.to_string()))?;
189            }
190            txn.put(meta_db, &meta_keys::UPDATED_AT, &now, WriteFlags::empty())
191                .map_err(|e| AzothError::Transaction(e.to_string()))?;
192
193            txn.commit()
194                .map_err(|e| AzothError::Transaction(e.to_string()))?;
195        }
196
197        Ok(Self {
198            env,
199            state_db,
200            events_db,
201            meta_db,
202            config_path: cfg.path.clone(),
203            event_log,
204            lock_manager,
205            write_lock: Arc::new(Mutex::new(())),
206            paused: Arc::new(AtomicBool::new(false)),
207        })
208    }
209
210    fn close(&self) -> Result<()> {
211        // LMDB closes automatically on drop
212        Ok(())
213    }
214
215    fn read_txn(&self) -> Result<Self::Txn<'_>> {
216        // Read-only transactions not supported yet
217        // Would require separate transaction type
218        Err(AzothError::InvalidState(
219            "Read-only transactions not yet implemented".into(),
220        ))
221    }
222
223    fn write_txn(&self) -> Result<Self::Txn<'_>> {
224        // Acquire write lock (single writer)
225        // Note: This is held for the entire transaction duration
226        let _guard = self.write_lock.lock().unwrap();
227
228        // Check if paused
229        if self.paused.load(Ordering::SeqCst) {
230            return Err(AzothError::Paused);
231        }
232
233        // Check if sealed
234        let ro_txn = self
235            .env
236            .begin_ro_txn()
237            .map_err(|e| AzothError::Transaction(e.to_string()))?;
238        if let Some(sealed_id) = self.get_sealed_event_id(&ro_txn)? {
239            return Err(AzothError::Sealed(sealed_id));
240        }
241        drop(ro_txn); // Release read transaction
242
243        let txn = self
244            .env
245            .begin_rw_txn()
246            .map_err(|e| AzothError::Transaction(e.to_string()))?;
247
248        // Lock is released when guard drops at the end of this scope
249        // This means the transaction can only be created, not held across await points
250        drop(_guard);
251
252        Ok(LmdbWriteTxn::new(
253            txn,
254            self.state_db,
255            self.events_db,
256            self.meta_db,
257            self.event_log.clone(),
258        ))
259    }
260
261    fn iter_events(&self, from: EventId, to: Option<EventId>) -> Result<Box<dyn EventIter>> {
262        // Use file-based event log instead of LMDB
263        // Convert EventLogIterator to EventIter via adapter
264        let iter = self.event_log.iter_range(from, to)?;
265        Ok(Box::new(EventIterAdapter(iter)))
266    }
267
268    fn seal(&self) -> Result<EventId> {
269        let _guard = self.write_lock.lock().unwrap();
270
271        let mut txn = self
272            .env
273            .begin_rw_txn()
274            .map_err(|e| AzothError::Transaction(e.to_string()))?;
275
276        let next_event_id = self.get_next_event_id(&txn)?;
277        let sealed_event_id = if next_event_id > 0 {
278            next_event_id - 1
279        } else {
280            0
281        };
282
283        self.set_meta(
284            &mut txn,
285            meta_keys::SEALED_EVENT_ID,
286            &sealed_event_id.to_string(),
287        )?;
288        self.set_meta(
289            &mut txn,
290            meta_keys::UPDATED_AT,
291            &chrono::Utc::now().to_rfc3339(),
292        )?;
293
294        txn.commit()
295            .map_err(|e| AzothError::Transaction(e.to_string()))?;
296
297        Ok(sealed_event_id)
298    }
299
300    fn lock_manager(&self) -> &LockManager {
301        &self.lock_manager
302    }
303
304    fn pause_ingestion(&self) -> Result<()> {
305        self.paused.store(true, Ordering::SeqCst);
306        // Wait for in-flight transactions (they hold write_lock)
307        let _guard = self.write_lock.lock().unwrap();
308        Ok(())
309    }
310
311    fn resume_ingestion(&self) -> Result<()> {
312        self.paused.store(false, Ordering::SeqCst);
313        Ok(())
314    }
315
316    fn is_paused(&self) -> bool {
317        self.paused.load(Ordering::SeqCst)
318    }
319
320    fn backup_to(&self, dir: &Path) -> Result<BackupInfo> {
321        crate::backup::backup_to(self, dir)
322    }
323
324    fn restore_from(dir: &Path, cfg: CanonicalConfig) -> Result<Self> {
325        crate::backup::restore_from(dir, cfg)
326    }
327
328    fn meta(&self) -> Result<CanonicalMeta> {
329        let txn = self
330            .env
331            .begin_ro_txn()
332            .map_err(|e| AzothError::Transaction(e.to_string()))?;
333
334        let next_event_id = self.get_next_event_id(&txn)?;
335        let sealed_event_id = self.get_sealed_event_id(&txn)?;
336        let schema_version = self
337            .get_meta(&txn, meta_keys::SCHEMA_VERSION)?
338            .and_then(|s| s.parse().ok())
339            .unwrap_or(1);
340        let created_at = self
341            .get_meta(&txn, meta_keys::CREATED_AT)?
342            .unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
343        let updated_at = self
344            .get_meta(&txn, meta_keys::UPDATED_AT)?
345            .unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
346
347        Ok(CanonicalMeta {
348            next_event_id,
349            sealed_event_id,
350            schema_version,
351            created_at,
352            updated_at,
353        })
354    }
355}
356
357impl LmdbCanonicalStore {
358    /// Get reference to the file-based event log
359    ///
360    /// This allows direct access to event storage for advanced use cases.
361    pub fn event_log(&self) -> &Arc<FileEventLog> {
362        &self.event_log
363    }
364}