use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use crate::infra::error::InfraError;
pub type ProgressFn = Arc<dyn Fn(&str) + Send + Sync>;
#[derive(Debug, Clone)]
pub struct RemoteFile {
pub path: String,
pub size: Option<u64>,
pub modified_at: Option<DateTime<Utc>>,
}
#[async_trait]
pub trait StorageBackend: Send + Sync {
async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError>;
async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError>;
async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError>;
async fn exists(&self, remote_path: &str) -> Result<bool, InfraError>;
async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
Err(InfraError::Transfer {
reason: format!(
"delete not supported by {} backend for path: {remote_path}",
self.backend_type()
),
})
}
async fn archive_move(
&self,
src_remote_path: &str,
archive_remote_path: &str,
) -> Result<(), InfraError> {
Err(InfraError::Transfer {
reason: format!(
"archive_move not supported by {} backend (src={src_remote_path}, dest={archive_remote_path})",
self.backend_type()
),
})
}
async fn archive_move_batch(
&self,
src_root: &str,
archive_dest_root: &str,
relative_paths: &[String],
) -> HashMap<String, Result<(), InfraError>> {
let mut results = HashMap::with_capacity(relative_paths.len());
for rel in relative_paths {
let src = if src_root.is_empty() {
rel.clone()
} else {
format!("{src_root}/{rel}")
};
let dest = if archive_dest_root.is_empty() {
rel.clone()
} else {
format!("{archive_dest_root}/{rel}")
};
let result = self.archive_move(&src, &dest).await;
results.insert(rel.clone(), result);
}
results
}
async fn push_batch(
&self,
src_root: &Path,
dest_root: &str,
relative_paths: &[String],
) -> HashMap<String, Result<(), InfraError>> {
let mut results = HashMap::with_capacity(relative_paths.len());
for rel in relative_paths {
let local_path = src_root.join(rel);
let remote_path = if dest_root.is_empty() {
rel.clone()
} else {
format!("{dest_root}/{rel}")
};
let result = self.push(&local_path, &remote_path).await;
results.insert(rel.clone(), result);
}
results
}
async fn pull_batch(
&self,
src_root: &str,
dest_root: &Path,
relative_paths: &[String],
) -> HashMap<String, Result<(), InfraError>> {
let mut results = HashMap::with_capacity(relative_paths.len());
for rel in relative_paths {
let remote_path = if src_root.is_empty() {
rel.clone()
} else {
format!("{src_root}/{rel}")
};
let local_path = dest_root.join(rel);
let result = self.pull(&remote_path, &local_path).await;
results.insert(rel.clone(), result);
}
results
}
async fn delete_batch(
&self,
remote_root: &str,
relative_paths: &[String],
) -> HashMap<String, Result<(), InfraError>> {
let mut results = HashMap::with_capacity(relative_paths.len());
for rel in relative_paths {
let remote_path = if remote_root.is_empty() {
rel.clone()
} else {
format!("{remote_root}/{rel}")
};
let result = self.delete(&remote_path).await;
results.insert(rel.clone(), result);
}
results
}
fn supports_batch(&self) -> bool {
false
}
fn backend_type(&self) -> &str;
fn set_progress_callback(&self, _callback: Option<ProgressFn>) {}
async fn ensure(&self) -> Result<(), InfraError> {
self.list("").await.map(|_| ())
}
}
#[cfg(any(test, feature = "test-utils"))]
pub mod memory {
use super::*;
use std::collections::HashMap;
use tokio::sync::Mutex;
pub struct InMemoryBackend {
pub log: Mutex<Vec<Op>>,
pub fail_next: Mutex<bool>,
pub files: Mutex<HashMap<String, Vec<u8>>>,
}
impl Default for InMemoryBackend {
fn default() -> Self {
Self {
log: Mutex::new(Vec::new()),
fail_next: Mutex::new(false),
files: Mutex::new(HashMap::new()),
}
}
}
#[derive(Debug, Clone)]
pub enum Op {
Push { local: String, remote: String },
Pull { remote: String, local: String },
List { path: String },
Exists { path: String },
Delete { path: String },
}
#[async_trait]
impl StorageBackend for InMemoryBackend {
async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
self.log.lock().await.push(Op::Push {
local: local_path.display().to_string(),
remote: remote_path.into(),
});
let mut guard = self.fail_next.lock().await;
if *guard {
*guard = false;
return Err(InfraError::Transfer {
reason: "mock push error".into(),
});
}
Ok(())
}
async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
self.log.lock().await.push(Op::Pull {
remote: remote_path.into(),
local: local_path.display().to_string(),
});
let mut guard = self.fail_next.lock().await;
if *guard {
*guard = false;
return Err(InfraError::Transfer {
reason: "mock pull error".into(),
});
}
Ok(())
}
async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
self.log.lock().await.push(Op::List {
path: remote_path.into(),
});
let files = self.files.lock().await;
Ok(files
.iter()
.map(|(path, data)| RemoteFile {
path: path.clone(),
size: Some(data.len() as u64),
modified_at: None,
})
.collect())
}
async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
self.log.lock().await.push(Op::Exists {
path: remote_path.into(),
});
Ok(self.files.lock().await.contains_key(remote_path))
}
async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
self.log.lock().await.push(Op::Delete {
path: remote_path.into(),
});
let mut guard = self.fail_next.lock().await;
if *guard {
*guard = false;
return Err(InfraError::Transfer {
reason: "mock delete error".into(),
});
}
self.files.lock().await.remove(remote_path);
Ok(())
}
fn backend_type(&self) -> &str {
"memory"
}
}
#[async_trait]
impl StorageBackend for std::sync::Arc<InMemoryBackend> {
async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
(**self).push(local_path, remote_path).await
}
async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
(**self).pull(remote_path, local_path).await
}
async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
(**self).list(remote_path).await
}
async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
(**self).exists(remote_path).await
}
async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
(**self).delete(remote_path).await
}
async fn push_batch(
&self,
src_root: &Path,
dest_root: &str,
relative_paths: &[String],
) -> HashMap<String, Result<(), InfraError>> {
(**self)
.push_batch(src_root, dest_root, relative_paths)
.await
}
async fn delete_batch(
&self,
remote_root: &str,
relative_paths: &[String],
) -> HashMap<String, Result<(), InfraError>> {
(**self).delete_batch(remote_root, relative_paths).await
}
fn supports_batch(&self) -> bool {
(**self).supports_batch()
}
fn backend_type(&self) -> &str {
(**self).backend_type()
}
async fn ensure(&self) -> Result<(), InfraError> {
(**self).ensure().await
}
fn set_progress_callback(&self, callback: Option<ProgressFn>) {
(**self).set_progress_callback(callback);
}
}
}