use crate::config::Config;
use crate::error::{Error, Result};
use crate::transfer_commands::TransferCommands;
use crate::transfer_storage::{TransferStorage, build_s3_key, split_s3_path};
use crate::types::RemotePath;
use serde::Deserialize;
use std::time::Duration;
use tokio::time::sleep;
#[non_exhaustive]
#[derive(Clone, Debug, Deserialize, serde::Serialize)]
pub struct ListedFile {
#[serde(rename = "filePath")]
pub file_path: String,
#[serde(rename = "modifiedTimestamp")]
pub modified_timestamp: Option<String>,
pub size: Option<i64>,
}
#[non_exhaustive]
#[derive(Clone, Debug, Deserialize, serde::Serialize)]
pub struct ListedPath {
pub path: String,
}
#[non_exhaustive]
#[derive(Clone, Debug, Deserialize, serde::Serialize)]
pub struct DirectoryListing {
pub files: Vec<ListedFile>,
pub paths: Vec<ListedPath>,
pub truncated: bool,
}
#[must_use]
pub fn sample_listing_for_demo() -> DirectoryListing {
DirectoryListing {
paths: vec![
ListedPath {
path: "/documents/".to_string(),
},
ListedPath {
path: "/uploads/".to_string(),
},
ListedPath {
path: "/backup/".to_string(),
},
],
files: vec![
ListedFile {
file_path: "/readme.txt".to_string(),
modified_timestamp: None,
size: Some(1024),
},
ListedFile {
file_path: "/data.csv".to_string(),
modified_timestamp: None,
size: Some(2048),
},
],
truncated: false,
}
}
pub async fn list_directory<TC, TS>(
transfer: &TC,
storage: &TS,
config: &Config,
remote_path: &RemotePath,
max_items: Option<i32>,
) -> Result<DirectoryListing>
where
TC: TransferCommands,
TS: TransferStorage,
{
let output_directory_path = config.listings_prefix();
let (bucket, key_prefix) = split_s3_path(&output_directory_path);
let started = transfer
.start_directory_listing(
&config.connector_id,
remote_path,
&output_directory_path,
max_items,
)
.await
.map_err(|e| {
e.with("remote_directory_path", remote_path.as_str())
.with("connector_id", config.connector_id.as_str())
})?;
let listing_id = &started.listing_id;
let output_file_name = &started.output_file_name;
if listing_id.as_str().is_empty() {
return Err(
Error::invalid_input("Missing ListingId in StartDirectoryListing response")
.with("remote_directory_path", remote_path.as_str()),
);
}
if output_file_name.as_str().is_empty() {
return Err(Error::invalid_input(
"Missing OutputFileName in StartDirectoryListing response",
)
.with("remote_directory_path", remote_path.as_str()));
}
let key = build_s3_key(key_prefix.as_str(), output_file_name.as_str());
const MAX_POLLS: u32 = 60;
const POLL_INTERVAL: Duration = Duration::from_millis(500);
for _ in 0..MAX_POLLS {
match storage.get_object(bucket.as_str(), &key).await {
Ok(bytes) => {
let listing: DirectoryListing = serde_json::from_slice(&bytes).map_err(|e| {
Error::parse("listing parse error", e)
.with("remote_directory_path", remote_path.as_str())
.with("key", &key)
})?;
return Ok(listing);
}
Err(e) => {
if e.kind == crate::error::ErrorKind::NotFound {
sleep(POLL_INTERVAL).await;
continue;
}
return Err(e.with("key", &key).with("bucket", bucket.as_str()));
}
}
}
Err(
Error::timeout("Directory listing result not found in S3 within timeout")
.with("remote_directory_path", remote_path.as_str())
.with("listing_id", listing_id.as_str())
.with("output_file_name", output_file_name.as_str()),
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn split_s3_path_bucket_and_prefix() {
let (b, p) = split_s3_path("my-bucket/staging/listings/");
assert_eq!(b.as_str(), "my-bucket");
assert_eq!(p.as_str(), "staging/listings/");
}
#[test]
fn split_s3_path_bucket_only() {
let (b, p) = split_s3_path("my-bucket");
assert_eq!(b.as_str(), "my-bucket");
assert_eq!(p.as_str(), "");
}
#[test]
fn directory_listing_json_parse() {
let json = r#"{"files":[{"filePath":"/home/a.txt","size":100}],"paths":[{"path":"/home/dir"}],"truncated":false}"#;
let listing: DirectoryListing = serde_json::from_str(json).unwrap();
assert_eq!(listing.files.len(), 1);
assert_eq!(
listing.files.first().map(|f| f.file_path.as_str()),
Some("/home/a.txt")
);
assert_eq!(listing.paths.len(), 1);
assert_eq!(
listing.paths.first().map(|p| p.path.as_str()),
Some("/home/dir")
);
assert!(!listing.truncated);
}
#[async_test_macros::async_test]
async fn list_directory_with_memory_backend() {
use crate::listing::{DirectoryListing, ListedFile, ListedPath, list_directory};
use crate::transfer_commands::MemoryTransferCommands;
use std::sync::Arc;
let storage = Arc::new(crate::transfer_storage::MemoryTransferStorage::new());
let listing_content = DirectoryListing {
files: vec![ListedFile {
file_path: "/remote/foo.txt".to_string(),
modified_timestamp: None,
size: Some(42),
}],
paths: vec![ListedPath {
path: "/remote/subdir".to_string(),
}],
truncated: false,
};
let transfer = MemoryTransferCommands::new(Arc::clone(&storage))
.with_default_listing(listing_content.clone());
let config = crate::config::test_config();
let result = list_directory(
&transfer,
&*storage,
&config,
&RemotePath::from("/remote"),
Some(100),
)
.await;
assert!(result.is_ok());
let listing = result.unwrap();
assert_eq!(listing.files.len(), 1);
assert_eq!(
listing.files.first().map(|f| f.file_path.as_str()),
Some("/remote/foo.txt")
);
assert_eq!(listing.files.first().and_then(|f| f.size), Some(42));
assert_eq!(listing.paths.len(), 1);
assert_eq!(
listing.paths.first().map(|p| p.path.as_str()),
Some("/remote/subdir")
);
assert!(!listing.truncated);
}
#[async_test_macros::async_test]
async fn list_directory_invalid_json_returns_parse_error() {
use crate::transfer_commands::MemoryTransferCommands;
use std::sync::Arc;
let storage = Arc::new(crate::transfer_storage::MemoryTransferStorage::new());
let transfer = MemoryTransferCommands::new(Arc::clone(&storage))
.with_listing_body(b"not valid json".to_vec());
let config = crate::config::test_config();
let result = list_directory(
&transfer,
&*storage,
&config,
&RemotePath::from("/remote"),
Some(100),
)
.await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind, crate::error::ErrorKind::Parse);
}
#[async_test_macros::async_test]
async fn list_directory_empty_listing_response_returns_invalid_input() {
use crate::transfer_commands::MemoryTransferCommands;
use std::sync::Arc;
let storage = Arc::new(crate::transfer_storage::MemoryTransferStorage::new());
let transfer =
MemoryTransferCommands::new(Arc::clone(&storage)).with_empty_listing_response(true);
let config = crate::config::test_config();
let result = list_directory(
&transfer,
&*storage,
&config,
&RemotePath::from("/remote"),
Some(100),
)
.await;
assert!(result.is_err());
assert_eq!(
result.unwrap_err().kind,
crate::error::ErrorKind::InvalidInput
);
}
}