use super::{prepare_as_destination_helper, GsLocator};
use crate::clouds::gcloud::storage;
use crate::common::*;
use crate::concat::concatenate_csv_streams;
#[instrument(
level = "debug",
name = "gs::write_local_data",
skip_all,
fields(dest = %dest)
)]
pub(crate) async fn write_local_data_helper(
ctx: Context,
dest: GsLocator,
data: BoxStream<CsvStream>,
shared_args: SharedArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<BoxStream<BoxFuture<BoxLocator>>> {
let _shared_args = shared_args.verify(GsLocator::features())?;
let dest_args = dest_args.verify(GsLocator::features())?;
let if_exists = dest_args.if_exists().to_owned();
prepare_as_destination_helper(ctx.clone(), dest.url.clone(), if_exists).await?;
if dest.is_directory() {
let written = data.map_ok(move |stream| {
let dest = dest.clone();
let ctx = ctx.clone();
async move {
let url = dest.url.join(&format!("{}.csv", stream.name))?;
storage::upload_file(&ctx, stream.data, &url)
.instrument(trace_span!("stream_to_gs", stream.name = %stream.name, url = %url))
.await?;
Ok(GsLocator { url }.boxed())
}
.boxed()
});
Ok(written.boxed())
} else if dest.is_csv_file() {
let stream = concatenate_csv_streams(ctx.clone(), data)?;
let fut = async move {
let url = &dest.url;
storage::upload_file(&ctx, stream.data, url).instrument(trace_span!("stream_to_gs", stream.name = %stream.name, url = %url)).await?;
Ok(GsLocator {
url: url.to_owned(),
}
.boxed())
};
Ok(box_stream_once(Ok(fut.boxed())))
} else {
Err(format_err!("do not know how to write to {}", dest))
}
}