use crate::error::{Error, Result};
use crate::listing::DirectoryListing;
use crate::retry;
use crate::transfer_storage::{TransferStorage, build_s3_key};
use crate::types::{ConnectorId, ListingId, OutputFileName, RemotePath, TransferId};
use std::future::Future;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
#[non_exhaustive]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TransferResultStatus {
pub status_code: String,
pub failure_message: Option<String>,
}
#[non_exhaustive]
#[derive(Clone, Debug)]
pub struct DirectoryListingStarted {
pub listing_id: ListingId,
pub output_file_name: OutputFileName,
}
#[allow(clippy::module_name_repetitions)]
pub trait TransferCommands: Send + Sync {
#[allow(clippy::manual_async_fn)]
fn start_file_transfer_retrieve(
&self,
connector_id: &ConnectorId,
remote_file_path: &RemotePath,
local_directory_path: &str,
) -> impl Future<Output = Result<TransferId>> + Send;
#[allow(clippy::manual_async_fn)]
fn start_file_transfer_send(
&self,
connector_id: &ConnectorId,
send_path: &str,
remote_directory_path: &RemotePath,
) -> impl Future<Output = Result<TransferId>> + Send;
#[allow(clippy::manual_async_fn)]
fn list_file_transfer_results(
&self,
connector_id: &ConnectorId,
transfer_id: &TransferId,
) -> impl Future<Output = Result<Vec<TransferResultStatus>>> + Send;
#[allow(clippy::manual_async_fn)]
fn start_directory_listing(
&self,
connector_id: &ConnectorId,
remote_directory_path: &RemotePath,
output_directory_path: &str,
max_items: Option<i32>,
) -> impl Future<Output = Result<DirectoryListingStarted>> + Send;
#[allow(clippy::manual_async_fn)]
fn start_remote_delete(
&self,
connector_id: &ConnectorId,
delete_path: &str,
) -> impl Future<Output = Result<()>> + Send;
#[allow(clippy::manual_async_fn)]
fn start_remote_move(
&self,
connector_id: &ConnectorId,
source_path: &str,
target_path: &str,
) -> impl Future<Output = Result<()>> + Send;
}
#[derive(Clone)]
pub struct AwsTransferCommands {
client: aws_sdk_transfer::Client,
}
impl AwsTransferCommands {
#[must_use]
pub const fn new(client: aws_sdk_transfer::Client) -> Self {
Self { client }
}
}
impl TransferCommands for AwsTransferCommands {
async fn start_file_transfer_retrieve(
&self,
connector_id: &ConnectorId,
remote_file_path: &RemotePath,
local_directory_path: &str,
) -> Result<TransferId> {
retry::with_retry_and_timeout(|| {
let client = self.client.clone();
let connector_id = connector_id.as_str().to_string();
let remote_file_path = remote_file_path.as_str().to_string();
let local_directory_path = local_directory_path.to_string();
async move {
let output = client
.start_file_transfer()
.connector_id(&connector_id)
.retrieve_file_paths(&remote_file_path)
.local_directory_path(&local_directory_path)
.send()
.await
.map_err(|e| {
let status = e.raw_response().map(|r| r.status().as_u16());
let msg = e.to_string();
Error::from_transfer_sdk_status(msg, status)
.with("remote_file_path", &remote_file_path)
.with("connector_id", &connector_id)
.with_source(e)
})?;
let transfer_id = output.transfer_id().to_string();
if transfer_id.is_empty() {
return Err(Error::invalid_input("Missing TransferId")
.with("remote_file_path", &remote_file_path));
}
Ok(TransferId::from(transfer_id))
}
})
.await
}
async fn start_file_transfer_send(
&self,
connector_id: &ConnectorId,
send_path: &str,
remote_directory_path: &RemotePath,
) -> Result<TransferId> {
retry::with_retry_and_timeout(|| {
let client = self.client.clone();
let connector_id = connector_id.as_str().to_string();
let send_path = send_path.to_string();
let remote_directory_path = remote_directory_path.as_str().to_string();
async move {
let output = client
.start_file_transfer()
.connector_id(&connector_id)
.send_file_paths(&send_path)
.remote_directory_path(&remote_directory_path)
.send()
.await
.map_err(|e| {
let status = e.raw_response().map(|r| r.status().as_u16());
let msg = e.to_string();
Error::from_transfer_sdk_status(msg, status)
.with("connector_id", &connector_id)
.with("send_path", &send_path)
.with_source(e)
})?;
let transfer_id = output.transfer_id().to_string();
if transfer_id.is_empty() {
return Err(Error::invalid_input("Missing TransferId")
.with("remote_directory_path", &remote_directory_path));
}
Ok(TransferId::from(transfer_id))
}
})
.await
}
async fn list_file_transfer_results(
&self,
connector_id: &ConnectorId,
transfer_id: &TransferId,
) -> Result<Vec<TransferResultStatus>> {
retry::with_retry_and_timeout(|| {
let client = self.client.clone();
let connector_id = connector_id.as_str().to_string();
let transfer_id = transfer_id.as_str().to_string();
async move {
let resp = client
.list_file_transfer_results()
.connector_id(&connector_id)
.transfer_id(&transfer_id)
.send()
.await
.map_err(|e| {
let status = e.raw_response().map(|r| r.status().as_u16());
let msg = e.to_string();
Error::from_transfer_sdk_status(msg, status)
.with("transfer_id", &transfer_id)
.with_source(e)
})?;
let results = resp
.file_transfer_results()
.iter()
.map(|r| TransferResultStatus {
status_code: r.status_code().as_str().to_string(),
failure_message: r.failure_message().map(String::from),
})
.collect();
Ok(results)
}
})
.await
}
async fn start_directory_listing(
&self,
connector_id: &ConnectorId,
remote_directory_path: &RemotePath,
output_directory_path: &str,
max_items: Option<i32>,
) -> Result<DirectoryListingStarted> {
retry::with_retry_and_timeout(|| {
let client = self.client.clone();
let connector_id = connector_id.as_str().to_string();
let remote_directory_path = remote_directory_path.as_str().to_string();
let output_directory_path = output_directory_path.to_string();
async move {
let output = client
.start_directory_listing()
.connector_id(&connector_id)
.remote_directory_path(&remote_directory_path)
.output_directory_path(&output_directory_path)
.set_max_items(max_items)
.send()
.await
.map_err(|e| {
let status = e.raw_response().map(|r| r.status().as_u16());
let msg = e.to_string();
Error::from_transfer_sdk_status(msg, status)
.with("remote_directory_path", &remote_directory_path)
.with("connector_id", &connector_id)
.with_source(e)
})?;
let listing_id = output.listing_id().to_string();
let output_file_name = output.output_file_name().to_string();
if listing_id.is_empty() {
return Err(Error::invalid_input(
"Missing ListingId in StartDirectoryListing response",
)
.with("remote_directory_path", &remote_directory_path));
}
if output_file_name.is_empty() {
return Err(Error::invalid_input(
"Missing OutputFileName in StartDirectoryListing response",
)
.with("remote_directory_path", &remote_directory_path));
}
Ok(DirectoryListingStarted {
listing_id: ListingId::from(listing_id),
output_file_name: OutputFileName::from(output_file_name),
})
}
})
.await
}
async fn start_remote_delete(
&self,
connector_id: &ConnectorId,
delete_path: &str,
) -> Result<()> {
retry::with_retry_and_timeout(|| {
let client = self.client.clone();
let connector_id = connector_id.as_str().to_string();
let delete_path = delete_path.to_string();
async move {
client
.start_remote_delete()
.connector_id(&connector_id)
.delete_path(&delete_path)
.send()
.await
.map_err(|e| {
let status = e.raw_response().map(|r| r.status().as_u16());
let msg = e.to_string();
Error::from_transfer_sdk_status(msg, status)
.with("delete_path", &delete_path)
.with("connector_id", &connector_id)
.with_source(e)
})?;
Ok(())
}
})
.await
}
async fn start_remote_move(
&self,
connector_id: &ConnectorId,
source_path: &str,
target_path: &str,
) -> Result<()> {
retry::with_retry_and_timeout(|| {
let client = self.client.clone();
let connector_id = connector_id.as_str().to_string();
let source_path = source_path.to_string();
let target_path = target_path.to_string();
async move {
client
.start_remote_move()
.connector_id(&connector_id)
.source_path(&source_path)
.target_path(&target_path)
.send()
.await
.map_err(|e| {
let status = e.raw_response().map(|r| r.status().as_u16());
let msg = e.to_string();
Error::from_transfer_sdk_status(msg, status)
.with("source_path", &source_path)
.with("target_path", &target_path)
.with("connector_id", &connector_id)
.with_source(e)
})?;
Ok(())
}
})
.await
}
}
pub struct MemoryTransferCommands {
storage: std::sync::Arc<crate::transfer_storage::MemoryTransferStorage>,
transfer_counter: AtomicU64,
listing_counter: AtomicU64,
default_listing: DirectoryListing,
#[allow(dead_code)]
listing_body_override: Option<Vec<u8>>,
#[allow(dead_code)]
empty_listing_response: bool,
}
impl MemoryTransferCommands {
#[allow(clippy::missing_const_for_fn)] #[must_use]
pub fn new(storage: std::sync::Arc<crate::transfer_storage::MemoryTransferStorage>) -> Self {
Self {
storage,
transfer_counter: AtomicU64::new(0),
listing_counter: AtomicU64::new(0),
default_listing: DirectoryListing {
files: vec![],
paths: vec![],
truncated: false,
},
listing_body_override: None,
empty_listing_response: false,
}
}
#[must_use]
pub fn with_default_listing(mut self, listing: DirectoryListing) -> Self {
self.default_listing = listing;
self
}
#[cfg(test)]
#[must_use]
pub fn with_listing_body(mut self, body: Vec<u8>) -> Self {
self.listing_body_override = Some(body);
self
}
#[cfg(test)]
#[must_use]
pub const fn with_empty_listing_response(mut self, empty: bool) -> Self {
self.empty_listing_response = empty;
self
}
}
impl Clone for MemoryTransferCommands {
fn clone(&self) -> Self {
Self {
storage: std::sync::Arc::clone(&self.storage),
transfer_counter: AtomicU64::new(self.transfer_counter.load(Ordering::SeqCst)),
listing_counter: AtomicU64::new(self.listing_counter.load(Ordering::SeqCst)),
default_listing: self.default_listing.clone(),
listing_body_override: self.listing_body_override.clone(),
empty_listing_response: self.empty_listing_response,
}
}
}
impl TransferCommands for MemoryTransferCommands {
async fn start_file_transfer_retrieve(
&self,
_connector_id: &ConnectorId,
remote_file_path: &RemotePath,
local_directory_path: &str,
) -> Result<TransferId> {
let storage = std::sync::Arc::clone(&self.storage);
let transfer_id = format!(
"mem-{}",
self.transfer_counter.fetch_add(1, Ordering::SeqCst)
);
let (bucket, key_prefix) = crate::transfer_storage::split_s3_path(local_directory_path);
let base_name = Path::new(remote_file_path.as_str())
.file_name()
.and_then(|p| p.to_str())
.unwrap_or("file");
let s3_key = build_s3_key(key_prefix.as_str(), base_name);
storage
.put_object(bucket.as_str(), &s3_key, b"mock retrieved content".to_vec())
.await?;
Ok(TransferId::from(transfer_id))
}
async fn start_file_transfer_send(
&self,
_connector_id: &ConnectorId,
_send_path: &str,
_remote_directory_path: &RemotePath,
) -> Result<TransferId> {
let transfer_id = format!(
"mem-send-{}",
self.transfer_counter.fetch_add(1, Ordering::SeqCst)
);
Ok(TransferId::from(transfer_id))
}
async fn list_file_transfer_results(
&self,
_connector_id: &ConnectorId,
_transfer_id: &TransferId,
) -> Result<Vec<TransferResultStatus>> {
Ok(vec![TransferResultStatus {
status_code: "COMPLETED".to_string(),
failure_message: None,
}])
}
async fn start_directory_listing(
&self,
_connector_id: &ConnectorId,
_remote_directory_path: &RemotePath,
output_directory_path: &str,
_max_items: Option<i32>,
) -> Result<DirectoryListingStarted> {
let storage = std::sync::Arc::clone(&self.storage);
let listing_id = format!(
"mem-listing-{}",
self.listing_counter.fetch_add(1, Ordering::SeqCst)
);
let output_file_name = format!("listing-{}.json", listing_id);
let (bucket, key_prefix) = crate::transfer_storage::split_s3_path(output_directory_path);
let key = build_s3_key(key_prefix.as_str(), &output_file_name);
if self.empty_listing_response {
return Ok(DirectoryListingStarted {
listing_id: ListingId::from(String::new()),
output_file_name: OutputFileName::from(String::new()),
});
}
let json_bytes = match self.listing_body_override.clone() {
Some(bytes) => bytes,
None => serde_json::to_vec(&self.default_listing)
.map_err(|e| Error::parse("listing serialize", e))?,
};
storage
.put_object(bucket.as_str(), &key, json_bytes)
.await?;
Ok(DirectoryListingStarted {
listing_id: ListingId::from(listing_id),
output_file_name: OutputFileName::from(output_file_name),
})
}
async fn start_remote_delete(
&self,
_connector_id: &ConnectorId,
_delete_path: &str,
) -> Result<()> {
Ok(())
}
async fn start_remote_move(
&self,
_connector_id: &ConnectorId,
_source_path: &str,
_target_path: &str,
) -> Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::listing::{DirectoryListing, ListedFile, ListedPath};
use crate::transfer_storage::MemoryTransferStorage;
use std::sync::Arc;
fn test_config() -> crate::config::Config {
crate::config::test_config()
}
#[async_test_macros::async_test]
async fn start_directory_listing_writes_to_storage() {
let storage = Arc::new(MemoryTransferStorage::new());
let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
let config = test_config();
let output_path = config.listings_prefix();
let remote = RemotePath::from("/remote");
let started = transfer
.start_directory_listing(&config.connector_id, &remote, &output_path, Some(100))
.await
.unwrap();
assert!(!started.listing_id.as_str().is_empty());
assert!(!started.output_file_name.as_str().is_empty());
assert!(started.output_file_name.as_str().ends_with(".json"));
let (bucket, key_prefix) = crate::transfer_storage::split_s3_path(&output_path);
let key = build_s3_key(key_prefix.as_str(), started.output_file_name.as_str());
let bytes = storage.get_object(bucket.as_str(), &key).await.unwrap();
let listing: DirectoryListing = serde_json::from_slice(&bytes).unwrap();
assert_eq!(listing.files.len(), 0);
assert_eq!(listing.paths.len(), 0);
assert!(!listing.truncated);
}
#[async_test_macros::async_test]
async fn with_default_listing_stores_custom_listing() {
let storage = Arc::new(MemoryTransferStorage::new());
let custom_listing = DirectoryListing {
files: vec![ListedFile {
file_path: "/remote/custom.txt".to_string(),
modified_timestamp: None,
size: Some(99),
}],
paths: vec![ListedPath {
path: "/remote/sub".to_string(),
}],
truncated: true,
};
let transfer = MemoryTransferCommands::new(Arc::clone(&storage))
.with_default_listing(custom_listing.clone());
let config = test_config();
let output_path = config.listings_prefix();
let remote = RemotePath::from("/remote");
let started = transfer
.start_directory_listing(&config.connector_id, &remote, &output_path, Some(50))
.await
.unwrap();
let (bucket, key_prefix) = crate::transfer_storage::split_s3_path(&output_path);
let key = build_s3_key(key_prefix.as_str(), started.output_file_name.as_str());
let bytes = storage.get_object(bucket.as_str(), &key).await.unwrap();
let listing: DirectoryListing = serde_json::from_slice(&bytes).unwrap();
assert_eq!(listing.files.len(), 1);
assert_eq!(
listing.files.first().unwrap().file_path,
"/remote/custom.txt"
);
assert_eq!(listing.files.first().unwrap().size, Some(99));
assert_eq!(listing.paths.len(), 1);
assert_eq!(listing.paths.first().unwrap().path, "/remote/sub");
assert!(listing.truncated);
}
#[async_test_macros::async_test]
async fn start_file_transfer_retrieve_returns_id_and_writes_storage() {
let storage = Arc::new(MemoryTransferStorage::new());
let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
let config = test_config();
let local_directory_path = config.retrieve_prefix();
let remote = RemotePath::from("/remote/data.bin");
let transfer_id = transfer
.start_file_transfer_retrieve(&config.connector_id, &remote, &local_directory_path)
.await
.unwrap();
assert!(!transfer_id.as_str().is_empty());
assert!(transfer_id.as_str().starts_with("mem-"));
let (bucket, key_prefix) = crate::transfer_storage::split_s3_path(&local_directory_path);
let key = build_s3_key(key_prefix.as_str(), "data.bin");
let bytes = storage.get_object(bucket.as_str(), &key).await.unwrap();
assert_eq!(bytes, b"mock retrieved content");
}
#[async_test_macros::async_test]
async fn start_file_transfer_send_returns_non_empty_id() {
let storage = Arc::new(MemoryTransferStorage::new());
let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
let config = test_config();
let remote = RemotePath::from("/remote");
let transfer_id = transfer
.start_file_transfer_send(&config.connector_id, "bucket/send/xyz", &remote)
.await
.unwrap();
assert!(!transfer_id.as_str().is_empty());
assert!(transfer_id.as_str().starts_with("mem-send-"));
}
#[async_test_macros::async_test]
async fn list_file_transfer_results_returns_completed() {
let storage = Arc::new(MemoryTransferStorage::new());
let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
let connector_id = ConnectorId::from("conn".to_string());
let transfer_id = TransferId::from("mem-0".to_string());
let results = transfer
.list_file_transfer_results(&connector_id, &transfer_id)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results.first().unwrap().status_code, "COMPLETED");
assert!(results.first().unwrap().failure_message.is_none());
}
#[async_test_macros::async_test]
async fn multiple_listing_calls_different_ids_and_keys() {
let storage = Arc::new(MemoryTransferStorage::new());
let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
let config = test_config();
let output_path = config.listings_prefix();
let remote = RemotePath::from("/remote");
let first = transfer
.start_directory_listing(&config.connector_id, &remote, &output_path, None)
.await
.unwrap();
let second = transfer
.start_directory_listing(&config.connector_id, &remote, &output_path, None)
.await
.unwrap();
assert_ne!(first.listing_id.as_str(), second.listing_id.as_str());
assert_ne!(
first.output_file_name.as_str(),
second.output_file_name.as_str()
);
let keys = storage.test_keys();
assert_eq!(keys.len(), 2);
assert_ne!(keys.first().unwrap(), keys.get(1).unwrap());
}
}