use std::process::{Command, Stdio};
use tokio_process::CommandExt;
use super::{prepare_as_destination_helper, GsLocator};
use crate::common::*;
use crate::tokio_glue::copy_stream_to_writer;
pub(crate) async fn write_local_data_helper(
ctx: Context,
url: Url,
data: BoxStream<CsvStream>,
shared_args: SharedArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<BoxStream<BoxFuture<BoxLocator>>> {
let _shared_args = shared_args.verify(GsLocator::features())?;
let dest_args = dest_args.verify(GsLocator::features())?;
let if_exists = dest_args.if_exists().to_owned();
prepare_as_destination_helper(ctx.clone(), url.clone(), if_exists).await?;
let written = data.map(move |stream| {
let url = url.clone();
let ctx = ctx.clone();
async move {
let url = url.join(&format!("{}.csv", stream.name))?;
let ctx = ctx
.child(o!("stream" => stream.name.clone(), "url" => url.to_string()));
debug!(ctx.log(), "uploading stream to gsutil");
let mut child = Command::new("gsutil")
.args(&["cp", "-", url.as_str()])
.stdin(Stdio::piped())
.stdout(Stdio::null())
.spawn_async()
.context("error running gsutil")?;
let child_stdin = child.stdin().take().expect("child should have stdin");
copy_stream_to_writer(ctx.clone(), stream.data, child_stdin)
.await
.context("error copying data to gsutil")?;
let status = child
.compat()
.await
.with_context(|_| format!("error finishing upload to {}", url))?;
if status.success() {
Ok(GsLocator { url }.boxed())
} else {
Err(format_err!("gsutil returned error: {}", status))
}
}
.boxed()
});
Ok(Box::new(written) as BoxStream<BoxFuture<BoxLocator>>)
}