Skip to main content

saorsa_node/storage/
lmdb.rs

1//! Content-addressed LMDB storage for chunks.
2//!
3//! Provides persistent storage for chunks using LMDB (via heed) for
4//! memory-mapped, zero-copy reads with ACID transactions.
5//!
6//! ```text
7//! {root}/chunks.mdb/   -- LMDB environment directory
8//! ```
9
10use crate::ant_protocol::XorName;
11use crate::error::{Error, Result};
12use heed::types::Bytes;
13use heed::{Database, Env, EnvOpenOptions};
14use std::path::{Path, PathBuf};
15use tokio::task::spawn_blocking;
16use tracing::{debug, trace, warn};
17
18/// Default LMDB map size: 32 GiB.
19///
20/// Node operators can override this via `storage.db_size_gb` in `config.toml`.
21const DEFAULT_MAX_MAP_SIZE: usize = 32 * 1_073_741_824; // 32 GiB
22
23/// Configuration for LMDB storage.
24#[derive(Debug, Clone)]
25pub struct LmdbStorageConfig {
26    /// Root directory for storage (LMDB env lives at `{root_dir}/chunks.mdb/`).
27    pub root_dir: PathBuf,
28    /// Whether to verify content on read (compares hash to address).
29    pub verify_on_read: bool,
30    /// Maximum number of chunks to store (0 = unlimited).
31    pub max_chunks: usize,
32    /// Maximum LMDB map size in bytes (0 = use default of 32 GiB).
33    pub max_map_size: usize,
34}
35
36impl Default for LmdbStorageConfig {
37    fn default() -> Self {
38        Self {
39            root_dir: PathBuf::from(".saorsa/chunks"),
40            verify_on_read: true,
41            max_chunks: 0,
42            max_map_size: 0,
43        }
44    }
45}
46
47/// Statistics about storage operations.
48#[derive(Debug, Clone, Default)]
49pub struct StorageStats {
50    /// Total number of chunks stored.
51    pub chunks_stored: u64,
52    /// Total number of chunks retrieved.
53    pub chunks_retrieved: u64,
54    /// Total bytes stored.
55    pub bytes_stored: u64,
56    /// Total bytes retrieved.
57    pub bytes_retrieved: u64,
58    /// Number of duplicate writes (already exists).
59    pub duplicates: u64,
60    /// Number of verification failures on read.
61    pub verification_failures: u64,
62    /// Number of chunks currently persisted.
63    pub current_chunks: u64,
64}
65
66/// Content-addressed LMDB storage.
67///
68/// Uses heed (LMDB wrapper) for memory-mapped, transactional chunk storage.
69/// Keys are 32-byte `XorName` addresses, values are raw chunk bytes.
70pub struct LmdbStorage {
71    /// LMDB environment.
72    env: Env,
73    /// The unnamed default database (key=XorName bytes, value=chunk bytes).
74    db: Database<Bytes, Bytes>,
75    /// Storage configuration.
76    config: LmdbStorageConfig,
77    /// Operation statistics.
78    stats: parking_lot::RwLock<StorageStats>,
79}
80
81impl LmdbStorage {
82    /// Create a new LMDB storage instance.
83    ///
84    /// Opens (or creates) an LMDB environment at `{root_dir}/chunks.mdb/`.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the LMDB environment cannot be opened.
89    #[allow(unsafe_code)]
90    pub async fn new(config: LmdbStorageConfig) -> Result<Self> {
91        let env_dir = config.root_dir.join("chunks.mdb");
92
93        // Create the directory synchronously before opening LMDB
94        std::fs::create_dir_all(&env_dir)
95            .map_err(|e| Error::Storage(format!("Failed to create LMDB directory: {e}")))?;
96
97        let map_size = if config.max_map_size > 0 {
98            config.max_map_size
99        } else {
100            DEFAULT_MAX_MAP_SIZE
101        };
102
103        let env_dir_clone = env_dir.clone();
104        let (env, db) = spawn_blocking(move || -> Result<(Env, Database<Bytes, Bytes>)> {
105            // SAFETY: `EnvOpenOptions::open()` is unsafe because LMDB uses memory-mapped
106            // I/O and relies on OS file-locking to prevent corruption from concurrent
107            // access by multiple processes. We satisfy this by giving each node instance
108            // a unique `root_dir` (typically a directory named by its full 64-hex peer
109            // ID), ensuring no two processes open the same LMDB environment. Callers
110            // who manually configure `--root-dir` must not point multiple nodes at the
111            // same directory.
112            let env = unsafe {
113                EnvOpenOptions::new()
114                    .map_size(map_size)
115                    .max_dbs(1)
116                    .open(&env_dir_clone)
117                    .map_err(|e| Error::Storage(format!("Failed to open LMDB env: {e}")))?
118            };
119
120            let mut wtxn = env
121                .write_txn()
122                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
123            let db: Database<Bytes, Bytes> = env
124                .create_database(&mut wtxn, None)
125                .map_err(|e| Error::Storage(format!("Failed to create database: {e}")))?;
126            wtxn.commit()
127                .map_err(|e| Error::Storage(format!("Failed to commit db creation: {e}")))?;
128
129            Ok((env, db))
130        })
131        .await
132        .map_err(|e| Error::Storage(format!("LMDB init task failed: {e}")))??;
133
134        let storage = Self {
135            env,
136            db,
137            config,
138            stats: parking_lot::RwLock::new(StorageStats::default()),
139        };
140
141        debug!(
142            "Initialized LMDB storage at {:?} ({} existing chunks)",
143            env_dir,
144            storage.current_chunks()?
145        );
146
147        Ok(storage)
148    }
149
150    /// Store a chunk.
151    ///
152    /// # Arguments
153    ///
154    /// * `address` - Content address (should be BLAKE3 of content)
155    /// * `content` - Chunk data
156    ///
157    /// # Returns
158    ///
159    /// Returns `true` if the chunk was newly stored, `false` if it already existed.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the write fails or content doesn't match address.
164    pub async fn put(&self, address: &XorName, content: &[u8]) -> Result<bool> {
165        // Verify content address
166        let computed = Self::compute_address(content);
167        if computed != *address {
168            return Err(Error::Storage(format!(
169                "Content address mismatch: expected {}, computed {}",
170                hex::encode(address),
171                hex::encode(computed)
172            )));
173        }
174
175        // Fast-path duplicate check (read-only, no write lock needed).
176        // This is an optimistic hint — the authoritative check happens inside
177        // the write transaction below to prevent TOCTOU races.
178        if self.exists(address)? {
179            trace!("Chunk {} already exists", hex::encode(address));
180            self.stats.write().duplicates += 1;
181            return Ok(false);
182        }
183
184        let key = *address;
185        let value = content.to_vec();
186        let env = self.env.clone();
187        let db = self.db;
188        let max_chunks = self.config.max_chunks;
189
190        // Existence check, capacity enforcement, and write all happen atomically
191        // inside a single write transaction. LMDB serializes write transactions,
192        // so there are no TOCTOU races or counter-drift issues.
193        let was_new = spawn_blocking(move || -> Result<bool> {
194            let mut wtxn = env
195                .write_txn()
196                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
197
198            // Authoritative existence check inside the serialized write txn
199            if db
200                .get(&wtxn, &key)
201                .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
202                .is_some()
203            {
204                return Ok(false);
205            }
206
207            // Enforce capacity limit (0 = unlimited)
208            if max_chunks > 0 {
209                let current = db
210                    .stat(&wtxn)
211                    .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
212                    .entries;
213                if current >= max_chunks {
214                    return Err(Error::Storage(format!(
215                        "Storage capacity reached: {current} chunks stored, max is {max_chunks}"
216                    )));
217                }
218            }
219
220            db.put(&mut wtxn, &key, &value)
221                .map_err(|e| Error::Storage(format!("Failed to put chunk: {e}")))?;
222            wtxn.commit()
223                .map_err(|e| Error::Storage(format!("Failed to commit put: {e}")))?;
224            Ok(true)
225        })
226        .await
227        .map_err(|e| Error::Storage(format!("LMDB put task failed: {e}")))??;
228
229        if !was_new {
230            trace!("Chunk {} already exists", hex::encode(address));
231            self.stats.write().duplicates += 1;
232            return Ok(false);
233        }
234
235        {
236            let mut stats = self.stats.write();
237            stats.chunks_stored += 1;
238            stats.bytes_stored += content.len() as u64;
239        }
240
241        debug!(
242            "Stored chunk {} ({} bytes)",
243            hex::encode(address),
244            content.len()
245        );
246
247        Ok(true)
248    }
249
250    /// Store data at the given address without content-address verification.
251    ///
252    /// Use this for data types where the address is not `BLAKE3(content)`.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the write fails.
257    pub async fn put_raw(&self, address: &XorName, data: &[u8]) -> Result<bool> {
258        // Fast-path duplicate check
259        if self.exists(address)? {
260            trace!("Record {} already exists", hex::encode(address));
261            self.stats.write().duplicates += 1;
262            return Ok(false);
263        }
264
265        let key = *address;
266        let value = data.to_vec();
267        let env = self.env.clone();
268        let db = self.db;
269        let max_chunks = self.config.max_chunks;
270
271        let was_new = spawn_blocking(move || -> Result<bool> {
272            let mut wtxn = env
273                .write_txn()
274                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
275
276            if db
277                .get(&wtxn, &key)
278                .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
279                .is_some()
280            {
281                return Ok(false);
282            }
283
284            if max_chunks > 0 {
285                let current = db
286                    .stat(&wtxn)
287                    .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
288                    .entries;
289                if current >= max_chunks {
290                    return Err(Error::Storage(format!(
291                        "Storage capacity reached: {current} stored, max is {max_chunks}"
292                    )));
293                }
294            }
295
296            db.put(&mut wtxn, &key, &value)
297                .map_err(|e| Error::Storage(format!("Failed to put record: {e}")))?;
298            wtxn.commit()
299                .map_err(|e| Error::Storage(format!("Failed to commit put: {e}")))?;
300            Ok(true)
301        })
302        .await
303        .map_err(|e| Error::Storage(format!("LMDB put_raw task failed: {e}")))??;
304
305        if !was_new {
306            // Race: fast-path missed it, txn-level check caught it.
307            self.stats.write().duplicates += 1;
308            return Ok(false);
309        }
310
311        {
312            let mut stats = self.stats.write();
313            stats.chunks_stored += 1;
314            stats.bytes_stored += data.len() as u64;
315        }
316
317        debug!(
318            "Stored record {} ({} bytes)",
319            hex::encode(address),
320            data.len()
321        );
322
323        Ok(true)
324    }
325
326    /// Overwrite a record at the given address without content-address verification.
327    ///
328    /// Unlike `put_raw`, this does not check for existing records and will
329    /// overwrite them. Used for mutable data types where
330    /// the handler has already validated the update.
331    ///
332    /// # Errors
333    ///
334    /// Returns an error if the write fails.
335    pub async fn put_overwrite(&self, address: &XorName, data: &[u8]) -> Result<()> {
336        let key = *address;
337        let value = data.to_vec();
338        let env = self.env.clone();
339        let db = self.db;
340
341        // Returns the byte length of the previous value, if any.
342        let old_len: Option<usize> = spawn_blocking(move || -> Result<Option<usize>> {
343            let mut wtxn = env
344                .write_txn()
345                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
346
347            let prev_len = db
348                .get(&wtxn, &key)
349                .map_err(|e| Error::Storage(format!("Failed to read old record: {e}")))?
350                .map(<[u8]>::len);
351
352            db.put(&mut wtxn, &key, &value)
353                .map_err(|e| Error::Storage(format!("Failed to put record: {e}")))?;
354            wtxn.commit()
355                .map_err(|e| Error::Storage(format!("Failed to commit put: {e}")))?;
356            Ok(prev_len)
357        })
358        .await
359        .map_err(|e| Error::Storage(format!("LMDB put_overwrite task failed: {e}")))??;
360
361        {
362            let mut stats = self.stats.write();
363            if let Some(prev) = old_len {
364                // Update: adjust byte count, chunk count stays the same.
365                stats.bytes_stored = stats
366                    .bytes_stored
367                    .saturating_sub(prev as u64)
368                    .saturating_add(data.len() as u64);
369            } else {
370                // New record.
371                stats.chunks_stored += 1;
372                stats.bytes_stored += data.len() as u64;
373            }
374        }
375
376        debug!(
377            "Overwritten record {} ({} bytes)",
378            hex::encode(address),
379            data.len()
380        );
381
382        Ok(())
383    }
384
385    /// Retrieve a chunk.
386    ///
387    /// # Arguments
388    ///
389    /// * `address` - Content address to retrieve
390    ///
391    /// # Returns
392    ///
393    /// Returns `Some(content)` if found, `None` if not found.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if read fails or verification fails.
398    pub async fn get(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
399        let key = *address;
400        let env = self.env.clone();
401        let db = self.db;
402
403        let content = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
404            let rtxn = env
405                .read_txn()
406                .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
407            let value = db
408                .get(&rtxn, &key)
409                .map_err(|e| Error::Storage(format!("Failed to get chunk: {e}")))?;
410            Ok(value.map(Vec::from))
411        })
412        .await
413        .map_err(|e| Error::Storage(format!("LMDB get task failed: {e}")))??;
414
415        let Some(content) = content else {
416            trace!("Chunk {} not found", hex::encode(address));
417            return Ok(None);
418        };
419
420        // Verify content if configured
421        if self.config.verify_on_read {
422            let computed = Self::compute_address(&content);
423            if computed != *address {
424                self.stats.write().verification_failures += 1;
425                warn!(
426                    "Chunk verification failed: expected {}, computed {}",
427                    hex::encode(address),
428                    hex::encode(computed)
429                );
430                return Err(Error::Storage(format!(
431                    "Chunk verification failed for {}",
432                    hex::encode(address)
433                )));
434            }
435        }
436
437        {
438            let mut stats = self.stats.write();
439            stats.chunks_retrieved += 1;
440            stats.bytes_retrieved += content.len() as u64;
441        }
442
443        debug!(
444            "Retrieved chunk {} ({} bytes)",
445            hex::encode(address),
446            content.len()
447        );
448
449        Ok(Some(content))
450    }
451
452    /// Retrieve raw data without content-address verification.
453    ///
454    /// Use this for non-chunk data types where the stored bytes are
455    /// serialized records, not raw content.
456    ///
457    /// # Errors
458    ///
459    /// Returns an error if the read fails.
460    pub async fn get_raw(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
461        let key = *address;
462        let env = self.env.clone();
463        let db = self.db;
464
465        let content = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
466            let rtxn = env
467                .read_txn()
468                .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
469            let value = db
470                .get(&rtxn, &key)
471                .map_err(|e| Error::Storage(format!("Failed to get record: {e}")))?;
472            Ok(value.map(Vec::from))
473        })
474        .await
475        .map_err(|e| Error::Storage(format!("LMDB get_raw task failed: {e}")))??;
476
477        let Some(content) = content else {
478            trace!("Record {} not found", hex::encode(address));
479            return Ok(None);
480        };
481
482        {
483            let mut stats = self.stats.write();
484            stats.chunks_retrieved += 1;
485            stats.bytes_retrieved += content.len() as u64;
486        }
487
488        debug!(
489            "Retrieved record {} ({} bytes)",
490            hex::encode(address),
491            content.len()
492        );
493
494        Ok(Some(content))
495    }
496
497    /// Check if a chunk exists.
498    ///
499    /// # Errors
500    ///
501    /// Returns an error if the LMDB read transaction fails.
502    pub fn exists(&self, address: &XorName) -> Result<bool> {
503        let rtxn = self
504            .env
505            .read_txn()
506            .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
507        let found = self
508            .db
509            .get(&rtxn, address.as_ref())
510            .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
511            .is_some();
512        Ok(found)
513    }
514
515    /// Delete a chunk.
516    ///
517    /// # Errors
518    ///
519    /// Returns an error if deletion fails.
520    pub async fn delete(&self, address: &XorName) -> Result<bool> {
521        let key = *address;
522        let env = self.env.clone();
523        let db = self.db;
524
525        let deleted = spawn_blocking(move || -> Result<bool> {
526            let mut wtxn = env
527                .write_txn()
528                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
529            let existed = db
530                .delete(&mut wtxn, &key)
531                .map_err(|e| Error::Storage(format!("Failed to delete chunk: {e}")))?;
532            wtxn.commit()
533                .map_err(|e| Error::Storage(format!("Failed to commit delete: {e}")))?;
534            Ok(existed)
535        })
536        .await
537        .map_err(|e| Error::Storage(format!("LMDB delete task failed: {e}")))??;
538
539        if deleted {
540            debug!("Deleted chunk {}", hex::encode(address));
541        }
542
543        Ok(deleted)
544    }
545
546    /// Get storage statistics.
547    #[must_use]
548    pub fn stats(&self) -> StorageStats {
549        let mut stats = self.stats.read().clone();
550        match self.current_chunks() {
551            Ok(count) => stats.current_chunks = count,
552            Err(e) => {
553                warn!("Failed to read current_chunks for stats: {e}");
554                stats.current_chunks = 0;
555            }
556        }
557        stats
558    }
559
560    /// Return the number of chunks currently stored, queried from LMDB metadata.
561    ///
562    /// This is an O(1) read of the B-tree page header — not a full scan.
563    ///
564    /// # Errors
565    ///
566    /// Returns an error if the LMDB read transaction fails.
567    pub fn current_chunks(&self) -> Result<u64> {
568        let rtxn = self
569            .env
570            .read_txn()
571            .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
572        let entries = self
573            .db
574            .stat(&rtxn)
575            .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
576            .entries;
577        Ok(entries as u64)
578    }
579
580    /// Compute content address (BLAKE3 hash).
581    #[must_use]
582    pub fn compute_address(content: &[u8]) -> XorName {
583        crate::client::compute_address(content)
584    }
585
586    /// Get the root directory.
587    #[must_use]
588    pub fn root_dir(&self) -> &Path {
589        &self.config.root_dir
590    }
591}
592
593#[cfg(test)]
594#[allow(clippy::unwrap_used, clippy::expect_used)]
595mod tests {
596    use super::*;
597    use tempfile::TempDir;
598
599    async fn create_test_storage() -> (LmdbStorage, TempDir) {
600        let temp_dir = TempDir::new().expect("create temp dir");
601        let config = LmdbStorageConfig {
602            root_dir: temp_dir.path().to_path_buf(),
603            verify_on_read: true,
604            max_chunks: 0,
605            max_map_size: 0,
606        };
607        let storage = LmdbStorage::new(config).await.expect("create storage");
608        (storage, temp_dir)
609    }
610
611    #[tokio::test]
612    async fn test_put_and_get() {
613        let (storage, _temp) = create_test_storage().await;
614
615        let content = b"hello world";
616        let address = LmdbStorage::compute_address(content);
617
618        // Store chunk
619        let is_new = storage.put(&address, content).await.expect("put");
620        assert!(is_new);
621
622        // Retrieve chunk
623        let retrieved = storage.get(&address).await.expect("get");
624        assert_eq!(retrieved, Some(content.to_vec()));
625    }
626
627    #[tokio::test]
628    async fn test_put_duplicate() {
629        let (storage, _temp) = create_test_storage().await;
630
631        let content = b"test data";
632        let address = LmdbStorage::compute_address(content);
633
634        // First store
635        let is_new1 = storage.put(&address, content).await.expect("put 1");
636        assert!(is_new1);
637
638        // Duplicate store
639        let is_new2 = storage.put(&address, content).await.expect("put 2");
640        assert!(!is_new2);
641
642        // Check stats
643        let stats = storage.stats();
644        assert_eq!(stats.chunks_stored, 1);
645        assert_eq!(stats.duplicates, 1);
646    }
647
648    #[tokio::test]
649    async fn test_get_not_found() {
650        let (storage, _temp) = create_test_storage().await;
651
652        let address = [0xAB; 32];
653        let result = storage.get(&address).await.expect("get");
654        assert!(result.is_none());
655    }
656
657    #[tokio::test]
658    async fn test_exists() {
659        let (storage, _temp) = create_test_storage().await;
660
661        let content = b"exists test";
662        let address = LmdbStorage::compute_address(content);
663
664        assert!(!storage.exists(&address).expect("exists"));
665
666        storage.put(&address, content).await.expect("put");
667
668        assert!(storage.exists(&address).expect("exists"));
669    }
670
671    #[tokio::test]
672    async fn test_delete() {
673        let (storage, _temp) = create_test_storage().await;
674
675        let content = b"delete test";
676        let address = LmdbStorage::compute_address(content);
677
678        // Store
679        storage.put(&address, content).await.expect("put");
680        assert!(storage.exists(&address).expect("exists"));
681
682        // Delete
683        let deleted = storage.delete(&address).await.expect("delete");
684        assert!(deleted);
685        assert!(!storage.exists(&address).expect("exists"));
686
687        // Delete again (already deleted)
688        let deleted2 = storage.delete(&address).await.expect("delete 2");
689        assert!(!deleted2);
690    }
691
692    #[tokio::test]
693    async fn test_max_chunks_enforced() {
694        let temp_dir = TempDir::new().expect("create temp dir");
695        let config = LmdbStorageConfig {
696            root_dir: temp_dir.path().to_path_buf(),
697            verify_on_read: true,
698            max_chunks: 2,
699            max_map_size: 0,
700        };
701        let storage = LmdbStorage::new(config).await.expect("create storage");
702
703        let content1 = b"chunk one";
704        let content2 = b"chunk two";
705        let content3 = b"chunk three";
706        let addr1 = LmdbStorage::compute_address(content1);
707        let addr2 = LmdbStorage::compute_address(content2);
708        let addr3 = LmdbStorage::compute_address(content3);
709
710        // First two should succeed
711        assert!(storage.put(&addr1, content1).await.is_ok());
712        assert!(storage.put(&addr2, content2).await.is_ok());
713
714        // Third should be rejected
715        let result = storage.put(&addr3, content3).await;
716        assert!(result.is_err());
717        assert!(result.unwrap_err().to_string().contains("capacity reached"));
718    }
719
720    #[tokio::test]
721    async fn test_address_mismatch() {
722        let (storage, _temp) = create_test_storage().await;
723
724        let content = b"some content";
725        let wrong_address = [0xFF; 32]; // Wrong address
726
727        let result = storage.put(&wrong_address, content).await;
728        assert!(result.is_err());
729        assert!(result.unwrap_err().to_string().contains("mismatch"));
730    }
731
732    #[test]
733    fn test_compute_address() {
734        // Known BLAKE3 hash of "hello world"
735        let content = b"hello world";
736        let address = LmdbStorage::compute_address(content);
737
738        let expected_hex = "d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24";
739        assert_eq!(hex::encode(address), expected_hex);
740    }
741
742    #[tokio::test]
743    async fn test_stats() {
744        let (storage, _temp) = create_test_storage().await;
745
746        let content1 = b"content 1";
747        let content2 = b"content 2";
748        let address1 = LmdbStorage::compute_address(content1);
749        let address2 = LmdbStorage::compute_address(content2);
750
751        // Store two chunks
752        storage.put(&address1, content1).await.expect("put 1");
753        storage.put(&address2, content2).await.expect("put 2");
754
755        // Retrieve one
756        storage.get(&address1).await.expect("get");
757
758        let stats = storage.stats();
759        assert_eq!(stats.chunks_stored, 2);
760        assert_eq!(stats.chunks_retrieved, 1);
761        assert_eq!(
762            stats.bytes_stored,
763            content1.len() as u64 + content2.len() as u64
764        );
765        assert_eq!(stats.bytes_retrieved, content1.len() as u64);
766        assert_eq!(stats.current_chunks, 2);
767    }
768
769    #[tokio::test]
770    async fn test_capacity_recovers_after_delete() {
771        let temp_dir = TempDir::new().expect("create temp dir");
772        let config = LmdbStorageConfig {
773            root_dir: temp_dir.path().to_path_buf(),
774            verify_on_read: true,
775            max_chunks: 1,
776            max_map_size: 0,
777        };
778        let storage = LmdbStorage::new(config).await.expect("create storage");
779
780        let first = b"first chunk";
781        let second = b"second chunk";
782        let addr1 = LmdbStorage::compute_address(first);
783        let addr2 = LmdbStorage::compute_address(second);
784
785        storage.put(&addr1, first).await.expect("put first");
786        storage.delete(&addr1).await.expect("delete first");
787
788        // Should succeed because delete freed capacity.
789        storage.put(&addr2, second).await.expect("put second");
790
791        let stats = storage.stats();
792        assert_eq!(stats.current_chunks, 1);
793    }
794
795    #[tokio::test]
796    async fn test_persistence_across_reopen() {
797        let temp_dir = TempDir::new().expect("create temp dir");
798        let content = b"persistent data";
799        let address = LmdbStorage::compute_address(content);
800
801        // Store a chunk
802        {
803            let config = LmdbStorageConfig {
804                root_dir: temp_dir.path().to_path_buf(),
805                verify_on_read: true,
806                max_chunks: 0,
807                max_map_size: 0,
808            };
809            let storage = LmdbStorage::new(config).await.expect("create storage");
810            storage.put(&address, content).await.expect("put");
811        }
812
813        // Re-open and verify it persisted
814        {
815            let config = LmdbStorageConfig {
816                root_dir: temp_dir.path().to_path_buf(),
817                verify_on_read: true,
818                max_chunks: 0,
819                max_map_size: 0,
820            };
821            let storage = LmdbStorage::new(config).await.expect("reopen storage");
822            assert_eq!(storage.current_chunks().expect("current_chunks"), 1);
823            let retrieved = storage.get(&address).await.expect("get");
824            assert_eq!(retrieved, Some(content.to_vec()));
825        }
826    }
827
828    #[tokio::test]
829    async fn test_put_raw_and_get_raw() {
830        let (storage, _temp) = create_test_storage().await;
831
832        // put_raw uses a caller-supplied address (not BLAKE3(content))
833        let address = [0xAA; 32];
834        let data = b"raw record data";
835
836        let was_new = storage.put_raw(&address, data).await.expect("put_raw");
837        assert!(was_new, "first put_raw should return true");
838
839        // get_raw retrieves without content-address verification
840        let retrieved = storage.get_raw(&address).await.expect("get_raw");
841        assert_eq!(retrieved, Some(data.to_vec()));
842
843        // Duplicate put_raw returns false
844        let was_new2 = storage.put_raw(&address, data).await.expect("put_raw dup");
845        assert!(!was_new2, "duplicate put_raw should return false");
846
847        // Stats: 1 stored, 1 duplicate
848        let stats = storage.stats();
849        assert_eq!(stats.chunks_stored, 1);
850        assert_eq!(stats.duplicates, 1);
851    }
852
853    #[tokio::test]
854    async fn test_get_raw_not_found() {
855        let (storage, _temp) = create_test_storage().await;
856
857        let result = storage.get_raw(&[0xBB; 32]).await.expect("get_raw");
858        assert!(result.is_none());
859    }
860
861    #[tokio::test]
862    async fn test_put_overwrite_new_record() {
863        let (storage, _temp) = create_test_storage().await;
864
865        let address = [0xCC; 32];
866        let data = b"new overwrite data";
867
868        storage
869            .put_overwrite(&address, data)
870            .await
871            .expect("put_overwrite");
872
873        let retrieved = storage.get_raw(&address).await.expect("get_raw");
874        assert_eq!(retrieved, Some(data.to_vec()));
875
876        let stats = storage.stats();
877        assert_eq!(stats.chunks_stored, 1);
878        assert_eq!(stats.bytes_stored, data.len() as u64);
879    }
880
881    #[tokio::test]
882    async fn test_put_overwrite_updates_stats_correctly() {
883        let (storage, _temp) = create_test_storage().await;
884
885        let address = [0xDD; 32];
886        let v1 = b"short";
887        let v2 = b"much longer replacement value";
888
889        // First write
890        storage
891            .put_overwrite(&address, v1)
892            .await
893            .expect("put_overwrite v1");
894        let stats1 = storage.stats();
895        assert_eq!(stats1.chunks_stored, 1);
896        assert_eq!(stats1.bytes_stored, v1.len() as u64);
897
898        // Overwrite with larger value — chunks_stored stays 1, bytes adjusts
899        storage
900            .put_overwrite(&address, v2)
901            .await
902            .expect("put_overwrite v2");
903        let stats2 = storage.stats();
904        assert_eq!(
905            stats2.chunks_stored, 1,
906            "overwrite should not increment chunk count"
907        );
908        assert_eq!(
909            stats2.bytes_stored,
910            v2.len() as u64,
911            "bytes should reflect new value size"
912        );
913
914        // Verify the new value is stored
915        let retrieved = storage.get_raw(&address).await.expect("get_raw");
916        assert_eq!(retrieved, Some(v2.to_vec()));
917    }
918}