transfer_family_cli 0.1.0

TUI to browse and transfer files via AWS Transfer Family connector
Documentation
//! File transfer (get/put) via AWS Transfer Family connector.

use crate::config::Config;
use crate::error::{Error, Result};
use crate::local_fs::LocalFs;
use crate::transfer_commands::TransferCommands;
use crate::transfer_storage::{TransferStorage, split_s3_path};
use std::path::Path;
use std::time::Duration;
use tokio::time::sleep;

const MAX_POLLS: u32 = 120;
const POLL_INTERVAL: Duration = Duration::from_secs(1);

/// Starts a retrieve (get) transfer, polls until complete, then downloads from S3 to local path.
pub async fn get_file<TC, TS, LF>(
    transfer: &TC,
    storage: &TS,
    local_fs: &LF,
    config: &Config,
    remote_file_path: &str,
    local_path: &Path,
    _remote_directory_path: &str,
) -> Result<()>
where
    TC: TransferCommands,
    TS: TransferStorage,
    LF: LocalFs,
{
    let local_directory_path = config.retrieve_prefix();
    let (bucket, key_prefix) = split_s3_path(&local_directory_path);

    let transfer_id = transfer
        .start_file_transfer_retrieve(
            &config.connector_id,
            remote_file_path,
            &local_directory_path,
        )
        .await
        .map_err(|e| {
            e.with("remote_file_path", remote_file_path)
                .with("connector_id", &config.connector_id)
        })?;

    if transfer_id.is_empty() {
        return Err(
            Error::invalid_input("Missing TransferId").with("remote_file_path", remote_file_path)
        );
    }

    wait_for_transfer_complete(
        transfer,
        config,
        &transfer_id,
        Some(remote_file_path),
        None::<&Path>,
    )
    .await?;

    let base_name = Path::new(remote_file_path)
        .file_name()
        .and_then(|p| p.to_str())
        .unwrap_or("file");
    let s3_key = format!("{key_prefix}{base_name}");

    let bytes = storage.get_object(bucket, &s3_key).await.map_err(|e| {
        e.with("remote_file_path", remote_file_path)
            .with("s3_key", &s3_key)
    })?;

    local_fs
        .write_all(local_path, &bytes)
        .await
        .map_err(|e| e.with("remote_file_path", remote_file_path))?;

    Ok(())
}

/// Uploads a local file to S3 staging, then starts a send (put) transfer and waits for completion.
pub async fn put_file<TC, TS, LF>(
    transfer: &TC,
    storage: &TS,
    local_fs: &LF,
    config: &Config,
    local_path: &Path,
    remote_directory_path: &str,
) -> Result<()>
where
    TC: TransferCommands,
    TS: TransferStorage,
    LF: LocalFs,
{
    let send_prefix = config.send_prefix();
    let (bucket, key_prefix) = split_s3_path(&send_prefix);

    let file_name = local_path
        .file_name()
        .and_then(|p| p.to_str())
        .unwrap_or("upload");
    let unique_key = format!("{}{}", uuid_simple(), file_name);
    let s3_key = if key_prefix.is_empty() {
        unique_key
    } else {
        format!("{key_prefix}/{unique_key}")
    };

    let bytes = local_fs
        .read_all(local_path)
        .await
        .map_err(|e| e.with("local_path", local_path.display().to_string()))?;

    storage
        .put_object(bucket, &s3_key, bytes)
        .await
        .map_err(|e| {
            e.with("local_path", local_path.display().to_string())
                .with("s3_key", &s3_key)
        })?;

    let send_path = format!("{bucket}/{s3_key}");

    let transfer_id = transfer
        .start_file_transfer_send(&config.connector_id, &send_path, remote_directory_path)
        .await
        .map_err(|e| {
            e.with("connector_id", &config.connector_id)
                .with("send_path", &send_path)
                .with("remote_directory_path", remote_directory_path)
        })?;

    if transfer_id.is_empty() {
        return Err(Error::invalid_input("Missing TransferId")
            .with("local_path", local_path.display().to_string())
            .with("remote_directory_path", remote_directory_path));
    }

    wait_for_transfer_complete(
        transfer,
        config,
        &transfer_id,
        None::<&str>,
        Some(local_path),
    )
    .await?;

    Ok(())
}

async fn wait_for_transfer_complete<TC>(
    transfer: &TC,
    config: &Config,
    transfer_id: &str,
    remote_file_path: Option<&str>,
    local_path: Option<&Path>,
) -> Result<()>
where
    TC: TransferCommands,
{
    for _ in 0..MAX_POLLS {
        let results = transfer
            .list_file_transfer_results(&config.connector_id, transfer_id)
            .await
            .map_err(|e| e.with("transfer_id", transfer_id))?;

        if results.is_empty() {
            sleep(POLL_INTERVAL).await;
            continue;
        }
        let all_done = results.iter().all(|r| {
            let s = r.status_code.as_str();
            s == "COMPLETED" || s == "SUCCESS"
        });
        let any_failed = results
            .iter()
            .any(|r| r.status_code == "FAILED" || r.status_code == "ERROR");
        if any_failed {
            let msg = results
                .iter()
                .find(|r| r.status_code == "FAILED")
                .and_then(|r| r.failure_message.clone())
                .unwrap_or_else(|| "Transfer failed".to_string());
            let mut err = Error::api_permanent(msg).with("transfer_id", transfer_id);
            if let Some(p) = remote_file_path {
                err = err.with("remote_file_path", p);
            }
            if let Some(p) = local_path {
                err = err.with("local_path", p.display().to_string());
            }
            return Err(err);
        }
        if all_done {
            return Ok(());
        }

        sleep(POLL_INTERVAL).await;
    }

    let mut err = Error::timeout(format!(
        "Transfer {transfer_id} did not complete within timeout"
    ))
    .with("transfer_id", transfer_id);
    if let Some(p) = remote_file_path {
        err = err.with("remote_file_path", p);
    }
    if let Some(p) = local_path {
        err = err.with("local_path", p.display().to_string());
    }
    Err(err)
}

