use crate::common::*;
use crate::tokio_glue::{SyncStreamReader, SyncStreamWriter};
#[instrument(level = "debug", skip(ctx, input, transform))]
pub(crate) fn spawn_sync_transform<T>(
ctx: Context,
name: String,
input: BoxStream<BytesMut>,
transform: T,
) -> Result<BoxStream<BytesMut>>
where
T: (FnOnce(
Context,
Box<dyn Read + Send + 'static>,
Box<dyn Write + Send + 'static>,
) -> Result<()>)
+ Send
+ 'static,
{
let rdr = SyncStreamReader::new(input);
let (wtr, output) = SyncStreamWriter::pipe();
let transform_ctx = ctx.clone();
let transform_fut = spawn_blocking(move || -> Result<()> {
transform(transform_ctx, Box::new(rdr), Box::new(wtr))
})
.instrument(debug_span!("sync_transform", name = ?name));
ctx.spawn_worker(transform_fut.boxed());
Ok(output.boxed())
}