use crate::config::Config;
use crate::error::{Error, Result};
use crate::local_fs::LocalFs;
use crate::transfer_commands::TransferCommands;
use crate::transfer_storage::{TransferStorage, split_s3_path};
use std::path::Path;
use std::time::Duration;
use tokio::time::sleep;
const MAX_POLLS: u32 = 120;
const POLL_INTERVAL: Duration = Duration::from_secs(1);
pub async fn get_file<TC, TS, LF>(
transfer: &TC,
storage: &TS,
local_fs: &LF,
config: &Config,
remote_file_path: &str,
local_path: &Path,
_remote_directory_path: &str,
) -> Result<()>
where
TC: TransferCommands,
TS: TransferStorage,
LF: LocalFs,
{
let local_directory_path = config.retrieve_prefix();
let (bucket, key_prefix) = split_s3_path(&local_directory_path);
let transfer_id = transfer
.start_file_transfer_retrieve(
&config.connector_id,
remote_file_path,
&local_directory_path,
)
.await
.map_err(|e| {
e.with("remote_file_path", remote_file_path)
.with("connector_id", &config.connector_id)
})?;
if transfer_id.is_empty() {
return Err(
Error::invalid_input("Missing TransferId").with("remote_file_path", remote_file_path)
);
}
wait_for_transfer_complete(
transfer,
config,
&transfer_id,
Some(remote_file_path),
None::<&Path>,
)
.await?;
let base_name = Path::new(remote_file_path)
.file_name()
.and_then(|p| p.to_str())
.unwrap_or("file");
let s3_key = format!("{key_prefix}{base_name}");
let bytes = storage.get_object(bucket, &s3_key).await.map_err(|e| {
e.with("remote_file_path", remote_file_path)
.with("s3_key", &s3_key)
})?;
local_fs
.write_all(local_path, &bytes)
.await
.map_err(|e| e.with("remote_file_path", remote_file_path))?;
Ok(())
}
pub async fn put_file<TC, TS, LF>(
transfer: &TC,
storage: &TS,
local_fs: &LF,
config: &Config,
local_path: &Path,
remote_directory_path: &str,
) -> Result<()>
where
TC: TransferCommands,
TS: TransferStorage,
LF: LocalFs,
{
let send_prefix = config.send_prefix();
let (bucket, key_prefix) = split_s3_path(&send_prefix);
let file_name = local_path
.file_name()
.and_then(|p| p.to_str())
.unwrap_or("upload");
let unique_key = format!("{}{}", uuid_simple(), file_name);
let s3_key = if key_prefix.is_empty() {
unique_key
} else {
format!("{key_prefix}/{unique_key}")
};
let bytes = local_fs
.read_all(local_path)
.await
.map_err(|e| e.with("local_path", local_path.display().to_string()))?;
storage
.put_object(bucket, &s3_key, bytes)
.await
.map_err(|e| {
e.with("local_path", local_path.display().to_string())
.with("s3_key", &s3_key)
})?;
let send_path = format!("{bucket}/{s3_key}");
let transfer_id = transfer
.start_file_transfer_send(&config.connector_id, &send_path, remote_directory_path)
.await
.map_err(|e| {
e.with("connector_id", &config.connector_id)
.with("send_path", &send_path)
.with("remote_directory_path", remote_directory_path)
})?;
if transfer_id.is_empty() {
return Err(Error::invalid_input("Missing TransferId")
.with("local_path", local_path.display().to_string())
.with("remote_directory_path", remote_directory_path));
}
wait_for_transfer_complete(
transfer,
config,
&transfer_id,
None::<&str>,
Some(local_path),
)
.await?;
Ok(())
}
async fn wait_for_transfer_complete<TC>(
transfer: &TC,
config: &Config,
transfer_id: &str,
remote_file_path: Option<&str>,
local_path: Option<&Path>,
) -> Result<()>
where
TC: TransferCommands,
{
for _ in 0..MAX_POLLS {
let results = transfer
.list_file_transfer_results(&config.connector_id, transfer_id)
.await
.map_err(|e| e.with("transfer_id", transfer_id))?;
if results.is_empty() {
sleep(POLL_INTERVAL).await;
continue;
}
let all_done = results.iter().all(|r| {
let s = r.status_code.as_str();
s == "COMPLETED" || s == "SUCCESS"
});
let any_failed = results
.iter()
.any(|r| r.status_code == "FAILED" || r.status_code == "ERROR");
if any_failed {
let msg = results
.iter()
.find(|r| r.status_code == "FAILED")
.and_then(|r| r.failure_message.clone())
.unwrap_or_else(|| "Transfer failed".to_string());
let mut err = Error::api_permanent(msg).with("transfer_id", transfer_id);
if let Some(p) = remote_file_path {
err = err.with("remote_file_path", p);
}
if let Some(p) = local_path {
err = err.with("local_path", p.display().to_string());
}
return Err(err);
}
if all_done {
return Ok(());
}
sleep(POLL_INTERVAL).await;
}
let mut err = Error::timeout(format!(
"Transfer {transfer_id} did not complete within timeout"
))
.with("transfer_id", transfer_id);
if let Some(p) = remote_file_path {
err = err.with("remote_file_path", p);
}
if let Some(p) = local_path {
err = err.with("local_path", p.display().to_string());
}
Err(err)
}
fn uuid_simple() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let t = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
format!("{t:x}-")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::Config;
use crate::local_fs::MemoryLocalFs;
use crate::transfer_commands::MemoryTransferCommands;
use crate::transfer_storage::MemoryTransferStorage;
use std::path::Path;
use std::sync::Arc;
fn test_config() -> Config {
Config::new(
"c-test".to_string(),
"bucket/transfer-cli/".to_string(),
None,
None,
std::path::PathBuf::from("/mem"),
)
}
#[tokio::test]
async fn get_file_with_memory_backend() {
let storage = Arc::new(MemoryTransferStorage::new());
let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
let local_fs = MemoryLocalFs::new();
let config = test_config();
let remote_path = "/remote/foo.txt";
let local_path = Path::new("/mem/foo.txt");
let result = get_file(
&transfer,
&*storage,
&local_fs,
&config,
remote_path,
local_path,
"/remote",
)
.await;
assert!(result.is_ok());
let bytes = local_fs.read_all(local_path).await.unwrap();
assert_eq!(bytes, b"mock retrieved content");
}
#[tokio::test]
async fn put_file_when_local_file_missing_returns_error() {
let storage = Arc::new(MemoryTransferStorage::new());
let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
let local_fs = MemoryLocalFs::new();
let config = test_config();
let local_path = Path::new("/mem/nonexistent.txt");
let result = put_file(
&transfer, &*storage, &local_fs, &config, local_path, "/remote",
)
.await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind, crate::error::ErrorKind::NotFound);
}
#[tokio::test]
async fn put_file_with_memory_backend() {
let storage = Arc::new(MemoryTransferStorage::new());
let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
let local_fs = MemoryLocalFs::new();
let config = test_config();
let local_path = Path::new("/mem/upload.txt");
let content = b"uploaded file content";
local_fs.write_all(local_path, content).await.unwrap();
let result = put_file(
&transfer, &*storage, &local_fs, &config, local_path, "/remote",
)
.await;
assert!(result.is_ok());
let send_prefix = config.send_prefix();
let (bucket, key_prefix) = split_s3_path(&send_prefix);
let keys: Vec<_> = storage
.test_keys()
.into_iter()
.filter(|(b, k)| b == bucket && k.starts_with(key_prefix))
.collect();
assert_eq!(keys.len(), 1);
let (bucket, key) = keys.first().expect("one key");
let stored = storage.get_object(bucket, key).await.unwrap();
assert_eq!(stored, content);
}
#[tokio::test]
async fn full_flow_list_get_put_with_memory_backend() {
use crate::listing::{DirectoryListing, ListedFile, list_directory};
let storage = Arc::new(MemoryTransferStorage::new());
let default_listing = DirectoryListing {
files: vec![ListedFile {
file_path: "/remote/data.bin".to_string(),
modified_timestamp: None,
size: Some(10),
}],
paths: vec![],
truncated: false,
};
let transfer =
MemoryTransferCommands::new(Arc::clone(&storage)).with_default_listing(default_listing);
let local_fs = MemoryLocalFs::new();
let config = test_config();
let listing = list_directory(&transfer, &*storage, &config, "/remote", Some(100)).await;
assert!(listing.is_ok());
let listing = listing.unwrap();
assert_eq!(listing.files.len(), 1);
assert_eq!(
listing.files.first().map(|f| f.file_path.as_str()),
Some("/remote/data.bin")
);
let remote_path = "/remote/data.bin";
let local_path = Path::new("/mem/data.bin");
get_file(
&transfer,
&*storage,
&local_fs,
&config,
remote_path,
local_path,
"/remote",
)
.await
.unwrap();
let bytes = local_fs.read_all(local_path).await.unwrap();
assert_eq!(bytes, b"mock retrieved content");
let put_path = Path::new("/mem/put_back.txt");
local_fs
.write_all(put_path, b"put back content")
.await
.unwrap();
put_file(
&transfer, &*storage, &local_fs, &config, put_path, "/remote",
)
.await
.unwrap();
let send_prefix = config.send_prefix();
let (bucket, key_prefix) = split_s3_path(&send_prefix);
let send_keys: Vec<_> = storage
.test_keys()
.into_iter()
.filter(|(b, k)| b == bucket && k.starts_with(key_prefix))
.collect();
assert!(!send_keys.is_empty());
let one_key = send_keys.first().expect("at least one send key");
let stored = storage.get_object(&one_key.0, &one_key.1).await.unwrap();
assert_eq!(stored, b"put back content");
}
}