Skip to main content

azoth_lmdb/
store.rs

1use azoth_core::{
2    error::{AzothError, Result},
3    event_log::{EventLog, EventLogIterator},
4    lock_manager::LockManager,
5    traits::{self, CanonicalStore, CanonicalTxn, EventIter, StateIter},
6    types::{BackupInfo, CanonicalMeta, CommitInfo, EventId},
7    CanonicalConfig,
8};
9use azoth_file_log::{FileEventLog, FileEventLogConfig};
10use lmdb::{Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, WriteFlags};
11use parking_lot::Mutex;
12use std::path::Path;
13use std::sync::{
14    atomic::{AtomicBool, AtomicUsize, Ordering},
15    Arc,
16};
17use std::time::Duration;
18use tokio::sync::Notify;
19
20use crate::keys::meta_keys;
21use crate::preflight_cache::PreflightCache;
22use crate::read_pool::LmdbReadPool;
23use crate::txn::LmdbWriteTxn;
24
25/// Adapter to convert EventLogIterator to EventIter
26struct EventIterAdapter(Box<dyn EventLogIterator>);
27
28impl EventIter for EventIterAdapter {
29    fn next(&mut self) -> Result<Option<(EventId, Vec<u8>)>> {
30        // Convert from Iterator's Option<Result<T>> to EventIter's Result<Option<T>>
31        match self.0.next() {
32            Some(Ok(item)) => Ok(Some(item)),
33            Some(Err(e)) => Err(e),
34            None => Ok(None),
35        }
36    }
37}
38
39/// LMDB-backed canonical store
40///
41/// State is stored in LMDB for transactional integrity.
42/// Events are stored in a file-based event log for better performance.
43pub struct LmdbCanonicalStore {
44    pub(crate) env: Arc<Environment>,
45    pub(crate) state_db: Database,
46    pub(crate) meta_db: Database,
47    pub(crate) config_path: std::path::PathBuf,
48    pub(crate) event_log: Arc<FileEventLog>,
49    dead_letter_queue: Arc<crate::dead_letter_queue::DeadLetterQueue>,
50    lock_manager: Arc<LockManager>,
51    write_lock: Arc<Mutex<()>>,
52    paused: Arc<AtomicBool>,
53    chunk_size: usize,
54    txn_counter: Arc<AtomicUsize>,
55    preflight_cache: Arc<PreflightCache>,
56    read_pool: Option<Arc<LmdbReadPool>>,
57    /// Shared notifier that fires after each successful event append.
58    ///
59    /// Created during `open()` and injected into the `FileEventLog`.
60    /// Consumers (projector, event processor) can clone this to await
61    /// new events without polling.
62    event_notify: Arc<Notify>,
63}
64
65impl LmdbCanonicalStore {
66    /// Get the next event ID from meta
67    fn get_next_event_id<T: Transaction>(&self, txn: &T) -> Result<EventId> {
68        match txn.get(self.meta_db, &meta_keys::NEXT_EVENT_ID) {
69            Ok(bytes) => {
70                let id_str = std::str::from_utf8(bytes)
71                    .map_err(|e| AzothError::Serialization(e.to_string()))?;
72                id_str
73                    .parse::<EventId>()
74                    .map_err(|e| AzothError::Serialization(e.to_string()))
75            }
76            Err(lmdb::Error::NotFound) => Ok(0),
77            Err(e) => Err(AzothError::Transaction(e.to_string())),
78        }
79    }
80
81    /// Get the sealed event ID from meta (if any)
82    fn get_sealed_event_id<T: Transaction>(&self, txn: &T) -> Result<Option<EventId>> {
83        match txn.get(self.meta_db, &meta_keys::SEALED_EVENT_ID) {
84            Ok(bytes) => {
85                let id_str = std::str::from_utf8(bytes)
86                    .map_err(|e| AzothError::Serialization(e.to_string()))?;
87                let id = id_str
88                    .parse::<EventId>()
89                    .map_err(|e| AzothError::Serialization(e.to_string()))?;
90                Ok(Some(id))
91            }
92            Err(lmdb::Error::NotFound) => Ok(None),
93            Err(e) => Err(AzothError::Transaction(e.to_string())),
94        }
95    }
96
97    /// Set meta value
98    fn set_meta(&self, txn: &mut lmdb::RwTransaction, key: &str, value: &str) -> Result<()> {
99        txn.put(self.meta_db, &key, &value, WriteFlags::empty())
100            .map_err(|e| AzothError::Transaction(e.to_string()))
101    }
102
103    /// Get meta value
104    fn get_meta<T: Transaction>(&self, txn: &T, key: &str) -> Result<Option<String>> {
105        match txn.get(self.meta_db, &key) {
106            Ok(bytes) => {
107                let value = std::str::from_utf8(bytes)
108                    .map_err(|e| AzothError::Serialization(e.to_string()))?
109                    .to_string();
110                Ok(Some(value))
111            }
112            Err(lmdb::Error::NotFound) => Ok(None),
113            Err(e) => Err(AzothError::Transaction(e.to_string())),
114        }
115    }
116}
117
118impl CanonicalStore for LmdbCanonicalStore {
119    type Txn<'a> = LmdbWriteTxn<'a>;
120    type ReadTxn<'a> = crate::txn::LmdbReadTxn<'a>;
121
122    fn open(cfg: CanonicalConfig) -> Result<Self> {
123        // Create directory if it doesn't exist
124        std::fs::create_dir_all(&cfg.path)?;
125
126        // Configure LMDB environment
127        let mut env_builder = Environment::new();
128        env_builder.set_max_dbs(2); // state, meta
129        env_builder.set_map_size(cfg.map_size);
130        env_builder.set_max_readers(cfg.max_readers);
131
132        // Set sync flags based on config
133        let mut flags = EnvironmentFlags::empty();
134        match cfg.sync_mode {
135            azoth_core::config::SyncMode::Full => {}
136            azoth_core::config::SyncMode::NoMetaSync => {
137                flags.insert(EnvironmentFlags::NO_META_SYNC);
138            }
139            azoth_core::config::SyncMode::NoSync => {
140                flags.insert(EnvironmentFlags::NO_SYNC);
141            }
142        }
143        env_builder.set_flags(flags);
144
145        let env = env_builder
146            .open(&cfg.path)
147            .map_err(|e| AzothError::Io(std::io::Error::other(e)))?;
148
149        // Open databases
150        let state_db = env
151            .create_db(Some("state"), DatabaseFlags::empty())
152            .map_err(|e| AzothError::Transaction(e.to_string()))?;
153
154        let meta_db = env
155            .create_db(Some("meta"), DatabaseFlags::empty())
156            .map_err(|e| AzothError::Transaction(e.to_string()))?;
157
158        let env = Arc::new(env);
159        let lock_manager = Arc::new(LockManager::new(
160            cfg.stripe_count,
161            Duration::from_millis(cfg.lock_timeout_ms),
162        ));
163
164        // Initialize preflight cache
165        let preflight_cache = Arc::new(PreflightCache::new(
166            cfg.preflight_cache_size,
167            cfg.preflight_cache_ttl_secs,
168            cfg.preflight_cache_enabled,
169        ));
170
171        // Initialize file-based event log with push-based notification
172        let event_notify = Arc::new(Notify::new());
173        let event_log_config = FileEventLogConfig {
174            base_dir: cfg.path.join("event-log"),
175            max_event_size: cfg.event_max_size_bytes,
176            max_batch_bytes: cfg.event_batch_max_bytes,
177            ..Default::default()
178        };
179        let mut event_log = FileEventLog::open(event_log_config)?;
180        event_log.set_event_notify(event_notify.clone());
181        let event_log = Arc::new(event_log);
182
183        // Initialize dead letter queue for failed event log writes
184        let dlq_dir = cfg.path.join("dlq");
185        let dead_letter_queue = crate::dead_letter_queue::DeadLetterQueue::open(&dlq_dir)?;
186
187        // Check for existing DLQ entries on startup (operational warning)
188        match dead_letter_queue.entry_count() {
189            Ok(count) if count > 0 => {
190                tracing::warn!(
191                    dlq_entries = count,
192                    dlq_path = %dlq_dir.display(),
193                    "Dead letter queue has {} unprocessed entries from previous runs. \
194                     These events were committed to state but not to the event log. \
195                     Run the DLQ recovery tool to replay them.",
196                    count
197                );
198            }
199            Ok(_) => {}
200            Err(e) => {
201                tracing::warn!("Failed to check DLQ entry count: {}", e);
202            }
203        }
204
205        // Initialize metadata if needed
206        {
207            let mut txn = env
208                .begin_rw_txn()
209                .map_err(|e| AzothError::Transaction(e.to_string()))?;
210
211            // Initialize next_event_id if not present
212            if txn.get(meta_db, &meta_keys::NEXT_EVENT_ID).is_err() {
213                txn.put(
214                    meta_db,
215                    &meta_keys::NEXT_EVENT_ID,
216                    &"0",
217                    WriteFlags::empty(),
218                )
219                .map_err(|e| AzothError::Transaction(e.to_string()))?;
220            }
221
222            // Initialize schema_version if not present
223            // schema_version starts at 0 so that migrations starting from version 1 will be applied
224            if txn.get(meta_db, &meta_keys::SCHEMA_VERSION).is_err() {
225                txn.put(
226                    meta_db,
227                    &meta_keys::SCHEMA_VERSION,
228                    &"0",
229                    WriteFlags::empty(),
230                )
231                .map_err(|e| AzothError::Transaction(e.to_string()))?;
232            }
233
234            // Initialize timestamps if not present
235            let now = chrono::Utc::now().to_rfc3339();
236            if txn.get(meta_db, &meta_keys::CREATED_AT).is_err() {
237                txn.put(meta_db, &meta_keys::CREATED_AT, &now, WriteFlags::empty())
238                    .map_err(|e| AzothError::Transaction(e.to_string()))?;
239            }
240            txn.put(meta_db, &meta_keys::UPDATED_AT, &now, WriteFlags::empty())
241                .map_err(|e| AzothError::Transaction(e.to_string()))?;
242
243            txn.commit()
244                .map_err(|e| AzothError::Transaction(e.to_string()))?;
245        }
246
247        // Initialize read pool if enabled
248        let read_pool = if cfg.read_pool.enabled {
249            Some(Arc::new(LmdbReadPool::new(
250                env.clone(),
251                state_db,
252                cfg.read_pool.clone(),
253            )))
254        } else {
255            None
256        };
257
258        Ok(Self {
259            env,
260            state_db,
261            meta_db,
262            config_path: cfg.path.clone(),
263            event_log,
264            dead_letter_queue,
265            lock_manager,
266            write_lock: Arc::new(Mutex::new(())),
267            paused: Arc::new(AtomicBool::new(false)),
268            chunk_size: cfg.state_iter_chunk_size,
269            txn_counter: Arc::new(AtomicUsize::new(0)),
270            preflight_cache,
271            read_pool,
272            event_notify,
273        })
274    }
275
276    fn close(&self) -> Result<()> {
277        // LMDB closes automatically on drop
278        Ok(())
279    }
280
281    fn read_txn(&self) -> Result<Self::ReadTxn<'_>> {
282        let txn = self
283            .env
284            .begin_ro_txn()
285            .map_err(|e| AzothError::Transaction(e.to_string()))?;
286        Ok(crate::txn::LmdbReadTxn::new(txn, self.state_db))
287    }
288
289    fn write_txn(&self) -> Result<Self::Txn<'_>> {
290        // Increment transaction counter (transaction starting)
291        self.txn_counter.fetch_add(1, Ordering::SeqCst);
292
293        // Check if paused (no lock needed)
294        if self.paused.load(Ordering::SeqCst) {
295            self.txn_counter.fetch_sub(1, Ordering::SeqCst);
296            return Err(AzothError::Paused);
297        }
298
299        // Check if sealed
300        let ro_txn = self.env.begin_ro_txn().map_err(|e| {
301            self.txn_counter.fetch_sub(1, Ordering::SeqCst);
302            AzothError::Transaction(e.to_string())
303        })?;
304        if let Some(sealed_id) = self.get_sealed_event_id(&ro_txn)? {
305            self.txn_counter.fetch_sub(1, Ordering::SeqCst);
306            return Err(AzothError::Sealed(sealed_id));
307        }
308        drop(ro_txn); // Release read transaction
309
310        // Let LMDB handle write serialization (no lock needed)
311        let txn = self.env.begin_rw_txn().map_err(|e| {
312            self.txn_counter.fetch_sub(1, Ordering::SeqCst);
313            AzothError::Transaction(e.to_string())
314        })?;
315
316        Ok(LmdbWriteTxn::new(
317            txn,
318            self.state_db,
319            self.meta_db,
320            self.event_log.clone(),
321            self.dead_letter_queue.clone(),
322            Arc::downgrade(&self.txn_counter),
323            self.preflight_cache.clone(),
324        ))
325    }
326
327    fn iter_events(&self, from: EventId, to: Option<EventId>) -> Result<Box<dyn EventIter>> {
328        // Use file-based event log instead of LMDB
329        // Convert EventLogIterator to EventIter via adapter
330        let iter = self.event_log.iter_range(from, to)?;
331        Ok(Box::new(EventIterAdapter(iter)))
332    }
333
334    fn range(&self, start: &[u8], end: Option<&[u8]>) -> Result<Box<dyn traits::StateIter>> {
335        let iter = crate::state_iter::LmdbStateIter::new(
336            self.env.clone(),
337            self.state_db,
338            start,
339            end,
340            self.chunk_size,
341        )?;
342        Ok(Box::new(iter))
343    }
344
345    fn scan_prefix(&self, prefix: &[u8]) -> Result<Box<dyn traits::StateIter>> {
346        let iter = crate::state_iter::LmdbStateIter::with_prefix(
347            self.env.clone(),
348            self.state_db,
349            prefix,
350            self.chunk_size,
351        )?;
352        Ok(Box::new(iter))
353    }
354
355    fn seal(&self) -> Result<EventId> {
356        let _guard = self.write_lock.lock();
357
358        let mut txn = self
359            .env
360            .begin_rw_txn()
361            .map_err(|e| AzothError::Transaction(e.to_string()))?;
362
363        let next_event_id = self.get_next_event_id(&txn)?;
364        // Empty event log: don't seal at all. Writing SEALED_EVENT_ID=0 makes the
365        // projector believe event 0 exists and can cause "lag" to stay >0 forever.
366        let sealed_event_id = if next_event_id > 0 {
367            next_event_id - 1
368        } else {
369            0
370        };
371        if next_event_id > 0 {
372            self.set_meta(
373                &mut txn,
374                meta_keys::SEALED_EVENT_ID,
375                &sealed_event_id.to_string(),
376            )?;
377        } else {
378            // Clear any previous seal, if present.
379            let _ = txn.del(self.meta_db, &meta_keys::SEALED_EVENT_ID, None);
380        }
381        self.set_meta(
382            &mut txn,
383            meta_keys::UPDATED_AT,
384            &chrono::Utc::now().to_rfc3339(),
385        )?;
386
387        txn.commit()
388            .map_err(|e| AzothError::Transaction(e.to_string()))?;
389
390        Ok(sealed_event_id)
391    }
392
393    fn lock_manager(&self) -> &LockManager {
394        &self.lock_manager
395    }
396
397    fn pause_ingestion(&self) -> Result<()> {
398        self.paused.store(true, Ordering::SeqCst);
399
400        // Wait for in-flight transactions to complete
401        let start = std::time::Instant::now();
402        while self.txn_counter.load(Ordering::SeqCst) > 0 {
403            if start.elapsed() > std::time::Duration::from_secs(30) {
404                return Err(AzothError::Timeout(
405                    "Waiting for in-flight transactions to complete".into(),
406                ));
407            }
408            std::thread::sleep(std::time::Duration::from_millis(10));
409        }
410
411        Ok(())
412    }
413
414    fn resume_ingestion(&self) -> Result<()> {
415        self.paused.store(false, Ordering::SeqCst);
416        Ok(())
417    }
418
419    fn is_paused(&self) -> bool {
420        self.paused.load(Ordering::SeqCst)
421    }
422
423    fn backup_to(&self, dir: &Path) -> Result<BackupInfo> {
424        crate::backup::backup_to(self, dir)
425    }
426
427    fn restore_from(dir: &Path, cfg: CanonicalConfig) -> Result<Self> {
428        crate::backup::restore_from(dir, cfg)
429    }
430
431    fn meta(&self) -> Result<CanonicalMeta> {
432        let txn = self
433            .env
434            .begin_ro_txn()
435            .map_err(|e| AzothError::Transaction(e.to_string()))?;
436
437        let next_event_id = self.get_next_event_id(&txn)?;
438        let sealed_event_id = self.get_sealed_event_id(&txn)?;
439        let schema_version = self
440            .get_meta(&txn, meta_keys::SCHEMA_VERSION)?
441            .and_then(|s| s.parse().ok())
442            .unwrap_or(1);
443        let created_at = self
444            .get_meta(&txn, meta_keys::CREATED_AT)?
445            .unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
446        let updated_at = self
447            .get_meta(&txn, meta_keys::UPDATED_AT)?
448            .unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
449
450        Ok(CanonicalMeta {
451            next_event_id,
452            sealed_event_id,
453            schema_version,
454            created_at,
455            updated_at,
456        })
457    }
458}
459
460impl LmdbCanonicalStore {
461    /// Clear the sealed marker, re-enabling writes.
462    ///
463    /// Sealing is used as a temporary barrier to create deterministic snapshots. Backups should
464    /// clear the seal before resuming ingestion; otherwise the DB becomes permanently read-only.
465    pub fn clear_seal(&self) -> Result<()> {
466        let _guard = self.write_lock.lock();
467
468        let mut txn = self
469            .env
470            .begin_rw_txn()
471            .map_err(|e| AzothError::Transaction(e.to_string()))?;
472
473        // Ignore errors (e.g. NotFound).
474        let _ = txn.del(self.meta_db, &meta_keys::SEALED_EVENT_ID, None);
475        let _ = self.set_meta(
476            &mut txn,
477            meta_keys::UPDATED_AT,
478            &chrono::Utc::now().to_rfc3339(),
479        );
480
481        txn.commit()
482            .map_err(|e| AzothError::Transaction(e.to_string()))?;
483        Ok(())
484    }
485
486    /// Begin a read-only transaction
487    ///
488    /// Read-only transactions allow concurrent reads without blocking writes or other reads.
489    /// This is more efficient than write_txn() for preflight validation and queries.
490    pub fn read_only_txn(&self) -> Result<crate::txn::LmdbReadTxn<'_>> {
491        let txn = self
492            .env
493            .begin_ro_txn()
494            .map_err(|e| AzothError::Transaction(e.to_string()))?;
495        Ok(crate::txn::LmdbReadTxn::new(txn, self.state_db))
496    }
497
498    /// Iterate events asynchronously
499    ///
500    /// This method wraps the synchronous event iterator in a blocking task,
501    /// allowing it to be used in async contexts without blocking the runtime.
502    pub async fn iter_events_async(
503        &self,
504        from: EventId,
505        to: Option<EventId>,
506    ) -> Result<Vec<(EventId, Vec<u8>)>> {
507        let event_log = self.event_log.clone();
508
509        tokio::task::spawn_blocking(move || {
510            let mut iter = event_log.iter_range(from, to)?;
511            let mut results = Vec::new();
512
513            loop {
514                match iter.next() {
515                    Some(Ok(item)) => results.push(item),
516                    Some(Err(e)) => return Err(e),
517                    None => break,
518                }
519            }
520
521            Ok(results)
522        })
523        .await
524        .map_err(|e| AzothError::Internal(format!("Task join error: {}", e)))?
525    }
526
527    /// Get single state value asynchronously
528    ///
529    /// This method wraps the synchronous read in a blocking task,
530    /// allowing it to be used in async contexts without blocking the runtime.
531    pub async fn get_state_async(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
532        let env = self.env.clone();
533        let state_db = self.state_db;
534        let key = key.to_vec();
535
536        tokio::task::spawn_blocking(move || {
537            let txn = env
538                .begin_ro_txn()
539                .map_err(|e| AzothError::Transaction(e.to_string()))?;
540
541            match txn.get(state_db, &key) {
542                Ok(bytes) => Ok(Some(bytes.to_vec())),
543                Err(lmdb::Error::NotFound) => Ok(None),
544                Err(e) => Err(AzothError::Transaction(e.to_string())),
545            }
546        })
547        .await
548        .map_err(|e| AzothError::Internal(format!("Task join error: {}", e)))?
549    }
550
551    /// Scan prefix asynchronously
552    ///
553    /// This method wraps the synchronous prefix scan in a blocking task,
554    /// allowing it to be used in async contexts without blocking the runtime.
555    pub async fn scan_prefix_async(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
556        let env = self.env.clone();
557        let state_db = self.state_db;
558        let prefix = prefix.to_vec();
559        let chunk_size = self.chunk_size;
560
561        tokio::task::spawn_blocking(move || {
562            let mut iter =
563                crate::state_iter::LmdbStateIter::with_prefix(env, state_db, &prefix, chunk_size)?;
564
565            let mut results = Vec::new();
566            while let Some(item) = iter.next()? {
567                results.push(item);
568            }
569
570            Ok(results)
571        })
572        .await
573        .map_err(|e| AzothError::Internal(format!("Task join error: {}", e)))?
574    }
575
576    /// Get reference to the file-based event log
577    ///
578    /// This allows direct access to event storage for advanced use cases.
579    pub fn event_log(&self) -> &Arc<FileEventLog> {
580        &self.event_log
581    }
582
583    /// Get the shared event notification handle.
584    ///
585    /// This `Notify` fires after every successful event append. Consumers
586    /// (projectors, event processors) can call `notify.notified().await` to
587    /// wake immediately when new events are available, eliminating polling.
588    pub fn event_notify(&self) -> Arc<Notify> {
589        self.event_notify.clone()
590    }
591
592    /// Get reference to the preflight cache
593    ///
594    /// This allows access to cache statistics and manual cache operations.
595    pub fn preflight_cache(&self) -> &Arc<PreflightCache> {
596        &self.preflight_cache
597    }
598
599    /// Get reference to the read pool (if enabled)
600    ///
601    /// Returns None if read pooling was not enabled in config.
602    pub fn read_pool(&self) -> Option<&Arc<LmdbReadPool>> {
603        self.read_pool.as_ref()
604    }
605
606    // ---- Native async write API ----
607
608    /// Execute a write operation asynchronously.
609    ///
610    /// Opens a write transaction, passes it to the closure, and commits
611    /// atomically -- all inside `spawn_blocking`. This centralizes the
612    /// `spawn_blocking` boilerplate that otherwise appears at every call site.
613    ///
614    /// The closure receives a mutable reference to an `LmdbWriteTxn` and
615    /// can perform any combination of `put_state`, `del_state`,
616    /// `append_event`, etc.  The return value `R` is forwarded to the caller.
617    ///
618    /// # Example
619    /// ```ignore
620    /// let info = store.submit_write(|txn| {
621    ///     txn.put_state(b"key", b"value")?;
622    ///     txn.append_event(b"{\"type\":\"put\"}")?;
623    ///     Ok(())
624    /// }).await?;
625    /// ```
626    pub async fn submit_write<F, R>(&self, f: F) -> Result<R>
627    where
628        F: for<'a> FnOnce(&mut LmdbWriteTxn<'a>) -> Result<R> + Send + 'static,
629        R: Send + 'static,
630    {
631        let env = self.env.clone();
632        let state_db = self.state_db;
633        let meta_db = self.meta_db;
634        let event_log = self.event_log.clone();
635        let dead_letter_queue = self.dead_letter_queue.clone();
636        let txn_counter = self.txn_counter.clone();
637        let preflight_cache = self.preflight_cache.clone();
638        let paused = self.paused.clone();
639
640        tokio::task::spawn_blocking(move || {
641            // Check if paused
642            if paused.load(Ordering::SeqCst) {
643                return Err(AzothError::Paused);
644            }
645
646            // Increment transaction counter
647            txn_counter.fetch_add(1, Ordering::SeqCst);
648
649            let rw_txn = match env.begin_rw_txn() {
650                Ok(t) => t,
651                Err(e) => {
652                    txn_counter.fetch_sub(1, Ordering::SeqCst);
653                    return Err(AzothError::Transaction(e.to_string()));
654                }
655            };
656
657            let mut write_txn = LmdbWriteTxn::new(
658                rw_txn,
659                state_db,
660                meta_db,
661                event_log,
662                dead_letter_queue,
663                Arc::downgrade(&txn_counter),
664                preflight_cache,
665            );
666
667            let result = f(&mut write_txn)?;
668            write_txn.commit()?;
669            Ok(result)
670        })
671        .await
672        .map_err(|e| AzothError::Internal(format!("Task join error: {}", e)))?
673    }
674
675    /// Put a single key-value pair asynchronously and commit.
676    ///
677    /// Convenience wrapper around [`Self::submit_write`] for the common case of
678    /// writing a single key without an event.
679    pub async fn async_put_state(&self, key: &[u8], value: &[u8]) -> Result<CommitInfo> {
680        let key = key.to_vec();
681        let value = value.to_vec();
682        self.submit_write(move |txn| {
683            txn.put_state(&key, &value)?;
684            Ok(())
685        })
686        .await?;
687        // submit_write commits; return a minimal CommitInfo
688        Ok(CommitInfo {
689            events_written: 0,
690            first_event_id: None,
691            last_event_id: None,
692            state_keys_written: 1,
693            state_keys_deleted: 0,
694            dlq_events: 0,
695        })
696    }
697
698    /// Delete a single key asynchronously and commit.
699    ///
700    /// Convenience wrapper around [`Self::submit_write`].
701    pub async fn async_del_state(&self, key: &[u8]) -> Result<CommitInfo> {
702        let key = key.to_vec();
703        self.submit_write(move |txn| {
704            txn.del_state(&key)?;
705            Ok(())
706        })
707        .await?;
708        Ok(CommitInfo {
709            events_written: 0,
710            first_event_id: None,
711            last_event_id: None,
712            state_keys_written: 0,
713            state_keys_deleted: 1,
714            dlq_events: 0,
715        })
716    }
717
718    /// Check if read pooling is enabled
719    pub fn has_read_pool(&self) -> bool {
720        self.read_pool.is_some()
721    }
722}