Skip to main content

ant_node/storage/
lmdb.rs

1//! Content-addressed LMDB storage for chunks.
2//!
3//! Provides persistent storage for chunks using LMDB (via heed) for
4//! memory-mapped, zero-copy reads with ACID transactions.
5//!
6//! ```text
7//! {root}/chunks.mdb/   -- LMDB environment directory
8//! ```
9
10use crate::ant_protocol::XorName;
11use crate::error::{Error, Result};
12use heed::types::Bytes;
13use heed::{Database, Env, EnvOpenOptions};
14use std::path::{Path, PathBuf};
15use tokio::task::spawn_blocking;
16use tracing::{debug, trace, warn};
17
18/// Default LMDB map size: 32 GiB.
19///
20/// Node operators can override this via `storage.db_size_gb` in `config.toml`.
21const DEFAULT_MAX_MAP_SIZE: usize = 32 * 1_073_741_824; // 32 GiB
22
23/// Configuration for LMDB storage.
24#[derive(Debug, Clone)]
25pub struct LmdbStorageConfig {
26    /// Root directory for storage (LMDB env lives at `{root_dir}/chunks.mdb/`).
27    pub root_dir: PathBuf,
28    /// Whether to verify content on read (compares hash to address).
29    pub verify_on_read: bool,
30    /// Maximum number of chunks to store (0 = unlimited).
31    pub max_chunks: usize,
32    /// Maximum LMDB map size in bytes (0 = use default of 32 GiB).
33    pub max_map_size: usize,
34}
35
36impl Default for LmdbStorageConfig {
37    fn default() -> Self {
38        Self {
39            root_dir: PathBuf::from(".ant/chunks"),
40            verify_on_read: true,
41            max_chunks: 0,
42            max_map_size: 0,
43        }
44    }
45}
46
47/// Statistics about storage operations.
48#[derive(Debug, Clone, Default)]
49pub struct StorageStats {
50    /// Total number of chunks stored.
51    pub chunks_stored: u64,
52    /// Total number of chunks retrieved.
53    pub chunks_retrieved: u64,
54    /// Total bytes stored.
55    pub bytes_stored: u64,
56    /// Total bytes retrieved.
57    pub bytes_retrieved: u64,
58    /// Number of duplicate writes (already exists).
59    pub duplicates: u64,
60    /// Number of verification failures on read.
61    pub verification_failures: u64,
62    /// Number of chunks currently persisted.
63    pub current_chunks: u64,
64}
65
66/// Content-addressed LMDB storage.
67///
68/// Uses heed (LMDB wrapper) for memory-mapped, transactional chunk storage.
69/// Keys are 32-byte `XorName` addresses, values are raw chunk bytes.
70pub struct LmdbStorage {
71    /// LMDB environment.
72    env: Env,
73    /// The unnamed default database (key=XorName bytes, value=chunk bytes).
74    db: Database<Bytes, Bytes>,
75    /// Storage configuration.
76    config: LmdbStorageConfig,
77    /// Operation statistics.
78    stats: parking_lot::RwLock<StorageStats>,
79}
80
81impl LmdbStorage {
82    /// Create a new LMDB storage instance.
83    ///
84    /// Opens (or creates) an LMDB environment at `{root_dir}/chunks.mdb/`.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the LMDB environment cannot be opened.
89    #[allow(unsafe_code)]
90    pub async fn new(config: LmdbStorageConfig) -> Result<Self> {
91        let env_dir = config.root_dir.join("chunks.mdb");
92
93        // Create the directory synchronously before opening LMDB
94        std::fs::create_dir_all(&env_dir)
95            .map_err(|e| Error::Storage(format!("Failed to create LMDB directory: {e}")))?;
96
97        let map_size = if config.max_map_size > 0 {
98            config.max_map_size
99        } else {
100            DEFAULT_MAX_MAP_SIZE
101        };
102
103        let env_dir_clone = env_dir.clone();
104        let (env, db) = spawn_blocking(move || -> Result<(Env, Database<Bytes, Bytes>)> {
105            // SAFETY: `EnvOpenOptions::open()` is unsafe because LMDB uses memory-mapped
106            // I/O and relies on OS file-locking to prevent corruption from concurrent
107            // access by multiple processes. We satisfy this by giving each node instance
108            // a unique `root_dir` (typically a directory named by its full 64-hex peer
109            // ID), ensuring no two processes open the same LMDB environment. Callers
110            // who manually configure `--root-dir` must not point multiple nodes at the
111            // same directory.
112            let env = unsafe {
113                EnvOpenOptions::new()
114                    .map_size(map_size)
115                    .max_dbs(1)
116                    .open(&env_dir_clone)
117                    .map_err(|e| Error::Storage(format!("Failed to open LMDB env: {e}")))?
118            };
119
120            let mut wtxn = env
121                .write_txn()
122                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
123            let db: Database<Bytes, Bytes> = env
124                .create_database(&mut wtxn, None)
125                .map_err(|e| Error::Storage(format!("Failed to create database: {e}")))?;
126            wtxn.commit()
127                .map_err(|e| Error::Storage(format!("Failed to commit db creation: {e}")))?;
128
129            Ok((env, db))
130        })
131        .await
132        .map_err(|e| Error::Storage(format!("LMDB init task failed: {e}")))??;
133
134        let storage = Self {
135            env,
136            db,
137            config,
138            stats: parking_lot::RwLock::new(StorageStats::default()),
139        };
140
141        debug!(
142            "Initialized LMDB storage at {:?} ({} existing chunks)",
143            env_dir,
144            storage.current_chunks()?
145        );
146
147        Ok(storage)
148    }
149
150    /// Store a chunk.
151    ///
152    /// # Arguments
153    ///
154    /// * `address` - Content address (should be BLAKE3 of content)
155    /// * `content` - Chunk data
156    ///
157    /// # Returns
158    ///
159    /// Returns `true` if the chunk was newly stored, `false` if it already existed.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the write fails or content doesn't match address.
164    pub async fn put(&self, address: &XorName, content: &[u8]) -> Result<bool> {
165        // Verify content address
166        let computed = Self::compute_address(content);
167        if computed != *address {
168            return Err(Error::Storage(format!(
169                "Content address mismatch: expected {}, computed {}",
170                hex::encode(address),
171                hex::encode(computed)
172            )));
173        }
174
175        // Fast-path duplicate check (read-only, no write lock needed).
176        // This is an optimistic hint — the authoritative check happens inside
177        // the write transaction below to prevent TOCTOU races.
178        if self.exists(address)? {
179            trace!("Chunk {} already exists", hex::encode(address));
180            self.stats.write().duplicates += 1;
181            return Ok(false);
182        }
183
184        let key = *address;
185        let value = content.to_vec();
186        let env = self.env.clone();
187        let db = self.db;
188        let max_chunks = self.config.max_chunks;
189
190        // Existence check, capacity enforcement, and write all happen atomically
191        // inside a single write transaction. LMDB serializes write transactions,
192        // so there are no TOCTOU races or counter-drift issues.
193        let was_new = spawn_blocking(move || -> Result<bool> {
194            let mut wtxn = env
195                .write_txn()
196                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
197
198            // Authoritative existence check inside the serialized write txn
199            if db
200                .get(&wtxn, &key)
201                .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
202                .is_some()
203            {
204                return Ok(false);
205            }
206
207            // Enforce capacity limit (0 = unlimited)
208            if max_chunks > 0 {
209                let current = db
210                    .stat(&wtxn)
211                    .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
212                    .entries;
213                if current >= max_chunks {
214                    return Err(Error::Storage(format!(
215                        "Storage capacity reached: {current} chunks stored, max is {max_chunks}"
216                    )));
217                }
218            }
219
220            db.put(&mut wtxn, &key, &value)
221                .map_err(|e| Error::Storage(format!("Failed to put chunk: {e}")))?;
222            wtxn.commit()
223                .map_err(|e| Error::Storage(format!("Failed to commit put: {e}")))?;
224            Ok(true)
225        })
226        .await
227        .map_err(|e| Error::Storage(format!("LMDB put task failed: {e}")))??;
228
229        if !was_new {
230            trace!("Chunk {} already exists", hex::encode(address));
231            self.stats.write().duplicates += 1;
232            return Ok(false);
233        }
234
235        {
236            let mut stats = self.stats.write();
237            stats.chunks_stored += 1;
238            stats.bytes_stored += content.len() as u64;
239        }
240
241        debug!(
242            "Stored chunk {} ({} bytes)",
243            hex::encode(address),
244            content.len()
245        );
246
247        Ok(true)
248    }
249
250    /// Retrieve a chunk.
251    ///
252    /// # Arguments
253    ///
254    /// * `address` - Content address to retrieve
255    ///
256    /// # Returns
257    ///
258    /// Returns `Some(content)` if found, `None` if not found.
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if read fails or verification fails.
263    pub async fn get(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
264        let key = *address;
265        let env = self.env.clone();
266        let db = self.db;
267
268        let content = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
269            let rtxn = env
270                .read_txn()
271                .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
272            let value = db
273                .get(&rtxn, &key)
274                .map_err(|e| Error::Storage(format!("Failed to get chunk: {e}")))?;
275            Ok(value.map(Vec::from))
276        })
277        .await
278        .map_err(|e| Error::Storage(format!("LMDB get task failed: {e}")))??;
279
280        let Some(content) = content else {
281            trace!("Chunk {} not found", hex::encode(address));
282            return Ok(None);
283        };
284
285        // Verify content if configured
286        if self.config.verify_on_read {
287            let computed = Self::compute_address(&content);
288            if computed != *address {
289                self.stats.write().verification_failures += 1;
290                warn!(
291                    "Chunk verification failed: expected {}, computed {}",
292                    hex::encode(address),
293                    hex::encode(computed)
294                );
295                return Err(Error::Storage(format!(
296                    "Chunk verification failed for {}",
297                    hex::encode(address)
298                )));
299            }
300        }
301
302        {
303            let mut stats = self.stats.write();
304            stats.chunks_retrieved += 1;
305            stats.bytes_retrieved += content.len() as u64;
306        }
307
308        debug!(
309            "Retrieved chunk {} ({} bytes)",
310            hex::encode(address),
311            content.len()
312        );
313
314        Ok(Some(content))
315    }
316
317    /// Check if a chunk exists.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if the LMDB read transaction fails.
322    pub fn exists(&self, address: &XorName) -> Result<bool> {
323        let rtxn = self
324            .env
325            .read_txn()
326            .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
327        let found = self
328            .db
329            .get(&rtxn, address.as_ref())
330            .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
331            .is_some();
332        Ok(found)
333    }
334
335    /// Delete a chunk.
336    ///
337    /// # Errors
338    ///
339    /// Returns an error if deletion fails.
340    pub async fn delete(&self, address: &XorName) -> Result<bool> {
341        let key = *address;
342        let env = self.env.clone();
343        let db = self.db;
344
345        let deleted = spawn_blocking(move || -> Result<bool> {
346            let mut wtxn = env
347                .write_txn()
348                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
349            let existed = db
350                .delete(&mut wtxn, &key)
351                .map_err(|e| Error::Storage(format!("Failed to delete chunk: {e}")))?;
352            wtxn.commit()
353                .map_err(|e| Error::Storage(format!("Failed to commit delete: {e}")))?;
354            Ok(existed)
355        })
356        .await
357        .map_err(|e| Error::Storage(format!("LMDB delete task failed: {e}")))??;
358
359        if deleted {
360            debug!("Deleted chunk {}", hex::encode(address));
361        }
362
363        Ok(deleted)
364    }
365
366    /// Get storage statistics.
367    #[must_use]
368    pub fn stats(&self) -> StorageStats {
369        let mut stats = self.stats.read().clone();
370        match self.current_chunks() {
371            Ok(count) => stats.current_chunks = count,
372            Err(e) => {
373                warn!("Failed to read current_chunks for stats: {e}");
374                stats.current_chunks = 0;
375            }
376        }
377        stats
378    }
379
380    /// Return the number of chunks currently stored, queried from LMDB metadata.
381    ///
382    /// This is an O(1) read of the B-tree page header — not a full scan.
383    ///
384    /// # Errors
385    ///
386    /// Returns an error if the LMDB read transaction fails.
387    pub fn current_chunks(&self) -> Result<u64> {
388        let rtxn = self
389            .env
390            .read_txn()
391            .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
392        let entries = self
393            .db
394            .stat(&rtxn)
395            .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
396            .entries;
397        Ok(entries as u64)
398    }
399
400    /// Compute content address (BLAKE3 hash).
401    #[must_use]
402    pub fn compute_address(content: &[u8]) -> XorName {
403        crate::client::compute_address(content)
404    }
405
406    /// Get the root directory.
407    #[must_use]
408    pub fn root_dir(&self) -> &Path {
409        &self.config.root_dir
410    }
411}
412
413#[cfg(test)]
414#[allow(clippy::unwrap_used, clippy::expect_used)]
415mod tests {
416    use super::*;
417    use tempfile::TempDir;
418
419    async fn create_test_storage() -> (LmdbStorage, TempDir) {
420        let temp_dir = TempDir::new().expect("create temp dir");
421        let config = LmdbStorageConfig {
422            root_dir: temp_dir.path().to_path_buf(),
423            verify_on_read: true,
424            max_chunks: 0,
425            max_map_size: 0,
426        };
427        let storage = LmdbStorage::new(config).await.expect("create storage");
428        (storage, temp_dir)
429    }
430
431    #[tokio::test]
432    async fn test_put_and_get() {
433        let (storage, _temp) = create_test_storage().await;
434
435        let content = b"hello world";
436        let address = LmdbStorage::compute_address(content);
437
438        // Store chunk
439        let is_new = storage.put(&address, content).await.expect("put");
440        assert!(is_new);
441
442        // Retrieve chunk
443        let retrieved = storage.get(&address).await.expect("get");
444        assert_eq!(retrieved, Some(content.to_vec()));
445    }
446
447    #[tokio::test]
448    async fn test_put_duplicate() {
449        let (storage, _temp) = create_test_storage().await;
450
451        let content = b"test data";
452        let address = LmdbStorage::compute_address(content);
453
454        // First store
455        let is_new1 = storage.put(&address, content).await.expect("put 1");
456        assert!(is_new1);
457
458        // Duplicate store
459        let is_new2 = storage.put(&address, content).await.expect("put 2");
460        assert!(!is_new2);
461
462        // Check stats
463        let stats = storage.stats();
464        assert_eq!(stats.chunks_stored, 1);
465        assert_eq!(stats.duplicates, 1);
466    }
467
468    #[tokio::test]
469    async fn test_get_not_found() {
470        let (storage, _temp) = create_test_storage().await;
471
472        let address = [0xAB; 32];
473        let result = storage.get(&address).await.expect("get");
474        assert!(result.is_none());
475    }
476
477    #[tokio::test]
478    async fn test_exists() {
479        let (storage, _temp) = create_test_storage().await;
480
481        let content = b"exists test";
482        let address = LmdbStorage::compute_address(content);
483
484        assert!(!storage.exists(&address).expect("exists"));
485
486        storage.put(&address, content).await.expect("put");
487
488        assert!(storage.exists(&address).expect("exists"));
489    }
490
491    #[tokio::test]
492    async fn test_delete() {
493        let (storage, _temp) = create_test_storage().await;
494
495        let content = b"delete test";
496        let address = LmdbStorage::compute_address(content);
497
498        // Store
499        storage.put(&address, content).await.expect("put");
500        assert!(storage.exists(&address).expect("exists"));
501
502        // Delete
503        let deleted = storage.delete(&address).await.expect("delete");
504        assert!(deleted);
505        assert!(!storage.exists(&address).expect("exists"));
506
507        // Delete again (already deleted)
508        let deleted2 = storage.delete(&address).await.expect("delete 2");
509        assert!(!deleted2);
510    }
511
512    #[tokio::test]
513    async fn test_max_chunks_enforced() {
514        let temp_dir = TempDir::new().expect("create temp dir");
515        let config = LmdbStorageConfig {
516            root_dir: temp_dir.path().to_path_buf(),
517            verify_on_read: true,
518            max_chunks: 2,
519            max_map_size: 0,
520        };
521        let storage = LmdbStorage::new(config).await.expect("create storage");
522
523        let content1 = b"chunk one";
524        let content2 = b"chunk two";
525        let content3 = b"chunk three";
526        let addr1 = LmdbStorage::compute_address(content1);
527        let addr2 = LmdbStorage::compute_address(content2);
528        let addr3 = LmdbStorage::compute_address(content3);
529
530        // First two should succeed
531        assert!(storage.put(&addr1, content1).await.is_ok());
532        assert!(storage.put(&addr2, content2).await.is_ok());
533
534        // Third should be rejected
535        let result = storage.put(&addr3, content3).await;
536        assert!(result.is_err());
537        assert!(result.unwrap_err().to_string().contains("capacity reached"));
538    }
539
540    #[tokio::test]
541    async fn test_address_mismatch() {
542        let (storage, _temp) = create_test_storage().await;
543
544        let content = b"some content";
545        let wrong_address = [0xFF; 32]; // Wrong address
546
547        let result = storage.put(&wrong_address, content).await;
548        assert!(result.is_err());
549        assert!(result.unwrap_err().to_string().contains("mismatch"));
550    }
551
552    #[test]
553    fn test_compute_address() {
554        // Known BLAKE3 hash of "hello world"
555        let content = b"hello world";
556        let address = LmdbStorage::compute_address(content);
557
558        let expected_hex = "d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24";
559        assert_eq!(hex::encode(address), expected_hex);
560    }
561
562    #[tokio::test]
563    async fn test_stats() {
564        let (storage, _temp) = create_test_storage().await;
565
566        let content1 = b"content 1";
567        let content2 = b"content 2";
568        let address1 = LmdbStorage::compute_address(content1);
569        let address2 = LmdbStorage::compute_address(content2);
570
571        // Store two chunks
572        storage.put(&address1, content1).await.expect("put 1");
573        storage.put(&address2, content2).await.expect("put 2");
574
575        // Retrieve one
576        storage.get(&address1).await.expect("get");
577
578        let stats = storage.stats();
579        assert_eq!(stats.chunks_stored, 2);
580        assert_eq!(stats.chunks_retrieved, 1);
581        assert_eq!(
582            stats.bytes_stored,
583            content1.len() as u64 + content2.len() as u64
584        );
585        assert_eq!(stats.bytes_retrieved, content1.len() as u64);
586        assert_eq!(stats.current_chunks, 2);
587    }
588
589    #[tokio::test]
590    async fn test_capacity_recovers_after_delete() {
591        let temp_dir = TempDir::new().expect("create temp dir");
592        let config = LmdbStorageConfig {
593            root_dir: temp_dir.path().to_path_buf(),
594            verify_on_read: true,
595            max_chunks: 1,
596            max_map_size: 0,
597        };
598        let storage = LmdbStorage::new(config).await.expect("create storage");
599
600        let first = b"first chunk";
601        let second = b"second chunk";
602        let addr1 = LmdbStorage::compute_address(first);
603        let addr2 = LmdbStorage::compute_address(second);
604
605        storage.put(&addr1, first).await.expect("put first");
606        storage.delete(&addr1).await.expect("delete first");
607
608        // Should succeed because delete freed capacity.
609        storage.put(&addr2, second).await.expect("put second");
610
611        let stats = storage.stats();
612        assert_eq!(stats.current_chunks, 1);
613    }
614
615    #[tokio::test]
616    async fn test_persistence_across_reopen() {
617        let temp_dir = TempDir::new().expect("create temp dir");
618        let content = b"persistent data";
619        let address = LmdbStorage::compute_address(content);
620
621        // Store a chunk
622        {
623            let config = LmdbStorageConfig {
624                root_dir: temp_dir.path().to_path_buf(),
625                verify_on_read: true,
626                max_chunks: 0,
627                max_map_size: 0,
628            };
629            let storage = LmdbStorage::new(config).await.expect("create storage");
630            storage.put(&address, content).await.expect("put");
631        }
632
633        // Re-open and verify it persisted
634        {
635            let config = LmdbStorageConfig {
636                root_dir: temp_dir.path().to_path_buf(),
637                verify_on_read: true,
638                max_chunks: 0,
639                max_map_size: 0,
640            };
641            let storage = LmdbStorage::new(config).await.expect("reopen storage");
642            assert_eq!(storage.current_chunks().expect("current_chunks"), 1);
643            let retrieved = storage.get(&address).await.expect("get");
644            assert_eq!(retrieved, Some(content.to_vec()));
645        }
646    }
647}