use super::S3Locator;
use crate::clouds::aws::s3;
use crate::common::*;
use crate::csv_stream::csv_stream_name;
#[instrument(
level = "trace",
name = "s3::local_data",
skip(ctx, shared_args, source_args)
)]
pub(crate) async fn local_data_helper(
ctx: Context,
url: Url,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
) -> Result<Option<BoxStream<CsvStream>>> {
let _shared_args = shared_args.verify(S3Locator::features())?;
let _source_args = source_args.verify(S3Locator::features())?;
debug!("getting CSV files from {}", url);
let file_urls = s3::ls(&ctx, &url).await?;
let csv_streams = file_urls.and_then(move |file_url| {
let ctx = ctx.clone();
let url = url.clone();
async move {
let name = csv_stream_name(url.as_str(), file_url.as_str())?.to_owned();
let data = s3::download_file(&ctx, &file_url)
.instrument(
debug_span!("read_stream", stream.name = %name, url = %file_url),
)
.await?;
Ok(CsvStream { name, data })
}
.boxed()
});
Ok(Some(csv_streams.boxed()))
}