use crate::*;
use std::io::{BufRead, BufReader, Read, Write};
use std::sync::mpsc::{Receiver, SyncSender};
use std::sync::{Mutex, MutexGuard};
#[inline(always)]
pub fn pipe(sz: usize) -> (PipeOut, PipeIn) {
let (sender, receiver) = std::sync::mpsc::sync_channel(sz);
(PipeOut::with(sender), PipeIn::with(receiver))
}
#[derive(Debug)]
pub struct PipeIn(LockablePipeIn);
impl PipeIn {
pub fn with(a: Receiver<Vec<u8>>) -> Self {
Self(LockablePipeIn::with(a))
}
}
impl StreamIn for PipeIn {
fn lock_bufread(&self) -> Box<dyn BufRead + '_> {
Box::new(PipeInLock(self.0.lock()))
}
fn is_line_pipe(&self) -> bool {
false
}
fn lines(&self) -> Box<dyn NextLine + '_> {
let a = self.0.inner.lock().unwrap().take().unwrap();
let b = a.lines();
Box::new(Lines { buf: b })
}
}
#[derive(Debug)]
pub struct PipeInLock<'a>(LockablePipeInLock<'a>);
impl Read for PipeInLock<'_> {
#[inline(always)]
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.0.read(buf)
}
}
impl BufRead for PipeInLock<'_> {
#[inline(always)]
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
self.0.fill_buf()
}
#[inline(always)]
fn consume(&mut self, amt: usize) {
self.0.consume(amt)
}
}
#[derive(Debug)]
pub struct PipeOut(LockablePipeOut);
impl PipeOut {
pub fn with(sender: SyncSender<Vec<u8>>) -> Self {
Self(LockablePipeOut::with(RawPipeOut::with(sender)))
}
}
impl StreamOut for PipeOut {
#[inline(always)]
fn lock(&self) -> Box<dyn StreamOutLock + '_> {
Box::new(PipeOutLock(self.0.lock()))
}
fn is_line_pipe(&self) -> bool {
false
}
fn write_line(&self, string: String) -> Result<()> {
self.lock().write_fmt(format_args!("{}\n", string))
}
fn flush_line(&self) -> Result<()> {
self.lock().flush()
}
}
#[derive(Debug)]
pub struct PipeOutLock<'a>(LockablePipeOutLock<'a>);
impl StreamOutLock for PipeOutLock<'_> {
#[inline(always)]
fn buffer(&self) -> &[u8] {
self.0.buffer()
}
}
impl Write for PipeOutLock<'_> {
#[inline(always)]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.write(buf)
}
#[inline(always)]
fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()
}
}
#[derive(Debug)]
pub struct PipeErr(LockablePipeOut);
impl PipeErr {
pub fn with(sender: SyncSender<Vec<u8>>) -> Self {
Self(LockablePipeOut::with(RawPipeOut::with(sender)))
}
}
impl StreamErr for PipeErr {
#[inline(always)]
fn lock(&self) -> Box<dyn StreamErrLock + '_> {
Box::new(PipeErrLock(self.0.lock()))
}
fn is_line_pipe(&self) -> bool {
false
}
fn write_line(&self, string: String) -> Result<()> {
self.lock().write_fmt(format_args!("{}\n", string))
}
fn flush_line(&self) -> Result<()> {
self.lock().flush()
}
}
impl std::convert::From<PipeOut> for PipeErr {
#[inline(always)]
fn from(a: PipeOut) -> Self {
Self(a.0)
}
}
#[derive(Debug)]
pub struct PipeErrLock<'a>(LockablePipeOutLock<'a>);
impl StreamErrLock for PipeErrLock<'_> {
#[inline(always)]
fn buffer(&self) -> &[u8] {
self.0.buffer()
}
}
impl Write for PipeErrLock<'_> {
#[inline(always)]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.write(buf)
}
#[inline(always)]
fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()
}
}
const LINE_BUF_SIZE: usize = 1024;
#[derive(Debug)]
struct LockablePipeIn {
inner: Mutex<Option<BufReader<RawPipeIn>>>,
}
impl LockablePipeIn {
pub fn with(a: Receiver<Vec<u8>>) -> Self {
LockablePipeIn {
inner: Mutex::new(Some(BufReader::with_capacity(
LINE_BUF_SIZE,
RawPipeIn::new(a),
))),
}
}
pub fn lock(&self) -> LockablePipeInLock<'_> {
LockablePipeInLock {
inner: self.inner.lock().unwrap_or_else(|e| e.into_inner()),
}
}
}
#[derive(Debug)]
struct LockablePipeInLock<'a> {
inner: MutexGuard<'a, Option<BufReader<RawPipeIn>>>,
}
impl Read for LockablePipeInLock<'_> {
#[inline(always)]
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.inner.as_mut().unwrap().read(buf)
}
}
impl BufRead for LockablePipeInLock<'_> {
#[inline(always)]
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
self.inner.as_mut().unwrap().fill_buf()
}
#[inline(always)]
fn consume(&mut self, amt: usize) {
self.inner.as_mut().unwrap().consume(amt)
}
}
#[derive(Debug)]
struct LockablePipeOut {
inner: Mutex<RawPipeOut>,
}
impl LockablePipeOut {
fn with(a: RawPipeOut) -> Self {
LockablePipeOut {
inner: Mutex::new(a),
}
}
pub fn lock(&self) -> LockablePipeOutLock<'_> {
LockablePipeOutLock {
inner: self.inner.lock().unwrap_or_else(|e| e.into_inner()),
}
}
}
#[derive(Debug)]
struct LockablePipeOutLock<'a> {
inner: MutexGuard<'a, RawPipeOut>,
}
impl LockablePipeOutLock<'_> {
#[inline(always)]
pub fn buffer(&self) -> &[u8] {
self.inner.buffer()
}
}
impl Write for LockablePipeOutLock<'_> {
#[inline(always)]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.inner.write(buf)
}
#[inline(always)]
fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}
pub struct Lines {
buf: std::io::Lines<BufReader<RawPipeIn>>,
}
impl Iterator for Lines {
type Item = Result<String>;
fn next(&mut self) -> Option<Result<String>> {
self.buf.next()
}
}
impl NextLine for Lines {}
#[derive(Debug)]
struct RawPipeIn {
buf: Vec<u8>,
pos: usize,
amt: usize,
reciever: Receiver<Vec<u8>>,
}
impl RawPipeIn {
fn new(a: Receiver<Vec<u8>>) -> Self {
Self {
buf: Vec::new(),
pos: 0,
amt: 0,
reciever: a,
}
}
}
impl Read for RawPipeIn {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.buf.is_empty() {
self.buf = match self.reciever.recv() {
Ok(s) => s,
Err(_) => return Ok(0),
};
}
let len = {
let src = self.buf.as_slice();
let src_len = src.len() - self.pos;
let dst_len = buf.len();
let (len, dst, src) = if src_len >= dst_len {
let len = dst_len;
(len, buf, &src[self.pos..(self.pos + len)])
} else {
let len = src_len;
(len, &mut buf[0..len], &src[self.pos..(self.pos + len)])
};
dst.copy_from_slice(src);
self.pos += len;
len
};
if self.pos >= self.buf.as_slice().len() {
self.buf.clear();
self.pos = 0;
self.amt = 0;
}
Ok(len)
}
}
impl BufRead for RawPipeIn {
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
if self.pos >= self.buf.as_slice().len() {
self.buf.clear();
self.pos = 0;
self.amt = 0;
}
if self.buf.is_empty() {
self.buf = self.reciever.recv().unwrap();
}
let src = {
let src = self.buf.as_slice();
let src_len = src.len() - self.pos;
let dst_len = self.amt;
let (len, src) = if src_len >= dst_len {
let len = dst_len;
(len, &src[self.pos..(self.pos + len)])
} else {
let len = src_len;
(len, &src[self.pos..(self.pos + len)])
};
self.pos += len;
src
};
Ok(src)
}
#[inline(always)]
fn consume(&mut self, amt: usize) {
self.amt = amt;
}
}
#[derive(Debug)]
struct RawPipeOut {
buf: Vec<u8>,
sender: SyncSender<Vec<u8>>,
}
impl RawPipeOut {
pub fn with(a: SyncSender<Vec<u8>>) -> Self {
Self {
buf: Vec::new(),
sender: a,
}
}
#[inline(always)]
pub fn buffer(&self) -> &[u8] {
self.buf.as_slice()
}
}
impl Write for RawPipeOut {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let src_len = buf.len();
const BUF_SZ: usize = 4 * 4 * 1024;
if self.buf.len() >= BUF_SZ {
self.flush()?;
}
self.buf.extend_from_slice(buf);
Ok(src_len)
}
fn flush(&mut self) -> std::io::Result<()> {
let r = self.sender.send(self.buf.clone());
if let Err(err) = r {
return Err(std::io::Error::new(std::io::ErrorKind::Other, err));
}
self.buf.clear();
Ok(())
}
}