use std::collections::HashMap;
use std::path::{Component, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use object_store::path::Path as ObjectPath;
use object_store::ObjectStore;
use parking_lot::RwLock;
use tracing::{debug, trace};
use crate::error::{Error, Result};
use crate::storage::StorageBackendConfig;
#[async_trait]
pub trait StorageBackend: Send + Sync {
async fn put(&self, key: &str, data: Bytes) -> Result<()>;
async fn get(&self, key: &str) -> Result<Bytes>;
async fn list(&self, prefix: &str) -> Result<Vec<String>>;
async fn exists(&self, key: &str) -> Result<bool>;
async fn delete(&self, key: &str) -> Result<()>;
}
pub fn create_backend_from_config(
config: &StorageBackendConfig,
) -> Result<Arc<dyn StorageBackend>> {
match config {
StorageBackendConfig::Filesystem { path } => {
Ok(Arc::new(FilesystemBackend::new(path.clone())))
}
StorageBackendConfig::Memory => Ok(Arc::new(MemoryBackend::new())),
StorageBackendConfig::S3 {
bucket,
region,
endpoint,
access_key,
secret_key,
prefix,
path_style,
allow_http,
} => {
let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(bucket);
if let Some(region) = region {
builder = builder.with_region(region);
}
if let Some(endpoint) = endpoint {
builder = builder.with_endpoint(endpoint);
}
if let Some(access_key) = access_key {
builder = builder.with_access_key_id(access_key);
}
if let Some(secret_key) = secret_key {
builder = builder.with_secret_access_key(secret_key);
}
if *path_style {
builder = builder.with_virtual_hosted_style_request(false);
}
if *allow_http {
builder = builder.with_allow_http(true);
}
let store = builder
.build()
.map_err(|e| Error::Storage(format!("Failed to create S3 backend: {}", e)))?;
Ok(Arc::new(ObjectStoreBackend::new(
Arc::new(store),
prefix.clone(),
)))
}
StorageBackendConfig::Azure {
account_name,
container_name,
account_key,
prefix,
endpoint,
use_workload_identity,
client_id,
tenant_id,
client_secret,
sas_token,
} => {
let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
.with_account(account_name)
.with_container_name(container_name);
if let Some(key) = account_key {
builder = builder.with_access_key(key);
}
if let Some(endpoint) = endpoint {
builder = builder.with_endpoint(endpoint.clone());
}
if let Some(true) = use_workload_identity {
builder = builder.with_use_fabric_endpoint(false);
}
if let Some(client_id) = client_id {
builder = builder.with_client_id(client_id);
}
if let Some(tenant_id) = tenant_id {
builder = builder.with_tenant_id(tenant_id);
}
if let Some(client_secret) = client_secret {
builder = builder.with_client_secret(client_secret);
}
if let Some(sas_token) = sas_token {
let pairs: Vec<(String, String)> = sas_token
.trim_start_matches('?')
.split('&')
.filter_map(|pair| {
let mut parts = pair.splitn(2, '=');
match (parts.next(), parts.next()) {
(Some(k), Some(v)) => Some((k.to_string(), v.to_string())),
_ => None,
}
})
.collect();
builder = builder.with_sas_authorization(pairs);
}
let store = builder
.build()
.map_err(|e| Error::Storage(format!("Failed to create Azure backend: {}", e)))?;
Ok(Arc::new(ObjectStoreBackend::new(
Arc::new(store),
prefix.clone(),
)))
}
StorageBackendConfig::Gcs {
bucket,
service_account_path,
prefix,
} => {
let mut builder =
object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(bucket);
if let Some(sa_path) = service_account_path {
builder = builder.with_service_account_path(sa_path);
}
let store = builder
.build()
.map_err(|e| Error::Storage(format!("Failed to create GCS backend: {}", e)))?;
Ok(Arc::new(ObjectStoreBackend::new(
Arc::new(store),
prefix.clone(),
)))
}
}
}
struct FilesystemBackend {
base_path: PathBuf,
}
impl FilesystemBackend {
fn new(base_path: PathBuf) -> Self {
Self { base_path }
}
fn full_path(&self, key: &str) -> Result<PathBuf> {
let key_path = PathBuf::from(key);
if key_path.is_absolute() {
return Err(Error::Storage(format!(
"Storage key must be relative: {}",
key
)));
}
for component in key_path.components() {
match component {
Component::Normal(_) | Component::CurDir => {}
_ => {
return Err(Error::Storage(format!(
"Storage key contains unsafe path component: {}",
key
)));
}
}
}
Ok(self.base_path.join(key_path))
}
}
#[async_trait]
impl StorageBackend for FilesystemBackend {
async fn put(&self, key: &str, data: Bytes) -> Result<()> {
let path = self.full_path(key)?;
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| Error::Storage(format!("Failed to create directory: {}", e)))?;
}
tokio::fs::write(&path, &data)
.await
.map_err(|e| Error::Storage(format!("Failed to write {}: {}", key, e)))?;
debug!("Stored {} ({} bytes)", key, data.len());
Ok(())
}
async fn get(&self, key: &str) -> Result<Bytes> {
let path = self.full_path(key)?;
let data = tokio::fs::read(&path)
.await
.map_err(|e| Error::Storage(format!("Failed to read {}: {}", key, e)))?;
trace!("Read {} ({} bytes)", key, data.len());
Ok(Bytes::from(data))
}
async fn list(&self, prefix: &str) -> Result<Vec<String>> {
let base = self.full_path(prefix)?;
let mut keys = Vec::new();
if !base.exists() {
return Ok(keys);
}
let mut stack = vec![base.clone()];
while let Some(dir) = stack.pop() {
let mut entries = tokio::fs::read_dir(&dir)
.await
.map_err(|e| Error::Storage(format!("Failed to read directory: {}", e)))?;
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| Error::Storage(format!("Failed to read entry: {}", e)))?
{
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if let Ok(relative) = path.strip_prefix(&self.base_path) {
keys.push(relative.to_string_lossy().to_string());
}
}
}
keys.sort();
Ok(keys)
}
async fn exists(&self, key: &str) -> Result<bool> {
Ok(self.full_path(key)?.exists())
}
async fn delete(&self, key: &str) -> Result<()> {
let path = self.full_path(key)?;
if path.exists() {
tokio::fs::remove_file(&path)
.await
.map_err(|e| Error::Storage(format!("Failed to delete {}: {}", key, e)))?;
}
Ok(())
}
}
pub struct MemoryBackend {
data: Arc<RwLock<HashMap<String, Bytes>>>,
}
impl MemoryBackend {
pub fn new() -> Self {
Self {
data: Arc::new(RwLock::new(HashMap::new())),
}
}
}
impl Default for MemoryBackend {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl StorageBackend for MemoryBackend {
async fn put(&self, key: &str, data: Bytes) -> Result<()> {
self.data.write().insert(key.to_string(), data);
Ok(())
}
async fn get(&self, key: &str) -> Result<Bytes> {
self.data
.read()
.get(key)
.cloned()
.ok_or_else(|| Error::Storage(format!("Key not found: {}", key)))
}
async fn list(&self, prefix: &str) -> Result<Vec<String>> {
let data = self.data.read();
let mut keys: Vec<String> = data
.keys()
.filter(|k| k.starts_with(prefix))
.cloned()
.collect();
keys.sort();
Ok(keys)
}
async fn exists(&self, key: &str) -> Result<bool> {
Ok(self.data.read().contains_key(key))
}
async fn delete(&self, key: &str) -> Result<()> {
self.data.write().remove(key);
Ok(())
}
}
struct ObjectStoreBackend {
store: Arc<dyn ObjectStore>,
prefix: Option<String>,
}
impl ObjectStoreBackend {
fn new(store: Arc<dyn ObjectStore>, prefix: Option<String>) -> Self {
Self { store, prefix }
}
fn full_key(&self, key: &str) -> ObjectPath {
match &self.prefix {
Some(prefix) => ObjectPath::from(format!("{}/{}", prefix.trim_end_matches('/'), key)),
None => ObjectPath::from(key),
}
}
fn strip_prefix(&self, path: &ObjectPath) -> String {
let path_str = path.to_string();
match &self.prefix {
Some(prefix) => {
let prefix = prefix.trim_end_matches('/');
path_str
.strip_prefix(&format!("{}/", prefix))
.unwrap_or(&path_str)
.to_string()
}
None => path_str,
}
}
}
#[async_trait]
impl StorageBackend for ObjectStoreBackend {
async fn put(&self, key: &str, data: Bytes) -> Result<()> {
let path = self.full_key(key);
let payload = object_store::PutPayload::from(data);
self.store
.put(&path, payload)
.await
.map_err(|e| Error::Storage(format!("Failed to put {}: {}", key, e)))?;
Ok(())
}
async fn get(&self, key: &str) -> Result<Bytes> {
let path = self.full_key(key);
let result = self
.store
.get(&path)
.await
.map_err(|e| Error::Storage(format!("Failed to get {}: {}", key, e)))?;
let bytes = result
.bytes()
.await
.map_err(|e| Error::Storage(format!("Failed to read bytes for {}: {}", key, e)))?;
Ok(bytes)
}
async fn list(&self, prefix: &str) -> Result<Vec<String>> {
let full_prefix = match &self.prefix {
Some(p) => ObjectPath::from(format!("{}/{}", p.trim_end_matches('/'), prefix)),
None => ObjectPath::from(prefix),
};
let mut keys = Vec::new();
let mut stream = self.store.list(Some(&full_prefix));
use futures::StreamExt;
while let Some(result) = stream.next().await {
let meta =
result.map_err(|e| Error::Storage(format!("Failed to list {}: {}", prefix, e)))?;
keys.push(self.strip_prefix(&meta.location));
}
keys.sort();
Ok(keys)
}
async fn exists(&self, key: &str) -> Result<bool> {
let path = self.full_key(key);
match self.store.head(&path).await {
Ok(_) => Ok(true),
Err(object_store::Error::NotFound { .. }) => Ok(false),
Err(e) => Err(Error::Storage(format!(
"Failed to check existence of {}: {}",
key, e
))),
}
}
async fn delete(&self, key: &str) -> Result<()> {
let path = self.full_key(key);
self.store
.delete(&path)
.await
.map_err(|e| Error::Storage(format!("Failed to delete {}: {}", key, e)))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_memory_backend_put_get() {
let backend = MemoryBackend::new();
let data = Bytes::from("hello world");
backend.put("test/key1", data.clone()).await.unwrap();
let result = backend.get("test/key1").await.unwrap();
assert_eq!(result, data);
}
#[tokio::test]
async fn test_memory_backend_list() {
let backend = MemoryBackend::new();
backend
.put("backup/manifest.json", Bytes::from("{}"))
.await
.unwrap();
backend
.put("backup/queues/q1/seg-001.zst", Bytes::from("data"))
.await
.unwrap();
backend
.put("other/file.txt", Bytes::from("other"))
.await
.unwrap();
let keys = backend.list("backup/").await.unwrap();
assert_eq!(keys.len(), 2);
assert!(keys.contains(&"backup/manifest.json".to_string()));
assert!(keys.contains(&"backup/queues/q1/seg-001.zst".to_string()));
}
#[tokio::test]
async fn test_memory_backend_exists_delete() {
let backend = MemoryBackend::new();
assert!(!backend.exists("test/key").await.unwrap());
backend.put("test/key", Bytes::from("data")).await.unwrap();
assert!(backend.exists("test/key").await.unwrap());
backend.delete("test/key").await.unwrap();
assert!(!backend.exists("test/key").await.unwrap());
}
#[tokio::test]
async fn test_memory_backend_get_not_found() {
let backend = MemoryBackend::new();
let result = backend.get("nonexistent").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_filesystem_backend_roundtrip() {
let temp_dir = tempfile::tempdir().unwrap();
let backend = FilesystemBackend::new(temp_dir.path().to_path_buf());
let data = Bytes::from("hello filesystem");
backend.put("sub/dir/file.txt", data.clone()).await.unwrap();
let result = backend.get("sub/dir/file.txt").await.unwrap();
assert_eq!(result, data);
assert!(backend.exists("sub/dir/file.txt").await.unwrap());
assert!(!backend.exists("nonexistent").await.unwrap());
let keys = backend.list("sub/").await.unwrap();
assert_eq!(keys.len(), 1);
assert_eq!(keys[0], "sub/dir/file.txt");
backend.delete("sub/dir/file.txt").await.unwrap();
assert!(!backend.exists("sub/dir/file.txt").await.unwrap());
}
#[tokio::test]
async fn test_filesystem_backend_rejects_path_traversal() {
let temp_dir = tempfile::tempdir().unwrap();
let backend = FilesystemBackend::new(temp_dir.path().to_path_buf());
let result = backend.put("../escape.txt", Bytes::from("bad")).await;
assert!(result.is_err());
let result = backend.get("/absolute/path").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_create_backend_filesystem() {
let temp_dir = tempfile::tempdir().unwrap();
let config = StorageBackendConfig::Filesystem {
path: temp_dir.path().to_path_buf(),
};
let backend = create_backend_from_config(&config).unwrap();
backend.put("test.txt", Bytes::from("works")).await.unwrap();
let data = backend.get("test.txt").await.unwrap();
assert_eq!(data, Bytes::from("works"));
}
#[tokio::test]
async fn test_create_backend_memory() {
let config = StorageBackendConfig::Memory;
let backend = create_backend_from_config(&config).unwrap();
backend.put("test.txt", Bytes::from("works")).await.unwrap();
let data = backend.get("test.txt").await.unwrap();
assert_eq!(data, Bytes::from("works"));
}
}