1use std::collections::HashMap;
10use std::path::Path;
11use std::sync::Mutex as StdMutex;
12
13use async_trait::async_trait;
14use chrono::{DateTime, NaiveDateTime, Utc};
15use secrecy::{ExposeSecret, SecretBox};
16
17use super::backend::{ProgressFn, RemoteFile, StorageBackend};
18use super::shell::{LocalShell, RemoteShell};
19use crate::infra::error::InfraError;
20
21const DEFAULT_RCLONE_TIMEOUT_SECS: u64 = 300;
26
27const RCLONE_TIMEOUT_ENV: &str = "VDSL_RCLONE_TIMEOUT";
32
33const MIN_RCLONE_TIMEOUT_SECS: u64 = 10;
35
36const BATCH_PER_FILE_TIMEOUT_SECS: u64 = 30;
42
43const SFTP_OPTIMIZATION_FLAGS: &[&str] = &["--sftp-set-modtime=false", "--sftp-disable-hashcheck"];
55
56const SFTP_BATCH_CHUNK_SIZE: usize = 100;
64
65const BATCH_CHUNK_MAX_RETRIES: u32 = 1;
67
68fn resolve_timeout(explicit: Option<u64>) -> u64 {
77 let raw = explicit
78 .or_else(|| {
79 std::env::var(RCLONE_TIMEOUT_ENV)
80 .ok()
81 .and_then(|v| v.parse::<u64>().ok())
82 })
83 .unwrap_or(DEFAULT_RCLONE_TIMEOUT_SECS);
84 raw.max(MIN_RCLONE_TIMEOUT_SECS)
85}
86
87pub struct RcloneBackend {
102 remote: SecretBox<String>,
105 shell: Box<dyn RemoteShell>,
107 timeout_secs: u64,
109 progress: StdMutex<Option<ProgressFn>>,
111}
112
113impl RcloneBackend {
114 pub fn new(remote: impl Into<String>) -> Self {
125 Self {
126 remote: SecretBox::new(Box::new(remote.into())),
127 shell: Box::new(LocalShell),
128 timeout_secs: resolve_timeout(None),
129 progress: StdMutex::new(None),
130 }
131 }
132
133 pub fn with_shell(remote: impl Into<String>, shell: Box<dyn RemoteShell>) -> Self {
135 Self {
136 remote: SecretBox::new(Box::new(remote.into())),
137 shell,
138 timeout_secs: resolve_timeout(None),
139 progress: StdMutex::new(None),
140 }
141 }
142
143 pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
148 self.timeout_secs = resolve_timeout(Some(timeout_secs));
149 self
150 }
151
152 fn remote_path(&self, path: &str) -> Result<String, InfraError> {
157 let path = path.trim_matches('/');
158 if path.starts_with('-') {
160 return Err(InfraError::Transfer {
161 reason: format!("invalid remote path (starts with '-'): {path}"),
162 });
163 }
164 if path.split('/').any(|seg| seg == "..") {
166 return Err(InfraError::Transfer {
167 reason: format!("invalid remote path (contains '..' traversal): {path}"),
168 });
169 }
170 let remote = self.remote.expose_secret();
171 if path.is_empty() {
172 Ok(remote.clone())
173 } else {
174 Ok(format!("{remote}/{path}"))
175 }
176 }
177
178 fn is_sftp(&self) -> bool {
183 self.remote.expose_secret().starts_with(":sftp")
184 }
185
186 async fn exec_rclone(&self, args: &[&str]) -> Result<String, InfraError> {
191 self.exec_rclone_with_timeout(args, self.timeout_secs).await
192 }
193
194 async fn exec_rclone_with_timeout(
199 &self,
200 args: &[&str],
201 timeout_secs: u64,
202 ) -> Result<String, InfraError> {
203 let mut full_args = vec!["rclone"];
204 full_args.extend_from_slice(args);
205 if self.is_sftp() {
206 full_args.extend_from_slice(SFTP_OPTIMIZATION_FLAGS);
207 }
208
209 let output = self.shell.exec(&full_args, Some(timeout_secs)).await?;
210
211 if !output.success {
212 return Err(InfraError::Transfer {
213 reason: format!(
214 "rclone failed (exit {}): {}",
215 output
216 .exit_code
217 .map_or("signal".to_string(), |c| c.to_string()),
218 output.stderr.trim()
219 ),
220 });
221 }
222
223 Ok(output.stdout)
224 }
225}
226
227#[async_trait]
228impl StorageBackend for RcloneBackend {
229 async fn push(&self, local_path: &Path, remote_path: &str) -> Result<(), InfraError> {
230 let dest = self.remote_path(remote_path)?;
231 let local_str = local_path.to_str().ok_or_else(|| -> InfraError {
232 InfraError::Transfer {
233 reason: format!(
234 "local path is not valid UTF-8: {}",
235 local_path.to_string_lossy()
236 ),
237 }
238 })?;
239 self.exec_rclone(&["copyto", local_str, &dest]).await?;
240 Ok(())
241 }
242
243 async fn pull(&self, remote_path: &str, local_path: &Path) -> Result<(), InfraError> {
244 let src = self.remote_path(remote_path)?;
245 if let Some(parent) = local_path.parent() {
247 if let Some(parent_str) = parent.to_str() {
248 if !parent_str.is_empty() {
249 let _ = self
250 .shell
251 .exec(&["mkdir", "-p", parent_str], Some(10))
252 .await;
253 }
254 }
255 }
256 let local_str = local_path.to_str().ok_or_else(|| -> InfraError {
257 InfraError::Transfer {
258 reason: format!(
259 "local path is not valid UTF-8: {}",
260 local_path.to_string_lossy()
261 ),
262 }
263 })?;
264 self.exec_rclone(&["copyto", &src, local_str]).await?;
265 Ok(())
266 }
267
268 async fn list(&self, remote_path: &str) -> Result<Vec<RemoteFile>, InfraError> {
269 let target = self.remote_path(remote_path)?;
270 let output = self
274 .exec_rclone(&["lsf", "--format", "pst", "--files-only", &target])
275 .await?;
276
277 let mut files = Vec::new();
278 for line in output.lines() {
279 let parts: Vec<&str> = line.splitn(3, ';').collect();
280 if parts.len() < 2 {
281 continue;
282 }
283 let path = parts[0];
284 let size = match parts[1].trim().parse::<u64>() {
285 Ok(s) => Some(s),
286 Err(e) => {
287 tracing::debug!(
288 path = path,
289 raw_size = parts[1].trim(),
290 error = %e,
291 "rclone lsf: size parse failed, treating as unknown"
292 );
293 None
294 }
295 };
296 let modified_at = parts.get(2).and_then(|ts| parse_rclone_timestamp(ts));
297 files.push(RemoteFile {
298 path: path.to_string(),
299 size,
300 modified_at,
301 });
302 }
303 Ok(files)
304 }
305
306 async fn exists(&self, remote_path: &str) -> Result<bool, InfraError> {
310 let target = self.remote_path(remote_path)?;
311 let result = self.exec_rclone(&["lsf", &target]).await;
312 match result {
313 Ok(output) => Ok(!output.trim().is_empty()),
314 Err(e) => {
315 tracing::debug!(
316 remote_path = remote_path,
317 error = %e,
318 "rclone exists check failed, returning false"
319 );
320 Ok(false)
321 }
322 }
323 }
324
325 async fn delete(&self, remote_path: &str) -> Result<(), InfraError> {
326 let target = self.remote_path(remote_path)?;
327 match self
334 .exec_rclone(&["deletefile", &target, "--retries", "1"])
335 .await
336 {
337 Ok(_) => Ok(()),
338 Err(e) => {
339 let msg = e.to_string();
340 if msg.contains("exit 4") || msg.contains("not found") {
343 tracing::debug!(
344 remote_path = remote_path,
345 "rclone deletefile: object already absent, treating as success"
346 );
347 Ok(())
348 } else {
349 Err(e)
350 }
351 }
352 }
353 }
354
355 async fn push_batch(
363 &self,
364 src_root: &Path,
365 dest_root: &str,
366 relative_paths: &[String],
367 ) -> HashMap<String, Result<(), InfraError>> {
368 if relative_paths.is_empty() {
369 return HashMap::new();
370 }
371
372 let dest_full = match self.remote_path(dest_root) {
373 Ok(d) => d,
374 Err(_) => {
375 let reason = format!("invalid dest_root for batch push: {dest_root}");
376 return Self::all_batch_err(relative_paths, &reason);
377 }
378 };
379
380 let src_root_str = match src_root.to_str() {
381 Some(s) => s.to_string(),
382 None => {
383 let reason = format!(
384 "src_root is not valid UTF-8: {}",
385 src_root.to_string_lossy()
386 );
387 return Self::all_batch_err(relative_paths, &reason);
388 }
389 };
390
391 self.exec_batch_chunked(
392 relative_paths,
393 "push",
394 |chunk, list_filename, sftp_flags, _chunk_timeout| {
395 let file_list = chunk.join("\n");
396 let src = &src_root_str;
397 let dest = &dest_full;
398 format!(
399 "cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
400 {file_list}\n\
401 __VDSL_EOF__\n\
402 rclone copy {src} {dest} \
403 --files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
404 _rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
405 )
406 },
407 )
408 .await
409 }
410
411 async fn pull_batch(
415 &self,
416 src_root: &str,
417 dest_root: &Path,
418 relative_paths: &[String],
419 ) -> HashMap<String, Result<(), InfraError>> {
420 if relative_paths.is_empty() {
421 return HashMap::new();
422 }
423
424 let src_full = match self.remote_path(src_root) {
425 Ok(s) => s,
426 Err(_) => {
427 let reason = format!("invalid src_root for batch pull: {src_root}");
428 return Self::all_batch_err(relative_paths, &reason);
429 }
430 };
431
432 let dest_root_str = match dest_root.to_str() {
433 Some(s) => s.to_string(),
434 None => {
435 let reason = format!(
436 "dest_root is not valid UTF-8: {}",
437 dest_root.to_string_lossy()
438 );
439 return Self::all_batch_err(relative_paths, &reason);
440 }
441 };
442
443 self.exec_batch_chunked(
444 relative_paths,
445 "pull",
446 |chunk, list_filename, sftp_flags, _chunk_timeout| {
447 let file_list = chunk.join("\n");
448 let src = &src_full;
449 let dest = &dest_root_str;
450 format!(
451 "cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
452 {file_list}\n\
453 __VDSL_EOF__\n\
454 rclone copy {src} {dest} \
455 --files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
456 _rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
457 )
458 },
459 )
460 .await
461 }
462
463 async fn delete_batch(
467 &self,
468 remote_root: &str,
469 relative_paths: &[String],
470 ) -> HashMap<String, Result<(), InfraError>> {
471 if relative_paths.is_empty() {
472 return HashMap::new();
473 }
474
475 let remote_full = match self.remote_path(remote_root) {
476 Ok(r) => r,
477 Err(_) => {
478 return Self::all_batch_err(
479 relative_paths,
480 &format!("invalid remote_root for batch delete: {remote_root}"),
481 );
482 }
483 };
484
485 self.exec_batch_chunked(
486 relative_paths,
487 "delete",
488 |chunk, list_filename, sftp_flags, _chunk_timeout| {
489 let file_list = chunk.join("\n");
490 let dest = &remote_full;
491 format!(
492 "cat <<'__VDSL_EOF__' > /tmp/{list_filename}\n\
493 {file_list}\n\
494 __VDSL_EOF__\n\
495 rclone delete {dest} \
496 --files-from /tmp/{list_filename} --transfers 8{sftp_flags}; \
497 _rc=$?; rm -f /tmp/{list_filename}; exit $_rc"
498 )
499 },
500 )
501 .await
502 }
503
504 fn supports_batch(&self) -> bool {
505 true
506 }
507
508 fn backend_type(&self) -> &str {
509 "rclone"
510 }
511
512 fn set_progress_callback(&self, callback: Option<ProgressFn>) {
513 if let Ok(mut guard) = self.progress.lock() {
514 *guard = callback;
515 }
516 }
517
518 async fn ensure(&self) -> Result<(), InfraError> {
519 let check = self.shell.exec(&["which", "rclone"], Some(10)).await;
521 let rclone_found = matches!(&check, Ok(out) if out.success);
522
523 if !rclone_found {
524 tracing::info!("rclone not found, attempting install via .deb package");
526 let install_script = concat!(
527 "curl -sL https://downloads.rclone.org/rclone-current-linux-amd64.deb -o /tmp/rclone.deb",
528 " && dpkg -i /tmp/rclone.deb",
529 " && rm -f /tmp/rclone.deb",
530 );
531 let install_result = self.shell.exec_script(install_script, Some(120)).await;
532
533 match &install_result {
534 Ok(out) if out.success => {
535 tracing::info!("rclone installed successfully via .deb");
536 }
537 Ok(out) => {
538 tracing::debug!(
540 exit_code = out.exit_code,
541 stderr = out.stderr.trim(),
542 "dpkg install failed, falling back to install.sh"
543 );
544 let fallback = self
545 .shell
546 .exec_script("curl -sL https://rclone.org/install.sh | bash", Some(120))
547 .await;
548 match &fallback {
549 Ok(o) if o.success => {
550 tracing::info!("rclone installed successfully via install.sh");
551 }
552 Ok(o) => {
553 return Err(InfraError::Init(format!(
554 "rclone install failed (exit {}): {}",
555 o.exit_code.unwrap_or(-1),
556 o.stderr.trim()
557 )));
558 }
559 Err(e) => {
560 return Err(InfraError::Init(format!(
561 "rclone install.sh exec failed: {e}"
562 )));
563 }
564 }
565 }
566 Err(e) => {
567 return Err(InfraError::Init(format!(
568 "rclone .deb install exec failed: {e}"
569 )));
570 }
571 }
572
573 let recheck = self.shell.exec(&["which", "rclone"], Some(10)).await;
575 match &recheck {
576 Ok(out) if out.success => {}
577 _ => {
578 return Err(InfraError::Init(
579 "rclone still not found after install attempt".to_string(),
580 ));
581 }
582 }
583 }
584
585 let remote = self.remote.expose_secret();
587 self.exec_rclone_with_timeout(&["lsf", "--max-depth", "1", remote], 30)
588 .await
589 .map_err(|e| InfraError::Init(format!("rclone connectivity test failed: {e}")))?;
590
591 Ok(())
592 }
593}
594
595impl RcloneBackend {
596 fn sftp_flags_for_script(&self) -> &'static str {
601 if self.is_sftp() {
602 " --sftp-set-modtime=false --sftp-disable-hashcheck"
603 } else {
604 ""
605 }
606 }
607
608 fn batch_chunk_size(&self) -> usize {
610 if self.is_sftp() {
611 SFTP_BATCH_CHUNK_SIZE
612 } else {
613 usize::MAX }
615 }
616
617 async fn exec_batch_chunked<F>(
626 &self,
627 relative_paths: &[String],
628 operation: &str,
629 build_script: F,
630 ) -> HashMap<String, Result<(), InfraError>>
631 where
632 F: Fn(&[String], &str, &str, u64) -> String,
633 {
634 let chunk_size = self.batch_chunk_size();
635 let sftp_flags = self.sftp_flags_for_script();
636 let total = relative_paths.len();
637 let chunks: Vec<&[String]> = relative_paths.chunks(chunk_size).collect();
638 let num_chunks = chunks.len();
639
640 if num_chunks > 1 {
641 tracing::info!(
642 operation,
643 total,
644 num_chunks,
645 chunk_size,
646 "batch_{operation}: chunked transfer start"
647 );
648 }
649
650 let mut all_results = HashMap::with_capacity(total);
651 let mut completed = 0usize;
652
653 for (i, chunk) in chunks.iter().enumerate() {
654 let chunk_num = i + 1;
655 let chunk_timeout =
656 self.timeout_secs + (chunk.len() as u64 * BATCH_PER_FILE_TIMEOUT_SECS);
657 let list_filename =
658 format!("vdsl-{operation}-{}.txt", uuid::Uuid::new_v4().as_simple());
659
660 if num_chunks > 1 {
661 tracing::info!(
662 operation,
663 chunk = chunk_num,
664 num_chunks,
665 files = chunk.len(),
666 completed,
667 total,
668 "batch_{operation}: chunk start"
669 );
670 }
671
672 let script = build_script(chunk, &list_filename, sftp_flags, chunk_timeout);
673
674 let mut attempt = 0u32;
675 let chunk_result = loop {
676 let result = self.shell.exec_script(&script, Some(chunk_timeout)).await;
677
678 match &result {
679 Ok(output) if output.success => break Ok(()),
680 Ok(output) => {
681 let err_msg = format!(
682 "rclone failed (exit {}): {}",
683 output
684 .exit_code
685 .map_or("signal".to_string(), |c| c.to_string()),
686 output.stderr.trim()
687 );
688 if attempt < BATCH_CHUNK_MAX_RETRIES {
689 attempt += 1;
690 tracing::warn!(
691 operation,
692 chunk = chunk_num,
693 attempt,
694 error = %err_msg,
695 "batch_{operation}: chunk failed, retrying"
696 );
697 continue;
698 }
699 break Err(format!("batch {operation} failed: {err_msg}"));
700 }
701 Err(e) => {
702 if attempt < BATCH_CHUNK_MAX_RETRIES {
703 attempt += 1;
704 tracing::warn!(
705 operation,
706 chunk = chunk_num,
707 attempt,
708 error = %e,
709 "batch_{operation}: chunk failed, retrying"
710 );
711 continue;
712 }
713 break Err(format!("batch {operation} failed: {e}"));
714 }
715 }
716 };
717
718 match chunk_result {
719 Ok(()) => {
720 for p in *chunk {
721 all_results.insert(p.clone(), Ok(()));
722 }
723 completed += chunk.len();
724 }
725 Err(reason) => {
726 for p in *chunk {
727 all_results.insert(
728 p.clone(),
729 Err(InfraError::Transfer {
730 reason: reason.clone(),
731 }),
732 );
733 }
734 tracing::error!(
736 operation,
737 chunk = chunk_num,
738 failed_files = chunk.len(),
739 reason = %reason,
740 "batch_{operation}: chunk failed after retries, continuing"
741 );
742 }
743 }
744
745 if let Ok(guard) = self.progress.lock() {
747 if let Some(cb) = guard.as_ref() {
748 cb(&format!(
749 "{operation}: chunk {chunk_num}/{num_chunks} ({completed}/{total})"
750 ));
751 }
752 }
753
754 if num_chunks > 1 {
755 tracing::info!(
756 operation,
757 chunk = chunk_num,
758 num_chunks,
759 completed,
760 total,
761 "batch_{operation}: chunk done"
762 );
763 }
764 }
765
766 if num_chunks > 1 {
767 let failed = total - completed;
768 tracing::info!(
769 operation,
770 total,
771 completed,
772 failed,
773 "batch_{operation}: all chunks done"
774 );
775 }
776
777 all_results
778 }
779
780 fn all_batch_err(
782 relative_paths: &[String],
783 reason: &str,
784 ) -> HashMap<String, Result<(), InfraError>> {
785 relative_paths
786 .iter()
787 .map(|p| {
788 (
789 p.clone(),
790 Err(InfraError::Transfer {
791 reason: reason.to_string(),
792 }),
793 )
794 })
795 .collect()
796 }
797}
798
799fn parse_rclone_timestamp(s: &str) -> Option<DateTime<Utc>> {
804 let trimmed = s.trim();
805 NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S%.f")
807 .or_else(|_| NaiveDateTime::parse_from_str(trimmed, "%Y-%m-%dT%H:%M:%S"))
808 .ok()
809 .map(|naive| naive.and_utc())
810}
811
812#[cfg(test)]
813mod tests {
814 use super::*;
815
816 #[test]
817 fn remote_path_construction() {
818 let b = RcloneBackend::new(":b2,account=kid,key=k:bucket");
819 assert_eq!(
820 b.remote_path("models/ckpt.safetensors").unwrap(),
821 ":b2,account=kid,key=k:bucket/models/ckpt.safetensors"
822 );
823 assert_eq!(
824 b.remote_path("/leading/slash").unwrap(),
825 ":b2,account=kid,key=k:bucket/leading/slash"
826 );
827 assert_eq!(b.remote_path("").unwrap(), ":b2,account=kid,key=k:bucket");
828 }
829
830 #[test]
831 fn remote_path_rejects_flag_like_input() {
832 let b = RcloneBackend::new("remote:bucket");
833 assert!(b.remote_path("--config=/etc/rclone.conf").is_err());
834 assert!(b.remote_path("-v").is_err());
835 }
836
837 #[test]
838 fn remote_path_rejects_traversal() {
839 let b = RcloneBackend::new("remote:bucket");
840 assert!(b.remote_path("../../etc/passwd").is_err());
841 assert!(b.remote_path("foo/../bar").is_err());
842 assert!(b.remote_path("..").is_err());
843 assert!(b.remote_path("./valid").is_ok());
845 assert!(b.remote_path("a/.../b").is_ok());
847 }
848
849 #[test]
850 fn backend_type() {
851 let b = RcloneBackend::new("remote:bucket");
852 assert_eq!(b.backend_type(), "rclone");
853 }
854
855 #[test]
856 fn parse_rclone_timestamp_nanoseconds() {
857 let ts = parse_rclone_timestamp("2024-01-15T10:30:00.123456789");
858 assert!(ts.is_some());
859 let dt = ts.unwrap();
860 assert_eq!(dt.year(), 2024);
861 assert_eq!(dt.month(), 1);
862 assert_eq!(dt.day(), 15);
863 assert_eq!(dt.hour(), 10);
864 assert_eq!(dt.minute(), 30);
865 }
866
867 #[test]
868 fn parse_rclone_timestamp_no_fraction() {
869 let ts = parse_rclone_timestamp("2024-01-15T10:30:00");
870 assert!(ts.is_some());
871 }
872
873 #[test]
874 fn parse_rclone_timestamp_invalid() {
875 assert!(parse_rclone_timestamp("not-a-date").is_none());
876 assert!(parse_rclone_timestamp("").is_none());
877 }
878
879 #[test]
880 fn is_sftp_detection() {
881 let sftp = RcloneBackend::new(":sftp,host=1.2.3.4,port=22,user=root:");
882 assert!(sftp.is_sftp());
883 assert_eq!(
884 sftp.sftp_flags_for_script(),
885 " --sftp-set-modtime=false --sftp-disable-hashcheck"
886 );
887
888 let b2 = RcloneBackend::new(":b2,account=kid,key=k:bucket");
889 assert!(!b2.is_sftp());
890 assert_eq!(b2.sftp_flags_for_script(), "");
891 }
892
893 use chrono::Datelike;
894 use chrono::Timelike;
895}