use crate::StorageBackend;
use async_trait::async_trait;
use std::fmt;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::AsyncWriteExt;
#[derive(Debug)]
pub enum MmapOrVec {
Mmap(memmap2::Mmap),
Vec(Vec<u8>),
}
impl AsRef<[u8]> for MmapOrVec {
fn as_ref(&self) -> &[u8] {
match self {
MmapOrVec::Mmap(mmap) => mmap.as_ref(),
MmapOrVec::Vec(vec) => vec.as_ref(),
}
}
}
#[derive(Clone)]
pub struct LocalBackend {
root: PathBuf,
}
impl LocalBackend {
pub async fn new<P: AsRef<Path>>(root: P) -> anyhow::Result<Self> {
let root = root.as_ref().to_path_buf();
if !root.exists() {
fs::create_dir_all(&root).await?;
} else if !root.is_dir() {
return Err(anyhow::anyhow!(
"path exists but is not a directory: {}",
root.display()
));
}
Ok(LocalBackend { root })
}
pub fn new_sync<P: AsRef<Path>>(root: P) -> anyhow::Result<Self> {
let root = root.as_ref().to_path_buf();
if !root.exists() {
std::fs::create_dir_all(&root)?;
} else if !root.is_dir() {
return Err(anyhow::anyhow!(
"path exists but is not a directory: {}",
root.display()
));
}
Ok(LocalBackend { root })
}
pub fn root(&self) -> &Path {
&self.root
}
fn object_path(&self, key: &str) -> PathBuf {
if key.starts_with("packs/") {
return self.root.join(key);
}
let encoded_key = key.replace('/', "__");
if key.len() >= 4 {
let shard1 = &key[0..2];
let shard2 = &key[2..4];
self.root
.join("objects")
.join(shard1)
.join(shard2)
.join(&encoded_key)
} else if key.len() >= 2 {
let shard1 = &key[0..2];
self.root.join("objects").join(shard1).join(&encoded_key)
} else {
self.root.join("objects").join(&encoded_key)
}
}
async fn ensure_parent_dir(&self, path: &Path) -> anyhow::Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
Ok(())
}
pub fn get_mmap(&self, key: &str) -> anyhow::Result<memmap2::Mmap> {
if key.is_empty() {
return Err(anyhow::anyhow!("key cannot be empty"));
}
let path = self.object_path(key);
let file = std::fs::File::open(&path)?;
let mmap = unsafe { memmap2::Mmap::map(&file)? };
Ok(mmap)
}
pub async fn get_size(&self, key: &str) -> anyhow::Result<u64> {
if key.is_empty() {
return Err(anyhow::anyhow!("key cannot be empty"));
}
let path = self.object_path(key);
let metadata = fs::metadata(&path).await?;
Ok(metadata.len())
}
pub async fn get_adaptive(&self, key: &str) -> anyhow::Result<MmapOrVec> {
const MMAP_THRESHOLD: u64 = 10 * 1024 * 1024;
let size = self.get_size(key).await?;
if size > MMAP_THRESHOLD {
tracing::debug!(key = %key, size = size, "Using mmap for large file");
Ok(MmapOrVec::Mmap(self.get_mmap(key)?))
} else {
Ok(MmapOrVec::Vec(fs::read(self.object_path(key)).await?))
}
}
}
impl fmt::Debug for LocalBackend {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalBackend")
.field("root", &self.root)
.finish()
}
}
#[async_trait]
impl StorageBackend for LocalBackend {
async fn get(&self, key: &str) -> anyhow::Result<Vec<u8>> {
if key.is_empty() {
return Err(anyhow::anyhow!("key cannot be empty"));
}
let path = self.object_path(key);
match fs::read(&path).await {
Ok(data) => Ok(data),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
Err(anyhow::anyhow!("object not found: {}", key))
}
Err(e) => Err(e.into()),
}
}
async fn put(&self, key: &str, data: &[u8]) -> anyhow::Result<()> {
if key.is_empty() {
return Err(anyhow::anyhow!("key cannot be empty"));
}
let path = self.object_path(key);
const MAX_RETRIES: u32 = 3;
let mut last_error = None;
for attempt in 0..=MAX_RETRIES {
if attempt > 0 {
let delay_ms = 10 * (5u64.pow(attempt - 1));
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
let _ = self.ensure_parent_dir(&path).await;
} else {
self.ensure_parent_dir(&path).await?;
}
let temp_path = path.with_extension("tmp");
let _ = fs::remove_file(&temp_path).await;
let file_result = fs::File::create(&temp_path).await;
let mut file = match file_result {
Ok(f) => f,
Err(e) if attempt < MAX_RETRIES && e.raw_os_error() == Some(2) => {
last_error = Some(e);
continue;
}
Err(e) => return Err(e.into()),
};
if let Err(e) = file.write_all(data).await {
let _ = fs::remove_file(&temp_path).await;
return Err(e.into());
}
if let Err(e) = file.sync_all().await {
let _ = fs::remove_file(&temp_path).await;
return Err(e.into());
}
drop(file);
match fs::rename(&temp_path, &path).await {
Ok(()) => return Ok(()),
Err(e) if attempt < MAX_RETRIES && e.raw_os_error() == Some(2) => {
let _ = fs::remove_file(&temp_path).await;
last_error = Some(e);
continue;
}
Err(e) => {
let _ = fs::remove_file(&temp_path).await;
return Err(e.into());
}
}
}
Err(anyhow::anyhow!(
"Failed to write object after {} retries: {}",
MAX_RETRIES,
last_error.map(|e| e.to_string()).unwrap_or_default()
))
}
async fn exists(&self, key: &str) -> anyhow::Result<bool> {
if key.is_empty() {
return Err(anyhow::anyhow!("key cannot be empty"));
}
let path = self.object_path(key);
match fs::try_exists(&path).await {
Ok(exists) => Ok(exists),
Err(e) => Err(e.into()),
}
}
async fn delete(&self, key: &str) -> anyhow::Result<()> {
if key.is_empty() {
return Err(anyhow::anyhow!("key cannot be empty"));
}
let path = self.object_path(key);
match fs::remove_file(&path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
Ok(())
}
Err(e) => Err(e.into()),
}
}
async fn list_objects(&self, prefix: &str) -> anyhow::Result<Vec<String>> {
let mut results = Vec::new();
if prefix.starts_with("packs/") {
let packs_dir = self.root.join("packs");
if !packs_dir.exists() {
return Ok(Vec::new());
}
Self::walk_dir_flat(&packs_dir, prefix, &mut results).await?;
} else {
let objects_dir = self.root.join("objects");
if !objects_dir.exists() {
return Ok(Vec::new());
}
Self::walk_dir_iterative(&objects_dir, &objects_dir, prefix, &mut results).await?;
}
results.sort();
Ok(results)
}
}
impl LocalBackend {
async fn walk_dir_flat(
dir: &Path,
prefix: &str,
results: &mut Vec<String>,
) -> anyhow::Result<()> {
let mut entries = match fs::read_dir(dir).await {
Ok(entries) => entries,
Err(_) => return Ok(()), };
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_file() {
if let Some(filename) = path.file_name() {
if let Some(filename_str) = filename.to_str() {
let key = format!("packs/{}", filename_str);
if key.starts_with(prefix) {
results.push(key);
}
}
}
}
}
Ok(())
}
async fn walk_dir_iterative(
objects_dir: &Path,
objects_base: &Path,
prefix: &str,
results: &mut Vec<String>,
) -> anyhow::Result<()> {
let mut work_queue = vec![objects_dir.to_path_buf()];
while let Some(current_path) = work_queue.pop() {
let mut entries = match fs::read_dir(¤t_path).await {
Ok(entries) => entries,
Err(_) => continue, };
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir() {
work_queue.push(path);
} else {
if let Ok(relative_path) = path.strip_prefix(objects_base) {
let components: Vec<_> = relative_path
.components()
.map(|c| c.as_os_str().to_string_lossy().to_string())
.collect();
let mut key = if components.is_empty() {
continue;
} else if components.len() == 1 {
components[0].clone()
} else {
components.last().unwrap().clone()
};
key = key.replace("__", "/");
if key.starts_with(prefix) {
results.push(key);
}
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use tempfile::TempDir;
#[tokio::test]
async fn test_new_creates_root_directory() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("storage");
assert!(!path.exists());
let backend = LocalBackend::new(&path).await.unwrap();
assert!(path.exists());
assert_eq!(backend.root(), &path);
}
#[tokio::test]
async fn test_new_with_existing_directory() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path();
let backend = LocalBackend::new(path).await.unwrap();
assert_eq!(backend.root(), path);
}
#[tokio::test]
async fn test_new_fails_with_file_path() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("file.txt");
fs::write(&file_path, b"content").unwrap();
let result = LocalBackend::new(&file_path).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_put_and_get() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let key = "test_key";
let data = b"test data content";
backend.put(key, data).await.unwrap();
let retrieved = backend.get(key).await.unwrap();
assert_eq!(retrieved, data);
}
#[tokio::test]
async fn test_sharding_creates_correct_path() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let key = "abcd1234567890";
let expected_path = temp_dir.path().join("objects/ab/cd/abcd1234567890");
backend.put(key, b"data").await.unwrap();
assert!(expected_path.exists());
}
#[tokio::test]
async fn test_get_nonexistent() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let result = backend.get("nonexistent").await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("object not found"));
}
#[tokio::test]
async fn test_exists() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let key = "exists_test";
assert!(!backend.exists(key).await.unwrap());
backend.put(key, b"data").await.unwrap();
assert!(backend.exists(key).await.unwrap());
}
#[tokio::test]
async fn test_delete_existing() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let key = "delete_test";
backend.put(key, b"data").await.unwrap();
assert!(backend.exists(key).await.unwrap());
backend.delete(key).await.unwrap();
assert!(!backend.exists(key).await.unwrap());
}
#[tokio::test]
async fn test_delete_nonexistent_is_idempotent() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
backend.delete("nonexistent").await.unwrap();
backend.delete("nonexistent").await.unwrap();
}
#[tokio::test]
async fn test_empty_key_operations() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
assert!(backend.put("", b"data").await.is_err());
assert!(backend.get("").await.is_err());
assert!(backend.exists("").await.is_err());
assert!(backend.delete("").await.is_err());
}
#[tokio::test]
async fn test_list_objects_empty() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let objects = backend.list_objects("").await.unwrap();
assert_eq!(objects.len(), 0);
}
#[tokio::test]
async fn test_list_objects_with_prefix() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
backend.put("images/photo1.jpg", b"data1").await.unwrap();
backend.put("images/photo2.jpg", b"data2").await.unwrap();
backend.put("videos/video1.mp4", b"data3").await.unwrap();
backend.put("audio/song1.mp3", b"data4").await.unwrap();
let images = backend.list_objects("images/").await.unwrap();
assert_eq!(images.len(), 2);
assert!(images.iter().all(|k| k.starts_with("images/")));
let videos = backend.list_objects("videos/").await.unwrap();
assert_eq!(videos.len(), 1);
let empty = backend.list_objects("nonexistent/").await.unwrap();
assert_eq!(empty.len(), 0);
}
#[tokio::test]
async fn test_list_objects_sorted() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
backend.put("zebra", b"data").await.unwrap();
backend.put("apple", b"data").await.unwrap();
backend.put("monkey", b"data").await.unwrap();
let objects = backend.list_objects("").await.unwrap();
assert_eq!(objects, vec!["apple", "monkey", "zebra"]);
}
#[tokio::test]
async fn test_list_objects_all_prefixes() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
backend.put("data/file1.bin", b"data").await.unwrap();
backend.put("data/file2.bin", b"data").await.unwrap();
backend.put("config/settings.json", b"data").await.unwrap();
let all = backend.list_objects("").await.unwrap();
assert_eq!(all.len(), 3);
}
#[tokio::test]
async fn test_atomic_write() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let key = "atomic_test";
let path = backend.object_path(key);
backend.put(key, b"atomic data").await.unwrap();
assert!(path.exists());
assert!(!path.with_extension("tmp").exists());
assert_eq!(backend.get(key).await.unwrap(), b"atomic data");
}
#[tokio::test]
async fn test_overwrite() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let key = "overwrite_test";
backend.put(key, b"old data").await.unwrap();
assert_eq!(backend.get(key).await.unwrap(), b"old data");
backend.put(key, b"new data").await.unwrap();
assert_eq!(backend.get(key).await.unwrap(), b"new data");
}
#[tokio::test]
async fn test_concurrent_writes() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let mut handles = vec![];
for i in 0..10 {
let backend_clone = backend.clone();
let handle = tokio::spawn(async move {
let key = format!("concurrent_test_{}", i);
let data = format!("data_{}", i);
backend_clone.put(&key, data.as_bytes()).await.unwrap();
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let objects = backend.list_objects("concurrent_test_").await.unwrap();
assert_eq!(objects.len(), 10);
}
#[tokio::test]
async fn test_concurrent_reads_writes() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
backend.put("shared_key", b"initial").await.unwrap();
let backend_read = backend.clone();
let read_handle = tokio::spawn(async move {
for _ in 0..5 {
let _ = backend_read.get("shared_key").await;
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
});
let backend_write = backend.clone();
let write_handle = tokio::spawn(async move {
for i in 0..5 {
let data = format!("data_{}", i);
backend_write
.put("shared_key", data.as_bytes())
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(15)).await;
}
});
read_handle.await.unwrap();
write_handle.await.unwrap();
let final_data = backend.get("shared_key").await.unwrap();
assert!(!final_data.is_empty());
}
#[tokio::test]
async fn test_large_data() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let large_data = vec![0xFF; 10 * 1024 * 1024]; let key = "large_file";
backend.put(key, &large_data).await.unwrap();
let retrieved = backend.get(key).await.unwrap();
assert_eq!(retrieved.len(), 10 * 1024 * 1024);
assert_eq!(retrieved, large_data);
}
#[tokio::test]
async fn test_debug_impl() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let debug_str = format!("{:?}", backend);
assert!(debug_str.contains("LocalBackend"));
assert!(debug_str.contains("root"));
}
#[tokio::test]
async fn test_clone_independence() {
let temp_dir = TempDir::new().unwrap();
let backend1 = LocalBackend::new(temp_dir.path()).await.unwrap();
let backend2 = backend1.clone();
backend1.put("key1", b"data1").await.unwrap();
assert_eq!(backend2.get("key1").await.unwrap(), b"data1");
backend2.put("key2", b"data2").await.unwrap();
assert_eq!(backend1.get("key2").await.unwrap(), b"data2");
}
#[tokio::test]
async fn test_short_keys() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
backend.put("a", b"data").await.unwrap();
assert_eq!(backend.get("a").await.unwrap(), b"data");
backend.put("ab", b"data").await.unwrap();
assert_eq!(backend.get("ab").await.unwrap(), b"data");
backend.put("abc", b"data").await.unwrap();
assert_eq!(backend.get("abc").await.unwrap(), b"data");
}
#[tokio::test]
async fn test_new_sync() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("sync_storage");
assert!(!path.exists());
let backend = LocalBackend::new_sync(&path).unwrap();
assert!(path.exists());
assert_eq!(backend.root(), &path);
}
#[tokio::test]
async fn test_mmap_read() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let test_data = b"Memory-mapped test data for reading";
backend.put("mmap_test", test_data).await.unwrap();
let mmap = backend.get_mmap("mmap_test").unwrap();
assert_eq!(mmap.as_ref(), test_data);
}
#[tokio::test]
async fn test_mmap_large_file() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let large_data = vec![0xABu8; 5 * 1024 * 1024];
backend.put("large_mmap_test", &large_data).await.unwrap();
let mmap = backend.get_mmap("large_mmap_test").unwrap();
assert_eq!(mmap.len(), 5 * 1024 * 1024);
assert_eq!(&mmap[..100], &large_data[..100]);
assert_eq!(
&mmap[mmap.len() - 100..],
&large_data[large_data.len() - 100..]
);
}
#[tokio::test]
async fn test_get_size() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let test_data = vec![0u8; 12345];
backend.put("size_test", &test_data).await.unwrap();
let size = backend.get_size("size_test").await.unwrap();
assert_eq!(size, 12345);
}
#[tokio::test]
async fn test_adaptive_loading_small() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let small_data = vec![1u8; 1024 * 1024]; backend.put("small_adaptive", &small_data).await.unwrap();
let result = backend.get_adaptive("small_adaptive").await.unwrap();
match &result {
super::MmapOrVec::Vec(v) => assert_eq!(v.len(), 1024 * 1024),
super::MmapOrVec::Mmap(_) => panic!("Expected Vec for small file"),
}
assert_eq!(result.as_ref(), &small_data[..]);
}
#[tokio::test]
async fn test_adaptive_loading_large() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let large_data = vec![2u8; 11 * 1024 * 1024]; backend.put("large_adaptive", &large_data).await.unwrap();
let result = backend.get_adaptive("large_adaptive").await.unwrap();
match &result {
super::MmapOrVec::Mmap(m) => assert_eq!(m.len(), 11 * 1024 * 1024),
super::MmapOrVec::Vec(_) => panic!("Expected Mmap for large file"),
}
assert_eq!(result.as_ref().len(), large_data.len());
}
#[tokio::test]
async fn test_mmap_or_vec_as_ref() {
let temp_dir = TempDir::new().unwrap();
let backend = LocalBackend::new(temp_dir.path()).await.unwrap();
let data = b"test data for as_ref";
backend.put("asref_test", data).await.unwrap();
let result = backend.get_adaptive("asref_test").await.unwrap();
let slice: &[u8] = result.as_ref();
assert_eq!(slice, data);
}
}