use super::RedshiftLocator;
use crate::common::*;
use crate::drivers::s3::find_s3_temp_dir;
use crate::tokio_glue::ConsumeWithParallelism;
#[instrument(
level = "debug",
name = "redshift::write_local_data",
skip_all,
fields(dest = %dest)
)]
pub(crate) async fn write_local_data_helper(
ctx: Context,
dest: RedshiftLocator,
data: BoxStream<CsvStream>,
shared_args: SharedArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<BoxStream<BoxFuture<BoxLocator>>> {
let shared_args_v = shared_args.clone().verify(RedshiftLocator::features())?;
let s3_temp = find_s3_temp_dir(shared_args_v.temporary_storage())?;
let s3_dest_args = DestinationArguments::for_temporary();
let s3_source_args = SourceArguments::for_temporary();
let result_stream = s3_temp
.write_local_data(ctx.clone(), data, shared_args.clone(), s3_dest_args)
.instrument(debug_span!("stream_to_s3_temp", url = %s3_temp))
.await?;
result_stream
.consume_with_parallelism(shared_args_v.max_streams())
.await?;
dest.write_remote_data(
ctx,
Box::new(s3_temp.clone()),
shared_args,
s3_source_args,
dest_args,
)
.instrument(trace_span!("load_from_s3_temp", url = %s3_temp))
.await?;
let fut = async { Ok(dest.boxed()) }.boxed();
Ok(box_stream_once(Ok(fut)))
}