use std::{fmt, io::Cursor, str::FromStr};
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
use crate::{
clouds::gcloud::bigquery,
common::*,
concat::concatenate_csv_streams,
drivers::bigquery_shared::{BqTable, Usage},
from_csv_cell::FromCsvCell,
from_json_value::FromJsonValue,
};
use super::{
bigquery::BigQueryLocator,
bigquery_shared::{
BqDataType, BqNonArrayDataType, BytesLiteral, ExpNotation, GeographyLiteral,
NumericLiteral, WriteBigQuerySql,
},
};
const MAX_CSV_SIZE_FOR_VIEW: usize = 128 * 1024;
#[derive(Clone, Debug)]
pub(crate) struct BigQueryTestFixtureLocator {
bigquery: BigQueryLocator,
}
impl fmt::Display for BigQueryTestFixtureLocator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "bigquery-test-fixture:{}", self.bigquery.as_table_name())
}
}
impl FromStr for BigQueryTestFixtureLocator {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
let as_bigquery = s.replace(
BigQueryTestFixtureLocator::scheme(),
BigQueryLocator::scheme(),
);
Ok(BigQueryTestFixtureLocator {
bigquery: BigQueryLocator::from_str(&as_bigquery)?,
})
}
}
impl Locator for BigQueryTestFixtureLocator {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self, ctx: Context) -> BoxFuture<Option<Schema>> {
self.bigquery.schema(ctx)
}
fn count(
&self,
ctx: Context,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
) -> BoxFuture<usize> {
self.bigquery.count(ctx, shared_args, source_args)
}
fn local_data(
&self,
ctx: Context,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
) -> BoxFuture<Option<BoxStream<CsvStream>>> {
self.bigquery.local_data(ctx, shared_args, source_args)
}
fn display_output_locators(&self) -> DisplayOutputLocators {
self.bigquery.display_output_locators()
}
fn write_local_data(
&self,
ctx: Context,
data: BoxStream<CsvStream>,
shared_args: SharedArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> BoxFuture<BoxStream<BoxFuture<BoxLocator>>> {
write_local_data_helper(ctx, self.to_owned(), data, shared_args, dest_args)
.boxed()
}
fn supports_write_remote_data(&self, source: &dyn Locator) -> bool {
self.bigquery.supports_write_remote_data(source)
}
fn write_remote_data(
&self,
ctx: Context,
source: BoxLocator,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> BoxFuture<Vec<BoxLocator>> {
self.bigquery.write_remote_data(
ctx,
source,
shared_args,
source_args,
dest_args,
)
}
}
struct BqColumnTypeInfo {
is_not_null: bool,
bq_data_type: BqDataType,
}
#[instrument(
level = "debug",
name = "bigquery_test_fixture::write_local_data",
skip_all,
fields(dest = %dest)
)]
async fn write_local_data_helper(
ctx: Context,
dest: BigQueryTestFixtureLocator,
data: BoxStream<CsvStream>,
shared_args: SharedArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<BoxStream<BoxFuture<BoxLocator>>> {
let csv_stream = concatenate_csv_streams(ctx.clone(), data)?;
let csv_data = csv_stream.into_bytes().await?;
if csv_data.len() > MAX_CSV_SIZE_FOR_VIEW {
debug!(
"bigquery-test-fixture data is too big ({} bytes), loading the slow way",
csv_data.len()
);
let csv_stream = CsvStream::from_bytes(csv_data).await;
let data = box_stream_once(Ok(csv_stream));
return dest
.bigquery
.write_local_data(ctx, data, shared_args, dest_args)
.await;
}
let shared_args = shared_args.verify(BigQueryTestFixtureLocator::features())?;
let dest_args = dest_args.verify(BigQueryTestFixtureLocator::features())?;
let if_exists = dest_args.if_exists();
let schema = shared_args.schema();
let bq_table = BqTable::for_table_name_and_columns(
schema,
dest.bigquery.as_table_name().to_owned(),
&schema.table.columns,
Usage::FinalTable,
)?;
let bq_col_type_infos = bq_table
.columns
.iter()
.map(|c| {
Ok(BqColumnTypeInfo {
is_not_null: c.is_not_null(),
bq_data_type: c.bq_data_type()?,
})
})
.collect::<Result<Vec<BqColumnTypeInfo>>>()?;
let mut sql = vec![];
writeln!(&mut sql, "SELECT * FROM UNNEST(ARRAY<STRUCT<")?;
for (idx, (col, bq_col_type_info)) in
bq_table.columns.iter().zip(&bq_col_type_infos).enumerate()
{
separator_comma(&mut sql, idx)?;
writeln!(
&mut sql,
"{} {}",
col.name.quoted(),
bq_col_type_info.bq_data_type
)?;
}
writeln!(&mut sql, ">>[")?;
let mut rdr = csv::Reader::from_reader(Cursor::new(&*csv_data));
for (row_idx, row) in rdr.records().enumerate() {
let row = row?;
separator_comma(&mut sql, row_idx)?;
write!(&mut sql, "STRUCT(")?;
for (col_idx, (bq_col_type_info, cell)) in
bq_col_type_infos.iter().zip(row.into_iter()).enumerate()
{
separator_comma(&mut sql, col_idx)?;
write_csv_cell_as_bigquery_literal(&mut sql, bq_col_type_info, cell)?;
}
writeln!(&mut sql, ")")?;
}
writeln!(&mut sql, "])")?;
let sql =
String::from_utf8(sql).context("CREATE VIEW SQL contained non-UTF-8 data")?;
debug!("import sql: {}", sql);
if if_exists == &IfExists::Overwrite {
bigquery::delete_table(dest.bigquery.as_table_name(), true).await?;
}
bigquery::create_view(dest.bigquery.as_table_name(), &sql).await?;
let fut = async { Ok(dest.boxed()) }.boxed();
Ok(box_stream_once(Ok(fut)))
}
fn separator_comma<W: Write>(wtr: &mut W, idx: usize) -> Result<(), io::Error> {
if idx != 0 {
write!(wtr, ",")?;
}
Ok(())
}
fn write_csv_cell_as_bigquery_literal<W: Write>(
sql: &mut W,
bq_col_type_info: &BqColumnTypeInfo,
cell: &str,
) -> Result<()> {
match &bq_col_type_info.bq_data_type {
BqDataType::Array(elem_ty) => {
let json = serde_json::Value::from_csv_cell(cell)?;
if let serde_json::Value::Array(arr) = json {
write!(sql, "ARRAY<{}>[", elem_ty)?;
for (idx, json) in arr.into_iter().enumerate() {
separator_comma(sql, idx)?;
write_json_value_as_bigquery_literal(elem_ty, &json, sql)?;
}
write!(sql, "]")?;
} else {
return Err(format_err!("expected JSON array, found {:?}", cell));
}
}
BqDataType::NonArray(ty) => {
if !bq_col_type_info.is_not_null && cell.is_empty() {
write!(sql, "NULL")?;
return Ok(());
}
match ty {
BqNonArrayDataType::Bool => {
let value: bool = FromCsvCell::from_csv_cell(cell)?;
value.write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Bytes => {
BytesLiteral(cell).write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Date => {
let value: NaiveDate = FromCsvCell::from_csv_cell(cell)?;
value.write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Datetime => {
let value: NaiveDateTime = FromCsvCell::from_csv_cell(cell)?;
value.write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Float64 => {
let value: f64 = FromCsvCell::from_csv_cell(cell)?;
ExpNotation(value).write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Geography => {
GeographyLiteral(cell).write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Int64 => {
let value: i64 = FromCsvCell::from_csv_cell(cell)?;
value.write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Numeric => {
NumericLiteral(cell).write_bigquery_sql(sql)?;
}
BqNonArrayDataType::String | BqNonArrayDataType::Stringified(_) => {
cell.write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Timestamp => {
let value: DateTime<Utc> = FromCsvCell::from_csv_cell(cell)?;
value.write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Time | BqNonArrayDataType::Struct(_) => {
return Err(format_err!(
"cannot ingest data of type {} using {}, use {} instead",
ty,
BigQueryTestFixtureLocator::scheme(),
BigQueryLocator::scheme(),
));
}
}
}
}
Ok(())
}
fn write_json_value_as_bigquery_literal<W: Write>(
elem_ty: &BqNonArrayDataType,
json: &serde_json::Value,
sql: &mut W,
) -> Result<()> {
match elem_ty {
BqNonArrayDataType::Bool => {
let value: bool = FromJsonValue::from_json_value(json)?;
value.write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Bytes => {
let value: String = FromJsonValue::from_json_value(json)?;
BytesLiteral(&value).write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Date => {
let value: NaiveDate = FromJsonValue::from_json_value(json)?;
value.write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Datetime => {
let value: NaiveDateTime = FromJsonValue::from_json_value(json)?;
value.write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Float64 => {
let value: f64 = FromJsonValue::from_json_value(json)?;
ExpNotation(value).write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Geography => {
let value: String = FromJsonValue::from_json_value(json)?;
GeographyLiteral(&value).write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Int64 => {
let value: i64 = FromJsonValue::from_json_value(json)?;
value.write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Numeric => {
let value: String = FromJsonValue::from_json_value(json)?;
NumericLiteral(&value).write_bigquery_sql(sql)?;
}
BqNonArrayDataType::String | BqNonArrayDataType::Stringified(_) => {
let value: String = FromJsonValue::from_json_value(json)?;
(&value[..]).write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Timestamp => {
let value: DateTime<Utc> = FromJsonValue::from_json_value(json)?;
value.write_bigquery_sql(sql)?;
}
BqNonArrayDataType::Time | BqNonArrayDataType::Struct(_) => {
return Err(format_err!(
"cannot ingest data of type {} using {}, use {} instead",
elem_ty,
BigQueryTestFixtureLocator::scheme(),
BigQueryLocator::scheme(),
));
}
};
Ok(())
}
impl LocatorStatic for BigQueryTestFixtureLocator {
fn scheme() -> &'static str {
"bigquery-test-fixture:"
}
fn features() -> Features {
let mut result = BigQueryLocator::features();
result.dest_if_exists = IfExistsFeatures::Overwrite | IfExistsFeatures::Error;
result
}
}