use std::collections::HashMap;
use std::path::Path;
use std::sync::Mutex as StdMutex;
use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use secrecy::{ExposeSecret, SecretBox};
use super::backend::{ProgressFn, RemoteFile, StorageBackend};
use super::shell::{LocalShell, RemoteShell};
use crate::infra::error::InfraError;
const DEFAULT_RCLONE_TIMEOUT_SECS: u64 = 300;
const RCLONE_TIMEOUT_ENV: &str = "VDSL_RCLONE_TIMEOUT";
const MIN_RCLONE_TIMEOUT_SECS: u64 = 10;
const BATCH_PER_FILE_TIMEOUT_SECS: u64 = 30;
const SFTP_OPTIMIZATION_FLAGS: &[&str] = &["--sftp-set-modtime=false", "--sftp-disable-hashcheck"];
const SFTP_BATCH_CHUNK_SIZE: usize = 100;
const BATCH_CHUNK_MAX_RETRIES: u32 = 1;
fn resolve_timeout(explicit: Option<u64>) -> u64 {
let raw = explicit
.or_else(|| {
std::env::var(RCLONE_TIMEOUT_ENV)
.ok()
.and_then(|v| v.parse::<u64>().ok())
})
.unwrap_or(DEFAULT_RCLONE_TIMEOUT_SECS);
raw.max(MIN_RCLONE_TIMEOUT_SECS)
}
pub struct RcloneBackend {
remote: SecretBox<String>,
shell: Box<dyn RemoteShell>,
timeout_secs: u64,
progress: StdMutex<Option<ProgressFn>>,
}
impl RcloneBackend {
pub fn new(remote: impl Into<String>) -> Self {
Self {
remote: SecretBox::new(Box::new(remote.into())),
shell: Box::new(LocalShell),
timeout_secs: resolve_timeout(None),
progress: StdMutex::new(None),
}
}
pub fn with_shell(remote: impl Into<String>, shell: Box<dyn RemoteShell>) -> Self {
Self {
remote: SecretBox::new(Box::new(remote.into())),
shell,
timeout_secs: resolve_timeout(None),
progress: StdMutex::new(None),
}
}
pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
self.timeout_secs = resolve_timeout(Some(timeout_secs));
self
}
fn remote_path(&self, path: &str) -> Result<String, InfraError> {
let path = path.trim_matches('/');
if path.starts_with('-') {
return Err(InfraError::Transfer {
reason: format!("invalid remote path (starts with '-'): {path}"),
});
}
if path.split('/').any(|seg| seg == "..") {
return Err(InfraError::Transfer {
reason: format!("invalid remote path (contains '..' traversal): {path}"),
});
}
let remote = self.remote.expose_secret();
if path.is_empty() {
Ok(remote.clone())
} else {
Ok(format!("{remote}/{path}"))
}
}
fn is_sftp(&self) -> bool {
self.remote.expose_secret().starts_with(":sftp")
}
async fn exec_rclone(&self, args: &[&str]) -> Result<String, InfraError> {
self.exec_rclone_with_timeout(args, self.timeout_secs).await
}
async fn exec_rclone_with_timeout(
&self,
args: &[&str],
timeout_secs: u64,
) -> Result<String, InfraError> {
let mut full_args = vec!["rclone"];
full_args.extend_from_slice(args);
if self.is_sftp() {
full_args.extend_from_slice(SFTP_OPTIMIZATION_FLAGS);
}
let output = self.shell.exec(&full_args, Some(timeout_secs)).await?;
if !output.success {
return Err(InfraError::Transfer {
reason: format!(
"rclone failed (exit {}): {}",
output
.exit_code
.map_or("signal".to_string(), |c| c.to_string()),
output.stderr.trim()
),
});
}
Ok(output.stdout)
}
}
#[async_trait]
impl StorageBackend for RcloneBackend {
async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
let dest = self.remote_path(remote_path)?;
let local_str = local_path.to_str().ok_or_else(|| -> InfraError {
InfraError::Transfer {
reason: format!(
"local path is not valid UTF-8: {}",
local_path.to_string_lossy()
),
}
})?;
self.exec_rclone(&["copyto", local_str, &dest]).await?;
Ok(())
}
async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
let src = self.remote_path(remote_path)?;
if let Some(parent) = local_path.parent() {
if let Some(parent_str) = parent.to_str() {
if !parent_str.is_empty() {
let _ = self
.shell
.exec(&["mkdir", "-p", parent_str], Some(10))
.await;
}
}
}
let local_str = local_path.to_str().ok_or_else(|| -> InfraError {
InfraError::Transfer {
reason: format!(
"local path is not valid UTF-8: {}",
local_path.to_string_lossy()
),
}
})?;
self.exec_rclone(&["copyto", &src, local_str]).await?;
Ok(())
}
async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
let target = self.remote_path(remote_path)?;
let output = self
.exec_rclone(&["lsf", "-R", "--format", "pst", "--files-only", &target])
.await?;
let mut files = Vec::new();
for line in output.lines() {
let parts: Vec<&str> = line.splitn(3, ';').collect();
if parts.len() < 2 {
continue;
}
let path = parts[0];
let size = match parts[1].trim().parse::<u64>() {
Ok(s) => Some(s),
Err(e) => {
tracing::debug!(
path = path,
raw_size = parts[1].trim(),
error = %e,
"rclone lsf: size parse failed, treating as unknown"
);
None
}
};
let modified_at = parts.get(2).and_then(|ts| parse_rclone_timestamp(ts));
files.push(RemoteFile {
path: path.to_string(),
size,
modified_at,
});
}
Ok(files)
}
async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
let target = self.remote_path(remote_path)?;
let result = self.exec_rclone(&["lsf", &target]).await;
match result {
Ok(output) => Ok(!output.trim().is_empty()),
Err(e) => {
tracing::debug!(
remote_path = remote_path,
error = %e,
"rclone exists check failed, returning false"
);
Ok(false)
}
}
}
async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
let target = self.remote_path(remote_path)?;
match self
.exec_rclone(&["deletefile", &target, "--retries", "1"])
.await
{
Ok(_) => Ok(()),
Err(e) => {
let msg = e.to_string();
if msg.contains("exit 4") || msg.contains("not found") {
tracing::debug!(
remote_path = remote_path,
"rclone deletefile: object already absent, treating as success"
);
Ok(())
} else {
Err(e)
}
}
}
}
async fn archive_move(
&self,
src_remote_path: &str,
archive_remote_path: &str,
) -> Result<(), InfraError> {
let src = self.remote_path(src_remote_path)?;
let dest = self.remote_path(archive_remote_path)?;
match self
.exec_rclone(&["moveto", &src, &dest, "--retries", "1"])
.await
{
Ok(_) => Ok(()),
Err(e) => {
let msg = e.to_string();
if msg.contains("exit 4") || msg.contains("not found") {
tracing::debug!(
src = src_remote_path,
dest = archive_remote_path,
"rclone moveto: src already absent, treating as success"
);
Ok(())
} else {
Err(e)
}
}
}
}
async fn push_batch(
&self,
src_root: &Path,
dest_root: &str,
relative_paths: &[String],
) -> HashMap<String, Result<(), InfraError>> {
if relative_paths.is_empty() {
return HashMap::new();
}
let dest_full = match self.remote_path(dest_root) {
Ok(d) => d,
Err(_) => {
let reason = format!("invalid dest_root for batch push: {dest_root}");
return Self::all_batch_err(relative_paths, &reason);
}
};
let src_root_str = match src_root.to_str() {
Some(s) => s.to_string(),
None => {
let reason = format!(
"src_root is not valid UTF-8: {}",
src_root.to_string_lossy()
);
return Self::all_batch_err(relative_paths, &reason);
}
};
self.exec_batch_chunked(
relative_paths,
"push",
|chunk, list_filename, sftp_flags, _chunk_timeout| {
let file_list = chunk.join("\n");
let src = &src_root_str;
let dest = &dest_full;
format!(
"cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
{file_list}\n\
__VDSL_EOF__\n\
rclone copy {src} {dest} \
--files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
_rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
)
},
)
.await
}
async fn pull_batch(
&self,
src_root: &str,
dest_root: &Path,
relative_paths: &[String],
) -> HashMap<String, Result<(), InfraError>> {
if relative_paths.is_empty() {
return HashMap::new();
}
let src_full = match self.remote_path(src_root) {
Ok(s) => s,
Err(_) => {
let reason = format!("invalid src_root for batch pull: {src_root}");
return Self::all_batch_err(relative_paths, &reason);
}
};
let dest_root_str = match dest_root.to_str() {
Some(s) => s.to_string(),
None => {
let reason = format!(
"dest_root is not valid UTF-8: {}",
dest_root.to_string_lossy()
);
return Self::all_batch_err(relative_paths, &reason);
}
};
self.exec_batch_chunked(
relative_paths,
"pull",
|chunk, list_filename, sftp_flags, _chunk_timeout| {
let file_list = chunk.join("\n");
let src = &src_full;
let dest = &dest_root_str;
format!(
"cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
{file_list}\n\
__VDSL_EOF__\n\
rclone copy {src} {dest} \
--files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
_rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
)
},
)
.await
}
async fn delete_batch(
&self,
remote_root: &str,
relative_paths: &[String],
) -> HashMap<String, Result<(), InfraError>> {
if relative_paths.is_empty() {
return HashMap::new();
}
let remote_full = match self.remote_path(remote_root) {
Ok(r) => r,
Err(_) => {
return Self::all_batch_err(
relative_paths,
&format!("invalid remote_root for batch delete: {remote_root}"),
);
}
};
self.exec_batch_chunked(
relative_paths,
"delete",
|chunk, list_filename, sftp_flags, _chunk_timeout| {
let file_list = chunk.join("\n");
let dest = &remote_full;
format!(
"cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
{file_list}\n\
__VDSL_EOF__\n\
rclone delete {dest} \
--files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
_rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
)
},
)
.await
}
async fn archive_move_batch(
&self,
src_root: &str,
archive_dest_root: &str,
relative_paths: &[String],
) -> HashMap<String, Result<(), InfraError>> {
if relative_paths.is_empty() {
return HashMap::new();
}
let src_full = match self.remote_path(src_root) {
Ok(r) => r,
Err(_) => {
return Self::all_batch_err(
relative_paths,
&format!("invalid src_root for batch archive_move: {src_root}"),
);
}
};
let dest_full = match self.remote_path(archive_dest_root) {
Ok(r) => r,
Err(_) => {
return Self::all_batch_err(
relative_paths,
&format!(
"invalid archive_dest_root for batch archive_move: {archive_dest_root}"
),
);
}
};
self.exec_batch_chunked(
relative_paths,
"archive_move",
|chunk, list_filename, sftp_flags, _chunk_timeout| {
let file_list = chunk.join("\n");
let src = &src_full;
let dest = &dest_full;
format!(
"cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
{file_list}\n\
__VDSL_EOF__\n\
rclone move {src} {dest} \
--files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
_rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
)
},
)
.await
}
fn supports_batch(&self) -> bool {
true
}
fn backend_type(&self) -> &str {
"rclone"
}
fn set_progress_callback(&self, callback: Option<ProgressFn>) {
if let Ok(mut guard) = self.progress.lock() {
*guard = callback;
}
}
async fn ensure(&self) -> Result<(), InfraError> {
let check = self.shell.exec(&["which", "rclone"], Some(10)).await;
let rclone_found = matches!(&check, Ok(out) if out.success);
if !rclone_found {
tracing::info!("rclone not found, attempting install via .deb package");
let install_script = concat!(
"curl -sL https://downloads.rclone.org/rclone-current-linux-amd64.deb -o /tmp/rclone.deb",
" && dpkg -i /tmp/rclone.deb",
" && rm -f /tmp/rclone.deb",
);
let install_result = self.shell.exec_script(install_script, Some(120)).await;
match &install_result {
Ok(out) if out.success => {
tracing::info!("rclone installed successfully via .deb");
}
Ok(out) => {
tracing::debug!(
exit_code = out.exit_code,
stderr = out.stderr.trim(),
"dpkg install failed, falling back to install.sh"
);
let fallback_script = concat!(
"(command -v unzip >/dev/null 2>&1 || ",
"(apt-get update -qq && apt-get install -y -qq unzip)) && ",
"curl -sL https://rclone.org/install.sh | bash",
);
let fallback = self.shell.exec_script(fallback_script, Some(180)).await;
match &fallback {
Ok(o) if o.success => {
tracing::info!("rclone installed successfully via install.sh");
}
Ok(o) => {
return Err(InfraError::Init(format!(
"rclone install failed (exit {}): {}",
o.exit_code.unwrap_or(-1),
o.stderr.trim()
)));
}
Err(e) => {
return Err(InfraError::Init(format!(
"rclone install.sh exec failed: {e}"
)));
}
}
}
Err(e) => {
return Err(InfraError::Init(format!(
"rclone .deb install exec failed: {e}"
)));
}
}
let recheck = self.shell.exec(&["which", "rclone"], Some(10)).await;
match &recheck {
Ok(out) if out.success => {}
_ => {
return Err(InfraError::Init(
"rclone still not found after install attempt".to_string(),
));
}
}
}
let remote = self.remote.expose_secret();
self.exec_rclone_with_timeout(&["lsf", "--max-depth", "1", remote], 30)
.await
.map_err(|e| InfraError::Init(format!("rclone connectivity test failed: {e}")))?;
Ok(())
}
}
impl RcloneBackend {
fn sftp_flags_for_script(&self) -> &'static str {
if self.is_sftp() {
" --sftp-set-modtime=false --sftp-disable-hashcheck"
} else {
""
}
}
fn batch_chunk_size(&self) -> usize {
if self.is_sftp() {
SFTP_BATCH_CHUNK_SIZE
} else {
usize::MAX }
}
async fn exec_batch_chunked<F>(
&self,
relative_paths: &[String],
operation: &str,
build_script: F,
) -> HashMap<String, Result<(), InfraError>>
where
F: Fn(&[String], &str, &str, u64) -> String,
{
let chunk_size = self.batch_chunk_size();
let sftp_flags = self.sftp_flags_for_script();
let total = relative_paths.len();
let chunks: Vec<&[String]> = relative_paths.chunks(chunk_size).collect();
let num_chunks = chunks.len();
if num_chunks > 1 {
tracing::info!(
operation,
total,
num_chunks,
chunk_size,
"batch_{operation}: chunked transfer start"
);
}
let mut all_results = HashMap::with_capacity(total);
let mut completed = 0usize;
for (i, chunk) in chunks.iter().enumerate() {
let chunk_num = i + 1;
let chunk_timeout =
self.timeout_secs + (chunk.len() as u64 * BATCH_PER_FILE_TIMEOUT_SECS);
let list_filename =
format!("vdsl-{operation}-{}.txt", uuid::Uuid::new_v4().as_simple());
if num_chunks > 1 {
tracing::info!(
operation,
chunk = chunk_num,
num_chunks,
files = chunk.len(),
completed,
total,
"batch_{operation}: chunk start"
);
}
let script = build_script(chunk, &list_filename, sftp_flags, chunk_timeout);
let mut attempt = 0u32;
let chunk_result = loop {
let result = self.shell.exec_script(&script, Some(chunk_timeout)).await;
match &result {
Ok(output) if output.success => break Ok(()),
Ok(output) => {
let err_msg = format!(
"rclone failed (exit {}): {}",
output
.exit_code
.map_or("signal".to_string(), |c| c.to_string()),
output.stderr.trim()
);
if attempt < BATCH_CHUNK_MAX_RETRIES {
attempt += 1;
tracing::warn!(
operation,
chunk = chunk_num,
attempt,
error = %err_msg,
"batch_{operation}: chunk failed, retrying"
);
continue;
}
break Err(format!("batch {operation} failed: {err_msg}"));
}
Err(e) => {
if attempt < BATCH_CHUNK_MAX_RETRIES {
attempt += 1;
tracing::warn!(
operation,
chunk = chunk_num,
attempt,
error = %e,
"batch_{operation}: chunk failed, retrying"
);
continue;
}
break Err(format!("batch {operation} failed: {e}"));
}
}
};
match chunk_result {
Ok(()) => {
for p in *chunk {
all_results.insert(p.clone(), Ok(()));
}
completed += chunk.len();
}
Err(reason) => {
for p in *chunk {
all_results.insert(
p.clone(),
Err(InfraError::Transfer {
reason: reason.clone(),
}),
);
}
tracing::error!(
operation,
chunk = chunk_num,
failed_files = chunk.len(),
reason = %reason,
"batch_{operation}: chunk failed after retries, continuing"
);
}
}
if let Ok(guard) = self.progress.lock() {
if let Some(cb) = guard.as_ref() {
cb(&format!(
"{operation}: chunk {chunk_num}/{num_chunks} ({completed}/{total})"
));
}
}
if num_chunks > 1 {
tracing::info!(
operation,
chunk = chunk_num,
num_chunks,
completed,
total,
"batch_{operation}: chunk done"
);
}
}
if num_chunks > 1 {
let failed = total - completed;
tracing::info!(
operation,
total,
completed,
failed,
"batch_{operation}: all chunks done"
);
}
all_results
}
fn all_batch_err(
relative_paths: &[String],
reason: &str,
) -> HashMap<String, Result<(), InfraError>> {
relative_paths
.iter()
.map(|p| {
(
p.clone(),
Err(InfraError::Transfer {
reason: reason.to_string(),
}),
)
})
.collect()
}
}
fn parse_rclone_timestamp(s: &str) -> Option<DateTime<Utc>> {
let trimmed = s.trim();
NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S%.f")
.or_else(|_| NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S"))
.ok()
.map(|naive| naive.and_utc())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn remote_path_construction() {
let b = RcloneBackend::new(":b2,account=kid,key=k:bucket");
assert_eq!(
b.remote_path("models/ckpt.safetensors").unwrap(),
":b2,account=kid,key=k:bucket/models/ckpt.safetensors"
);
assert_eq!(
b.remote_path("/leading/slash").unwrap(),
":b2,account=kid,key=k:bucket/leading/slash"
);
assert_eq!(b.remote_path("").unwrap(), ":b2,account=kid,key=k:bucket");
}
#[test]
fn remote_path_rejects_flag_like_input() {
let b = RcloneBackend::new("remote:bucket");
assert!(b.remote_path("--config=/etc/rclone.conf").is_err());
assert!(b.remote_path("-v").is_err());
}
#[test]
fn remote_path_rejects_traversal() {
let b = RcloneBackend::new("remote:bucket");
assert!(b.remote_path("../../etc/passwd").is_err());
assert!(b.remote_path("foo/../bar").is_err());
assert!(b.remote_path("..").is_err());
assert!(b.remote_path("./valid").is_ok());
assert!(b.remote_path("a/.../b").is_ok());
}
#[test]
fn backend_type() {
let b = RcloneBackend::new("remote:bucket");
assert_eq!(b.backend_type(), "rclone");
}
#[test]
fn parse_rclone_timestamp_nanoseconds() {
let ts = parse_rclone_timestamp("2024-01-15T10:30:00.123456789");
assert!(ts.is_some());
let dt = ts.unwrap();
assert_eq!(dt.year(), 2024);
assert_eq!(dt.month(), 1);
assert_eq!(dt.day(), 15);
assert_eq!(dt.hour(), 10);
assert_eq!(dt.minute(), 30);
}
#[test]
fn parse_rclone_timestamp_no_fraction() {
let ts = parse_rclone_timestamp("2024-01-15T10:30:00");
assert!(ts.is_some());
}
#[test]
fn parse_rclone_timestamp_invalid() {
assert!(parse_rclone_timestamp("not-a-date").is_none());
assert!(parse_rclone_timestamp("").is_none());
}
#[test]
fn is_sftp_detection() {
let sftp = RcloneBackend::new(":sftp,host=1.2.3.4,port=22,user=root:");
assert!(sftp.is_sftp());
assert_eq!(
sftp.sftp_flags_for_script(),
" --sftp-set-modtime=false --sftp-disable-hashcheck"
);
let b2 = RcloneBackend::new(":b2,account=kid,key=k:bucket");
assert!(!b2.is_sftp());
assert_eq!(b2.sftp_flags_for_script(), "");
}
use chrono::Datelike;
use chrono::Timelike;
}