transfer_family_cli 0.4.0

TUI to browse and transfer files via AWS Transfer Family connector
Documentation
//! Directory listing via AWS Transfer Family connector (`StartDirectoryListing` + S3).

use crate::config::Config;
use crate::error::{Error, Result};
use crate::transfer_commands::TransferCommands;
use crate::transfer_storage::{TransferStorage, build_s3_key, split_s3_path};
use crate::types::RemotePath;
use serde::Deserialize;
use std::time::Duration;
use tokio::time::sleep;

/// One file entry in a directory listing result.
#[non_exhaustive]
#[derive(Clone, Debug, Deserialize, serde::Serialize)]
pub struct ListedFile {
    /// Full path of the file on the remote server.
    #[serde(rename = "filePath")]
    pub file_path: String,
    /// Last modified time (UTC), if available.
    #[serde(rename = "modifiedTimestamp")]
    pub modified_timestamp: Option<String>,
    /// Size in bytes, if available.
    pub size: Option<i64>,
}

/// One directory entry in a directory listing result.
#[non_exhaustive]
#[derive(Clone, Debug, Deserialize, serde::Serialize)]
pub struct ListedPath {
    /// Full path of the directory on the remote server.
    pub path: String,
}

/// Parsed directory listing from S3 JSON (connector listing result).
#[non_exhaustive]
#[derive(Clone, Debug, Deserialize, serde::Serialize)]
pub struct DirectoryListing {
    pub files: Vec<ListedFile>,
    pub paths: Vec<ListedPath>,
    pub truncated: bool,
}

/// Returns a sample directory listing for the in-memory demo binary.
#[must_use]
pub fn sample_listing_for_demo() -> DirectoryListing {
    DirectoryListing {
        paths: vec![
            ListedPath {
                path: "/documents/".to_string(),
            },
            ListedPath {
                path: "/uploads/".to_string(),
            },
            ListedPath {
                path: "/backup/".to_string(),
            },
        ],
        files: vec![
            ListedFile {
                file_path: "/readme.txt".to_string(),
                modified_timestamp: None,
                size: Some(1024),
            },
            ListedFile {
                file_path: "/data.csv".to_string(),
                modified_timestamp: None,
                size: Some(2048),
            },
        ],
        truncated: false,
    }
}

