use anyhow::{format_err, Context as _, Result};
use dbcrossbarlib::{
config::Configuration, rechunk::rechunk_csvs, tokio_glue::try_forward, Context,
DestinationArguments, DisplayOutputLocators, DriverArguments, IfExists,
SharedArguments, SourceArguments, TemporaryStorage, UnparsedLocator,
};
use futures::{pin_mut, stream, FutureExt, StreamExt, TryStreamExt};
use humanize_rs::bytes::Bytes as HumanizedBytes;
use structopt::{self, StructOpt};
use tokio::io;
use tokio_util::codec::{FramedWrite, LinesCodec};
use tracing::{debug, field, instrument, Span};
#[derive(Debug, StructOpt)]
pub(crate) struct Opt {
#[structopt(long = "if-exists", default_value = "error")]
if_exists: IfExists,
#[structopt(long = "schema")]
schema: Option<UnparsedLocator>,
#[structopt(long = "temporary")]
temporaries: Vec<String>,
#[structopt(long = "stream-size")]
stream_size: Option<HumanizedBytes>,
#[structopt(long = "from-arg")]
from_args: Vec<String>,
#[structopt(long = "to-arg")]
to_args: Vec<String>,
#[structopt(long = "where")]
where_clause: Option<String>,
#[structopt(long = "max-streams", short = "J", default_value = "4")]
max_streams: usize,
#[structopt(long = "display-output-locators")]
display_output_locators: bool,
from_locator: UnparsedLocator,
to_locator: UnparsedLocator,
}
#[instrument(level = "debug", name = "cp", skip_all, fields(from, to))]
pub(crate) async fn run(
ctx: Context,
config: Configuration,
enable_unstable: bool,
opt: Opt,
) -> Result<()> {
let schema_opt = opt.schema.map(|s| s.parse(enable_unstable)).transpose()?;
let from_locator = opt.from_locator.parse(enable_unstable)?;
let to_locator = opt.to_locator.parse(enable_unstable)?;
let span = Span::current();
span.record("from", &field::display(&from_locator));
span.record("to", &field::display(&to_locator));
let schema = {
let schema_locator = schema_opt.as_ref().unwrap_or(&from_locator);
schema_locator
.schema(ctx.clone())
.await
.with_context(|| format!("error reading schema from {}", schema_locator))?
.ok_or_else(|| {
format_err!("don't know how to read schema from {}", schema_locator)
})
}?;
let temporaries = opt.temporaries.clone();
let temporary_storage = TemporaryStorage::with_config(temporaries, &config)?;
let shared_args = SharedArguments::new(schema, temporary_storage, opt.max_streams);
let from_args = DriverArguments::from_cli_args(&opt.from_args)?;
let source_args = SourceArguments::new(from_args, opt.where_clause.clone());
let to_args = DriverArguments::from_cli_args(&opt.to_args)?;
let dest_args = DestinationArguments::new(to_args, opt.if_exists);
let should_use_remote = opt.stream_size.is_none()
&& to_locator.supports_write_remote_data(from_locator.as_ref());
let dests = if should_use_remote {
debug!("performing remote data transfer");
let dests = to_locator
.write_remote_data(ctx, from_locator, shared_args, source_args, dest_args)
.await?;
stream::iter(dests).map(Ok).boxed()
} else {
debug!("performing local data transfer");
let mut data = from_locator
.local_data(ctx.clone(), shared_args.clone(), source_args)
.await?
.ok_or_else(|| {
format_err!("don't know how to read data from {}", from_locator)
})?;
if let Some(stream_size) = opt.stream_size {
let stream_size = stream_size.size();
data = rechunk_csvs(ctx.clone(), stream_size, data)?;
}
let result_stream = to_locator
.write_local_data(ctx.clone(), data, shared_args.clone(), dest_args)
.await?;
result_stream
.try_buffer_unordered(shared_args.max_streams())
.boxed()
};
let display_output_locators = match (
opt.display_output_locators,
to_locator.display_output_locators(),
) {
(true, DisplayOutputLocators::Never) => {
return Err(format_err!(
"cannot use --display-output-locators with {}",
to_locator
))
}
(true, _) | (false, DisplayOutputLocators::ByDefault) => true,
(false, _) => false,
};
if display_output_locators {
let stdout_sink = FramedWrite::new(io::stdout(), LinesCodec::new());
let dest_strings = dests.and_then(|dest| {
async move {
let dest_str = dest.to_string();
if dest_str.contains('\n') || dest_str.contains('\r') {
Err(format_err!(
"cannot output locator with newline: {:?}",
dest_str
))
} else {
Ok(dest_str)
}
}
});
pin_mut!(dest_strings);
try_forward(dest_strings, stdout_sink).await?;
} else {
let dests = dests.try_collect::<Vec<_>>().boxed().await?;
debug!("destination locators: {:?}", dests);
}
Ok(())
}