use super::support::validate_local_blob_id;
use super::{
BlobGetResult, BlobPrefixDeleteResult, BlobPutOptions, BlobPutResult, BlobReadOptions,
BlobStore,
};
use crate::features::storage::api::{Result, StorageError};
#[derive(Debug)]
pub struct LocalBlobStore {
root: std::path::PathBuf,
}
impl LocalBlobStore {
pub fn new(root: std::path::PathBuf) -> Result<Self> {
std::fs::create_dir_all(&root)?;
Ok(Self { root })
}
fn blob_path(&self, blob_id: &str) -> Result<std::path::PathBuf> {
validate_local_blob_id(blob_id)?;
Ok(self.root.join(blob_id))
}
fn prefixed_ids(&self, namespace: &str, prefix: &str, limit: usize) -> Result<Vec<String>> {
let combined = format!("{namespace}{prefix}");
let mut out = Vec::new();
for entry_res in std::fs::read_dir(&self.root)? {
let entry = entry_res?;
let path = entry.path();
if !path.is_file() {
continue;
}
let Some(name) = entry.file_name().to_str().map(str::to_string) else {
continue;
};
if name.starts_with(&combined) {
out.push(name);
if limit > 0 && out.len() >= limit {
break;
}
}
}
out.sort();
Ok(out)
}
}
impl BlobStore for LocalBlobStore {
fn put_blob_with_options(
&mut self,
blob_id: &str,
bytes: &[u8],
options: BlobPutOptions,
) -> Result<BlobPutResult> {
if options.verify_content_hash {
return Err(StorageError::InvalidInput(
"local blob backend does not support verify_content_hash".to_string(),
));
}
let existing = self.get_blob(blob_id)?;
if options.deny_if_exists && existing.is_some() {
return Err(StorageError::InvalidInput(
"blob already exists and deny_if_exists is enabled".to_string(),
));
}
if options.idempotent && existing.as_deref() == Some(bytes) {
return Ok(BlobPutResult {
inserted: false,
overwritten: false,
idempotent_noop: true,
});
}
let path = self.blob_path(blob_id)?;
std::fs::write(path, bytes)?;
Ok(BlobPutResult {
inserted: existing.is_none(),
overwritten: existing.is_some(),
idempotent_noop: false,
})
}
fn get_blob_with_options(
&self,
blob_id: &str,
_options: BlobReadOptions,
) -> Result<Option<BlobGetResult>> {
let path = self.blob_path(blob_id)?;
match std::fs::read(path) {
Ok(bytes) => Ok(Some(BlobGetResult {
data: bytes,
served_tier: Some("local".to_string()),
rehydrated_local: false,
})),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(StorageError::Io(err)),
}
}
fn has_blob(&self, blob_id: &str) -> Result<bool> {
let path = self.blob_path(blob_id)?;
Ok(path.exists())
}
fn delete_blob(&mut self, blob_id: &str) -> Result<()> {
let path = self.blob_path(blob_id)?;
match std::fs::remove_file(path) {
Ok(()) => Ok(()),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(StorageError::Io(err)),
}
}
fn delete_blobs(&mut self, blob_ids: &[String]) -> Result<usize> {
let mut deleted = 0usize;
for blob_id in blob_ids {
if self.has_blob(blob_id)? {
self.delete_blob(blob_id)?;
deleted += 1;
}
}
Ok(deleted)
}
fn list_prefix(&self, namespace: &str, prefix: &str, limit: usize) -> Result<Vec<String>> {
self.prefixed_ids(namespace, prefix, limit)
}
fn delete_prefix(
&mut self,
namespace: &str,
prefix: &str,
batch_limit: usize,
) -> Result<BlobPrefixDeleteResult> {
let probe_limit = if batch_limit == 0 {
0
} else {
batch_limit.saturating_add(1)
};
let probe = self.prefixed_ids(namespace, prefix, probe_limit)?;
let truncated = batch_limit > 0 && probe.len() > batch_limit;
let to_delete = if truncated {
probe[..batch_limit].to_vec()
} else {
probe
};
let deleted = self.delete_blobs(&to_delete)?;
Ok(BlobPrefixDeleteResult { deleted, truncated })
}
}