use crate::common::*;
use crate::drivers::{bigquery::BigQueryLocator, gs::find_gs_temp_dir};
use crate::tokio_glue::ConsumeWithParallelism;
pub(crate) async fn write_local_data_helper(
ctx: Context,
dest: BigQueryLocator,
data: BoxStream<CsvStream>,
shared_args: SharedArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<BoxStream<BoxFuture<BoxLocator>>> {
let shared_args_v = shared_args.clone().verify(BigQueryLocator::features())?;
let gs_temp = find_gs_temp_dir(shared_args_v.temporary_storage())?;
let gs_dest_args = DestinationArguments::for_temporary();
let gs_source_args = SourceArguments::for_temporary();
let to_temp_ctx = ctx.child(o!("to_temp" => gs_temp.to_string()));
let result_stream = gs_temp
.write_local_data(to_temp_ctx, data, shared_args.clone(), gs_dest_args)
.await?;
result_stream.consume_with_parallelism(4).await?;
let from_temp_ctx = ctx.child(o!("from_temp" => gs_temp.to_string()));
dest.write_remote_data(
from_temp_ctx,
Box::new(gs_temp),
shared_args,
gs_source_args,
dest_args,
)
.await?;
let fut = async { Ok(dest.boxed()) }.boxed();
Ok(box_stream_once(Ok(fut)))
}