use crate::common::*;
use crate::tokio_glue::{
run_sync_fn_in_background, SyncStreamReader, SyncStreamWriter,
};
pub(crate) fn spawn_sync_transform<T>(
ctx: Context,
name: String,
input: BoxStream<BytesMut>,
transform: T,
) -> Result<BoxStream<BytesMut>>
where
T: (FnOnce(
Context,
Box<dyn Read + Send + 'static>,
Box<dyn Write + Send + 'static>,
) -> Result<()>)
+ Send
+ 'static,
{
let ctx = ctx.child(o!("transform" => name.clone()));
let rdr_ctx = ctx.child(o!("mode" => "input"));
let rdr = SyncStreamReader::new(rdr_ctx, input);
let wtr_ctx = ctx.child(o!("mode" => "output"));
let (wtr, output) = SyncStreamWriter::pipe(wtr_ctx);
let transform_ctx = ctx.clone();
let transform_fut = run_sync_fn_in_background(name, move || -> Result<()> {
transform(transform_ctx, Box::new(rdr), Box::new(wtr))
});
ctx.spawn_worker(tokio_fut(transform_fut));
Ok(Box::new(output))
}