use crate::*;
use std::io::{BufRead, BufReader, Read, Write};
use std::sync::mpsc::{Receiver, SyncSender};
use std::sync::{Mutex, MutexGuard};
#[inline]
pub fn pipe(sz: usize) -> (PipeOut, PipeIn) {
let (sender, receiver) = std::sync::mpsc::sync_channel(sz);
(PipeOut::with(sender), PipeIn::with(receiver))
}
#[derive(Debug)]
pub struct PipeIn(LockablePipeIn);
impl PipeIn {
pub fn with(a: Receiver<Vec<u8>>) -> Self {
Self(LockablePipeIn::with(a))
}
}
impl StreamIn for PipeIn {
fn lock_bufread(&self) -> Box<dyn BufRead + '_> {
Box::new(PipeInLock(self.0.lock()))
}
fn is_line_pipe(&self) -> bool {
false
}
fn lines(&self) -> Box<dyn NextLine + '_> {
Box::new(Lines {
buf: self.lock_bufread().lines(),
})
}
}
#[derive(Debug)]
pub struct PipeInLock<'a>(LockablePipeInLock<'a>);
impl Read for PipeInLock<'_> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.0.read(buf)
}
}
impl BufRead for PipeInLock<'_> {
#[inline]
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
self.0.fill_buf()
}
#[inline]
fn consume(&mut self, amt: usize) {
self.0.consume(amt)
}
}
#[derive(Debug)]
pub struct PipeOut(LockablePipeOut);
impl PipeOut {
pub fn with(sender: SyncSender<Vec<u8>>) -> Self {
Self(LockablePipeOut::with(RawPipeOut::with(sender)))
}
}
impl StreamOut for PipeOut {
#[inline]
fn lock(&self) -> Box<dyn StreamOutLock + '_> {
Box::new(PipeOutLock(self.0.lock()))
}
fn is_line_pipe(&self) -> bool {
false
}
fn write_line(&self, string: String) -> Result<()> {
self.lock().write_fmt(format_args!("{}\n", string))
}
fn flush_line(&self) -> Result<()> {
self.lock().flush()
}
}
#[derive(Debug)]
pub struct PipeOutLock<'a>(LockablePipeOutLock<'a>);
impl StreamOutLock for PipeOutLock<'_> {
#[inline]
fn buffer(&self) -> &[u8] {
self.0.buffer()
}
}
impl Write for PipeOutLock<'_> {
#[inline]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.write(buf)
}
#[inline]
fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()
}
}
#[derive(Debug)]
pub struct PipeErr(LockablePipeOut);
impl PipeErr {
pub fn with(sender: SyncSender<Vec<u8>>) -> Self {
Self(LockablePipeOut::with(RawPipeOut::with(sender)))
}
}
impl StreamErr for PipeErr {
#[inline]
fn lock(&self) -> Box<dyn StreamErrLock + '_> {
Box::new(PipeErrLock(self.0.lock()))
}
fn is_line_pipe(&self) -> bool {
false
}
fn write_line(&self, string: String) -> Result<()> {
self.lock().write_fmt(format_args!("{}\n", string))
}
fn flush_line(&self) -> Result<()> {
self.lock().flush()
}
}
impl std::convert::From<PipeOut> for PipeErr {
#[inline]
fn from(a: PipeOut) -> Self {
Self(a.0)
}
}
#[derive(Debug)]
pub struct PipeErrLock<'a>(LockablePipeOutLock<'a>);
impl StreamErrLock for PipeErrLock<'_> {
#[inline]
fn buffer(&self) -> &[u8] {
self.0.buffer()
}
}
impl Write for PipeErrLock<'_> {
#[inline]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.write(buf)
}
#[inline]
fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()
}
}
const LINE_BUF_SIZE: usize = 1024;
#[derive(Debug)]
struct LockablePipeIn {
inner: Mutex<BufReader<RawPipeIn>>,
}
impl LockablePipeIn {
pub fn with(a: Receiver<Vec<u8>>) -> Self {
LockablePipeIn {
inner: Mutex::new(BufReader::with_capacity(
LINE_BUF_SIZE,
RawPipeIn::new(a),
)),
}
}
pub fn lock(&self) -> LockablePipeInLock<'_> {
LockablePipeInLock {
inner: self.inner.lock_any(),
}
}
}
#[derive(Debug)]
struct LockablePipeInLock<'a> {
inner: MutexGuard<'a, BufReader<RawPipeIn>>,
}
impl Read for LockablePipeInLock<'_> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.inner.read(buf)
}
}
impl BufRead for LockablePipeInLock<'_> {
#[inline]
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
self.inner.fill_buf()
}
#[inline]
fn consume(&mut self, amt: usize) {
self.inner.consume(amt)
}
}
#[derive(Debug)]
struct LockablePipeOut {
inner: Mutex<RawPipeOut>,
}
impl LockablePipeOut {
fn with(a: RawPipeOut) -> Self {
LockablePipeOut {
inner: Mutex::new(a),
}
}
pub fn lock(&self) -> LockablePipeOutLock<'_> {
LockablePipeOutLock {
inner: self.inner.lock_any(),
}
}
}
#[derive(Debug)]
struct LockablePipeOutLock<'a> {
inner: MutexGuard<'a, RawPipeOut>,
}
impl LockablePipeOutLock<'_> {
#[inline]
pub fn buffer(&self) -> &[u8] {
self.inner.buffer()
}
}
impl Write for LockablePipeOutLock<'_> {
#[inline]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.inner.write(buf)
}
#[inline]
fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}
pub struct Lines<'a> {
buf: std::io::Lines<Box<dyn BufRead + 'a>>,
}
impl<'a> Iterator for Lines<'a> {
type Item = Result<String>;
fn next(&mut self) -> Option<Result<String>> {
self.buf.next()
}
}
impl<'a> NextLine for Lines<'a> {}
#[derive(Debug)]
struct RawPipeIn {
buf: Vec<u8>,
pos: usize,
reciever: Receiver<Vec<u8>>,
}
impl RawPipeIn {
fn new(a: Receiver<Vec<u8>>) -> Self {
Self {
buf: Vec::new(),
pos: 0,
reciever: a,
}
}
}
impl Read for RawPipeIn {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let n = self.fill_buf()?.read(buf)?;
self.consume(n);
Ok(n)
}
}
impl BufRead for RawPipeIn {
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
if self.pos >= self.buf.len() {
self.buf.clear();
self.pos = 0;
}
if self.buf.is_empty() {
match self.reciever.recv() {
Ok(s) => self.buf = s,
Err(_) => return Ok(&[]),
}
}
Ok(&self.buf[self.pos..])
}
#[inline]
fn consume(&mut self, amt: usize) {
self.pos = std::cmp::min(self.pos + amt, self.buf.len());
}
}
#[derive(Debug)]
struct RawPipeOut {
buf: Vec<u8>,
sender: SyncSender<Vec<u8>>,
}
impl RawPipeOut {
pub fn with(a: SyncSender<Vec<u8>>) -> Self {
Self {
buf: Vec::new(),
sender: a,
}
}
#[inline]
pub fn buffer(&self) -> &[u8] {
self.buf.as_slice()
}
}
impl Write for RawPipeOut {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let src_len = buf.len();
const BUF_SZ: usize = 4 * 4 * 1024;
if self.buf.len() >= BUF_SZ {
self.flush()?;
}
self.buf.extend_from_slice(buf);
Ok(src_len)
}
fn flush(&mut self) -> std::io::Result<()> {
let buf = std::mem::take(&mut self.buf);
let r = self.sender.send(buf);
if let Err(err) = r {
return Err(std::io::Error::new(std::io::ErrorKind::Other, err));
}
Ok(())
}
}