use crate::config::Config;
use crate::error::{Error, Result};
use crate::local_fs::LocalFs;
use crate::transfer_commands::TransferCommands;
use crate::transfer_storage::{TransferStorage, build_s3_key, split_s3_path};
use crate::types::{RemotePath, TransferId};
use std::path::Path;
use std::time::Duration;
use tokio::time::sleep;
const MAX_POLLS: u32 = 120;
const POLL_INTERVAL: Duration = Duration::from_secs(1);
pub async fn get_file<TC, TS, LF>(
transfer: &TC,
storage: &TS,
local_fs: &LF,
config: &Config,
remote_file_path: &RemotePath,
local_path: &Path,
_remote_directory_path: &str,
) -> Result<()>
where
TC: TransferCommands,
TS: TransferStorage,
LF: LocalFs,
{
let local_directory_path = config.retrieve_prefix();
let (bucket, key_prefix) = split_s3_path(&local_directory_path);
let transfer_id: crate::types::TransferId = transfer
.start_file_transfer_retrieve(
&config.connector_id,
remote_file_path,
&local_directory_path,
)
.await
.map_err(|e: Error| {
e.with("remote_file_path", remote_file_path.as_str())
.with("connector_id", config.connector_id.as_str())
})?;
wait_for_transfer_complete(
transfer,
config,
&transfer_id,
Some(remote_file_path.as_str()),
None::<&Path>,
)
.await?;
let base_name = Path::new(remote_file_path.as_str())
.file_name()
.and_then(|p| p.to_str())
.unwrap_or("file");
let s3_key = build_s3_key(key_prefix.as_str(), base_name);
let bytes = storage
.get_object(bucket.as_str(), &s3_key)
.await
.map_err(|e| {
e.with("remote_file_path", remote_file_path.as_str())
.with("s3_key", &s3_key)
})?;
local_fs
.write_all(local_path, &bytes)
.await
.map_err(|e| e.with("remote_file_path", remote_file_path.as_str()))?;
Ok(())
}
pub async fn put_file<TC, TS, LF>(
transfer: &TC,
storage: &TS,
local_fs: &LF,
config: &Config,
local_path: &Path,
remote_directory_path: &str,
) -> Result<()>
where
TC: TransferCommands,
TS: TransferStorage,
LF: LocalFs,
{
let send_prefix = config.send_prefix();
let (bucket, key_prefix) = split_s3_path(&send_prefix);
let file_name = local_path
.file_name()
.and_then(|p| p.to_str())
.unwrap_or("upload");
let unique_key = format!("{}{}", uuid_simple(), file_name);
let s3_key = build_s3_key(key_prefix.as_str(), &unique_key);
let bytes = local_fs
.read_all(local_path)
.await
.map_err(|e| e.with("local_path", local_path.display().to_string()))?;
storage
.put_object(bucket.as_str(), &s3_key, bytes)
.await
.map_err(|e| {
e.with("local_path", local_path.display().to_string())
.with("s3_key", &s3_key)
})?;
let send_path = format!("{bucket}/{s3_key}");
let remote_dir = RemotePath::from(remote_directory_path.to_string());
let transfer_id: crate::types::TransferId = transfer
.start_file_transfer_send(&config.connector_id, &send_path, &remote_dir)
.await
.map_err(|e: Error| {
e.with("connector_id", config.connector_id.as_str())
.with("send_path", &send_path)
.with("remote_directory_path", remote_directory_path)
})?;
wait_for_transfer_complete(
transfer,
config,
&transfer_id,
None::<&str>,
Some(local_path),
)
.await?;
Ok(())
}
pub async fn delete_file<TC>(transfer: &TC, config: &Config, remote_path: &str) -> Result<()>
where
TC: crate::transfer_commands::TransferCommands,
{
transfer
.start_remote_delete(&config.connector_id, remote_path)
.await
.map_err(|e| e.with("remote_path", remote_path))
}
pub async fn move_file<TC>(
transfer: &TC,
config: &Config,
remote_src: &str,
remote_dest: &str,
) -> Result<()>
where
TC: crate::transfer_commands::TransferCommands,
{
transfer
.start_remote_move(&config.connector_id, remote_src, remote_dest)
.await
.map_err(|e| {
e.with("remote_src", remote_src)
.with("remote_dest", remote_dest)
})
}
async fn wait_for_transfer_complete<TC>(
transfer: &TC,
config: &Config,
transfer_id: &TransferId,
remote_file_path: Option<&str>,
local_path: Option<&Path>,
) -> Result<()>
where
TC: TransferCommands,
{
for _ in 0..MAX_POLLS {
let results = transfer
.list_file_transfer_results(&config.connector_id, transfer_id)
.await
.map_err(|e| e.with("transfer_id", transfer_id.as_str()))?;
if results.is_empty() {
sleep(POLL_INTERVAL).await;
continue;
}
let all_done = results.iter().all(|r| {
let s = r.status_code.as_str();
s == "COMPLETED" || s == "SUCCESS"
});
let any_failed = results
.iter()
.any(|r| r.status_code == "FAILED" || r.status_code == "ERROR");
if any_failed {
let msg = results
.iter()
.find(|r| r.status_code == "FAILED")
.and_then(|r| r.failure_message.clone())
.unwrap_or_else(|| "Transfer failed".to_string());
let err = add_transfer_context(
Error::api_permanent(msg).with("transfer_id", transfer_id.as_str()),
remote_file_path,
local_path,
);
return Err(err);
}
if all_done {
return Ok(());
}
sleep(POLL_INTERVAL).await;
}
let err = add_transfer_context(
Error::timeout(format!(
"Transfer {} did not complete within timeout",
transfer_id.as_str()
))
.with("transfer_id", transfer_id.as_str()),
remote_file_path,
local_path,
);
Err(err)
}
fn add_transfer_context(
mut err: Error,
remote_file_path: Option<&str>,
local_path: Option<&Path>,
) -> Error {
if let Some(p) = remote_file_path {
err = err.with("remote_file_path", p);
}
if let Some(p) = local_path {
err = err.with("local_path", p.display().to_string());
}
err
}
fn uuid_simple() -> String {
format!("{}-", uuid::Uuid::new_v4())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::Config;
use crate::local_fs::MemoryLocalFs;
use crate::transfer_commands::MemoryTransferCommands;
use crate::transfer_storage::MemoryTransferStorage;
use std::path::Path;
use std::sync::Arc;
fn test_config() -> Config {
crate::config::test_config()
}
#[tokio::test]
async fn get_file_with_memory_backend() {
let storage = Arc::new(MemoryTransferStorage::new());
let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
let local_fs = MemoryLocalFs::new();
let config = test_config();
let remote_path = "/remote/foo.txt";
let local_path = Path::new("/mem/foo.txt");
let remote_path_rp = RemotePath::from(remote_path.to_string());
let result = get_file(
&transfer,
&*storage,
&local_fs,
&config,
&remote_path_rp,
local_path,
"/remote",
)
.await;
assert!(result.is_ok());
let bytes = local_fs.read_all(local_path).await.unwrap();
assert_eq!(bytes, b"mock retrieved content");
}
#[tokio::test]
async fn put_file_when_local_file_missing_returns_error() {
let storage = Arc::new(MemoryTransferStorage::new());
let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
let local_fs = MemoryLocalFs::new();
let config = test_config();
let local_path = Path::new("/mem/nonexistent.txt");
let result = put_file(
&transfer, &*storage, &local_fs, &config, local_path, "/remote",
)
.await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind, crate::error::ErrorKind::NotFound);
}
#[tokio::test]
async fn put_file_with_memory_backend() {
let storage = Arc::new(MemoryTransferStorage::new());
let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
let local_fs = MemoryLocalFs::new();
let config = test_config();
let local_path = Path::new("/mem/upload.txt");
let content = b"uploaded file content";
local_fs.write_all(local_path, content).await.unwrap();
let result = put_file(
&transfer, &*storage, &local_fs, &config, local_path, "/remote",
)
.await;
assert!(result.is_ok());
let send_prefix = config.send_prefix();
let (bucket, key_prefix) = split_s3_path(&send_prefix);
let keys: Vec<_> = storage
.test_keys()
.into_iter()
.filter(|(b, k)| b == bucket.as_str() && k.starts_with(key_prefix.as_str()))
.collect();
assert_eq!(keys.len(), 1);
let (bucket_s, key) = keys.first().expect("one key");
let stored = storage.get_object(bucket_s, key).await.unwrap();
assert_eq!(stored, content);
}
#[tokio::test]
async fn full_flow_list_get_put_with_memory_backend() {
use crate::listing::{DirectoryListing, ListedFile, list_directory};
let storage = Arc::new(MemoryTransferStorage::new());
let default_listing = DirectoryListing {
files: vec![ListedFile {
file_path: "/remote/data.bin".to_string(),
modified_timestamp: None,
size: Some(10),
}],
paths: vec![],
truncated: false,
};
let transfer =
MemoryTransferCommands::new(Arc::clone(&storage)).with_default_listing(default_listing);
let local_fs = MemoryLocalFs::new();
let config = test_config();
let listing = list_directory(
&transfer,
&*storage,
&config,
&RemotePath::from("/remote"),
Some(100),
)
.await;
assert!(listing.is_ok());
let listing = listing.unwrap();
assert_eq!(listing.files.len(), 1);
assert_eq!(
listing.files.first().map(|f| f.file_path.as_str()),
Some("/remote/data.bin")
);
let remote_path = "/remote/data.bin";
let local_path = Path::new("/mem/data.bin");
get_file(
&transfer,
&*storage,
&local_fs,
&config,
&RemotePath::from(remote_path),
local_path,
"/remote",
)
.await
.unwrap();
let bytes = local_fs.read_all(local_path).await.unwrap();
assert_eq!(bytes, b"mock retrieved content");
let put_path = Path::new("/mem/put_back.txt");
local_fs
.write_all(put_path, b"put back content")
.await
.unwrap();
put_file(
&transfer, &*storage, &local_fs, &config, put_path, "/remote",
)
.await
.unwrap();
let send_prefix = config.send_prefix();
let (bucket, key_prefix) = split_s3_path(&send_prefix);
let send_keys: Vec<_> = storage
.test_keys()
.into_iter()
.filter(|(b, k)| b == bucket.as_str() && k.starts_with(key_prefix.as_str()))
.collect();
assert!(!send_keys.is_empty());
let one_key = send_keys.first().expect("at least one send key");
let stored = storage.get_object(&one_key.0, &one_key.1).await.unwrap();
assert_eq!(stored, b"put back content");
}
}