1use crate::ant_protocol::XorName;
11use crate::error::{Error, Result};
12use crate::logging::{debug, info, trace, warn};
13use heed::types::Bytes;
14use heed::{Database, Env, EnvOpenOptions, MdbError};
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::Instant;
18use tokio::task::spawn_blocking;
19
20use crate::ant_protocol::XORNAME_LEN;
21
22pub const MIB: u64 = 1024 * 1024;
24
25pub const GIB: u64 = 1024 * MIB;
27
28const DEFAULT_DISK_RESERVE: u64 = 500 * MIB;
30
31#[allow(clippy::cast_precision_loss)] fn bytes_to_gib(bytes: u64) -> f64 {
34 bytes as f64 / GIB as f64
35}
36
37const MIN_MAP_SIZE: usize = 256 * 1024 * 1024;
42
43const DISK_CHECK_INTERVAL_SECS: u64 = 5;
48
49#[derive(Debug, Clone)]
51pub struct LmdbStorageConfig {
52 pub root_dir: PathBuf,
54 pub verify_on_read: bool,
56 pub max_map_size: usize,
61 pub disk_reserve: u64,
65}
66
67impl Default for LmdbStorageConfig {
68 fn default() -> Self {
69 Self {
70 root_dir: PathBuf::from(".ant/chunks"),
71 verify_on_read: true,
72 max_map_size: 0,
73 disk_reserve: DEFAULT_DISK_RESERVE,
74 }
75 }
76}
77
78impl LmdbStorageConfig {
79 #[cfg(any(test, feature = "test-utils"))]
82 #[must_use]
83 pub fn test_default() -> Self {
84 Self {
85 disk_reserve: 0,
86 ..Self::default()
87 }
88 }
89}
90
91#[derive(Debug, Clone, Default)]
93pub struct StorageStats {
94 pub chunks_stored: u64,
96 pub chunks_retrieved: u64,
98 pub bytes_stored: u64,
100 pub bytes_retrieved: u64,
102 pub duplicates: u64,
104 pub verification_failures: u64,
106 pub current_chunks: u64,
108}
109
110pub struct LmdbStorage {
115 env: Env,
117 db: Database<Bytes, Bytes>,
119 config: LmdbStorageConfig,
121 env_dir: PathBuf,
123 stats: parking_lot::RwLock<StorageStats>,
125 env_lock: Arc<parking_lot::RwLock<()>>,
131 last_disk_ok: parking_lot::Mutex<Option<Instant>>,
136}
137
138impl LmdbStorage {
139 #[allow(unsafe_code)]
153 pub async fn new(config: LmdbStorageConfig) -> Result<Self> {
154 let env_dir = config.root_dir.join("chunks.mdb");
155
156 std::fs::create_dir_all(&env_dir)
158 .map_err(|e| Error::Storage(format!("Failed to create LMDB directory: {e}")))?;
159
160 let map_size = if config.max_map_size > 0 {
161 config.max_map_size
163 } else {
164 let computed = compute_map_size(&env_dir, config.disk_reserve)?;
166 info!(
167 "Auto-computed LMDB map size: {:.2} GiB (available disk minus {:.2} GiB reserve)",
168 bytes_to_gib(computed as u64),
169 bytes_to_gib(config.disk_reserve),
170 );
171 computed
172 };
173
174 let env_dir_clone = env_dir.clone();
175 let (env, db) = spawn_blocking(move || -> Result<(Env, Database<Bytes, Bytes>)> {
176 let env = unsafe {
184 EnvOpenOptions::new()
185 .map_size(map_size)
186 .max_dbs(1)
187 .open(&env_dir_clone)
188 .map_err(|e| Error::Storage(format!("Failed to open LMDB env: {e}")))?
189 };
190
191 let mut wtxn = env
192 .write_txn()
193 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
194 let db: Database<Bytes, Bytes> = env
195 .create_database(&mut wtxn, None)
196 .map_err(|e| Error::Storage(format!("Failed to create database: {e}")))?;
197 wtxn.commit()
198 .map_err(|e| Error::Storage(format!("Failed to commit db creation: {e}")))?;
199
200 Ok((env, db))
201 })
202 .await
203 .map_err(|e| Error::Storage(format!("LMDB init task failed: {e}")))??;
204
205 let storage = Self {
206 env,
207 db,
208 config,
209 env_dir,
210 stats: parking_lot::RwLock::new(StorageStats::default()),
211 env_lock: Arc::new(parking_lot::RwLock::new(())),
212 last_disk_ok: parking_lot::Mutex::new(None),
213 };
214
215 debug!(
216 "Initialized LMDB storage at {:?} ({} existing chunks)",
217 storage.env_dir,
218 storage.current_chunks()?
219 );
220
221 Ok(storage)
222 }
223
224 pub async fn put(&self, address: &XorName, content: &[u8]) -> Result<bool> {
240 let computed = Self::compute_address(content);
242 if computed != *address {
243 return Err(Error::Storage(format!(
244 "Content address mismatch: expected {}, computed {}",
245 hex::encode(address),
246 hex::encode(computed)
247 )));
248 }
249
250 if self.exists(address)? {
254 trace!("Chunk {} already exists", hex::encode(address));
255 self.stats.write().duplicates += 1;
256 return Ok(false);
257 }
258
259 self.check_disk_space_cached()?;
263
264 match self.try_put(address, content).await? {
266 PutOutcome::New => {}
267 PutOutcome::Duplicate => {
268 trace!("Chunk {} already exists", hex::encode(address));
269 self.stats.write().duplicates += 1;
270 return Ok(false);
271 }
272 PutOutcome::MapFull => {
273 self.try_resize().await?;
276 match self.try_put(address, content).await? {
278 PutOutcome::New => {}
279 PutOutcome::Duplicate => {
280 self.stats.write().duplicates += 1;
281 return Ok(false);
282 }
283 PutOutcome::MapFull => {
284 return Err(Error::Storage(
285 "LMDB map full after resize — disk may be at capacity".into(),
286 ));
287 }
288 }
289 }
290 }
291
292 {
293 let mut stats = self.stats.write();
294 stats.chunks_stored += 1;
295 stats.bytes_stored += content.len() as u64;
296 }
297
298 debug!(
299 "Stored chunk {} ({} bytes)",
300 hex::encode(address),
301 content.len()
302 );
303
304 Ok(true)
305 }
306
307 async fn try_put(&self, address: &XorName, content: &[u8]) -> Result<PutOutcome> {
312 let key = *address;
313 let value = content.to_vec();
314 let env = self.env.clone();
315 let db = self.db;
316 let lock = Arc::clone(&self.env_lock);
317
318 spawn_blocking(move || -> Result<PutOutcome> {
319 let _guard = lock.read();
320
321 let mut wtxn = env
322 .write_txn()
323 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
324
325 if db
327 .get(&wtxn, &key)
328 .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
329 .is_some()
330 {
331 return Ok(PutOutcome::Duplicate);
332 }
333
334 match db.put(&mut wtxn, &key, &value) {
335 Ok(()) => {}
336 Err(heed::Error::Mdb(MdbError::MapFull)) => return Ok(PutOutcome::MapFull),
337 Err(e) => {
338 return Err(Error::Storage(format!("Failed to put chunk: {e}")));
339 }
340 }
341
342 match wtxn.commit() {
343 Ok(()) => Ok(PutOutcome::New),
344 Err(heed::Error::Mdb(MdbError::MapFull)) => Ok(PutOutcome::MapFull),
345 Err(e) => Err(Error::Storage(format!("Failed to commit put: {e}"))),
346 }
347 })
348 .await
349 .map_err(|e| Error::Storage(format!("LMDB put task failed: {e}")))?
350 }
351
352 pub async fn get(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
362 let key = *address;
363 let env = self.env.clone();
364 let db = self.db;
365 let lock = Arc::clone(&self.env_lock);
366
367 let content = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
368 let _guard = lock.read();
369 let rtxn = env
370 .read_txn()
371 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
372 let value = db
373 .get(&rtxn, &key)
374 .map_err(|e| Error::Storage(format!("Failed to get chunk: {e}")))?;
375 Ok(value.map(Vec::from))
376 })
377 .await
378 .map_err(|e| Error::Storage(format!("LMDB get task failed: {e}")))??;
379
380 let Some(content) = content else {
381 trace!("Chunk {} not found", hex::encode(address));
382 return Ok(None);
383 };
384
385 if self.config.verify_on_read {
387 let computed = Self::compute_address(&content);
388 if computed != *address {
389 self.stats.write().verification_failures += 1;
390 warn!(
391 "Chunk verification failed: expected {}, computed {}",
392 hex::encode(address),
393 hex::encode(computed)
394 );
395 return Err(Error::Storage(format!(
396 "Chunk verification failed for {}",
397 hex::encode(address)
398 )));
399 }
400 }
401
402 {
403 let mut stats = self.stats.write();
404 stats.chunks_retrieved += 1;
405 stats.bytes_retrieved += content.len() as u64;
406 }
407
408 debug!(
409 "Retrieved chunk {} ({} bytes)",
410 hex::encode(address),
411 content.len()
412 );
413
414 Ok(Some(content))
415 }
416
417 pub fn exists(&self, address: &XorName) -> Result<bool> {
423 let _guard = self.env_lock.read();
424 let rtxn = self
425 .env
426 .read_txn()
427 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
428 let found = self
429 .db
430 .get(&rtxn, address.as_ref())
431 .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
432 .is_some();
433 Ok(found)
434 }
435
436 pub async fn delete(&self, address: &XorName) -> Result<bool> {
442 let key = *address;
443 let env = self.env.clone();
444 let db = self.db;
445 let lock = Arc::clone(&self.env_lock);
446
447 let deleted = spawn_blocking(move || -> Result<bool> {
448 let _guard = lock.read();
449 let mut wtxn = env
450 .write_txn()
451 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
452 let existed = db
453 .delete(&mut wtxn, &key)
454 .map_err(|e| Error::Storage(format!("Failed to delete chunk: {e}")))?;
455 wtxn.commit()
456 .map_err(|e| Error::Storage(format!("Failed to commit delete: {e}")))?;
457 Ok(existed)
458 })
459 .await
460 .map_err(|e| Error::Storage(format!("LMDB delete task failed: {e}")))??;
461
462 if deleted {
463 debug!("Deleted chunk {}", hex::encode(address));
464 }
465
466 Ok(deleted)
467 }
468
469 #[must_use]
471 pub fn stats(&self) -> StorageStats {
472 let mut stats = self.stats.read().clone();
473 match self.current_chunks() {
474 Ok(count) => stats.current_chunks = count,
475 Err(e) => {
476 warn!("Failed to read current_chunks for stats: {e}");
477 stats.current_chunks = 0;
478 }
479 }
480 stats
481 }
482
483 pub fn current_chunks(&self) -> Result<u64> {
491 let _guard = self.env_lock.read();
492 let rtxn = self
493 .env
494 .read_txn()
495 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
496 let entries = self
497 .db
498 .stat(&rtxn)
499 .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
500 .entries;
501 Ok(entries as u64)
502 }
503
504 #[must_use]
506 pub fn compute_address(content: &[u8]) -> XorName {
507 crate::client::compute_address(content)
508 }
509
510 #[must_use]
512 pub fn root_dir(&self) -> &Path {
513 &self.config.root_dir
514 }
515
516 pub async fn all_keys(&self) -> Result<Vec<XorName>> {
525 let env = self.env.clone();
526 let db = self.db;
527
528 let keys = spawn_blocking(move || -> Result<Vec<XorName>> {
529 let rtxn = env
530 .read_txn()
531 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
532 let mut keys = Vec::new();
533 let iter = db
534 .iter(&rtxn)
535 .map_err(|e| Error::Storage(format!("Failed to iterate database: {e}")))?;
536 for result in iter {
537 let (key_bytes, _) =
538 result.map_err(|e| Error::Storage(format!("Failed to read entry: {e}")))?;
539 if key_bytes.len() == XORNAME_LEN {
540 let mut key = [0u8; XORNAME_LEN];
541 key.copy_from_slice(key_bytes);
542 keys.push(key);
543 } else {
544 crate::logging::warn!(
545 "LmdbStorage: skipping entry with unexpected key length {} (expected {XORNAME_LEN})",
546 key_bytes.len()
547 );
548 }
549 }
550 Ok(keys)
551 })
552 .await
553 .map_err(|e| Error::Storage(format!("all_keys task failed: {e}")))?;
554
555 keys
556 }
557
558 pub async fn get_raw(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
567 let key = *address;
568 let env = self.env.clone();
569 let db = self.db;
570
571 let value = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
572 let rtxn = env
573 .read_txn()
574 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
575 let val = db
576 .get(&rtxn, key.as_ref())
577 .map_err(|e| Error::Storage(format!("Failed to get chunk: {e}")))?;
578 Ok(val.map(Vec::from))
579 })
580 .await
581 .map_err(|e| Error::Storage(format!("get_raw task failed: {e}")))?;
582
583 value
584 }
585
586 pub(crate) fn check_capacity(&self) -> Result<()> {
600 self.check_disk_space_cached()
601 }
602
603 fn check_disk_space_cached(&self) -> Result<()> {
608 {
609 let last = self.last_disk_ok.lock();
610 if let Some(t) = *last {
611 if t.elapsed().as_secs() < DISK_CHECK_INTERVAL_SECS {
612 return Ok(());
613 }
614 }
615 }
616 check_disk_space(&self.env_dir, self.config.disk_reserve)?;
618 *self.last_disk_ok.lock() = Some(Instant::now());
620 Ok(())
621 }
622
623 #[allow(unsafe_code)]
634 async fn try_resize(&self) -> Result<()> {
635 let from_disk = compute_map_size(&self.env_dir, self.config.disk_reserve)?;
636 let env = self.env.clone();
637 let lock = Arc::clone(&self.env_lock);
638
639 spawn_blocking(move || -> Result<()> {
640 let _guard = lock.write();
642
643 let current_map = env.info().map_size;
646 let new_size = from_disk.max(current_map);
647
648 if new_size <= current_map {
649 debug!("LMDB map resize skipped — no additional disk space available");
650 return Ok(());
651 }
652
653 unsafe {
655 env.resize(new_size)
656 .map_err(|e| Error::Storage(format!("Failed to resize LMDB map: {e}")))?;
657 }
658
659 info!(
660 "Resized LMDB map to {:.2} GiB (was {:.2} GiB)",
661 bytes_to_gib(new_size as u64),
662 bytes_to_gib(current_map as u64),
663 );
664 Ok(())
665 })
666 .await
667 .map_err(|e| Error::Storage(format!("LMDB resize task failed: {e}")))?
668 }
669}
670
671enum PutOutcome {
677 New,
679 Duplicate,
681 MapFull,
683}
684
685fn compute_map_size(db_dir: &Path, reserve: u64) -> Result<usize> {
701 let available = fs2::available_space(db_dir)
702 .map_err(|e| Error::Storage(format!("Failed to query available disk space: {e}")))?;
703
704 let mdb_file = db_dir.join("data.mdb");
706 let current_db_bytes = std::fs::metadata(&mdb_file).map_or(0, |m| m.len());
707
708 let growth_room = available.saturating_sub(reserve);
711 let target = current_db_bytes.saturating_add(growth_room);
712
713 let page = page_size::get() as u64;
715 let aligned = target.div_ceil(page) * page;
716
717 let result = usize::try_from(aligned).unwrap_or(usize::MAX);
718 Ok(result.max(MIN_MAP_SIZE))
719}
720
721fn check_disk_space(db_dir: &Path, reserve: u64) -> Result<()> {
723 let available = fs2::available_space(db_dir)
724 .map_err(|e| Error::Storage(format!("Failed to query available disk space: {e}")))?;
725
726 if available < reserve {
727 return Err(Error::Storage(format!(
728 "Insufficient disk space: {:.2} GiB available, {:.2} GiB reserve required. \
729 Free disk space or increase the partition to continue storing chunks.",
730 bytes_to_gib(available),
731 bytes_to_gib(reserve),
732 )));
733 }
734
735 Ok(())
736}
737
738#[cfg(test)]
739#[allow(clippy::unwrap_used, clippy::expect_used)]
740mod tests {
741 use super::*;
742 use tempfile::TempDir;
743
744 async fn create_test_storage() -> (LmdbStorage, TempDir) {
745 let temp_dir = TempDir::new().expect("create temp dir");
746 let config = LmdbStorageConfig {
747 root_dir: temp_dir.path().to_path_buf(),
748 ..LmdbStorageConfig::test_default()
749 };
750 let storage = LmdbStorage::new(config).await.expect("create storage");
751 (storage, temp_dir)
752 }
753
754 #[tokio::test]
755 async fn test_put_and_get() {
756 let (storage, _temp) = create_test_storage().await;
757
758 let content = b"hello world";
759 let address = LmdbStorage::compute_address(content);
760
761 let is_new = storage.put(&address, content).await.expect("put");
763 assert!(is_new);
764
765 let retrieved = storage.get(&address).await.expect("get");
767 assert_eq!(retrieved, Some(content.to_vec()));
768 }
769
770 #[tokio::test]
771 async fn test_put_duplicate() {
772 let (storage, _temp) = create_test_storage().await;
773
774 let content = b"test data";
775 let address = LmdbStorage::compute_address(content);
776
777 let is_new1 = storage.put(&address, content).await.expect("put 1");
779 assert!(is_new1);
780
781 let is_new2 = storage.put(&address, content).await.expect("put 2");
783 assert!(!is_new2);
784
785 let stats = storage.stats();
787 assert_eq!(stats.chunks_stored, 1);
788 assert_eq!(stats.duplicates, 1);
789 }
790
791 #[tokio::test]
792 async fn test_get_not_found() {
793 let (storage, _temp) = create_test_storage().await;
794
795 let address = [0xAB; 32];
796 let result = storage.get(&address).await.expect("get");
797 assert!(result.is_none());
798 }
799
800 #[tokio::test]
801 async fn test_exists() {
802 let (storage, _temp) = create_test_storage().await;
803
804 let content = b"exists test";
805 let address = LmdbStorage::compute_address(content);
806
807 assert!(!storage.exists(&address).expect("exists"));
808
809 storage.put(&address, content).await.expect("put");
810
811 assert!(storage.exists(&address).expect("exists"));
812 }
813
814 #[tokio::test]
815 async fn test_delete() {
816 let (storage, _temp) = create_test_storage().await;
817
818 let content = b"delete test";
819 let address = LmdbStorage::compute_address(content);
820
821 storage.put(&address, content).await.expect("put");
823 assert!(storage.exists(&address).expect("exists"));
824
825 let deleted = storage.delete(&address).await.expect("delete");
827 assert!(deleted);
828 assert!(!storage.exists(&address).expect("exists"));
829
830 let deleted2 = storage.delete(&address).await.expect("delete 2");
832 assert!(!deleted2);
833 }
834
835 #[tokio::test]
836 async fn test_address_mismatch() {
837 let (storage, _temp) = create_test_storage().await;
838
839 let content = b"some content";
840 let wrong_address = [0xFF; 32]; let result = storage.put(&wrong_address, content).await;
843 assert!(result.is_err());
844 assert!(result.unwrap_err().to_string().contains("mismatch"));
845 }
846
847 #[test]
848 fn test_compute_address() {
849 let content = b"hello world";
851 let address = LmdbStorage::compute_address(content);
852
853 let expected_hex = "d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24";
854 assert_eq!(hex::encode(address), expected_hex);
855 }
856
857 #[tokio::test]
858 async fn test_stats() {
859 let (storage, _temp) = create_test_storage().await;
860
861 let content1 = b"content 1";
862 let content2 = b"content 2";
863 let address1 = LmdbStorage::compute_address(content1);
864 let address2 = LmdbStorage::compute_address(content2);
865
866 storage.put(&address1, content1).await.expect("put 1");
868 storage.put(&address2, content2).await.expect("put 2");
869
870 storage.get(&address1).await.expect("get");
872
873 let stats = storage.stats();
874 assert_eq!(stats.chunks_stored, 2);
875 assert_eq!(stats.chunks_retrieved, 1);
876 assert_eq!(
877 stats.bytes_stored,
878 content1.len() as u64 + content2.len() as u64
879 );
880 assert_eq!(stats.bytes_retrieved, content1.len() as u64);
881 assert_eq!(stats.current_chunks, 2);
882 }
883
884 #[tokio::test]
885 async fn test_persistence_across_reopen() {
886 let temp_dir = TempDir::new().expect("create temp dir");
887 let content = b"persistent data";
888 let address = LmdbStorage::compute_address(content);
889
890 {
892 let config = LmdbStorageConfig {
893 root_dir: temp_dir.path().to_path_buf(),
894 ..LmdbStorageConfig::test_default()
895 };
896 let storage = LmdbStorage::new(config).await.expect("create storage");
897 storage.put(&address, content).await.expect("put");
898 }
899
900 {
902 let config = LmdbStorageConfig {
903 root_dir: temp_dir.path().to_path_buf(),
904 ..LmdbStorageConfig::test_default()
905 };
906 let storage = LmdbStorage::new(config).await.expect("reopen storage");
907 assert_eq!(storage.current_chunks().expect("current_chunks"), 1);
908 let retrieved = storage.get(&address).await.expect("get");
909 assert_eq!(retrieved, Some(content.to_vec()));
910 }
911 }
912
913 #[tokio::test]
914 async fn test_all_keys() {
915 let (storage, _temp) = create_test_storage().await;
916
917 let keys = storage.all_keys().await.expect("all_keys empty");
919 assert!(keys.is_empty());
920
921 let content1 = b"chunk one for keys";
923 let content2 = b"chunk two for keys";
924 let addr1 = LmdbStorage::compute_address(content1);
925 let addr2 = LmdbStorage::compute_address(content2);
926 storage.put(&addr1, content1).await.expect("put 1");
927 storage.put(&addr2, content2).await.expect("put 2");
928
929 let mut keys = storage.all_keys().await.expect("all_keys");
930 keys.sort_unstable();
931 let mut expected = vec![addr1, addr2];
932 expected.sort_unstable();
933 assert_eq!(keys, expected);
934 }
935
936 #[tokio::test]
937 async fn test_get_raw() {
938 let (storage, _temp) = create_test_storage().await;
939
940 let content = b"raw test data";
941 let address = LmdbStorage::compute_address(content);
942 storage.put(&address, content).await.expect("put");
943
944 let raw = storage.get_raw(&address).await.expect("get_raw");
946 assert_eq!(raw, Some(content.to_vec()));
947
948 let missing = storage.get_raw(&[0xFF; 32]).await.expect("get_raw missing");
950 assert!(missing.is_none());
951 }
952}