use super::{prepare_as_destination_helper, S3Locator};
use crate::clouds::aws::s3;
use crate::common::*;
#[instrument(
level = "debug",
name = "s3::write_local_data",
skip_all,
fields(url = %url)
)]
pub(crate) async fn write_local_data_helper(
url: Url,
data: BoxStream<CsvStream>,
shared_args: SharedArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<BoxStream<BoxFuture<BoxLocator>>> {
let _shared_args = shared_args.verify(S3Locator::features())?;
let dest_args = dest_args.verify(S3Locator::features())?;
let if_exists = dest_args.if_exists().to_owned();
prepare_as_destination_helper(url.clone(), if_exists).await?;
let written = data.map_ok(move |stream| {
let url = url.clone();
async move {
let url = url.join(&format!("{}.csv", stream.name))?;
s3::upload_file(stream.data, &url)
.instrument(
debug_span!("write_stream", stream.name = %stream.name, url = %url),
)
.await?;
Ok(S3Locator { url }.boxed())
}
.boxed()
});
Ok(written.boxed())
}