use crate::*;
use std::io::{BufRead, Read, Write};
use std::sync::mpsc::{Receiver, SyncSender};
use std::sync::{Mutex, MutexGuard};
#[inline(always)]
pub fn line_pipe(sz: usize) -> (LinePipeOut, LinePipeIn) {
let (sender, receiver) = std::sync::mpsc::sync_channel(sz);
(LinePipeOut::with(sender), LinePipeIn::with(receiver))
}
trait WriteString {
fn write_line(&mut self, string: String) -> Result<()>;
fn flush_line(&mut self) -> Result<()>;
}
const MSG_CHUNK_SZ: usize = 2 * 512;
#[derive(Debug)]
pub struct LinePipeIn(LockableLinePipeIn);
impl LinePipeIn {
pub fn with(a: Receiver<Vec<String>>) -> Self {
Self(LockableLinePipeIn::with(a))
}
}
impl StreamIn for LinePipeIn {
#[inline(always)]
fn lock_bufread(&self) -> Box<dyn BufRead + '_> {
unimplemented!()
}
#[inline(always)]
fn is_line_pipe(&self) -> bool {
true
}
fn lines(&self) -> Box<dyn NextLine + '_> {
let a = self.0.inner.lock().unwrap().take().unwrap();
Box::new(Lines { buf: a })
}
}
#[allow(dead_code)]
pub struct LinePipeInLock<'a>(LockableLinePipeInLock<'a>);
impl Read for LinePipeInLock<'_> {
#[inline(always)]
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
unimplemented!()
}
}
impl BufRead for LinePipeInLock<'_> {
#[inline(always)]
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
unimplemented!()
}
#[inline(always)]
fn consume(&mut self, _amt: usize) {
unimplemented!()
}
}
#[derive(Debug)]
pub struct LinePipeOut(LockableLinePipeOut);
impl LinePipeOut {
pub fn with(sender: SyncSender<Vec<String>>) -> Self {
Self(LockableLinePipeOut::with(RawLinePipeOut::with(sender)))
}
}
impl StreamOut for LinePipeOut {
#[inline(always)]
fn lock(&self) -> Box<dyn StreamOutLock + '_> {
unimplemented!()
}
#[inline(always)]
fn is_line_pipe(&self) -> bool {
true
}
fn write_line(&self, string: String) -> Result<()> {
self.0.lock().write_line(string)
}
fn flush_line(&self) -> Result<()> {
self.0.lock().flush_line()
}
}
#[derive(Debug)]
pub struct LinePipeOutLock<'a>(LockableLinePipeOutLock<'a>);
impl StreamOutLock for LinePipeOutLock<'_> {
#[inline(always)]
fn buffer(&self) -> &[u8] {
unimplemented!()
}
}
impl Write for LinePipeOutLock<'_> {
#[inline(always)]
fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
unimplemented!()
}
#[inline(always)]
fn flush(&mut self) -> std::io::Result<()> {
unimplemented!()
}
}
impl WriteString for LinePipeOutLock<'_> {
#[inline(always)]
fn write_line(&mut self, string: String) -> std::io::Result<()> {
self.0.write_line(string)
}
#[inline(always)]
fn flush_line(&mut self) -> std::io::Result<()> {
self.0.flush_line()
}
}
#[derive(Debug)]
pub struct LinePipeErr(LockableLinePipeOut);
impl LinePipeErr {
pub fn with(sender: SyncSender<Vec<String>>) -> Self {
Self(LockableLinePipeOut::with(RawLinePipeOut::with(sender)))
}
}
impl StreamErr for LinePipeErr {
#[inline(always)]
fn lock(&self) -> Box<dyn StreamErrLock + '_> {
unimplemented!()
}
#[inline(always)]
fn is_line_pipe(&self) -> bool {
true
}
fn write_line(&self, string: String) -> Result<()> {
self.0.lock().write_line(string)
}
fn flush_line(&self) -> Result<()> {
self.0.lock().flush_line()
}
}
impl std::convert::From<LinePipeOut> for LinePipeErr {
#[inline(always)]
fn from(a: LinePipeOut) -> Self {
Self(a.0)
}
}
#[allow(dead_code)]
#[derive(Debug)]
pub struct LinePipeErrLock<'a>(LockableLinePipeOutLock<'a>);
impl StreamErrLock for LinePipeErrLock<'_> {
#[inline(always)]
fn buffer(&self) -> &[u8] {
unimplemented!()
}
}
impl Write for LinePipeErrLock<'_> {
#[inline(always)]
fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
unimplemented!()
}
#[inline(always)]
fn flush(&mut self) -> std::io::Result<()> {
unimplemented!()
}
}
#[derive(Debug)]
struct LockableLinePipeIn {
inner: Mutex<Option<RawLinePipeIn>>,
}
impl LockableLinePipeIn {
pub fn with(a: Receiver<Vec<String>>) -> Self {
LockableLinePipeIn {
inner: Mutex::new(Some(RawLinePipeIn::new(a))),
}
}
}
#[derive(Debug)]
struct LockableLinePipeInLock<'a> {
_inner: MutexGuard<'a, Option<RawLinePipeIn>>,
}
impl Read for LockableLinePipeInLock<'_> {
#[inline(always)]
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
unimplemented!()
}
}
impl BufRead for LockableLinePipeInLock<'_> {
#[inline(always)]
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
unimplemented!()
}
#[inline(always)]
fn consume(&mut self, _amt: usize) {
unimplemented!()
}
}
#[derive(Debug)]
struct LockableLinePipeOut {
inner: Mutex<RawLinePipeOut>,
}
impl LockableLinePipeOut {
fn with(a: RawLinePipeOut) -> Self {
LockableLinePipeOut {
inner: Mutex::new(a),
}
}
pub fn lock(&self) -> LockableLinePipeOutLock<'_> {
LockableLinePipeOutLock {
inner: self.inner.lock().unwrap_or_else(|e| e.into_inner()),
}
}
}
#[derive(Debug)]
struct LockableLinePipeOutLock<'a> {
inner: MutexGuard<'a, RawLinePipeOut>,
}
impl Write for LockableLinePipeOutLock<'_> {
#[inline(always)]
fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
unimplemented!()
}
#[inline(always)]
fn flush(&mut self) -> std::io::Result<()> {
unimplemented!()
}
}
impl WriteString for LockableLinePipeOutLock<'_> {
#[inline(always)]
fn write_line(&mut self, string: String) -> std::io::Result<()> {
self.inner.write_line(string)
}
#[inline(always)]
fn flush_line(&mut self) -> std::io::Result<()> {
self.inner.flush_line()
}
}
pub struct Lines {
buf: RawLinePipeIn,
}
impl Iterator for Lines {
type Item = Result<String>;
fn next(&mut self) -> Option<Result<String>> {
self.buf.next()
}
}
impl NextLine for Lines {}
#[derive(Debug)]
struct RawLinePipeIn {
buf: Vec<String>,
receiver: Receiver<Vec<String>>,
}
impl RawLinePipeIn {
fn new(a: Receiver<Vec<String>>) -> Self {
Self {
buf: Vec::with_capacity(MSG_CHUNK_SZ),
receiver: a,
}
}
fn next(&mut self) -> Option<Result<String>> {
if self.buf.is_empty() {
let mut b = match self.receiver.recv() {
Ok(s) => s,
Err(_) => return None,
};
b.reverse();
self.buf = b;
}
Some(Ok(self.buf.pop().unwrap()))
}
}
impl Read for RawLinePipeIn {
#[inline(always)]
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
unimplemented!();
}
}
impl BufRead for RawLinePipeIn {
#[inline(always)]
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
unimplemented!();
}
#[inline(always)]
fn consume(&mut self, _amt: usize) {
unimplemented!();
}
}
#[derive(Debug)]
struct RawLinePipeOut {
buf: Vec<String>,
sender: SyncSender<Vec<String>>,
}
impl RawLinePipeOut {
pub fn with(a: SyncSender<Vec<String>>) -> Self {
Self {
buf: Vec::new(),
sender: a,
}
}
}
impl Write for RawLinePipeOut {
#[inline(always)]
fn write(&mut self, _buf: &[u8]) -> std::io::Result<usize> {
unimplemented!();
}
#[inline(always)]
fn flush(&mut self) -> std::io::Result<()> {
unimplemented!();
}
}
impl WriteString for RawLinePipeOut {
fn write_line(&mut self, string: String) -> Result<()> {
self.buf.push(string);
if self.buf.len() > MSG_CHUNK_SZ {
self.flush_line()?;
}
Ok(())
}
fn flush_line(&mut self) -> Result<()> {
let mut v = Vec::with_capacity(self.buf.len());
v.append(&mut self.buf); let r = self.sender.send(v);
if let Err(err) = r {
return Err(std::io::Error::new(std::io::ErrorKind::Other, err));
}
self.buf.clear();
Ok(())
}
}