use std::io;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::constants::{MAX_PIPE_CHUNK_SIZE, READ_PAUSE_DURATION};
pub fn spawn_read_task<R>(
reader: R,
buf: usize,
) -> (JoinHandle<io::Result<()>>, mpsc::Receiver<Vec<u8>>)
where
R: AsyncRead + Send + Unpin + 'static,
{
let (tx, rx) = mpsc::channel(buf);
let task = tokio::spawn(read_handler(reader, tx));
(task, rx)
}
async fn read_handler<R>(mut reader: R, channel: mpsc::Sender<Vec<u8>>) -> io::Result<()>
where
R: AsyncRead + Unpin,
{
let mut buf: [u8; MAX_PIPE_CHUNK_SIZE] = [0; MAX_PIPE_CHUNK_SIZE];
loop {
match reader.read(&mut buf).await {
Ok(n) if n > 0 => {
channel.send(buf[..n].to_vec()).await.map_err(|_| {
io::Error::new(io::ErrorKind::BrokenPipe, "Output channel closed")
})?;
tokio::time::sleep(READ_PAUSE_DURATION).await;
}
Ok(_) => return Ok(()),
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
tokio::time::sleep(READ_PAUSE_DURATION).await;
}
Err(x) => return Err(x),
}
}
}
pub fn spawn_write_task<W>(
writer: W,
buf: usize,
) -> (JoinHandle<io::Result<()>>, mpsc::Sender<Vec<u8>>)
where
W: AsyncWrite + Send + Unpin + 'static,
{
let (tx, rx) = mpsc::channel(buf);
let task = tokio::spawn(write_handler(writer, rx));
(task, tx)
}
async fn write_handler<W>(mut writer: W, mut channel: mpsc::Receiver<Vec<u8>>) -> io::Result<()>
where
W: AsyncWrite + Unpin,
{
while let Some(data) = channel.recv().await {
writer.write_all(&data).await?;
}
Ok(())
}