use super::RedshiftLocator;
use crate::common::*;
use crate::drivers::s3::find_s3_temp_dir;
use crate::tokio_glue::ConsumeWithParallelism;
pub(crate) async fn write_local_data_helper(
ctx: Context,
dest: RedshiftLocator,
data: BoxStream<CsvStream>,
shared_args: SharedArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<BoxStream<BoxFuture<BoxLocator>>> {
let shared_args_v = shared_args.clone().verify(RedshiftLocator::features())?;
let s3_temp = find_s3_temp_dir(shared_args_v.temporary_storage())?;
let s3_dest_args = DestinationArguments::for_temporary();
let s3_source_args = SourceArguments::for_temporary();
let to_temp_ctx = ctx.child(o!("to_temp" => s3_temp.to_string()));
let result_stream = s3_temp
.write_local_data(to_temp_ctx, data, shared_args.clone(), s3_dest_args)
.await?;
result_stream.consume_with_parallelism(4).await?;
let from_temp_ctx = ctx.child(o!("from_temp" => s3_temp.to_string()));
dest.write_remote_data(
from_temp_ctx,
Box::new(s3_temp),
shared_args,
s3_source_args,
dest_args,
)
.await?;
let fut = async { Ok(dest.boxed()) }.boxed();
Ok(box_stream_once(Ok(fut)))
}