use bigml::{
self,
resource::{dataset, source, source::Optype, Resource, Source},
};
use chrono::{Duration, Utc};
use serde::Deserialize;
use super::{source::SourceExt, BigMlCredentials, BigMlLocator, CreateOptions};
use crate::common::*;
use crate::concat::concatenate_csv_streams;
use crate::drivers::s3::{find_s3_temp_dir, sign_s3_url, AwsCredentials};
#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct BigMlDestinationArguments {
name: Option<String>,
optype_for_text: Option<Optype>,
}
pub(crate) async fn write_local_data_helper(
ctx: Context,
dest: BigMlLocator,
mut data: BoxStream<CsvStream>,
shared_args: SharedArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<BoxStream<BoxFuture<BoxLocator>>> {
let shared_args_v = shared_args.clone().verify(BigMlLocator::features())?;
let dest_args = dest_args.verify(BigMlLocator::features())?;
let schema = shared_args_v.schema().to_owned();
let bigml_dest_args = dest_args
.driver_args()
.deserialize::<BigMlDestinationArguments>()
.context("could not parse --to-arg")?;
let creds = BigMlCredentials::try_default()?;
let CreateOptions {
concat_csv_streams,
convert_to_dataset,
} = dest
.to_create_options()
.ok_or_else(|| format_err!("cannot to write to {}", dest))?;
if concat_csv_streams {
data = box_stream_once(Ok(concatenate_csv_streams(ctx.clone(), data)?));
}
let s3_temp = find_s3_temp_dir(shared_args_v.temporary_storage()).ok();
let sources: BoxStream<BoxFuture<(Context, Source)>> =
if let Some(s3_temp) = s3_temp {
let s3_dest_args = DestinationArguments::for_temporary();
let s3_locator_stream: BoxStream<BoxFuture<BoxLocator>> = s3_temp
.write_local_data(ctx.clone(), data, shared_args, s3_dest_args)
.await?;
let ctx = ctx.clone();
let creds = creds.clone();
let bigml_dest_args = bigml_dest_args.clone();
let bigml_source_stream = s3_locator_stream.map(move |locator_fut| {
let ctx = ctx.clone();
let creds = creds.clone();
let bigml_dest_args = bigml_dest_args.clone();
let fut = async move {
let locator = locator_fut.await?.to_string();
if !locator.starts_with("s3://") {
return Err(format_err!(
"expected S3 driver to output s3:// URL, found {}",
locator,
));
}
let ctx = ctx.child(o!("s3_object" => locator.clone()));
debug!(ctx.log(), "creating BigML source from S3 object");
let aws_creds = AwsCredentials::try_default()?;
let url = locator
.parse::<Url>()
.context("could not parse S3 temporary URL")?;
let expires = Utc::now() + Duration::hours(1);
let (signed_url, x_amz_security_token) =
sign_s3_url(&aws_creds, "GET", expires, &url)?;
if x_amz_security_token.is_some() {
return Err(format_err!(
"BigML does not support AWS_SESSION_TOKEN"
));
}
let mut args = source::Args::remote(signed_url.into_string());
args.disable_datetime = Some(true);
if let Some(name) = &bigml_dest_args.name {
args.name = Some(name.to_owned());
}
let client = creds.client()?;
let source = client.create(&args).await?;
let ctx = ctx.child(o!("bigml_source" => source.id().to_string()));
debug!(ctx.log(), "created source from S3 object");
Ok((ctx, source))
};
fut.boxed()
});
Box::new(bigml_source_stream)
} else {
eprintln!("WARNING: You must pass --temporary=s3://... for BigML");
let ctx = ctx.clone();
let creds = creds.clone();
#[allow(deprecated)]
Box::new(data.map(move |stream| {
let ctx = ctx.clone();
let creds = creds.clone();
let fut = async move {
let ctx = ctx.child(o!("stream" => stream.name.clone()));
debug!(ctx.log(), "uploading CSV stream to BigML");
let (name, data) = stream.into_name_and_portable_stream();
let client = creds.client()?;
let source = client.create_source_from_stream(&name, data).await?;
let ctx = ctx.child(o!("bigml_source" => source.id().to_string()));
debug!(ctx.log(), "uploaded CSV stream to BigML");
Ok((ctx, source))
};
fut.boxed()
}))
};
let written = sources.map(move |ctx_source_fut| {
let creds = creds.clone();
let schema = schema.clone(); let bigml_dest_args = bigml_dest_args.clone();
let fut = async move {
let (ctx, mut source) = ctx_source_fut.await?;
trace!(ctx.log(), "waiting for source to be ready");
let client = creds.client()?;
source = client.wait(source.id()).await?;
let optype_for_text =
bigml_dest_args.optype_for_text.unwrap_or(Optype::Text);
let update = source.calculate_column_type_fix(&schema, optype_for_text)?;
trace!(ctx.log(), "updating source with {:?}", update);
client.update(&source.id(), &update).await?;
trace!(ctx.log(), "waiting for source to be ready (again)");
source = client.wait(source.id()).await?;
if !convert_to_dataset {
Ok(BigMlLocator::output_source(source.id().to_owned()).boxed())
} else {
trace!(ctx.log(), "converting to dataset");
let mut args = dataset::Args::from_source(source.id().to_owned());
if let Some(name) = &bigml_dest_args.name {
args.name = Some(name.to_owned());
}
let dataset = client.create_and_wait(&args).await?;
debug!(ctx.log(), "converted to {}", dataset.id().to_owned());
Ok(BigMlLocator::read_dataset(dataset.id().to_owned()).boxed())
}
};
fut.boxed()
});
Ok(Box::new(written) as BoxStream<_>)
}