1use crate::ant_protocol::XorName;
11use crate::error::{Error, Result};
12use crate::logging::{debug, info, trace, warn};
13use heed::types::Bytes;
14use heed::{Database, Env, EnvOpenOptions, MdbError};
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::Instant;
18use tokio::task::spawn_blocking;
19
20use crate::ant_protocol::XORNAME_LEN;
21
22pub const MIB: u64 = 1024 * 1024;
24
25pub const GIB: u64 = 1024 * MIB;
27
28const DEFAULT_DISK_RESERVE: u64 = 500 * MIB;
30
31#[allow(clippy::cast_precision_loss)] fn bytes_to_gib(bytes: u64) -> f64 {
34 bytes as f64 / GIB as f64
35}
36
37const MIN_MAP_SIZE: usize = 256 * 1024 * 1024;
42
43const DISK_CHECK_INTERVAL_SECS: u64 = 5;
48
49#[derive(Debug, Clone)]
51pub struct LmdbStorageConfig {
52 pub root_dir: PathBuf,
54 pub verify_on_read: bool,
56 pub max_map_size: usize,
61 pub disk_reserve: u64,
65}
66
67impl Default for LmdbStorageConfig {
68 fn default() -> Self {
69 Self {
70 root_dir: PathBuf::from(".ant/chunks"),
71 verify_on_read: true,
72 max_map_size: 0,
73 disk_reserve: DEFAULT_DISK_RESERVE,
74 }
75 }
76}
77
78impl LmdbStorageConfig {
79 #[cfg(any(test, feature = "test-utils"))]
82 #[must_use]
83 pub fn test_default() -> Self {
84 Self {
85 disk_reserve: 0,
86 ..Self::default()
87 }
88 }
89}
90
91#[derive(Debug, Clone, Default)]
93pub struct StorageStats {
94 pub chunks_stored: u64,
96 pub chunks_retrieved: u64,
98 pub bytes_stored: u64,
100 pub bytes_retrieved: u64,
102 pub duplicates: u64,
104 pub verification_failures: u64,
106 pub current_chunks: u64,
108}
109
110pub struct LmdbStorage {
115 env: Env,
117 db: Database<Bytes, Bytes>,
119 config: LmdbStorageConfig,
121 env_dir: PathBuf,
123 stats: parking_lot::RwLock<StorageStats>,
125 env_lock: Arc<parking_lot::RwLock<()>>,
131 last_disk_ok: parking_lot::Mutex<Option<Instant>>,
136}
137
138impl LmdbStorage {
139 #[allow(unsafe_code)]
153 pub async fn new(config: LmdbStorageConfig) -> Result<Self> {
154 let env_dir = config.root_dir.join("chunks.mdb");
155
156 std::fs::create_dir_all(&env_dir)
158 .map_err(|e| Error::Storage(format!("Failed to create LMDB directory: {e}")))?;
159
160 let map_size = if config.max_map_size > 0 {
161 config.max_map_size
163 } else {
164 let computed = compute_map_size(&env_dir, config.disk_reserve)?;
166 info!(
167 "Auto-computed LMDB map size: {:.2} GiB (available disk minus {:.2} GiB reserve)",
168 bytes_to_gib(computed as u64),
169 bytes_to_gib(config.disk_reserve),
170 );
171 computed
172 };
173
174 let env_dir_clone = env_dir.clone();
175 let (env, db) = spawn_blocking(move || -> Result<(Env, Database<Bytes, Bytes>)> {
176 let env = unsafe {
184 EnvOpenOptions::new()
185 .map_size(map_size)
186 .max_dbs(1)
187 .open(&env_dir_clone)
188 .map_err(|e| Error::Storage(format!("Failed to open LMDB env: {e}")))?
189 };
190
191 let mut wtxn = env
192 .write_txn()
193 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
194 let db: Database<Bytes, Bytes> = env
195 .create_database(&mut wtxn, None)
196 .map_err(|e| Error::Storage(format!("Failed to create database: {e}")))?;
197 wtxn.commit()
198 .map_err(|e| Error::Storage(format!("Failed to commit db creation: {e}")))?;
199
200 Ok((env, db))
201 })
202 .await
203 .map_err(|e| Error::Storage(format!("LMDB init task failed: {e}")))??;
204
205 let storage = Self {
206 env,
207 db,
208 config,
209 env_dir,
210 stats: parking_lot::RwLock::new(StorageStats::default()),
211 env_lock: Arc::new(parking_lot::RwLock::new(())),
212 last_disk_ok: parking_lot::Mutex::new(None),
213 };
214
215 debug!(
216 "Initialized LMDB storage at {:?} ({} existing chunks)",
217 storage.env_dir,
218 storage.current_chunks()?
219 );
220
221 Ok(storage)
222 }
223
224 pub async fn put(&self, address: &XorName, content: &[u8]) -> Result<bool> {
240 let computed = Self::compute_address(content);
242 if computed != *address {
243 return Err(Error::Storage(format!(
244 "Content address mismatch: expected {}, computed {}",
245 hex::encode(address),
246 hex::encode(computed)
247 )));
248 }
249
250 if self.exists(address)? {
254 trace!("Chunk {} already exists", hex::encode(address));
255 self.stats.write().duplicates += 1;
256 return Ok(false);
257 }
258
259 self.check_disk_space_cached()?;
263
264 match self.try_put(address, content).await? {
266 PutOutcome::New => {}
267 PutOutcome::Duplicate => {
268 trace!("Chunk {} already exists", hex::encode(address));
269 self.stats.write().duplicates += 1;
270 return Ok(false);
271 }
272 PutOutcome::MapFull => {
273 self.try_resize().await?;
276 match self.try_put(address, content).await? {
278 PutOutcome::New => {}
279 PutOutcome::Duplicate => {
280 self.stats.write().duplicates += 1;
281 return Ok(false);
282 }
283 PutOutcome::MapFull => {
284 return Err(Error::Storage(
285 "LMDB map full after resize — disk may be at capacity".into(),
286 ));
287 }
288 }
289 }
290 }
291
292 {
293 let mut stats = self.stats.write();
294 stats.chunks_stored += 1;
295 stats.bytes_stored += content.len() as u64;
296 }
297
298 debug!(
299 "Stored chunk {} ({} bytes)",
300 hex::encode(address),
301 content.len()
302 );
303
304 Ok(true)
305 }
306
307 async fn try_put(&self, address: &XorName, content: &[u8]) -> Result<PutOutcome> {
312 let key = *address;
313 let value = content.to_vec();
314 let env = self.env.clone();
315 let db = self.db;
316 let lock = Arc::clone(&self.env_lock);
317
318 spawn_blocking(move || -> Result<PutOutcome> {
319 let _guard = lock.read();
320
321 let mut wtxn = env
322 .write_txn()
323 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
324
325 if db
327 .get(&wtxn, &key)
328 .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
329 .is_some()
330 {
331 return Ok(PutOutcome::Duplicate);
332 }
333
334 match db.put(&mut wtxn, &key, &value) {
335 Ok(()) => {}
336 Err(heed::Error::Mdb(MdbError::MapFull)) => return Ok(PutOutcome::MapFull),
337 Err(e) => {
338 return Err(Error::Storage(format!("Failed to put chunk: {e}")));
339 }
340 }
341
342 match wtxn.commit() {
343 Ok(()) => Ok(PutOutcome::New),
344 Err(heed::Error::Mdb(MdbError::MapFull)) => Ok(PutOutcome::MapFull),
345 Err(e) => Err(Error::Storage(format!("Failed to commit put: {e}"))),
346 }
347 })
348 .await
349 .map_err(|e| Error::Storage(format!("LMDB put task failed: {e}")))?
350 }
351
352 pub async fn get(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
362 let key = *address;
363 let env = self.env.clone();
364 let db = self.db;
365 let lock = Arc::clone(&self.env_lock);
366
367 let content = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
368 let _guard = lock.read();
369 let rtxn = env
370 .read_txn()
371 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
372 let value = db
373 .get(&rtxn, &key)
374 .map_err(|e| Error::Storage(format!("Failed to get chunk: {e}")))?;
375 Ok(value.map(Vec::from))
376 })
377 .await
378 .map_err(|e| Error::Storage(format!("LMDB get task failed: {e}")))??;
379
380 let Some(content) = content else {
381 trace!("Chunk {} not found", hex::encode(address));
382 return Ok(None);
383 };
384
385 if self.config.verify_on_read {
387 let computed = Self::compute_address(&content);
388 if computed != *address {
389 self.stats.write().verification_failures += 1;
390 warn!(
391 "Chunk verification failed: expected {}, computed {}",
392 hex::encode(address),
393 hex::encode(computed)
394 );
395 return Err(Error::Storage(format!(
396 "Chunk verification failed for {}",
397 hex::encode(address)
398 )));
399 }
400 }
401
402 {
403 let mut stats = self.stats.write();
404 stats.chunks_retrieved += 1;
405 stats.bytes_retrieved += content.len() as u64;
406 }
407
408 debug!(
409 "Retrieved chunk {} ({} bytes)",
410 hex::encode(address),
411 content.len()
412 );
413
414 Ok(Some(content))
415 }
416
417 pub fn exists(&self, address: &XorName) -> Result<bool> {
423 let _guard = self.env_lock.read();
424 let rtxn = self
425 .env
426 .read_txn()
427 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
428 let found = self
429 .db
430 .get(&rtxn, address.as_ref())
431 .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
432 .is_some();
433 Ok(found)
434 }
435
436 pub async fn delete(&self, address: &XorName) -> Result<bool> {
442 let key = *address;
443 let env = self.env.clone();
444 let db = self.db;
445 let lock = Arc::clone(&self.env_lock);
446
447 let deleted = spawn_blocking(move || -> Result<bool> {
448 let _guard = lock.read();
449 let mut wtxn = env
450 .write_txn()
451 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
452 let existed = db
453 .delete(&mut wtxn, &key)
454 .map_err(|e| Error::Storage(format!("Failed to delete chunk: {e}")))?;
455 wtxn.commit()
456 .map_err(|e| Error::Storage(format!("Failed to commit delete: {e}")))?;
457 Ok(existed)
458 })
459 .await
460 .map_err(|e| Error::Storage(format!("LMDB delete task failed: {e}")))??;
461
462 if deleted {
463 debug!("Deleted chunk {}", hex::encode(address));
464 }
465
466 Ok(deleted)
467 }
468
469 #[must_use]
471 pub fn stats(&self) -> StorageStats {
472 let mut stats = self.stats.read().clone();
473 match self.current_chunks() {
474 Ok(count) => stats.current_chunks = count,
475 Err(e) => {
476 warn!("Failed to read current_chunks for stats: {e}");
477 stats.current_chunks = 0;
478 }
479 }
480 stats
481 }
482
483 pub fn current_chunks(&self) -> Result<u64> {
491 let _guard = self.env_lock.read();
492 let rtxn = self
493 .env
494 .read_txn()
495 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
496 let entries = self
497 .db
498 .stat(&rtxn)
499 .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
500 .entries;
501 Ok(entries as u64)
502 }
503
504 #[must_use]
506 pub fn compute_address(content: &[u8]) -> XorName {
507 crate::client::compute_address(content)
508 }
509
510 #[must_use]
512 pub fn root_dir(&self) -> &Path {
513 &self.config.root_dir
514 }
515
516 pub async fn all_keys(&self) -> Result<Vec<XorName>> {
525 let env = self.env.clone();
526 let db = self.db;
527
528 let keys = spawn_blocking(move || -> Result<Vec<XorName>> {
529 let rtxn = env
530 .read_txn()
531 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
532 let mut keys = Vec::new();
533 let iter = db
534 .iter(&rtxn)
535 .map_err(|e| Error::Storage(format!("Failed to iterate database: {e}")))?;
536 for result in iter {
537 let (key_bytes, _) =
538 result.map_err(|e| Error::Storage(format!("Failed to read entry: {e}")))?;
539 if key_bytes.len() == XORNAME_LEN {
540 let mut key = [0u8; XORNAME_LEN];
541 key.copy_from_slice(key_bytes);
542 keys.push(key);
543 } else {
544 crate::logging::warn!(
545 "LmdbStorage: skipping entry with unexpected key length {} (expected {XORNAME_LEN})",
546 key_bytes.len()
547 );
548 }
549 }
550 Ok(keys)
551 })
552 .await
553 .map_err(|e| Error::Storage(format!("all_keys task failed: {e}")))?;
554
555 keys
556 }
557
558 pub async fn get_raw(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
567 let key = *address;
568 let env = self.env.clone();
569 let db = self.db;
570
571 let value = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
572 let rtxn = env
573 .read_txn()
574 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
575 let val = db
576 .get(&rtxn, key.as_ref())
577 .map_err(|e| Error::Storage(format!("Failed to get chunk: {e}")))?;
578 Ok(val.map(Vec::from))
579 })
580 .await
581 .map_err(|e| Error::Storage(format!("get_raw task failed: {e}")))?;
582
583 value
584 }
585
586 fn check_disk_space_cached(&self) -> Result<()> {
591 {
592 let last = self.last_disk_ok.lock();
593 if let Some(t) = *last {
594 if t.elapsed().as_secs() < DISK_CHECK_INTERVAL_SECS {
595 return Ok(());
596 }
597 }
598 }
599 check_disk_space(&self.env_dir, self.config.disk_reserve)?;
601 *self.last_disk_ok.lock() = Some(Instant::now());
603 Ok(())
604 }
605
606 #[allow(unsafe_code)]
617 async fn try_resize(&self) -> Result<()> {
618 let from_disk = compute_map_size(&self.env_dir, self.config.disk_reserve)?;
619 let env = self.env.clone();
620 let lock = Arc::clone(&self.env_lock);
621
622 spawn_blocking(move || -> Result<()> {
623 let _guard = lock.write();
625
626 let current_map = env.info().map_size;
629 let new_size = from_disk.max(current_map);
630
631 if new_size <= current_map {
632 debug!("LMDB map resize skipped — no additional disk space available");
633 return Ok(());
634 }
635
636 unsafe {
638 env.resize(new_size)
639 .map_err(|e| Error::Storage(format!("Failed to resize LMDB map: {e}")))?;
640 }
641
642 info!(
643 "Resized LMDB map to {:.2} GiB (was {:.2} GiB)",
644 bytes_to_gib(new_size as u64),
645 bytes_to_gib(current_map as u64),
646 );
647 Ok(())
648 })
649 .await
650 .map_err(|e| Error::Storage(format!("LMDB resize task failed: {e}")))?
651 }
652}
653
654enum PutOutcome {
660 New,
662 Duplicate,
664 MapFull,
666}
667
668fn compute_map_size(db_dir: &Path, reserve: u64) -> Result<usize> {
684 let available = fs2::available_space(db_dir)
685 .map_err(|e| Error::Storage(format!("Failed to query available disk space: {e}")))?;
686
687 let mdb_file = db_dir.join("data.mdb");
689 let current_db_bytes = std::fs::metadata(&mdb_file).map(|m| m.len()).unwrap_or(0);
690
691 let growth_room = available.saturating_sub(reserve);
694 let target = current_db_bytes.saturating_add(growth_room);
695
696 let page = page_size::get() as u64;
698 let aligned = target.div_ceil(page) * page;
699
700 let result = usize::try_from(aligned).unwrap_or(usize::MAX);
701 Ok(result.max(MIN_MAP_SIZE))
702}
703
704fn check_disk_space(db_dir: &Path, reserve: u64) -> Result<()> {
706 let available = fs2::available_space(db_dir)
707 .map_err(|e| Error::Storage(format!("Failed to query available disk space: {e}")))?;
708
709 if available < reserve {
710 return Err(Error::Storage(format!(
711 "Insufficient disk space: {:.2} GiB available, {:.2} GiB reserve required. \
712 Free disk space or increase the partition to continue storing chunks.",
713 bytes_to_gib(available),
714 bytes_to_gib(reserve),
715 )));
716 }
717
718 Ok(())
719}
720
721#[cfg(test)]
722#[allow(clippy::unwrap_used, clippy::expect_used)]
723mod tests {
724 use super::*;
725 use tempfile::TempDir;
726
727 async fn create_test_storage() -> (LmdbStorage, TempDir) {
728 let temp_dir = TempDir::new().expect("create temp dir");
729 let config = LmdbStorageConfig {
730 root_dir: temp_dir.path().to_path_buf(),
731 ..LmdbStorageConfig::test_default()
732 };
733 let storage = LmdbStorage::new(config).await.expect("create storage");
734 (storage, temp_dir)
735 }
736
737 #[tokio::test]
738 async fn test_put_and_get() {
739 let (storage, _temp) = create_test_storage().await;
740
741 let content = b"hello world";
742 let address = LmdbStorage::compute_address(content);
743
744 let is_new = storage.put(&address, content).await.expect("put");
746 assert!(is_new);
747
748 let retrieved = storage.get(&address).await.expect("get");
750 assert_eq!(retrieved, Some(content.to_vec()));
751 }
752
753 #[tokio::test]
754 async fn test_put_duplicate() {
755 let (storage, _temp) = create_test_storage().await;
756
757 let content = b"test data";
758 let address = LmdbStorage::compute_address(content);
759
760 let is_new1 = storage.put(&address, content).await.expect("put 1");
762 assert!(is_new1);
763
764 let is_new2 = storage.put(&address, content).await.expect("put 2");
766 assert!(!is_new2);
767
768 let stats = storage.stats();
770 assert_eq!(stats.chunks_stored, 1);
771 assert_eq!(stats.duplicates, 1);
772 }
773
774 #[tokio::test]
775 async fn test_get_not_found() {
776 let (storage, _temp) = create_test_storage().await;
777
778 let address = [0xAB; 32];
779 let result = storage.get(&address).await.expect("get");
780 assert!(result.is_none());
781 }
782
783 #[tokio::test]
784 async fn test_exists() {
785 let (storage, _temp) = create_test_storage().await;
786
787 let content = b"exists test";
788 let address = LmdbStorage::compute_address(content);
789
790 assert!(!storage.exists(&address).expect("exists"));
791
792 storage.put(&address, content).await.expect("put");
793
794 assert!(storage.exists(&address).expect("exists"));
795 }
796
797 #[tokio::test]
798 async fn test_delete() {
799 let (storage, _temp) = create_test_storage().await;
800
801 let content = b"delete test";
802 let address = LmdbStorage::compute_address(content);
803
804 storage.put(&address, content).await.expect("put");
806 assert!(storage.exists(&address).expect("exists"));
807
808 let deleted = storage.delete(&address).await.expect("delete");
810 assert!(deleted);
811 assert!(!storage.exists(&address).expect("exists"));
812
813 let deleted2 = storage.delete(&address).await.expect("delete 2");
815 assert!(!deleted2);
816 }
817
818 #[tokio::test]
819 async fn test_address_mismatch() {
820 let (storage, _temp) = create_test_storage().await;
821
822 let content = b"some content";
823 let wrong_address = [0xFF; 32]; let result = storage.put(&wrong_address, content).await;
826 assert!(result.is_err());
827 assert!(result.unwrap_err().to_string().contains("mismatch"));
828 }
829
830 #[test]
831 fn test_compute_address() {
832 let content = b"hello world";
834 let address = LmdbStorage::compute_address(content);
835
836 let expected_hex = "d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24";
837 assert_eq!(hex::encode(address), expected_hex);
838 }
839
840 #[tokio::test]
841 async fn test_stats() {
842 let (storage, _temp) = create_test_storage().await;
843
844 let content1 = b"content 1";
845 let content2 = b"content 2";
846 let address1 = LmdbStorage::compute_address(content1);
847 let address2 = LmdbStorage::compute_address(content2);
848
849 storage.put(&address1, content1).await.expect("put 1");
851 storage.put(&address2, content2).await.expect("put 2");
852
853 storage.get(&address1).await.expect("get");
855
856 let stats = storage.stats();
857 assert_eq!(stats.chunks_stored, 2);
858 assert_eq!(stats.chunks_retrieved, 1);
859 assert_eq!(
860 stats.bytes_stored,
861 content1.len() as u64 + content2.len() as u64
862 );
863 assert_eq!(stats.bytes_retrieved, content1.len() as u64);
864 assert_eq!(stats.current_chunks, 2);
865 }
866
867 #[tokio::test]
868 async fn test_persistence_across_reopen() {
869 let temp_dir = TempDir::new().expect("create temp dir");
870 let content = b"persistent data";
871 let address = LmdbStorage::compute_address(content);
872
873 {
875 let config = LmdbStorageConfig {
876 root_dir: temp_dir.path().to_path_buf(),
877 ..LmdbStorageConfig::test_default()
878 };
879 let storage = LmdbStorage::new(config).await.expect("create storage");
880 storage.put(&address, content).await.expect("put");
881 }
882
883 {
885 let config = LmdbStorageConfig {
886 root_dir: temp_dir.path().to_path_buf(),
887 ..LmdbStorageConfig::test_default()
888 };
889 let storage = LmdbStorage::new(config).await.expect("reopen storage");
890 assert_eq!(storage.current_chunks().expect("current_chunks"), 1);
891 let retrieved = storage.get(&address).await.expect("get");
892 assert_eq!(retrieved, Some(content.to_vec()));
893 }
894 }
895
896 #[tokio::test]
897 async fn test_all_keys() {
898 let (storage, _temp) = create_test_storage().await;
899
900 let keys = storage.all_keys().await.expect("all_keys empty");
902 assert!(keys.is_empty());
903
904 let content1 = b"chunk one for keys";
906 let content2 = b"chunk two for keys";
907 let addr1 = LmdbStorage::compute_address(content1);
908 let addr2 = LmdbStorage::compute_address(content2);
909 storage.put(&addr1, content1).await.expect("put 1");
910 storage.put(&addr2, content2).await.expect("put 2");
911
912 let mut keys = storage.all_keys().await.expect("all_keys");
913 keys.sort_unstable();
914 let mut expected = vec![addr1, addr2];
915 expected.sort_unstable();
916 assert_eq!(keys, expected);
917 }
918
919 #[tokio::test]
920 async fn test_get_raw() {
921 let (storage, _temp) = create_test_storage().await;
922
923 let content = b"raw test data";
924 let address = LmdbStorage::compute_address(content);
925 storage.put(&address, content).await.expect("put");
926
927 let raw = storage.get_raw(&address).await.expect("get_raw");
929 assert_eq!(raw, Some(content.to_vec()));
930
931 let missing = storage.get_raw(&[0xFF; 32]).await.expect("get_raw missing");
933 assert!(missing.is_none());
934 }
935}