Skip to main content

ant_node/storage/
lmdb.rs

1//! Content-addressed LMDB storage for chunks.
2//!
3//! Provides persistent storage for chunks using LMDB (via heed) for
4//! memory-mapped, zero-copy reads with ACID transactions.
5//!
6//! ```text
7//! {root}/chunks.mdb/   -- LMDB environment directory
8//! ```
9
10use crate::ant_protocol::XorName;
11use crate::error::{Error, Result};
12use crate::logging::{debug, info, trace, warn};
13use heed::types::Bytes;
14use heed::{Database, Env, EnvOpenOptions, MdbError};
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::Instant;
18use tokio::task::spawn_blocking;
19
20use crate::ant_protocol::XORNAME_LEN;
21
22/// Bytes in one MiB.
23pub const MIB: u64 = 1024 * 1024;
24
25/// Bytes in one GiB.
26pub const GIB: u64 = 1024 * MIB;
27
28/// Default minimum free disk space to preserve on the storage partition.
29const DEFAULT_DISK_RESERVE: u64 = 500 * MIB;
30
31/// Convert a byte count to GiB for human-readable log messages.
32#[allow(clippy::cast_precision_loss)] // display only — sub-byte precision is irrelevant
33fn bytes_to_gib(bytes: u64) -> f64 {
34    bytes as f64 / GIB as f64
35}
36
37/// Absolute minimum LMDB map size.
38///
39/// Even on a nearly-full disk the database must be able to open.
40/// Set to 256 MiB — enough for millions of LMDB pages.
41const MIN_MAP_SIZE: usize = 256 * 1024 * 1024;
42
43/// How often to re-query available disk space (in seconds).
44///
45/// Between checks the cached result is trusted.  Disk space changes slowly
46/// relative to chunk-write throughput, so a multi-second window is safe.
47const DISK_CHECK_INTERVAL_SECS: u64 = 5;
48
49/// Configuration for LMDB storage.
50#[derive(Debug, Clone)]
51pub struct LmdbStorageConfig {
52    /// Root directory for storage (LMDB env lives at `{root_dir}/chunks.mdb/`).
53    pub root_dir: PathBuf,
54    /// Whether to verify content on read (compares hash to address).
55    pub verify_on_read: bool,
56    /// Explicit LMDB map size cap in bytes.
57    ///
58    /// When 0 (default), the map size is computed automatically from available
59    /// disk space and grows on demand when more storage becomes available.
60    pub max_map_size: usize,
61    /// Minimum free disk space (in bytes) to preserve on the storage partition.
62    ///
63    /// Writes are refused when available space drops below this threshold.
64    pub disk_reserve: u64,
65}
66
67impl Default for LmdbStorageConfig {
68    fn default() -> Self {
69        Self {
70            root_dir: PathBuf::from(".ant/chunks"),
71            verify_on_read: true,
72            max_map_size: 0,
73            disk_reserve: DEFAULT_DISK_RESERVE,
74        }
75    }
76}
77
78impl LmdbStorageConfig {
79    /// A test-friendly default with `disk_reserve` set to 0 so unit tests
80    /// don't depend on the host having >= 1 GiB free disk space.
81    #[cfg(any(test, feature = "test-utils"))]
82    #[must_use]
83    pub fn test_default() -> Self {
84        Self {
85            disk_reserve: 0,
86            ..Self::default()
87        }
88    }
89}
90
91/// Statistics about storage operations.
92#[derive(Debug, Clone, Default)]
93pub struct StorageStats {
94    /// Total number of chunks stored.
95    pub chunks_stored: u64,
96    /// Total number of chunks retrieved.
97    pub chunks_retrieved: u64,
98    /// Total bytes stored.
99    pub bytes_stored: u64,
100    /// Total bytes retrieved.
101    pub bytes_retrieved: u64,
102    /// Number of duplicate writes (already exists).
103    pub duplicates: u64,
104    /// Number of verification failures on read.
105    pub verification_failures: u64,
106    /// Number of chunks currently persisted.
107    pub current_chunks: u64,
108}
109
110/// Content-addressed LMDB storage.
111///
112/// Uses heed (LMDB wrapper) for memory-mapped, transactional chunk storage.
113/// Keys are 32-byte `XorName` addresses, values are raw chunk bytes.
114pub struct LmdbStorage {
115    /// LMDB environment.
116    env: Env,
117    /// The unnamed default database (key=XorName bytes, value=chunk bytes).
118    db: Database<Bytes, Bytes>,
119    /// Storage configuration.
120    config: LmdbStorageConfig,
121    /// Path to the LMDB environment directory (for disk-space queries).
122    env_dir: PathBuf,
123    /// Operation statistics.
124    stats: parking_lot::RwLock<StorageStats>,
125    /// Serialises access to the LMDB environment during a map resize.
126    ///
127    /// Normal read/write operations acquire a **shared** lock.  The rare
128    /// resize path acquires an **exclusive** lock, ensuring no transactions
129    /// are active when `env.resize()` is called (an LMDB safety requirement).
130    env_lock: Arc<parking_lot::RwLock<()>>,
131    /// Timestamp of the last successful disk-space check.
132    ///
133    /// `None` means "never checked — check on next write".  Updated only
134    /// after a passing check, so a low-space result is always rechecked.
135    last_disk_ok: parking_lot::Mutex<Option<Instant>>,
136}
137
138impl LmdbStorage {
139    /// Create a new LMDB storage instance.
140    ///
141    /// Opens (or creates) an LMDB environment at `{root_dir}/chunks.mdb/`.
142    ///
143    /// When `config.max_map_size` is 0 (the default) the map size is derived
144    /// from the available disk space on the partition that hosts the database,
145    /// minus `config.disk_reserve`.  This allows a node to use all available
146    /// storage without a fixed cap.  If the operator adds more storage later
147    /// the map is resized on demand (see [`Self::put`]).
148    ///
149    /// # Errors
150    ///
151    /// Returns an error if the LMDB environment cannot be opened.
152    #[allow(unsafe_code)]
153    pub async fn new(config: LmdbStorageConfig) -> Result<Self> {
154        let env_dir = config.root_dir.join("chunks.mdb");
155
156        // Create the directory synchronously before opening LMDB
157        std::fs::create_dir_all(&env_dir)
158            .map_err(|e| Error::Storage(format!("Failed to create LMDB directory: {e}")))?;
159
160        let map_size = if config.max_map_size > 0 {
161            // Operator provided an explicit cap.
162            config.max_map_size
163        } else {
164            // Auto-scale: current DB footprint + available space − reserve.
165            let computed = compute_map_size(&env_dir, config.disk_reserve)?;
166            info!(
167                "Auto-computed LMDB map size: {:.2} GiB (available disk minus {:.2} GiB reserve)",
168                bytes_to_gib(computed as u64),
169                bytes_to_gib(config.disk_reserve),
170            );
171            computed
172        };
173
174        let env_dir_clone = env_dir.clone();
175        let (env, db) = spawn_blocking(move || -> Result<(Env, Database<Bytes, Bytes>)> {
176            // SAFETY: `EnvOpenOptions::open()` is unsafe because LMDB uses memory-mapped
177            // I/O and relies on OS file-locking to prevent corruption from concurrent
178            // access by multiple processes. We satisfy this by giving each node instance
179            // a unique `root_dir` (typically a directory named by its full 64-hex peer
180            // ID), ensuring no two processes open the same LMDB environment. Callers
181            // who manually configure `--root-dir` must not point multiple nodes at the
182            // same directory.
183            let env = unsafe {
184                EnvOpenOptions::new()
185                    .map_size(map_size)
186                    .max_dbs(1)
187                    .open(&env_dir_clone)
188                    .map_err(|e| Error::Storage(format!("Failed to open LMDB env: {e}")))?
189            };
190
191            let mut wtxn = env
192                .write_txn()
193                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
194            let db: Database<Bytes, Bytes> = env
195                .create_database(&mut wtxn, None)
196                .map_err(|e| Error::Storage(format!("Failed to create database: {e}")))?;
197            wtxn.commit()
198                .map_err(|e| Error::Storage(format!("Failed to commit db creation: {e}")))?;
199
200            Ok((env, db))
201        })
202        .await
203        .map_err(|e| Error::Storage(format!("LMDB init task failed: {e}")))??;
204
205        let storage = Self {
206            env,
207            db,
208            config,
209            env_dir,
210            stats: parking_lot::RwLock::new(StorageStats::default()),
211            env_lock: Arc::new(parking_lot::RwLock::new(())),
212            last_disk_ok: parking_lot::Mutex::new(None),
213        };
214
215        debug!(
216            "Initialized LMDB storage at {:?} ({} existing chunks)",
217            storage.env_dir,
218            storage.current_chunks()?
219        );
220
221        Ok(storage)
222    }
223
224    /// Store a chunk.
225    ///
226    /// Before writing, verifies that available disk space exceeds the
227    /// configured reserve.  If the LMDB map is full but more disk space
228    /// exists (e.g. the operator added storage), the map is resized
229    /// automatically and the write is retried.
230    ///
231    /// # Returns
232    ///
233    /// Returns `true` if the chunk was newly stored, `false` if it already existed.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the write fails, content doesn't match address,
238    /// or the disk is too full to accept new chunks.
239    pub async fn put(&self, address: &XorName, content: &[u8]) -> Result<bool> {
240        // Verify content address
241        let computed = Self::compute_address(content);
242        if computed != *address {
243            return Err(Error::Storage(format!(
244                "Content address mismatch: expected {}, computed {}",
245                hex::encode(address),
246                hex::encode(computed)
247            )));
248        }
249
250        // Fast-path duplicate check (read-only, no write lock needed).
251        // This is an optimistic hint — the authoritative check happens inside
252        // the write transaction below to prevent TOCTOU races.
253        if self.exists(address)? {
254            trace!("Chunk {} already exists", hex::encode(address));
255            self.stats.write().duplicates += 1;
256            return Ok(false);
257        }
258
259        // ── Disk-space guard (cached — at most one syscall per interval) ─
260        // Placed after the duplicate check so that re-storing an existing
261        // chunk remains a harmless no-op even when disk space is low.
262        self.check_disk_space_cached()?;
263
264        // ── Write (with resize-on-demand) ───────────────────────────────
265        match self.try_put(address, content).await? {
266            PutOutcome::New => {}
267            PutOutcome::Duplicate => {
268                trace!("Chunk {} already exists", hex::encode(address));
269                self.stats.write().duplicates += 1;
270                return Ok(false);
271            }
272            PutOutcome::MapFull => {
273                // The map ceiling was reached but there may be more disk space
274                // available (e.g. operator expanded the partition).
275                self.try_resize().await?;
276                // Retry once after resize.
277                match self.try_put(address, content).await? {
278                    PutOutcome::New => {}
279                    PutOutcome::Duplicate => {
280                        self.stats.write().duplicates += 1;
281                        return Ok(false);
282                    }
283                    PutOutcome::MapFull => {
284                        return Err(Error::Storage(
285                            "LMDB map full after resize — disk may be at capacity".into(),
286                        ));
287                    }
288                }
289            }
290        }
291
292        {
293            let mut stats = self.stats.write();
294            stats.chunks_stored += 1;
295            stats.bytes_stored += content.len() as u64;
296        }
297
298        debug!(
299            "Stored chunk {} ({} bytes)",
300            hex::encode(address),
301            content.len()
302        );
303
304        Ok(true)
305    }
306
307    /// Attempt a single put inside a write transaction.
308    ///
309    /// Returns [`PutOutcome::MapFull`] instead of an error when the LMDB map
310    /// ceiling is reached, so the caller can resize and retry.
311    async fn try_put(&self, address: &XorName, content: &[u8]) -> Result<PutOutcome> {
312        let key = *address;
313        let value = content.to_vec();
314        let env = self.env.clone();
315        let db = self.db;
316        let lock = Arc::clone(&self.env_lock);
317
318        spawn_blocking(move || -> Result<PutOutcome> {
319            let _guard = lock.read();
320
321            let mut wtxn = env
322                .write_txn()
323                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
324
325            // Authoritative existence check inside the serialized write txn
326            if db
327                .get(&wtxn, &key)
328                .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
329                .is_some()
330            {
331                return Ok(PutOutcome::Duplicate);
332            }
333
334            match db.put(&mut wtxn, &key, &value) {
335                Ok(()) => {}
336                Err(heed::Error::Mdb(MdbError::MapFull)) => return Ok(PutOutcome::MapFull),
337                Err(e) => {
338                    return Err(Error::Storage(format!("Failed to put chunk: {e}")));
339                }
340            }
341
342            match wtxn.commit() {
343                Ok(()) => Ok(PutOutcome::New),
344                Err(heed::Error::Mdb(MdbError::MapFull)) => Ok(PutOutcome::MapFull),
345                Err(e) => Err(Error::Storage(format!("Failed to commit put: {e}"))),
346            }
347        })
348        .await
349        .map_err(|e| Error::Storage(format!("LMDB put task failed: {e}")))?
350    }
351
352    /// Retrieve a chunk.
353    ///
354    /// # Returns
355    ///
356    /// Returns `Some(content)` if found, `None` if not found.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if read fails or verification fails.
361    pub async fn get(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
362        let key = *address;
363        let env = self.env.clone();
364        let db = self.db;
365        let lock = Arc::clone(&self.env_lock);
366
367        let content = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
368            let _guard = lock.read();
369            let rtxn = env
370                .read_txn()
371                .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
372            let value = db
373                .get(&rtxn, &key)
374                .map_err(|e| Error::Storage(format!("Failed to get chunk: {e}")))?;
375            Ok(value.map(Vec::from))
376        })
377        .await
378        .map_err(|e| Error::Storage(format!("LMDB get task failed: {e}")))??;
379
380        let Some(content) = content else {
381            trace!("Chunk {} not found", hex::encode(address));
382            return Ok(None);
383        };
384
385        // Verify content if configured
386        if self.config.verify_on_read {
387            let computed = Self::compute_address(&content);
388            if computed != *address {
389                self.stats.write().verification_failures += 1;
390                warn!(
391                    "Chunk verification failed: expected {}, computed {}",
392                    hex::encode(address),
393                    hex::encode(computed)
394                );
395                return Err(Error::Storage(format!(
396                    "Chunk verification failed for {}",
397                    hex::encode(address)
398                )));
399            }
400        }
401
402        {
403            let mut stats = self.stats.write();
404            stats.chunks_retrieved += 1;
405            stats.bytes_retrieved += content.len() as u64;
406        }
407
408        debug!(
409            "Retrieved chunk {} ({} bytes)",
410            hex::encode(address),
411            content.len()
412        );
413
414        Ok(Some(content))
415    }
416
417    /// Check if a chunk exists.
418    ///
419    /// # Errors
420    ///
421    /// Returns an error if the LMDB read transaction fails.
422    pub fn exists(&self, address: &XorName) -> Result<bool> {
423        let _guard = self.env_lock.read();
424        let rtxn = self
425            .env
426            .read_txn()
427            .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
428        let found = self
429            .db
430            .get(&rtxn, address.as_ref())
431            .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
432            .is_some();
433        Ok(found)
434    }
435
436    /// Delete a chunk.
437    ///
438    /// # Errors
439    ///
440    /// Returns an error if deletion fails.
441    pub async fn delete(&self, address: &XorName) -> Result<bool> {
442        let key = *address;
443        let env = self.env.clone();
444        let db = self.db;
445        let lock = Arc::clone(&self.env_lock);
446
447        let deleted = spawn_blocking(move || -> Result<bool> {
448            let _guard = lock.read();
449            let mut wtxn = env
450                .write_txn()
451                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
452            let existed = db
453                .delete(&mut wtxn, &key)
454                .map_err(|e| Error::Storage(format!("Failed to delete chunk: {e}")))?;
455            wtxn.commit()
456                .map_err(|e| Error::Storage(format!("Failed to commit delete: {e}")))?;
457            Ok(existed)
458        })
459        .await
460        .map_err(|e| Error::Storage(format!("LMDB delete task failed: {e}")))??;
461
462        if deleted {
463            debug!("Deleted chunk {}", hex::encode(address));
464        }
465
466        Ok(deleted)
467    }
468
469    /// Get storage statistics.
470    #[must_use]
471    pub fn stats(&self) -> StorageStats {
472        let mut stats = self.stats.read().clone();
473        match self.current_chunks() {
474            Ok(count) => stats.current_chunks = count,
475            Err(e) => {
476                warn!("Failed to read current_chunks for stats: {e}");
477                stats.current_chunks = 0;
478            }
479        }
480        stats
481    }
482
483    /// Return the number of chunks currently stored, queried from LMDB metadata.
484    ///
485    /// This is an O(1) read of the B-tree page header — not a full scan.
486    ///
487    /// # Errors
488    ///
489    /// Returns an error if the LMDB read transaction fails.
490    pub fn current_chunks(&self) -> Result<u64> {
491        let _guard = self.env_lock.read();
492        let rtxn = self
493            .env
494            .read_txn()
495            .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
496        let entries = self
497            .db
498            .stat(&rtxn)
499            .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
500            .entries;
501        Ok(entries as u64)
502    }
503
504    /// Compute content address (BLAKE3 hash).
505    #[must_use]
506    pub fn compute_address(content: &[u8]) -> XorName {
507        crate::client::compute_address(content)
508    }
509
510    /// Get the root directory.
511    #[must_use]
512    pub fn root_dir(&self) -> &Path {
513        &self.config.root_dir
514    }
515
516    /// Return all stored record keys.
517    ///
518    /// Iterates the LMDB database in a read transaction. Used by the
519    /// replication subsystem for hint construction and audit sampling.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if the LMDB read transaction fails.
524    pub async fn all_keys(&self) -> Result<Vec<XorName>> {
525        let env = self.env.clone();
526        let db = self.db;
527
528        let keys = spawn_blocking(move || -> Result<Vec<XorName>> {
529            let rtxn = env
530                .read_txn()
531                .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
532            let mut keys = Vec::new();
533            let iter = db
534                .iter(&rtxn)
535                .map_err(|e| Error::Storage(format!("Failed to iterate database: {e}")))?;
536            for result in iter {
537                let (key_bytes, _) =
538                    result.map_err(|e| Error::Storage(format!("Failed to read entry: {e}")))?;
539                if key_bytes.len() == XORNAME_LEN {
540                    let mut key = [0u8; XORNAME_LEN];
541                    key.copy_from_slice(key_bytes);
542                    keys.push(key);
543                } else {
544                    crate::logging::warn!(
545                        "LmdbStorage: skipping entry with unexpected key length {} (expected {XORNAME_LEN})",
546                        key_bytes.len()
547                    );
548                }
549            }
550            Ok(keys)
551        })
552        .await
553        .map_err(|e| Error::Storage(format!("all_keys task failed: {e}")))?;
554
555        keys
556    }
557
558    /// Retrieve raw chunk bytes without content-address verification.
559    ///
560    /// Used by the audit subsystem to compute digests over stored bytes.
561    /// Unlike [`Self::get`], this does not verify `hash(content) == address`.
562    ///
563    /// # Errors
564    ///
565    /// Returns an error if the LMDB read transaction fails.
566    pub async fn get_raw(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
567        let key = *address;
568        let env = self.env.clone();
569        let db = self.db;
570
571        let value = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
572            let rtxn = env
573                .read_txn()
574                .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
575            let val = db
576                .get(&rtxn, key.as_ref())
577                .map_err(|e| Error::Storage(format!("Failed to get chunk: {e}")))?;
578            Ok(val.map(Vec::from))
579        })
580        .await
581        .map_err(|e| Error::Storage(format!("get_raw task failed: {e}")))?;
582
583        value
584    }
585
586    /// Check available disk space, skipping the syscall if a recent check passed.
587    ///
588    /// Only caches *passing* results — a low-space condition is always
589    /// rechecked so we detect freed space promptly.
590    fn check_disk_space_cached(&self) -> Result<()> {
591        {
592            let last = self.last_disk_ok.lock();
593            if let Some(t) = *last {
594                if t.elapsed().as_secs() < DISK_CHECK_INTERVAL_SECS {
595                    return Ok(());
596                }
597            }
598        }
599        // Cache miss or stale — perform the actual statvfs check.
600        check_disk_space(&self.env_dir, self.config.disk_reserve)?;
601        // Passed — update the cache timestamp.
602        *self.last_disk_ok.lock() = Some(Instant::now());
603        Ok(())
604    }
605
606    /// Grow the LMDB map to match currently available disk space.
607    ///
608    /// The new size is the **larger** of:
609    ///   1. the current map size (so existing data is never truncated), and
610    ///   2. `current_db_file_size + available_space − reserve`
611    ///      (so all reachable disk space can be used).
612    ///
613    /// Acquires an **exclusive** lock on `env_lock` so that no read or write
614    /// transactions are active when the underlying `mdb_env_set_mapsize` is
615    /// called (an LMDB safety requirement).
616    #[allow(unsafe_code)]
617    async fn try_resize(&self) -> Result<()> {
618        let from_disk = compute_map_size(&self.env_dir, self.config.disk_reserve)?;
619        let env = self.env.clone();
620        let lock = Arc::clone(&self.env_lock);
621
622        spawn_blocking(move || -> Result<()> {
623            // Exclusive lock guarantees no concurrent transactions.
624            let _guard = lock.write();
625
626            // Never shrink below the current map — existing data must remain
627            // addressable regardless of what the disk-space calculation says.
628            let current_map = env.info().map_size;
629            let new_size = from_disk.max(current_map);
630
631            if new_size <= current_map {
632                debug!("LMDB map resize skipped — no additional disk space available");
633                return Ok(());
634            }
635
636            // SAFETY: We hold an exclusive lock, so no transactions are active.
637            unsafe {
638                env.resize(new_size)
639                    .map_err(|e| Error::Storage(format!("Failed to resize LMDB map: {e}")))?;
640            }
641
642            info!(
643                "Resized LMDB map to {:.2} GiB (was {:.2} GiB)",
644                bytes_to_gib(new_size as u64),
645                bytes_to_gib(current_map as u64),
646            );
647            Ok(())
648        })
649        .await
650        .map_err(|e| Error::Storage(format!("LMDB resize task failed: {e}")))?
651    }
652}
653
654// ────────────────────────────────────────────────────────────────────────────
655// Helpers
656// ────────────────────────────────────────────────────────────────────────────
657
658/// Outcome of a single `try_put` attempt.
659enum PutOutcome {
660    /// Chunk was newly stored.
661    New,
662    /// Chunk already existed (idempotent).
663    Duplicate,
664    /// The LMDB map ceiling was reached — caller should resize and retry.
665    MapFull,
666}
667
668/// Compute the LMDB map size from the disk hosting `db_dir`.
669///
670/// The result covers **all existing data** plus all remaining usable disk
671/// space:
672///
673/// ```text
674/// map_size = current_db_file_size + max(0, available_space − reserve)
675/// ```
676///
677/// `available_space` (from `statvfs`) reports only the *free* bytes on the
678/// partition — the DB file's own footprint is **not** included, so adding
679/// it back ensures the map is always large enough for the data already
680/// stored.
681///
682/// The result is page-aligned and never falls below [`MIN_MAP_SIZE`].
683fn compute_map_size(db_dir: &Path, reserve: u64) -> Result<usize> {
684    let available = fs2::available_space(db_dir)
685        .map_err(|e| Error::Storage(format!("Failed to query available disk space: {e}")))?;
686
687    // The MDB data file may not exist yet on first run.
688    let mdb_file = db_dir.join("data.mdb");
689    let current_db_bytes = std::fs::metadata(&mdb_file).map(|m| m.len()).unwrap_or(0);
690
691    // available_space excludes the DB file, so we add it back to get the
692    // total space the DB could occupy while still leaving `reserve` free.
693    let growth_room = available.saturating_sub(reserve);
694    let target = current_db_bytes.saturating_add(growth_room);
695
696    // Align up to system page size (required by heed's resize).
697    let page = page_size::get() as u64;
698    let aligned = target.div_ceil(page) * page;
699
700    let result = usize::try_from(aligned).unwrap_or(usize::MAX);
701    Ok(result.max(MIN_MAP_SIZE))
702}
703
704/// Reject the write early if available disk space is below `reserve`.
705fn check_disk_space(db_dir: &Path, reserve: u64) -> Result<()> {
706    let available = fs2::available_space(db_dir)
707        .map_err(|e| Error::Storage(format!("Failed to query available disk space: {e}")))?;
708
709    if available < reserve {
710        return Err(Error::Storage(format!(
711            "Insufficient disk space: {:.2} GiB available, {:.2} GiB reserve required. \
712             Free disk space or increase the partition to continue storing chunks.",
713            bytes_to_gib(available),
714            bytes_to_gib(reserve),
715        )));
716    }
717
718    Ok(())
719}
720
721#[cfg(test)]
722#[allow(clippy::unwrap_used, clippy::expect_used)]
723mod tests {
724    use super::*;
725    use tempfile::TempDir;
726
727    async fn create_test_storage() -> (LmdbStorage, TempDir) {
728        let temp_dir = TempDir::new().expect("create temp dir");
729        let config = LmdbStorageConfig {
730            root_dir: temp_dir.path().to_path_buf(),
731            ..LmdbStorageConfig::test_default()
732        };
733        let storage = LmdbStorage::new(config).await.expect("create storage");
734        (storage, temp_dir)
735    }
736
737    #[tokio::test]
738    async fn test_put_and_get() {
739        let (storage, _temp) = create_test_storage().await;
740
741        let content = b"hello world";
742        let address = LmdbStorage::compute_address(content);
743
744        // Store chunk
745        let is_new = storage.put(&address, content).await.expect("put");
746        assert!(is_new);
747
748        // Retrieve chunk
749        let retrieved = storage.get(&address).await.expect("get");
750        assert_eq!(retrieved, Some(content.to_vec()));
751    }
752
753    #[tokio::test]
754    async fn test_put_duplicate() {
755        let (storage, _temp) = create_test_storage().await;
756
757        let content = b"test data";
758        let address = LmdbStorage::compute_address(content);
759
760        // First store
761        let is_new1 = storage.put(&address, content).await.expect("put 1");
762        assert!(is_new1);
763
764        // Duplicate store
765        let is_new2 = storage.put(&address, content).await.expect("put 2");
766        assert!(!is_new2);
767
768        // Check stats
769        let stats = storage.stats();
770        assert_eq!(stats.chunks_stored, 1);
771        assert_eq!(stats.duplicates, 1);
772    }
773
774    #[tokio::test]
775    async fn test_get_not_found() {
776        let (storage, _temp) = create_test_storage().await;
777
778        let address = [0xAB; 32];
779        let result = storage.get(&address).await.expect("get");
780        assert!(result.is_none());
781    }
782
783    #[tokio::test]
784    async fn test_exists() {
785        let (storage, _temp) = create_test_storage().await;
786
787        let content = b"exists test";
788        let address = LmdbStorage::compute_address(content);
789
790        assert!(!storage.exists(&address).expect("exists"));
791
792        storage.put(&address, content).await.expect("put");
793
794        assert!(storage.exists(&address).expect("exists"));
795    }
796
797    #[tokio::test]
798    async fn test_delete() {
799        let (storage, _temp) = create_test_storage().await;
800
801        let content = b"delete test";
802        let address = LmdbStorage::compute_address(content);
803
804        // Store
805        storage.put(&address, content).await.expect("put");
806        assert!(storage.exists(&address).expect("exists"));
807
808        // Delete
809        let deleted = storage.delete(&address).await.expect("delete");
810        assert!(deleted);
811        assert!(!storage.exists(&address).expect("exists"));
812
813        // Delete again (already deleted)
814        let deleted2 = storage.delete(&address).await.expect("delete 2");
815        assert!(!deleted2);
816    }
817
818    #[tokio::test]
819    async fn test_address_mismatch() {
820        let (storage, _temp) = create_test_storage().await;
821
822        let content = b"some content";
823        let wrong_address = [0xFF; 32]; // Wrong address
824
825        let result = storage.put(&wrong_address, content).await;
826        assert!(result.is_err());
827        assert!(result.unwrap_err().to_string().contains("mismatch"));
828    }
829
830    #[test]
831    fn test_compute_address() {
832        // Known BLAKE3 hash of "hello world"
833        let content = b"hello world";
834        let address = LmdbStorage::compute_address(content);
835
836        let expected_hex = "d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24";
837        assert_eq!(hex::encode(address), expected_hex);
838    }
839
840    #[tokio::test]
841    async fn test_stats() {
842        let (storage, _temp) = create_test_storage().await;
843
844        let content1 = b"content 1";
845        let content2 = b"content 2";
846        let address1 = LmdbStorage::compute_address(content1);
847        let address2 = LmdbStorage::compute_address(content2);
848
849        // Store two chunks
850        storage.put(&address1, content1).await.expect("put 1");
851        storage.put(&address2, content2).await.expect("put 2");
852
853        // Retrieve one
854        storage.get(&address1).await.expect("get");
855
856        let stats = storage.stats();
857        assert_eq!(stats.chunks_stored, 2);
858        assert_eq!(stats.chunks_retrieved, 1);
859        assert_eq!(
860            stats.bytes_stored,
861            content1.len() as u64 + content2.len() as u64
862        );
863        assert_eq!(stats.bytes_retrieved, content1.len() as u64);
864        assert_eq!(stats.current_chunks, 2);
865    }
866
867    #[tokio::test]
868    async fn test_persistence_across_reopen() {
869        let temp_dir = TempDir::new().expect("create temp dir");
870        let content = b"persistent data";
871        let address = LmdbStorage::compute_address(content);
872
873        // Store a chunk
874        {
875            let config = LmdbStorageConfig {
876                root_dir: temp_dir.path().to_path_buf(),
877                ..LmdbStorageConfig::test_default()
878            };
879            let storage = LmdbStorage::new(config).await.expect("create storage");
880            storage.put(&address, content).await.expect("put");
881        }
882
883        // Re-open and verify it persisted
884        {
885            let config = LmdbStorageConfig {
886                root_dir: temp_dir.path().to_path_buf(),
887                ..LmdbStorageConfig::test_default()
888            };
889            let storage = LmdbStorage::new(config).await.expect("reopen storage");
890            assert_eq!(storage.current_chunks().expect("current_chunks"), 1);
891            let retrieved = storage.get(&address).await.expect("get");
892            assert_eq!(retrieved, Some(content.to_vec()));
893        }
894    }
895
896    #[tokio::test]
897    async fn test_all_keys() {
898        let (storage, _temp) = create_test_storage().await;
899
900        // Empty storage
901        let keys = storage.all_keys().await.expect("all_keys empty");
902        assert!(keys.is_empty());
903
904        // Store some chunks
905        let content1 = b"chunk one for keys";
906        let content2 = b"chunk two for keys";
907        let addr1 = LmdbStorage::compute_address(content1);
908        let addr2 = LmdbStorage::compute_address(content2);
909        storage.put(&addr1, content1).await.expect("put 1");
910        storage.put(&addr2, content2).await.expect("put 2");
911
912        let mut keys = storage.all_keys().await.expect("all_keys");
913        keys.sort_unstable();
914        let mut expected = vec![addr1, addr2];
915        expected.sort_unstable();
916        assert_eq!(keys, expected);
917    }
918
919    #[tokio::test]
920    async fn test_get_raw() {
921        let (storage, _temp) = create_test_storage().await;
922
923        let content = b"raw test data";
924        let address = LmdbStorage::compute_address(content);
925        storage.put(&address, content).await.expect("put");
926
927        // get_raw returns bytes without verification
928        let raw = storage.get_raw(&address).await.expect("get_raw");
929        assert_eq!(raw, Some(content.to_vec()));
930
931        // Non-existent key
932        let missing = storage.get_raw(&[0xFF; 32]).await.expect("get_raw missing");
933        assert!(missing.is_none());
934    }
935}