/// Starts a directory listing and waits for the result file in S3, then parses and returns it.
pub async fn list_directory<TC, TS>(
    transfer: &TC,
    storage: &TS,
    config: &Config,
    remote_path: &RemotePath,
    max_items: Option<i32>,
) -> Result<DirectoryListing>
where
    TC: TransferCommands,
    TS: TransferStorage,
{
    let output_directory_path = config.listings_prefix();
    let (bucket, key_prefix) = split_s3_path(&output_directory_path);

    let started = transfer
        .start_directory_listing(
            &config.connector_id,
            remote_path,
            &output_directory_path,
            max_items,
        )
        .await
        .map_err(|e| {
            e.with("remote_directory_path", remote_path.as_str())
                .with("connector_id", config.connector_id.as_str())
        })?;

    let listing_id = &started.listing_id;
    let output_file_name = &started.output_file_name;

    if listing_id.as_str().is_empty() {
        return Err(
            Error::invalid_input("Missing ListingId in StartDirectoryListing response")
                .with("remote_directory_path", remote_path.as_str()),
        );
    }
    if output_file_name.as_str().is_empty() {
        return Err(Error::invalid_input(
            "Missing OutputFileName in StartDirectoryListing response",
        )
        .with("remote_directory_path", remote_path.as_str()));
    }

    let key = build_s3_key(key_prefix.as_str(), output_file_name.as_str());

    const MAX_POLLS: u32 = 60;
    const POLL_INTERVAL: Duration = Duration::from_millis(500);

    for _ in 0..MAX_POLLS {
        match storage.get_object(bucket.as_str(), &key).await {
            Ok(bytes) => {
                let listing: DirectoryListing = serde_json::from_slice(&bytes).map_err(|e| {
                    Error::parse("listing parse error", e)
                        .with("remote_directory_path", remote_path.as_str())
                        .with("key", &key)
                })?;
                return Ok(listing);
            }
            Err(e) => {
                if e.kind == crate::error::ErrorKind::NotFound {
                    sleep(POLL_INTERVAL).await;
                    continue;
                }
                return Err(e.with("key", &key).with("bucket", bucket.as_str()));
            }
        }
    }

    Err(
        Error::timeout("Directory listing result not found in S3 within timeout")
            .with("remote_directory_path", remote_path.as_str())
            .with("listing_id", listing_id.as_str())
            .with("output_file_name", output_file_name.as_str()),
    )
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn split_s3_path_bucket_and_prefix() {
        let (b, p) = split_s3_path("my-bucket/staging/listings/");
        assert_eq!(b.as_str(), "my-bucket");
        assert_eq!(p.as_str(), "staging/listings/");
    }

    #[test]
    fn split_s3_path_bucket_only() {
        let (b, p) = split_s3_path("my-bucket");
        assert_eq!(b.as_str(), "my-bucket");
        assert_eq!(p.as_str(), "");
    }

    #[test]
    fn directory_listing_json_parse() {
        let json = r#"{"files":[{"filePath":"/home/a.txt","size":100}],"paths":[{"path":"/home/dir"}],"truncated":false}"#;
        let listing: DirectoryListing = serde_json::from_str(json).unwrap();
        assert_eq!(listing.files.len(), 1);
        assert_eq!(
            listing.files.first().map(|f| f.file_path.as_str()),
            Some("/home/a.txt")
        );
        assert_eq!(listing.paths.len(), 1);
        assert_eq!(
            listing.paths.first().map(|p| p.path.as_str()),
            Some("/home/dir")
        );
        assert!(!listing.truncated);
    }

    #[async_test_macros::async_test]
    async fn list_directory_with_memory_backend() {
        use crate::listing::{DirectoryListing, ListedFile, ListedPath, list_directory};
        use crate::transfer_commands::MemoryTransferCommands;
        use std::sync::Arc;

        let storage = Arc::new(crate::transfer_storage::MemoryTransferStorage::new());
        let listing_content = DirectoryListing {
            files: vec![ListedFile {
                file_path: "/remote/foo.txt".to_string(),
                modified_timestamp: None,
                size: Some(42),
            }],
            paths: vec![ListedPath {
                path: "/remote/subdir".to_string(),
            }],
            truncated: false,
        };
        let transfer = MemoryTransferCommands::new(Arc::clone(&storage))
            .with_default_listing(listing_content.clone());

        let config = crate::config::test_config();

        let result = list_directory(
            &transfer,
            &*storage,
            &config,
            &RemotePath::from("/remote"),
            Some(100),
        )
        .await;
        assert!(result.is_ok());
        let listing = result.unwrap();
        assert_eq!(listing.files.len(), 1);
        assert_eq!(
            listing.files.first().map(|f| f.file_path.as_str()),
            Some("/remote/foo.txt")
        );
        assert_eq!(listing.files.first().and_then(|f| f.size), Some(42));
        assert_eq!(listing.paths.len(), 1);
        assert_eq!(
            listing.paths.first().map(|p| p.path.as_str()),
            Some("/remote/subdir")
        );
        assert!(!listing.truncated);
    }

    #[async_test_macros::async_test]
    async fn list_directory_invalid_json_returns_parse_error() {
        use crate::transfer_commands::MemoryTransferCommands;
        use std::sync::Arc;

        let storage = Arc::new(crate::transfer_storage::MemoryTransferStorage::new());
        let transfer = MemoryTransferCommands::new(Arc::clone(&storage))
            .with_listing_body(b"not valid json".to_vec());

        let config = crate::config::test_config();

        let result = list_directory(
            &transfer,
            &*storage,
            &config,
            &RemotePath::from("/remote"),
            Some(100),
        )
        .await;
        assert!(result.is_err());
        assert_eq!(result.unwrap_err().kind, crate::error::ErrorKind::Parse);
    }

    #[async_test_macros::async_test]
    async fn list_directory_empty_listing_response_returns_invalid_input() {
        use crate::transfer_commands::MemoryTransferCommands;
        use std::sync::Arc;

        let storage = Arc::new(crate::transfer_storage::MemoryTransferStorage::new());
        let transfer =
            MemoryTransferCommands::new(Arc::clone(&storage)).with_empty_listing_response(true);

        let config = crate::config::test_config();

        let result = list_directory(
            &transfer,
            &*storage,
            &config,
            &RemotePath::from("/remote"),
            Some(100),
        )
        .await;
        assert!(result.is_err());
        assert_eq!(
            result.unwrap_err().kind,
            crate::error::ErrorKind::InvalidInput
        );
    }
}