fn uuid_simple() -> String {
    use std::time::{SystemTime, UNIX_EPOCH};
    let t = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_nanos();
    format!("{t:x}-")
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::config::Config;
    use crate::local_fs::MemoryLocalFs;
    use crate::transfer_commands::MemoryTransferCommands;
    use crate::transfer_storage::MemoryTransferStorage;
    use std::path::Path;
    use std::sync::Arc;

    fn test_config() -> Config {
        Config::new(
            "c-test".to_string(),
            "bucket/transfer-cli/".to_string(),
            None,
            None,
            std::path::PathBuf::from("/mem"),
        )
    }

    #[tokio::test]
    async fn get_file_with_memory_backend() {
        let storage = Arc::new(MemoryTransferStorage::new());
        let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
        let local_fs = MemoryLocalFs::new();

        let config = test_config();
        let remote_path = "/remote/foo.txt";
        let local_path = Path::new("/mem/foo.txt");

        let result = get_file(
            &transfer,
            &*storage,
            &local_fs,
            &config,
            remote_path,
            local_path,
            "/remote",
        )
        .await;
        assert!(result.is_ok());

        let bytes = local_fs.read_all(local_path).await.unwrap();
        assert_eq!(bytes, b"mock retrieved content");
    }

    #[tokio::test]
    async fn put_file_when_local_file_missing_returns_error() {
        let storage = Arc::new(MemoryTransferStorage::new());
        let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
        let local_fs = MemoryLocalFs::new();

        let config = test_config();
        let local_path = Path::new("/mem/nonexistent.txt");

        let result = put_file(
            &transfer, &*storage, &local_fs, &config, local_path, "/remote",
        )
        .await;
        assert!(result.is_err());
        assert_eq!(result.unwrap_err().kind, crate::error::ErrorKind::NotFound);
    }

    #[tokio::test]
    async fn put_file_with_memory_backend() {
        let storage = Arc::new(MemoryTransferStorage::new());
        let transfer = MemoryTransferCommands::new(Arc::clone(&storage));
        let local_fs = MemoryLocalFs::new();

        let config = test_config();
        let local_path = Path::new("/mem/upload.txt");
        let content = b"uploaded file content";
        local_fs.write_all(local_path, content).await.unwrap();

        let result = put_file(
            &transfer, &*storage, &local_fs, &config, local_path, "/remote",
        )
        .await;
        assert!(result.is_ok());

        let send_prefix = config.send_prefix();
        let (bucket, key_prefix) = split_s3_path(&send_prefix);
        let keys: Vec<_> = storage
            .test_keys()
            .into_iter()
            .filter(|(b, k)| b == bucket && k.starts_with(key_prefix))
            .collect();
        assert_eq!(keys.len(), 1);
        let (bucket, key) = keys.first().expect("one key");
        let stored = storage.get_object(bucket, key).await.unwrap();
        assert_eq!(stored, content);
    }

    #[tokio::test]
    async fn full_flow_list_get_put_with_memory_backend() {
        use crate::listing::{DirectoryListing, ListedFile, list_directory};

        let storage = Arc::new(MemoryTransferStorage::new());
        let default_listing = DirectoryListing {
            files: vec![ListedFile {
                file_path: "/remote/data.bin".to_string(),
                modified_timestamp: None,
                size: Some(10),
            }],
            paths: vec![],
            truncated: false,
        };
        let transfer =
            MemoryTransferCommands::new(Arc::clone(&storage)).with_default_listing(default_listing);
        let local_fs = MemoryLocalFs::new();
        let config = test_config();

        let listing = list_directory(&transfer, &*storage, &config, "/remote", Some(100)).await;
        assert!(listing.is_ok());
        let listing = listing.unwrap();
        assert_eq!(listing.files.len(), 1);
        assert_eq!(
            listing.files.first().map(|f| f.file_path.as_str()),
            Some("/remote/data.bin")
        );

        let remote_path = "/remote/data.bin";
        let local_path = Path::new("/mem/data.bin");
        get_file(
            &transfer,
            &*storage,
            &local_fs,
            &config,
            remote_path,
            local_path,
            "/remote",
        )
        .await
        .unwrap();
        let bytes = local_fs.read_all(local_path).await.unwrap();
        assert_eq!(bytes, b"mock retrieved content");

        let put_path = Path::new("/mem/put_back.txt");
        local_fs
            .write_all(put_path, b"put back content")
            .await
            .unwrap();
        put_file(
            &transfer, &*storage, &local_fs, &config, put_path, "/remote",
        )
        .await
        .unwrap();

        let send_prefix = config.send_prefix();
        let (bucket, key_prefix) = split_s3_path(&send_prefix);
        let send_keys: Vec<_> = storage
            .test_keys()
            .into_iter()
            .filter(|(b, k)| b == bucket && k.starts_with(key_prefix))
            .collect();
        assert!(!send_keys.is_empty());
        let one_key = send_keys.first().expect("at least one send key");
        let stored = storage.get_object(&one_key.0, &one_key.1).await.unwrap();
        assert_eq!(stored, b"put back content");
    }
}