use bigml::{
self,
resource::{dataset, source, source::Optype, Resource, Source},
};
use chrono::{Duration, Utc};
use serde::Deserialize;
use tracing::{field, Span};
use super::{source::SourceExt, BigMlLocator, CreateOptions};
use crate::clouds::aws::{sign_s3_url, AwsCredentials};
use crate::common::*;
use crate::concat::concatenate_csv_streams;
use crate::drivers::s3::find_s3_temp_dir;
#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct BigMlDestinationArguments {
name: Option<String>,
optype_for_text: Option<Optype>,
#[serde(default)]
tags: Vec<String>,
}
#[instrument(
level = "debug",
name = "bigml::write_local_data",
skip_all,
fields(dest = %dest)
)]
pub(crate) async fn write_local_data_helper(
ctx: Context,
dest: BigMlLocator,
mut data: BoxStream<CsvStream>,
shared_args: SharedArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<BoxStream<BoxFuture<BoxLocator>>> {
let shared_args_v = shared_args.clone().verify(BigMlLocator::features())?;
let dest_args = dest_args.verify(BigMlLocator::features())?;
let schema = shared_args_v.schema().to_owned();
let bigml_dest_args = dest_args
.driver_args()
.deserialize::<BigMlDestinationArguments>()
.context("could not parse --to-arg")?;
let CreateOptions {
concat_csv_streams,
convert_to_dataset,
} = dest
.to_create_options()
.ok_or_else(|| format_err!("cannot to write to {}", dest))?;
if concat_csv_streams {
data = box_stream_once(Ok(concatenate_csv_streams(ctx.clone(), data)?));
}
let s3_temp = find_s3_temp_dir(shared_args_v.temporary_storage()).ok();
let sources: BoxStream<BoxFuture<Source>> = if let Some(s3_temp) = s3_temp {
let s3_dest_args = DestinationArguments::for_temporary();
let s3_locator_stream: BoxStream<BoxFuture<BoxLocator>> = s3_temp
.write_local_data(ctx.clone(), data, shared_args, s3_dest_args)
.await?;
let bigml_dest_args = bigml_dest_args.clone();
let bigml_source_stream = s3_locator_stream.map_ok(move |locator_fut| {
let bigml_dest_args = bigml_dest_args.clone();
let fut = async move {
let locator = locator_fut.await?.to_string();
if !locator.starts_with("s3://") {
return Err(format_err!(
"expected S3 driver to output s3:// URL, found {}",
locator,
));
}
Span::current().record("s3_object", &field::display(&locator));
debug!("creating BigML source from S3 object");
let aws_creds = AwsCredentials::try_default().await?;
let url = locator
.parse::<Url>()
.context("could not parse S3 temporary URL")?;
let expires = Utc::now() + Duration::hours(1);
let (signed_url, x_amz_security_token) =
sign_s3_url(&aws_creds, "GET", expires, &url)?;
if x_amz_security_token.is_some() {
return Err(format_err!(
"BigML does not support AWS_SESSION_TOKEN"
));
}
let mut args = source::Args::remote(String::from(signed_url));
args.disable_datetime = Some(true);
if let Some(name) = &bigml_dest_args.name {
args.name = Some(name.to_owned());
}
args.tags = bigml_dest_args.tags.clone();
let client = bigml::Client::new_from_env()?;
let source = client.create(&args).await?;
Span::current().record("bigml_source", &field::display(source.id()));
debug!("created source from S3 object");
Ok(source)
}
.instrument(debug_span!(
"create_source",
s3_object = field::Empty,
bigml_source = field::Empty
));
fut.boxed()
});
bigml_source_stream.boxed()
} else {
return Err(format_err!(
"WARNING: You must pass --temporary=s3://... for BigML"
));
};
let written = sources.map_ok(move |ctx_source_fut| {
let schema = schema.clone(); let bigml_dest_args = bigml_dest_args.clone();
let fut = async move {
let mut source = ctx_source_fut.await?;
Span::current().record("bigml_source", &field::display(source.id()));
trace!("waiting for source to be ready");
let client = bigml::Client::new_from_env()?;
source = client.wait(source.id()).await?;
let optype_for_text =
bigml_dest_args.optype_for_text.unwrap_or(Optype::Text);
let update = source.calculate_column_type_fix(&schema, optype_for_text)?;
trace!("updating source with {:?}", update);
client.update(source.id(), &update).await?;
trace!("waiting for source to be ready (again)");
source = client.wait(source.id()).await?;
if !convert_to_dataset {
Ok(BigMlLocator::output_source(source.id().to_owned()).boxed())
} else {
trace!("converting to dataset");
let mut args = dataset::Args::from_source(source.id().to_owned());
if let Some(name) = &bigml_dest_args.name {
args.name = Some(name.to_owned());
}
args.tags = bigml_dest_args.tags.clone();
let dataset = client.create_and_wait(&args).await?;
debug!("converted to {}", dataset.id().to_owned());
Ok(BigMlLocator::read_dataset(dataset.id().to_owned()).boxed())
}
}
.instrument(debug_span!(
"finish_bigml_import",
bigml_source = field::Empty
));
fut.boxed()
});
Ok(written.boxed())
}