1use crate::ant_protocol::XorName;
11use crate::error::{Error, Result};
12use heed::types::Bytes;
13use heed::{Database, Env, EnvOpenOptions};
14use std::path::{Path, PathBuf};
15use tokio::task::spawn_blocking;
16use tracing::{debug, trace, warn};
17
18const DEFAULT_MAX_MAP_SIZE: usize = 32 * 1_073_741_824; #[derive(Debug, Clone)]
25pub struct LmdbStorageConfig {
26 pub root_dir: PathBuf,
28 pub verify_on_read: bool,
30 pub max_chunks: usize,
32 pub max_map_size: usize,
34}
35
36impl Default for LmdbStorageConfig {
37 fn default() -> Self {
38 Self {
39 root_dir: PathBuf::from(".ant/chunks"),
40 verify_on_read: true,
41 max_chunks: 0,
42 max_map_size: 0,
43 }
44 }
45}
46
47#[derive(Debug, Clone, Default)]
49pub struct StorageStats {
50 pub chunks_stored: u64,
52 pub chunks_retrieved: u64,
54 pub bytes_stored: u64,
56 pub bytes_retrieved: u64,
58 pub duplicates: u64,
60 pub verification_failures: u64,
62 pub current_chunks: u64,
64}
65
66pub struct LmdbStorage {
71 env: Env,
73 db: Database<Bytes, Bytes>,
75 config: LmdbStorageConfig,
77 stats: parking_lot::RwLock<StorageStats>,
79}
80
81impl LmdbStorage {
82 #[allow(unsafe_code)]
90 pub async fn new(config: LmdbStorageConfig) -> Result<Self> {
91 let env_dir = config.root_dir.join("chunks.mdb");
92
93 std::fs::create_dir_all(&env_dir)
95 .map_err(|e| Error::Storage(format!("Failed to create LMDB directory: {e}")))?;
96
97 let map_size = if config.max_map_size > 0 {
98 config.max_map_size
99 } else {
100 DEFAULT_MAX_MAP_SIZE
101 };
102
103 let env_dir_clone = env_dir.clone();
104 let (env, db) = spawn_blocking(move || -> Result<(Env, Database<Bytes, Bytes>)> {
105 let env = unsafe {
113 EnvOpenOptions::new()
114 .map_size(map_size)
115 .max_dbs(1)
116 .open(&env_dir_clone)
117 .map_err(|e| Error::Storage(format!("Failed to open LMDB env: {e}")))?
118 };
119
120 let mut wtxn = env
121 .write_txn()
122 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
123 let db: Database<Bytes, Bytes> = env
124 .create_database(&mut wtxn, None)
125 .map_err(|e| Error::Storage(format!("Failed to create database: {e}")))?;
126 wtxn.commit()
127 .map_err(|e| Error::Storage(format!("Failed to commit db creation: {e}")))?;
128
129 Ok((env, db))
130 })
131 .await
132 .map_err(|e| Error::Storage(format!("LMDB init task failed: {e}")))??;
133
134 let storage = Self {
135 env,
136 db,
137 config,
138 stats: parking_lot::RwLock::new(StorageStats::default()),
139 };
140
141 debug!(
142 "Initialized LMDB storage at {:?} ({} existing chunks)",
143 env_dir,
144 storage.current_chunks()?
145 );
146
147 Ok(storage)
148 }
149
150 pub async fn put(&self, address: &XorName, content: &[u8]) -> Result<bool> {
165 let computed = Self::compute_address(content);
167 if computed != *address {
168 return Err(Error::Storage(format!(
169 "Content address mismatch: expected {}, computed {}",
170 hex::encode(address),
171 hex::encode(computed)
172 )));
173 }
174
175 if self.exists(address)? {
179 trace!("Chunk {} already exists", hex::encode(address));
180 self.stats.write().duplicates += 1;
181 return Ok(false);
182 }
183
184 let key = *address;
185 let value = content.to_vec();
186 let env = self.env.clone();
187 let db = self.db;
188 let max_chunks = self.config.max_chunks;
189
190 let was_new = spawn_blocking(move || -> Result<bool> {
194 let mut wtxn = env
195 .write_txn()
196 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
197
198 if db
200 .get(&wtxn, &key)
201 .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
202 .is_some()
203 {
204 return Ok(false);
205 }
206
207 if max_chunks > 0 {
209 let current = db
210 .stat(&wtxn)
211 .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
212 .entries;
213 if current >= max_chunks {
214 return Err(Error::Storage(format!(
215 "Storage capacity reached: {current} chunks stored, max is {max_chunks}"
216 )));
217 }
218 }
219
220 db.put(&mut wtxn, &key, &value)
221 .map_err(|e| Error::Storage(format!("Failed to put chunk: {e}")))?;
222 wtxn.commit()
223 .map_err(|e| Error::Storage(format!("Failed to commit put: {e}")))?;
224 Ok(true)
225 })
226 .await
227 .map_err(|e| Error::Storage(format!("LMDB put task failed: {e}")))??;
228
229 if !was_new {
230 trace!("Chunk {} already exists", hex::encode(address));
231 self.stats.write().duplicates += 1;
232 return Ok(false);
233 }
234
235 {
236 let mut stats = self.stats.write();
237 stats.chunks_stored += 1;
238 stats.bytes_stored += content.len() as u64;
239 }
240
241 debug!(
242 "Stored chunk {} ({} bytes)",
243 hex::encode(address),
244 content.len()
245 );
246
247 Ok(true)
248 }
249
250 pub async fn get(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
264 let key = *address;
265 let env = self.env.clone();
266 let db = self.db;
267
268 let content = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
269 let rtxn = env
270 .read_txn()
271 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
272 let value = db
273 .get(&rtxn, &key)
274 .map_err(|e| Error::Storage(format!("Failed to get chunk: {e}")))?;
275 Ok(value.map(Vec::from))
276 })
277 .await
278 .map_err(|e| Error::Storage(format!("LMDB get task failed: {e}")))??;
279
280 let Some(content) = content else {
281 trace!("Chunk {} not found", hex::encode(address));
282 return Ok(None);
283 };
284
285 if self.config.verify_on_read {
287 let computed = Self::compute_address(&content);
288 if computed != *address {
289 self.stats.write().verification_failures += 1;
290 warn!(
291 "Chunk verification failed: expected {}, computed {}",
292 hex::encode(address),
293 hex::encode(computed)
294 );
295 return Err(Error::Storage(format!(
296 "Chunk verification failed for {}",
297 hex::encode(address)
298 )));
299 }
300 }
301
302 {
303 let mut stats = self.stats.write();
304 stats.chunks_retrieved += 1;
305 stats.bytes_retrieved += content.len() as u64;
306 }
307
308 debug!(
309 "Retrieved chunk {} ({} bytes)",
310 hex::encode(address),
311 content.len()
312 );
313
314 Ok(Some(content))
315 }
316
317 pub fn exists(&self, address: &XorName) -> Result<bool> {
323 let rtxn = self
324 .env
325 .read_txn()
326 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
327 let found = self
328 .db
329 .get(&rtxn, address.as_ref())
330 .map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
331 .is_some();
332 Ok(found)
333 }
334
335 pub async fn delete(&self, address: &XorName) -> Result<bool> {
341 let key = *address;
342 let env = self.env.clone();
343 let db = self.db;
344
345 let deleted = spawn_blocking(move || -> Result<bool> {
346 let mut wtxn = env
347 .write_txn()
348 .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
349 let existed = db
350 .delete(&mut wtxn, &key)
351 .map_err(|e| Error::Storage(format!("Failed to delete chunk: {e}")))?;
352 wtxn.commit()
353 .map_err(|e| Error::Storage(format!("Failed to commit delete: {e}")))?;
354 Ok(existed)
355 })
356 .await
357 .map_err(|e| Error::Storage(format!("LMDB delete task failed: {e}")))??;
358
359 if deleted {
360 debug!("Deleted chunk {}", hex::encode(address));
361 }
362
363 Ok(deleted)
364 }
365
366 #[must_use]
368 pub fn stats(&self) -> StorageStats {
369 let mut stats = self.stats.read().clone();
370 match self.current_chunks() {
371 Ok(count) => stats.current_chunks = count,
372 Err(e) => {
373 warn!("Failed to read current_chunks for stats: {e}");
374 stats.current_chunks = 0;
375 }
376 }
377 stats
378 }
379
380 pub fn current_chunks(&self) -> Result<u64> {
388 let rtxn = self
389 .env
390 .read_txn()
391 .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
392 let entries = self
393 .db
394 .stat(&rtxn)
395 .map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
396 .entries;
397 Ok(entries as u64)
398 }
399
400 #[must_use]
402 pub fn compute_address(content: &[u8]) -> XorName {
403 crate::client::compute_address(content)
404 }
405
406 #[must_use]
408 pub fn root_dir(&self) -> &Path {
409 &self.config.root_dir
410 }
411}
412
413#[cfg(test)]
414#[allow(clippy::unwrap_used, clippy::expect_used)]
415mod tests {
416 use super::*;
417 use tempfile::TempDir;
418
419 async fn create_test_storage() -> (LmdbStorage, TempDir) {
420 let temp_dir = TempDir::new().expect("create temp dir");
421 let config = LmdbStorageConfig {
422 root_dir: temp_dir.path().to_path_buf(),
423 verify_on_read: true,
424 max_chunks: 0,
425 max_map_size: 0,
426 };
427 let storage = LmdbStorage::new(config).await.expect("create storage");
428 (storage, temp_dir)
429 }
430
431 #[tokio::test]
432 async fn test_put_and_get() {
433 let (storage, _temp) = create_test_storage().await;
434
435 let content = b"hello world";
436 let address = LmdbStorage::compute_address(content);
437
438 let is_new = storage.put(&address, content).await.expect("put");
440 assert!(is_new);
441
442 let retrieved = storage.get(&address).await.expect("get");
444 assert_eq!(retrieved, Some(content.to_vec()));
445 }
446
447 #[tokio::test]
448 async fn test_put_duplicate() {
449 let (storage, _temp) = create_test_storage().await;
450
451 let content = b"test data";
452 let address = LmdbStorage::compute_address(content);
453
454 let is_new1 = storage.put(&address, content).await.expect("put 1");
456 assert!(is_new1);
457
458 let is_new2 = storage.put(&address, content).await.expect("put 2");
460 assert!(!is_new2);
461
462 let stats = storage.stats();
464 assert_eq!(stats.chunks_stored, 1);
465 assert_eq!(stats.duplicates, 1);
466 }
467
468 #[tokio::test]
469 async fn test_get_not_found() {
470 let (storage, _temp) = create_test_storage().await;
471
472 let address = [0xAB; 32];
473 let result = storage.get(&address).await.expect("get");
474 assert!(result.is_none());
475 }
476
477 #[tokio::test]
478 async fn test_exists() {
479 let (storage, _temp) = create_test_storage().await;
480
481 let content = b"exists test";
482 let address = LmdbStorage::compute_address(content);
483
484 assert!(!storage.exists(&address).expect("exists"));
485
486 storage.put(&address, content).await.expect("put");
487
488 assert!(storage.exists(&address).expect("exists"));
489 }
490
491 #[tokio::test]
492 async fn test_delete() {
493 let (storage, _temp) = create_test_storage().await;
494
495 let content = b"delete test";
496 let address = LmdbStorage::compute_address(content);
497
498 storage.put(&address, content).await.expect("put");
500 assert!(storage.exists(&address).expect("exists"));
501
502 let deleted = storage.delete(&address).await.expect("delete");
504 assert!(deleted);
505 assert!(!storage.exists(&address).expect("exists"));
506
507 let deleted2 = storage.delete(&address).await.expect("delete 2");
509 assert!(!deleted2);
510 }
511
512 #[tokio::test]
513 async fn test_max_chunks_enforced() {
514 let temp_dir = TempDir::new().expect("create temp dir");
515 let config = LmdbStorageConfig {
516 root_dir: temp_dir.path().to_path_buf(),
517 verify_on_read: true,
518 max_chunks: 2,
519 max_map_size: 0,
520 };
521 let storage = LmdbStorage::new(config).await.expect("create storage");
522
523 let content1 = b"chunk one";
524 let content2 = b"chunk two";
525 let content3 = b"chunk three";
526 let addr1 = LmdbStorage::compute_address(content1);
527 let addr2 = LmdbStorage::compute_address(content2);
528 let addr3 = LmdbStorage::compute_address(content3);
529
530 assert!(storage.put(&addr1, content1).await.is_ok());
532 assert!(storage.put(&addr2, content2).await.is_ok());
533
534 let result = storage.put(&addr3, content3).await;
536 assert!(result.is_err());
537 assert!(result.unwrap_err().to_string().contains("capacity reached"));
538 }
539
540 #[tokio::test]
541 async fn test_address_mismatch() {
542 let (storage, _temp) = create_test_storage().await;
543
544 let content = b"some content";
545 let wrong_address = [0xFF; 32]; let result = storage.put(&wrong_address, content).await;
548 assert!(result.is_err());
549 assert!(result.unwrap_err().to_string().contains("mismatch"));
550 }
551
552 #[test]
553 fn test_compute_address() {
554 let content = b"hello world";
556 let address = LmdbStorage::compute_address(content);
557
558 let expected_hex = "d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24";
559 assert_eq!(hex::encode(address), expected_hex);
560 }
561
562 #[tokio::test]
563 async fn test_stats() {
564 let (storage, _temp) = create_test_storage().await;
565
566 let content1 = b"content 1";
567 let content2 = b"content 2";
568 let address1 = LmdbStorage::compute_address(content1);
569 let address2 = LmdbStorage::compute_address(content2);
570
571 storage.put(&address1, content1).await.expect("put 1");
573 storage.put(&address2, content2).await.expect("put 2");
574
575 storage.get(&address1).await.expect("get");
577
578 let stats = storage.stats();
579 assert_eq!(stats.chunks_stored, 2);
580 assert_eq!(stats.chunks_retrieved, 1);
581 assert_eq!(
582 stats.bytes_stored,
583 content1.len() as u64 + content2.len() as u64
584 );
585 assert_eq!(stats.bytes_retrieved, content1.len() as u64);
586 assert_eq!(stats.current_chunks, 2);
587 }
588
589 #[tokio::test]
590 async fn test_capacity_recovers_after_delete() {
591 let temp_dir = TempDir::new().expect("create temp dir");
592 let config = LmdbStorageConfig {
593 root_dir: temp_dir.path().to_path_buf(),
594 verify_on_read: true,
595 max_chunks: 1,
596 max_map_size: 0,
597 };
598 let storage = LmdbStorage::new(config).await.expect("create storage");
599
600 let first = b"first chunk";
601 let second = b"second chunk";
602 let addr1 = LmdbStorage::compute_address(first);
603 let addr2 = LmdbStorage::compute_address(second);
604
605 storage.put(&addr1, first).await.expect("put first");
606 storage.delete(&addr1).await.expect("delete first");
607
608 storage.put(&addr2, second).await.expect("put second");
610
611 let stats = storage.stats();
612 assert_eq!(stats.current_chunks, 1);
613 }
614
615 #[tokio::test]
616 async fn test_persistence_across_reopen() {
617 let temp_dir = TempDir::new().expect("create temp dir");
618 let content = b"persistent data";
619 let address = LmdbStorage::compute_address(content);
620
621 {
623 let config = LmdbStorageConfig {
624 root_dir: temp_dir.path().to_path_buf(),
625 verify_on_read: true,
626 max_chunks: 0,
627 max_map_size: 0,
628 };
629 let storage = LmdbStorage::new(config).await.expect("create storage");
630 storage.put(&address, content).await.expect("put");
631 }
632
633 {
635 let config = LmdbStorageConfig {
636 root_dir: temp_dir.path().to_path_buf(),
637 verify_on_read: true,
638 max_chunks: 0,
639 max_map_size: 0,
640 };
641 let storage = LmdbStorage::new(config).await.expect("reopen storage");
642 assert_eq!(storage.current_chunks().expect("current_chunks"), 1);
643 let retrieved = storage.get(&address).await.expect("get");
644 assert_eq!(retrieved, Some(content.to_vec()));
645 }
646 }
647}