1use crate::ant_protocol::XorName;
11use crate::error::{Error, Result};
12use heed::types::Bytes;
13use heed::{Database, Env, EnvOpenOptions};
14use std::path::{Path, PathBuf};
15use tokio::task::spawn_blocking;
16use tracing::{debug, trace, warn};
17
18const DEFAULT_MAX_MAP_SIZE: usize = 32 * 1_073_741_824; #[derive(Debug, Clone)]
25pub struct LmdbStorageConfig {
26 pub root_dir: PathBuf,
28 pub verify_on_read: bool,
30 pub max_chunks: usize,
32 pub max_map_size: usize,
34}
35
36impl Default for LmdbStorageConfig {
37 fn default() -> Self {
38 Self {
39 root_dir: PathBuf::from(".saorsa/chunks"),
40 verify_on_read: true,
41 max_chunks: 0,
42 max_map_size: 0,
43 }
44 }
45}
46
47#[derive(Debug, Clone, Default)]
49pub struct StorageStats {
50 pub chunks_stored: u64,
52 pub chunks_retrieved: u64,
54 pub bytes_stored: u64,
56 pub bytes_retrieved: u64,
58 pub duplicates: u64,
60 pub verification_failures: u64,
62 pub current_chunks: u64,
64}
65
66pub struct LmdbStorage {
71 env: Env,
73 db: Database<Bytes, Bytes>,
75 config: LmdbStorageConfig,
77 stats: parking_lot::RwLock<StorageStats>,
79}
80
81impl LmdbStorage {
82 #[allow(unsafe_code)]
90 pub async fn new(config: LmdbStorageConfig) -> Result<Self> {
91 let env_dir = config.root_dir.join("chunks.mdb");
92
93 std::fs::create_dir_all(&env_dir)
95 .map_err(|e| Error::Storage(format!("Failed to create LMDB directory: {e}")))?;
96
97 let map_size = if config.max_map_size > 0 {
98 config.max_map_size
99 } else {
100 DEFAULT_MAX_MAP_SIZE
101 };
102
103 let env_dir_clone = env_dir.clone();
104 let (env, db) = spawn_blocking(move || -> Result<(Env, Database<Bytes, Bytes>)> {
105 let env = unsafe {
113 EnvOpenOptions::new()
114 .map_size(map_size)
115 .max_dbs(1)
116 .open(&env_dir_clone)
117 .map_err(|e| Error::Storage(format!("Failed to open LMDB env: {e}")))?
118 };
119
120 let mut wtxn = env
121 .write_txn()
122 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
123 let db: Database<Bytes, Bytes> = env
124 .create_database(&mut wtxn, None)
125 .map_err(|e| Error::Storage(format!("Failed to create database: {e}")))?;
126 wtxn.commit()
127 .map_err(|e| Error::Storage(format!("Failed to commit db creation: {e}")))?;
128
129 Ok((env, db))
130 })
131 .await
132 .map_err(|e| Error::Storage(format!("LMDB init task failed: {e}")))??;
133
134 let storage = Self {
135 env,
136 db,
137 config,
138 stats: parking_lot::RwLock::new(StorageStats::default()),
139 };
140
141 debug!(
142 "Initialized LMDB storage at {:?} ({} existing chunks)",
143 env_dir,
144 storage.current_chunks()?
145 );
146
147 Ok(storage)
148 }
149
150 pub async fn put(&self, address: &XorName, content: &[u8]) -> Result<bool> {
165 let computed = Self::compute_address(content);
167 if computed != *address {
168 return Err(Error::Storage(format!(
169 "Content address mismatch: expected {}, computed {}",
170 hex::encode(address),
171 hex::encode(computed)
172 )));
173 }
174
175 if self.exists(address)? {
179 trace!("Chunk {} already exists", hex::encode(address));
180 self.stats.write().duplicates += 1;
181 return Ok(false);
182 }
183
184 let key = *address;
185 let value = content.to_vec();
186 let env = self.env.clone();
187 let db = self.db;
188 let max_chunks = self.config.max_chunks;
189
190 let was_new = spawn_blocking(move || -> Result<bool> {
194 let mut wtxn = env
195 .write_txn()
196 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
197
198 if db
200 .get(&wtxn, &key)
201 .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
202 .is_some()
203 {
204 return Ok(false);
205 }
206
207 if max_chunks > 0 {
209 let current = db
210 .stat(&wtxn)
211 .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
212 .entries;
213 if current >= max_chunks {
214 return Err(Error::Storage(format!(
215 "Storage capacity reached: {current} chunks stored, max is {max_chunks}"
216 )));
217 }
218 }
219
220 db.put(&mut wtxn, &key, &value)
221 .map_err(|e| Error::Storage(format!("Failed to put chunk: {e}")))?;
222 wtxn.commit()
223 .map_err(|e| Error::Storage(format!("Failed to commit put: {e}")))?;
224 Ok(true)
225 })
226 .await
227 .map_err(|e| Error::Storage(format!("LMDB put task failed: {e}")))??;
228
229 if !was_new {
230 trace!("Chunk {} already exists", hex::encode(address));
231 self.stats.write().duplicates += 1;
232 return Ok(false);
233 }
234
235 {
236 let mut stats = self.stats.write();
237 stats.chunks_stored += 1;
238 stats.bytes_stored += content.len() as u64;
239 }
240
241 debug!(
242 "Stored chunk {} ({} bytes)",
243 hex::encode(address),
244 content.len()
245 );
246
247 Ok(true)
248 }
249
250 pub async fn put_raw(&self, address: &XorName, data: &[u8]) -> Result<bool> {
258 if self.exists(address)? {
260 trace!("Record {} already exists", hex::encode(address));
261 self.stats.write().duplicates += 1;
262 return Ok(false);
263 }
264
265 let key = *address;
266 let value = data.to_vec();
267 let env = self.env.clone();
268 let db = self.db;
269 let max_chunks = self.config.max_chunks;
270
271 let was_new = spawn_blocking(move || -> Result<bool> {
272 let mut wtxn = env
273 .write_txn()
274 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
275
276 if db
277 .get(&wtxn, &key)
278 .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
279 .is_some()
280 {
281 return Ok(false);
282 }
283
284 if max_chunks > 0 {
285 let current = db
286 .stat(&wtxn)
287 .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
288 .entries;
289 if current >= max_chunks {
290 return Err(Error::Storage(format!(
291 "Storage capacity reached: {current} stored, max is {max_chunks}"
292 )));
293 }
294 }
295
296 db.put(&mut wtxn, &key, &value)
297 .map_err(|e| Error::Storage(format!("Failed to put record: {e}")))?;
298 wtxn.commit()
299 .map_err(|e| Error::Storage(format!("Failed to commit put: {e}")))?;
300 Ok(true)
301 })
302 .await
303 .map_err(|e| Error::Storage(format!("LMDB put_raw task failed: {e}")))??;
304
305 if !was_new {
306 self.stats.write().duplicates += 1;
308 return Ok(false);
309 }
310
311 {
312 let mut stats = self.stats.write();
313 stats.chunks_stored += 1;
314 stats.bytes_stored += data.len() as u64;
315 }
316
317 debug!(
318 "Stored record {} ({} bytes)",
319 hex::encode(address),
320 data.len()
321 );
322
323 Ok(true)
324 }
325
326 pub async fn put_overwrite(&self, address: &XorName, data: &[u8]) -> Result<()> {
336 let key = *address;
337 let value = data.to_vec();
338 let env = self.env.clone();
339 let db = self.db;
340
341 let old_len: Option<usize> = spawn_blocking(move || -> Result<Option<usize>> {
343 let mut wtxn = env
344 .write_txn()
345 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
346
347 let prev_len = db
348 .get(&wtxn, &key)
349 .map_err(|e| Error::Storage(format!("Failed to read old record: {e}")))?
350 .map(<[u8]>::len);
351
352 db.put(&mut wtxn, &key, &value)
353 .map_err(|e| Error::Storage(format!("Failed to put record: {e}")))?;
354 wtxn.commit()
355 .map_err(|e| Error::Storage(format!("Failed to commit put: {e}")))?;
356 Ok(prev_len)
357 })
358 .await
359 .map_err(|e| Error::Storage(format!("LMDB put_overwrite task failed: {e}")))??;
360
361 {
362 let mut stats = self.stats.write();
363 if let Some(prev) = old_len {
364 stats.bytes_stored = stats
366 .bytes_stored
367 .saturating_sub(prev as u64)
368 .saturating_add(data.len() as u64);
369 } else {
370 stats.chunks_stored += 1;
372 stats.bytes_stored += data.len() as u64;
373 }
374 }
375
376 debug!(
377 "Overwritten record {} ({} bytes)",
378 hex::encode(address),
379 data.len()
380 );
381
382 Ok(())
383 }
384
385 pub async fn get(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
399 let key = *address;
400 let env = self.env.clone();
401 let db = self.db;
402
403 let content = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
404 let rtxn = env
405 .read_txn()
406 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
407 let value = db
408 .get(&rtxn, &key)
409 .map_err(|e| Error::Storage(format!("Failed to get chunk: {e}")))?;
410 Ok(value.map(Vec::from))
411 })
412 .await
413 .map_err(|e| Error::Storage(format!("LMDB get task failed: {e}")))??;
414
415 let Some(content) = content else {
416 trace!("Chunk {} not found", hex::encode(address));
417 return Ok(None);
418 };
419
420 if self.config.verify_on_read {
422 let computed = Self::compute_address(&content);
423 if computed != *address {
424 self.stats.write().verification_failures += 1;
425 warn!(
426 "Chunk verification failed: expected {}, computed {}",
427 hex::encode(address),
428 hex::encode(computed)
429 );
430 return Err(Error::Storage(format!(
431 "Chunk verification failed for {}",
432 hex::encode(address)
433 )));
434 }
435 }
436
437 {
438 let mut stats = self.stats.write();
439 stats.chunks_retrieved += 1;
440 stats.bytes_retrieved += content.len() as u64;
441 }
442
443 debug!(
444 "Retrieved chunk {} ({} bytes)",
445 hex::encode(address),
446 content.len()
447 );
448
449 Ok(Some(content))
450 }
451
452 pub async fn get_raw(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
461 let key = *address;
462 let env = self.env.clone();
463 let db = self.db;
464
465 let content = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
466 let rtxn = env
467 .read_txn()
468 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
469 let value = db
470 .get(&rtxn, &key)
471 .map_err(|e| Error::Storage(format!("Failed to get record: {e}")))?;
472 Ok(value.map(Vec::from))
473 })
474 .await
475 .map_err(|e| Error::Storage(format!("LMDB get_raw task failed: {e}")))??;
476
477 let Some(content) = content else {
478 trace!("Record {} not found", hex::encode(address));
479 return Ok(None);
480 };
481
482 {
483 let mut stats = self.stats.write();
484 stats.chunks_retrieved += 1;
485 stats.bytes_retrieved += content.len() as u64;
486 }
487
488 debug!(
489 "Retrieved record {} ({} bytes)",
490 hex::encode(address),
491 content.len()
492 );
493
494 Ok(Some(content))
495 }
496
497 pub fn exists(&self, address: &XorName) -> Result<bool> {
503 let rtxn = self
504 .env
505 .read_txn()
506 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
507 let found = self
508 .db
509 .get(&rtxn, address.as_ref())
510 .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
511 .is_some();
512 Ok(found)
513 }
514
515 pub async fn delete(&self, address: &XorName) -> Result<bool> {
521 let key = *address;
522 let env = self.env.clone();
523 let db = self.db;
524
525 let deleted = spawn_blocking(move || -> Result<bool> {
526 let mut wtxn = env
527 .write_txn()
528 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
529 let existed = db
530 .delete(&mut wtxn, &key)
531 .map_err(|e| Error::Storage(format!("Failed to delete chunk: {e}")))?;
532 wtxn.commit()
533 .map_err(|e| Error::Storage(format!("Failed to commit delete: {e}")))?;
534 Ok(existed)
535 })
536 .await
537 .map_err(|e| Error::Storage(format!("LMDB delete task failed: {e}")))??;
538
539 if deleted {
540 debug!("Deleted chunk {}", hex::encode(address));
541 }
542
543 Ok(deleted)
544 }
545
546 #[must_use]
548 pub fn stats(&self) -> StorageStats {
549 let mut stats = self.stats.read().clone();
550 match self.current_chunks() {
551 Ok(count) => stats.current_chunks = count,
552 Err(e) => {
553 warn!("Failed to read current_chunks for stats: {e}");
554 stats.current_chunks = 0;
555 }
556 }
557 stats
558 }
559
560 pub fn current_chunks(&self) -> Result<u64> {
568 let rtxn = self
569 .env
570 .read_txn()
571 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
572 let entries = self
573 .db
574 .stat(&rtxn)
575 .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
576 .entries;
577 Ok(entries as u64)
578 }
579
580 #[must_use]
582 pub fn compute_address(content: &[u8]) -> XorName {
583 crate::client::compute_address(content)
584 }
585
586 #[must_use]
588 pub fn root_dir(&self) -> &Path {
589 &self.config.root_dir
590 }
591}
592
593#[cfg(test)]
594#[allow(clippy::unwrap_used, clippy::expect_used)]
595mod tests {
596 use super::*;
597 use tempfile::TempDir;
598
599 async fn create_test_storage() -> (LmdbStorage, TempDir) {
600 let temp_dir = TempDir::new().expect("create temp dir");
601 let config = LmdbStorageConfig {
602 root_dir: temp_dir.path().to_path_buf(),
603 verify_on_read: true,
604 max_chunks: 0,
605 max_map_size: 0,
606 };
607 let storage = LmdbStorage::new(config).await.expect("create storage");
608 (storage, temp_dir)
609 }
610
611 #[tokio::test]
612 async fn test_put_and_get() {
613 let (storage, _temp) = create_test_storage().await;
614
615 let content = b"hello world";
616 let address = LmdbStorage::compute_address(content);
617
618 let is_new = storage.put(&address, content).await.expect("put");
620 assert!(is_new);
621
622 let retrieved = storage.get(&address).await.expect("get");
624 assert_eq!(retrieved, Some(content.to_vec()));
625 }
626
627 #[tokio::test]
628 async fn test_put_duplicate() {
629 let (storage, _temp) = create_test_storage().await;
630
631 let content = b"test data";
632 let address = LmdbStorage::compute_address(content);
633
634 let is_new1 = storage.put(&address, content).await.expect("put 1");
636 assert!(is_new1);
637
638 let is_new2 = storage.put(&address, content).await.expect("put 2");
640 assert!(!is_new2);
641
642 let stats = storage.stats();
644 assert_eq!(stats.chunks_stored, 1);
645 assert_eq!(stats.duplicates, 1);
646 }
647
648 #[tokio::test]
649 async fn test_get_not_found() {
650 let (storage, _temp) = create_test_storage().await;
651
652 let address = [0xAB; 32];
653 let result = storage.get(&address).await.expect("get");
654 assert!(result.is_none());
655 }
656
657 #[tokio::test]
658 async fn test_exists() {
659 let (storage, _temp) = create_test_storage().await;
660
661 let content = b"exists test";
662 let address = LmdbStorage::compute_address(content);
663
664 assert!(!storage.exists(&address).expect("exists"));
665
666 storage.put(&address, content).await.expect("put");
667
668 assert!(storage.exists(&address).expect("exists"));
669 }
670
671 #[tokio::test]
672 async fn test_delete() {
673 let (storage, _temp) = create_test_storage().await;
674
675 let content = b"delete test";
676 let address = LmdbStorage::compute_address(content);
677
678 storage.put(&address, content).await.expect("put");
680 assert!(storage.exists(&address).expect("exists"));
681
682 let deleted = storage.delete(&address).await.expect("delete");
684 assert!(deleted);
685 assert!(!storage.exists(&address).expect("exists"));
686
687 let deleted2 = storage.delete(&address).await.expect("delete 2");
689 assert!(!deleted2);
690 }
691
692 #[tokio::test]
693 async fn test_max_chunks_enforced() {
694 let temp_dir = TempDir::new().expect("create temp dir");
695 let config = LmdbStorageConfig {
696 root_dir: temp_dir.path().to_path_buf(),
697 verify_on_read: true,
698 max_chunks: 2,
699 max_map_size: 0,
700 };
701 let storage = LmdbStorage::new(config).await.expect("create storage");
702
703 let content1 = b"chunk one";
704 let content2 = b"chunk two";
705 let content3 = b"chunk three";
706 let addr1 = LmdbStorage::compute_address(content1);
707 let addr2 = LmdbStorage::compute_address(content2);
708 let addr3 = LmdbStorage::compute_address(content3);
709
710 assert!(storage.put(&addr1, content1).await.is_ok());
712 assert!(storage.put(&addr2, content2).await.is_ok());
713
714 let result = storage.put(&addr3, content3).await;
716 assert!(result.is_err());
717 assert!(result.unwrap_err().to_string().contains("capacity reached"));
718 }
719
720 #[tokio::test]
721 async fn test_address_mismatch() {
722 let (storage, _temp) = create_test_storage().await;
723
724 let content = b"some content";
725 let wrong_address = [0xFF; 32]; let result = storage.put(&wrong_address, content).await;
728 assert!(result.is_err());
729 assert!(result.unwrap_err().to_string().contains("mismatch"));
730 }
731
732 #[test]
733 fn test_compute_address() {
734 let content = b"hello world";
736 let address = LmdbStorage::compute_address(content);
737
738 let expected_hex = "d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24";
739 assert_eq!(hex::encode(address), expected_hex);
740 }
741
742 #[tokio::test]
743 async fn test_stats() {
744 let (storage, _temp) = create_test_storage().await;
745
746 let content1 = b"content 1";
747 let content2 = b"content 2";
748 let address1 = LmdbStorage::compute_address(content1);
749 let address2 = LmdbStorage::compute_address(content2);
750
751 storage.put(&address1, content1).await.expect("put 1");
753 storage.put(&address2, content2).await.expect("put 2");
754
755 storage.get(&address1).await.expect("get");
757
758 let stats = storage.stats();
759 assert_eq!(stats.chunks_stored, 2);
760 assert_eq!(stats.chunks_retrieved, 1);
761 assert_eq!(
762 stats.bytes_stored,
763 content1.len() as u64 + content2.len() as u64
764 );
765 assert_eq!(stats.bytes_retrieved, content1.len() as u64);
766 assert_eq!(stats.current_chunks, 2);
767 }
768
769 #[tokio::test]
770 async fn test_capacity_recovers_after_delete() {
771 let temp_dir = TempDir::new().expect("create temp dir");
772 let config = LmdbStorageConfig {
773 root_dir: temp_dir.path().to_path_buf(),
774 verify_on_read: true,
775 max_chunks: 1,
776 max_map_size: 0,
777 };
778 let storage = LmdbStorage::new(config).await.expect("create storage");
779
780 let first = b"first chunk";
781 let second = b"second chunk";
782 let addr1 = LmdbStorage::compute_address(first);
783 let addr2 = LmdbStorage::compute_address(second);
784
785 storage.put(&addr1, first).await.expect("put first");
786 storage.delete(&addr1).await.expect("delete first");
787
788 storage.put(&addr2, second).await.expect("put second");
790
791 let stats = storage.stats();
792 assert_eq!(stats.current_chunks, 1);
793 }
794
795 #[tokio::test]
796 async fn test_persistence_across_reopen() {
797 let temp_dir = TempDir::new().expect("create temp dir");
798 let content = b"persistent data";
799 let address = LmdbStorage::compute_address(content);
800
801 {
803 let config = LmdbStorageConfig {
804 root_dir: temp_dir.path().to_path_buf(),
805 verify_on_read: true,
806 max_chunks: 0,
807 max_map_size: 0,
808 };
809 let storage = LmdbStorage::new(config).await.expect("create storage");
810 storage.put(&address, content).await.expect("put");
811 }
812
813 {
815 let config = LmdbStorageConfig {
816 root_dir: temp_dir.path().to_path_buf(),
817 verify_on_read: true,
818 max_chunks: 0,
819 max_map_size: 0,
820 };
821 let storage = LmdbStorage::new(config).await.expect("reopen storage");
822 assert_eq!(storage.current_chunks().expect("current_chunks"), 1);
823 let retrieved = storage.get(&address).await.expect("get");
824 assert_eq!(retrieved, Some(content.to_vec()));
825 }
826 }
827
828 #[tokio::test]
829 async fn test_put_raw_and_get_raw() {
830 let (storage, _temp) = create_test_storage().await;
831
832 let address = [0xAA; 32];
834 let data = b"raw record data";
835
836 let was_new = storage.put_raw(&address, data).await.expect("put_raw");
837 assert!(was_new, "first put_raw should return true");
838
839 let retrieved = storage.get_raw(&address).await.expect("get_raw");
841 assert_eq!(retrieved, Some(data.to_vec()));
842
843 let was_new2 = storage.put_raw(&address, data).await.expect("put_raw dup");
845 assert!(!was_new2, "duplicate put_raw should return false");
846
847 let stats = storage.stats();
849 assert_eq!(stats.chunks_stored, 1);
850 assert_eq!(stats.duplicates, 1);
851 }
852
853 #[tokio::test]
854 async fn test_get_raw_not_found() {
855 let (storage, _temp) = create_test_storage().await;
856
857 let result = storage.get_raw(&[0xBB; 32]).await.expect("get_raw");
858 assert!(result.is_none());
859 }
860
861 #[tokio::test]
862 async fn test_put_overwrite_new_record() {
863 let (storage, _temp) = create_test_storage().await;
864
865 let address = [0xCC; 32];
866 let data = b"new overwrite data";
867
868 storage
869 .put_overwrite(&address, data)
870 .await
871 .expect("put_overwrite");
872
873 let retrieved = storage.get_raw(&address).await.expect("get_raw");
874 assert_eq!(retrieved, Some(data.to_vec()));
875
876 let stats = storage.stats();
877 assert_eq!(stats.chunks_stored, 1);
878 assert_eq!(stats.bytes_stored, data.len() as u64);
879 }
880
881 #[tokio::test]
882 async fn test_put_overwrite_updates_stats_correctly() {
883 let (storage, _temp) = create_test_storage().await;
884
885 let address = [0xDD; 32];
886 let v1 = b"short";
887 let v2 = b"much longer replacement value";
888
889 storage
891 .put_overwrite(&address, v1)
892 .await
893 .expect("put_overwrite v1");
894 let stats1 = storage.stats();
895 assert_eq!(stats1.chunks_stored, 1);
896 assert_eq!(stats1.bytes_stored, v1.len() as u64);
897
898 storage
900 .put_overwrite(&address, v2)
901 .await
902 .expect("put_overwrite v2");
903 let stats2 = storage.stats();
904 assert_eq!(
905 stats2.chunks_stored, 1,
906 "overwrite should not increment chunk count"
907 );
908 assert_eq!(
909 stats2.bytes_stored,
910 v2.len() as u64,
911 "bytes should reflect new value size"
912 );
913
914 let retrieved = storage.get_raw(&address).await.expect("get_raw");
916 assert_eq!(retrieved, Some(v2.to_vec()));
917 }
918}