use crate::ant_protocol::XorName;
use crate::error::{Error, Result};
use crate::logging::{debug, info, trace, warn};
use heed::types::Bytes;
use heed::{Database, Env, EnvOpenOptions, MdbError};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use tokio::task::spawn_blocking;
use crate::ant_protocol::XORNAME_LEN;
pub const MIB: u64 = 1024 * 1024;
pub const GIB: u64 = 1024 * MIB;
const DEFAULT_DISK_RESERVE: u64 = 500 * MIB;
#[allow(clippy::cast_precision_loss)] fn bytes_to_gib(bytes: u64) -> f64 {
bytes as f64 / GIB as f64
}
const MIN_MAP_SIZE: usize = 256 * 1024 * 1024;
const DISK_CHECK_INTERVAL_SECS: u64 = 5;
#[derive(Debug, Clone)]
pub struct LmdbStorageConfig {
pub root_dir: PathBuf,
pub verify_on_read: bool,
pub max_map_size: usize,
pub disk_reserve: u64,
}
impl Default for LmdbStorageConfig {
fn default() -> Self {
Self {
root_dir: PathBuf::from(".ant/chunks"),
verify_on_read: true,
max_map_size: 0,
disk_reserve: DEFAULT_DISK_RESERVE,
}
}
}
impl LmdbStorageConfig {
#[cfg(any(test, feature = "test-utils"))]
#[must_use]
pub fn test_default() -> Self {
Self {
disk_reserve: 0,
..Self::default()
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StorageStats {
pub chunks_stored: u64,
pub chunks_retrieved: u64,
pub bytes_stored: u64,
pub bytes_retrieved: u64,
pub duplicates: u64,
pub verification_failures: u64,
pub current_chunks: u64,
}
pub struct LmdbStorage {
env: Env,
db: Database<Bytes, Bytes>,
config: LmdbStorageConfig,
env_dir: PathBuf,
stats: parking_lot::RwLock<StorageStats>,
env_lock: Arc<parking_lot::RwLock<()>>,
last_disk_ok: parking_lot::Mutex<Option<Instant>>,
}
impl LmdbStorage {
#[allow(unsafe_code)]
pub async fn new(config: LmdbStorageConfig) -> Result<Self> {
let env_dir = config.root_dir.join("chunks.mdb");
std::fs::create_dir_all(&env_dir)
.map_err(|e| Error::Storage(format!("Failed to create LMDB directory: {e}")))?;
let map_size = if config.max_map_size > 0 {
config.max_map_size
} else {
let computed = compute_map_size(&env_dir, config.disk_reserve)?;
info!(
"Auto-computed LMDB map size: {:.2} GiB (available disk minus {:.2} GiB reserve)",
bytes_to_gib(computed as u64),
bytes_to_gib(config.disk_reserve),
);
computed
};
let env_dir_clone = env_dir.clone();
let (env, db) = spawn_blocking(move || -> Result<(Env, Database<Bytes, Bytes>)> {
let env = unsafe {
EnvOpenOptions::new()
.map_size(map_size)
.max_dbs(1)
.open(&env_dir_clone)
.map_err(|e| Error::Storage(format!("Failed to open LMDB env: {e}")))?
};
let mut wtxn = env
.write_txn()
.map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
let db: Database<Bytes, Bytes> = env
.create_database(&mut wtxn, None)
.map_err(|e| Error::Storage(format!("Failed to create database: {e}")))?;
wtxn.commit()
.map_err(|e| Error::Storage(format!("Failed to commit db creation: {e}")))?;
Ok((env, db))
})
.await
.map_err(|e| Error::Storage(format!("LMDB init task failed: {e}")))??;
let storage = Self {
env,
db,
config,
env_dir,
stats: parking_lot::RwLock::new(StorageStats::default()),
env_lock: Arc::new(parking_lot::RwLock::new(())),
last_disk_ok: parking_lot::Mutex::new(None),
};
debug!(
"Initialized LMDB storage at {:?} ({} existing chunks)",
storage.env_dir,
storage.current_chunks()?
);
Ok(storage)
}
pub async fn put(&self, address: &XorName, content: &[u8]) -> Result<bool> {
let computed = Self::compute_address(content);
if computed != *address {
return Err(Error::Storage(format!(
"Content address mismatch: expected {}, computed {}",
hex::encode(address),
hex::encode(computed)
)));
}
if self.exists(address)? {
trace!("Chunk {} already exists", hex::encode(address));
self.stats.write().duplicates += 1;
return Ok(false);
}
self.check_disk_space_cached()?;
match self.try_put(address, content).await? {
PutOutcome::New => {}
PutOutcome::Duplicate => {
trace!("Chunk {} already exists", hex::encode(address));
self.stats.write().duplicates += 1;
return Ok(false);
}
PutOutcome::MapFull => {
self.try_resize().await?;
match self.try_put(address, content).await? {
PutOutcome::New => {}
PutOutcome::Duplicate => {
self.stats.write().duplicates += 1;
return Ok(false);
}
PutOutcome::MapFull => {
return Err(Error::Storage(
"LMDB map full after resize — disk may be at capacity".into(),
));
}
}
}
}
{
let mut stats = self.stats.write();
stats.chunks_stored += 1;
stats.bytes_stored += content.len() as u64;
}
debug!(
"Stored chunk {} ({} bytes)",
hex::encode(address),
content.len()
);
Ok(true)
}
async fn try_put(&self, address: &XorName, content: &[u8]) -> Result<PutOutcome> {
let key = *address;
let value = content.to_vec();
let env = self.env.clone();
let db = self.db;
let lock = Arc::clone(&self.env_lock);
spawn_blocking(move || -> Result<PutOutcome> {
let _guard = lock.read();
let mut wtxn = env
.write_txn()
.map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
if db
.get(&wtxn, &key)
.map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
.is_some()
{
return Ok(PutOutcome::Duplicate);
}
match db.put(&mut wtxn, &key, &value) {
Ok(()) => {}
Err(heed::Error::Mdb(MdbError::MapFull)) => return Ok(PutOutcome::MapFull),
Err(e) => {
return Err(Error::Storage(format!("Failed to put chunk: {e}")));
}
}
match wtxn.commit() {
Ok(()) => Ok(PutOutcome::New),
Err(heed::Error::Mdb(MdbError::MapFull)) => Ok(PutOutcome::MapFull),
Err(e) => Err(Error::Storage(format!("Failed to commit put: {e}"))),
}
})
.await
.map_err(|e| Error::Storage(format!("LMDB put task failed: {e}")))?
}
pub async fn get(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
let key = *address;
let env = self.env.clone();
let db = self.db;
let lock = Arc::clone(&self.env_lock);
let content = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
let _guard = lock.read();
let rtxn = env
.read_txn()
.map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
let value = db
.get(&rtxn, &key)
.map_err(|e| Error::Storage(format!("Failed to get chunk: {e}")))?;
Ok(value.map(Vec::from))
})
.await
.map_err(|e| Error::Storage(format!("LMDB get task failed: {e}")))??;
let Some(content) = content else {
trace!("Chunk {} not found", hex::encode(address));
return Ok(None);
};
if self.config.verify_on_read {
let computed = Self::compute_address(&content);
if computed != *address {
self.stats.write().verification_failures += 1;
warn!(
"Chunk verification failed: expected {}, computed {}",
hex::encode(address),
hex::encode(computed)
);
return Err(Error::Storage(format!(
"Chunk verification failed for {}",
hex::encode(address)
)));
}
}
{
let mut stats = self.stats.write();
stats.chunks_retrieved += 1;
stats.bytes_retrieved += content.len() as u64;
}
debug!(
"Retrieved chunk {} ({} bytes)",
hex::encode(address),
content.len()
);
Ok(Some(content))
}
pub fn exists(&self, address: &XorName) -> Result<bool> {
let _guard = self.env_lock.read();
let rtxn = self
.env
.read_txn()
.map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
let found = self
.db
.get(&rtxn, address.as_ref())
.map_err(|e| Error::Storage(format!("Failed to check existence: {e}")))?
.is_some();
Ok(found)
}
pub async fn delete(&self, address: &XorName) -> Result<bool> {
let key = *address;
let env = self.env.clone();
let db = self.db;
let lock = Arc::clone(&self.env_lock);
let deleted = spawn_blocking(move || -> Result<bool> {
let _guard = lock.read();
let mut wtxn = env
.write_txn()
.map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
let existed = db
.delete(&mut wtxn, &key)
.map_err(|e| Error::Storage(format!("Failed to delete chunk: {e}")))?;
wtxn.commit()
.map_err(|e| Error::Storage(format!("Failed to commit delete: {e}")))?;
Ok(existed)
})
.await
.map_err(|e| Error::Storage(format!("LMDB delete task failed: {e}")))??;
if deleted {
debug!("Deleted chunk {}", hex::encode(address));
}
Ok(deleted)
}
#[must_use]
pub fn stats(&self) -> StorageStats {
let mut stats = self.stats.read().clone();
match self.current_chunks() {
Ok(count) => stats.current_chunks = count,
Err(e) => {
warn!("Failed to read current_chunks for stats: {e}");
stats.current_chunks = 0;
}
}
stats
}
pub fn current_chunks(&self) -> Result<u64> {
let _guard = self.env_lock.read();
let rtxn = self
.env
.read_txn()
.map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
let entries = self
.db
.stat(&rtxn)
.map_err(|e| Error::Storage(format!("Failed to read db stats: {e}")))?
.entries;
Ok(entries as u64)
}
#[must_use]
pub fn compute_address(content: &[u8]) -> XorName {
crate::client::compute_address(content)
}
#[must_use]
pub fn root_dir(&self) -> &Path {
&self.config.root_dir
}
pub async fn all_keys(&self) -> Result<Vec<XorName>> {
let env = self.env.clone();
let db = self.db;
let keys = spawn_blocking(move || -> Result<Vec<XorName>> {
let rtxn = env
.read_txn()
.map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
let mut keys = Vec::new();
let iter = db
.iter(&rtxn)
.map_err(|e| Error::Storage(format!("Failed to iterate database: {e}")))?;
for result in iter {
let (key_bytes, _) =
result.map_err(|e| Error::Storage(format!("Failed to read entry: {e}")))?;
if key_bytes.len() == XORNAME_LEN {
let mut key = [0u8; XORNAME_LEN];
key.copy_from_slice(key_bytes);
keys.push(key);
} else {
crate::logging::warn!(
"LmdbStorage: skipping entry with unexpected key length {} (expected {XORNAME_LEN})",
key_bytes.len()
);
}
}
Ok(keys)
})
.await
.map_err(|e| Error::Storage(format!("all_keys task failed: {e}")))?;
keys
}
pub async fn get_raw(&self, address: &XorName) -> Result<Option<Vec<u8>>> {
let key = *address;
let env = self.env.clone();
let db = self.db;
let value = spawn_blocking(move || -> Result<Option<Vec<u8>>> {
let rtxn = env
.read_txn()
.map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
let val = db
.get(&rtxn, key.as_ref())
.map_err(|e| Error::Storage(format!("Failed to get chunk: {e}")))?;
Ok(val.map(Vec::from))
})
.await
.map_err(|e| Error::Storage(format!("get_raw task failed: {e}")))?;
value
}
fn check_disk_space_cached(&self) -> Result<()> {
{
let last = self.last_disk_ok.lock();
if let Some(t) = *last {
if t.elapsed().as_secs() < DISK_CHECK_INTERVAL_SECS {
return Ok(());
}
}
}
check_disk_space(&self.env_dir, self.config.disk_reserve)?;
*self.last_disk_ok.lock() = Some(Instant::now());
Ok(())
}
#[allow(unsafe_code)]
async fn try_resize(&self) -> Result<()> {
let from_disk = compute_map_size(&self.env_dir, self.config.disk_reserve)?;
let env = self.env.clone();
let lock = Arc::clone(&self.env_lock);
spawn_blocking(move || -> Result<()> {
let _guard = lock.write();
let current_map = env.info().map_size;
let new_size = from_disk.max(current_map);
if new_size <= current_map {
debug!("LMDB map resize skipped — no additional disk space available");
return Ok(());
}
unsafe {
env.resize(new_size)
.map_err(|e| Error::Storage(format!("Failed to resize LMDB map: {e}")))?;
}
info!(
"Resized LMDB map to {:.2} GiB (was {:.2} GiB)",
bytes_to_gib(new_size as u64),
bytes_to_gib(current_map as u64),
);
Ok(())
})
.await
.map_err(|e| Error::Storage(format!("LMDB resize task failed: {e}")))?
}
}
enum PutOutcome {
New,
Duplicate,
MapFull,
}
fn compute_map_size(db_dir: &Path, reserve: u64) -> Result<usize> {
let available = fs2::available_space(db_dir)
.map_err(|e| Error::Storage(format!("Failed to query available disk space: {e}")))?;
let mdb_file = db_dir.join("data.mdb");
let current_db_bytes = std::fs::metadata(&mdb_file).map_or(0, |m| m.len());
let growth_room = available.saturating_sub(reserve);
let target = current_db_bytes.saturating_add(growth_room);
let page = page_size::get() as u64;
let aligned = target.div_ceil(page) * page;
let result = usize::try_from(aligned).unwrap_or(usize::MAX);
Ok(result.max(MIN_MAP_SIZE))
}
fn check_disk_space(db_dir: &Path, reserve: u64) -> Result<()> {
let available = fs2::available_space(db_dir)
.map_err(|e| Error::Storage(format!("Failed to query available disk space: {e}")))?;
if available < reserve {
return Err(Error::Storage(format!(
"Insufficient disk space: {:.2} GiB available, {:.2} GiB reserve required. \
Free disk space or increase the partition to continue storing chunks.",
bytes_to_gib(available),
bytes_to_gib(reserve),
)));
}
Ok(())
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use tempfile::TempDir;
async fn create_test_storage() -> (LmdbStorage, TempDir) {
let temp_dir = TempDir::new().expect("create temp dir");
let config = LmdbStorageConfig {
root_dir: temp_dir.path().to_path_buf(),
..LmdbStorageConfig::test_default()
};
let storage = LmdbStorage::new(config).await.expect("create storage");
(storage, temp_dir)
}
#[tokio::test]
async fn test_put_and_get() {
let (storage, _temp) = create_test_storage().await;
let content = b"hello world";
let address = LmdbStorage::compute_address(content);
let is_new = storage.put(&address, content).await.expect("put");
assert!(is_new);
let retrieved = storage.get(&address).await.expect("get");
assert_eq!(retrieved, Some(content.to_vec()));
}
#[tokio::test]
async fn test_put_duplicate() {
let (storage, _temp) = create_test_storage().await;
let content = b"test data";
let address = LmdbStorage::compute_address(content);
let is_new1 = storage.put(&address, content).await.expect("put 1");
assert!(is_new1);
let is_new2 = storage.put(&address, content).await.expect("put 2");
assert!(!is_new2);
let stats = storage.stats();
assert_eq!(stats.chunks_stored, 1);
assert_eq!(stats.duplicates, 1);
}
#[tokio::test]
async fn test_get_not_found() {
let (storage, _temp) = create_test_storage().await;
let address = [0xAB; 32];
let result = storage.get(&address).await.expect("get");
assert!(result.is_none());
}
#[tokio::test]
async fn test_exists() {
let (storage, _temp) = create_test_storage().await;
let content = b"exists test";
let address = LmdbStorage::compute_address(content);
assert!(!storage.exists(&address).expect("exists"));
storage.put(&address, content).await.expect("put");
assert!(storage.exists(&address).expect("exists"));
}
#[tokio::test]
async fn test_delete() {
let (storage, _temp) = create_test_storage().await;
let content = b"delete test";
let address = LmdbStorage::compute_address(content);
storage.put(&address, content).await.expect("put");
assert!(storage.exists(&address).expect("exists"));
let deleted = storage.delete(&address).await.expect("delete");
assert!(deleted);
assert!(!storage.exists(&address).expect("exists"));
let deleted2 = storage.delete(&address).await.expect("delete 2");
assert!(!deleted2);
}
#[tokio::test]
async fn test_address_mismatch() {
let (storage, _temp) = create_test_storage().await;
let content = b"some content";
let wrong_address = [0xFF; 32];
let result = storage.put(&wrong_address, content).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("mismatch"));
}
#[test]
fn test_compute_address() {
let content = b"hello world";
let address = LmdbStorage::compute_address(content);
let expected_hex = "d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24";
assert_eq!(hex::encode(address), expected_hex);
}
#[tokio::test]
async fn test_stats() {
let (storage, _temp) = create_test_storage().await;
let content1 = b"content 1";
let content2 = b"content 2";
let address1 = LmdbStorage::compute_address(content1);
let address2 = LmdbStorage::compute_address(content2);
storage.put(&address1, content1).await.expect("put 1");
storage.put(&address2, content2).await.expect("put 2");
storage.get(&address1).await.expect("get");
let stats = storage.stats();
assert_eq!(stats.chunks_stored, 2);
assert_eq!(stats.chunks_retrieved, 1);
assert_eq!(
stats.bytes_stored,
content1.len() as u64 + content2.len() as u64
);
assert_eq!(stats.bytes_retrieved, content1.len() as u64);
assert_eq!(stats.current_chunks, 2);
}
#[tokio::test]
async fn test_persistence_across_reopen() {
let temp_dir = TempDir::new().expect("create temp dir");
let content = b"persistent data";
let address = LmdbStorage::compute_address(content);
{
let config = LmdbStorageConfig {
root_dir: temp_dir.path().to_path_buf(),
..LmdbStorageConfig::test_default()
};
let storage = LmdbStorage::new(config).await.expect("create storage");
storage.put(&address, content).await.expect("put");
}
{
let config = LmdbStorageConfig {
root_dir: temp_dir.path().to_path_buf(),
..LmdbStorageConfig::test_default()
};
let storage = LmdbStorage::new(config).await.expect("reopen storage");
assert_eq!(storage.current_chunks().expect("current_chunks"), 1);
let retrieved = storage.get(&address).await.expect("get");
assert_eq!(retrieved, Some(content.to_vec()));
}
}
#[tokio::test]
async fn test_all_keys() {
let (storage, _temp) = create_test_storage().await;
let keys = storage.all_keys().await.expect("all_keys empty");
assert!(keys.is_empty());
let content1 = b"chunk one for keys";
let content2 = b"chunk two for keys";
let addr1 = LmdbStorage::compute_address(content1);
let addr2 = LmdbStorage::compute_address(content2);
storage.put(&addr1, content1).await.expect("put 1");
storage.put(&addr2, content2).await.expect("put 2");
let mut keys = storage.all_keys().await.expect("all_keys");
keys.sort_unstable();
let mut expected = vec![addr1, addr2];
expected.sort_unstable();
assert_eq!(keys, expected);
}
#[tokio::test]
async fn test_get_raw() {
let (storage, _temp) = create_test_storage().await;
let content = b"raw test data";
let address = LmdbStorage::compute_address(content);
storage.put(&address, content).await.expect("put");
let raw = storage.get_raw(&address).await.expect("get_raw");
assert_eq!(raw, Some(content.to_vec()));
let missing = storage.get_raw(&[0xFF; 32]).await.expect("get_raw missing");
assert!(missing.is_none());
}
}