use std::{
fs::File,
io::{self, stdin, stdout, BufReader, BufWriter, Error, ErrorKind, Read, Stdin, Stdout, Write},
path::{Path, PathBuf},
process::{Child, ChildStdin, ChildStdout, Command, Stdio},
thread,
};
use crate::compress_type::{CompressThreads, CompressType};
use crate::filter_spec::FilterSpec;
use crate::path_utils::*;
use os_pipe::{pipe, PipeReader};
#[derive(Debug)]
pub enum Filter {
NoFilter,
Filter(FilterSpec),
}
fn piped_stdin(buf: CheckBuf) -> PipeReader {
let (reader, mut wr) = pipe().expect("Couldn't create pipe");
thread::spawn(move || {
let mut tbuf = [0; 65536];
let mut rd = io::stdin();
wr.write_all(&buf).expect("Error writing to pipe");
while let Ok(n) = rd.read(&mut tbuf) {
if n > 0 {
assert!(n <= tbuf.len());
wr.write_all(&tbuf[..n]).expect("Error writing to pipe");
} else {
break;
}
}
});
reader
}
impl Filter {
pub fn reader<P: AsRef<Path>>(&self, name: Option<P>, buf: CheckBuf) -> io::Result<Reader> {
let pipe = if name.is_none() && !buf.is_empty() {
Some(piped_stdin(buf))
} else {
None
};
Ok(match self {
Filter::NoFilter => {
if let Some(s) = name {
Reader::from_file(File::open(s.as_ref())?)
} else if let Some(p) = pipe {
Reader::from_pipe_reader(p)
} else {
Reader::from_stdin()
}
}
Filter::Filter(f) => {
if let Some(s) = name {
Reader::from_child(open_read_filter(f, Some(File::open(s.as_ref())?))?)
} else {
Reader::from_child(open_read_filter(f, pipe)?)
}
}
})
}
pub fn writer<P: AsRef<Path>>(
&self,
name: Option<P>,
fix_path: bool,
no_wait: bool,
) -> io::Result<Writer> {
let name = match (name, self) {
(Some(p), Filter::Filter(f)) => {
if fix_path {
Some(p.as_ref().to_owned())
} else {
Some(f.cond_add_suffix(p.as_ref()))
}
}
(Some(p), _) => Some(p.as_ref().to_owned()),
_ => None,
};
Ok(match self {
Filter::NoFilter => {
if let Some(s) = name {
Writer::from_file(File::create(&s)?)
} else {
Writer::from_stdout()
}
}
Filter::Filter(f) => {
if let Some(s) = name {
Writer::from_child(open_write_filter(f, Some(File::create(&s)?))?, no_wait)
} else {
let none: Option<File> = None;
Writer::from_child(open_write_filter(f, none)?, no_wait)
}
}
})
}
pub fn new_decompress_filter(ctype: CompressType) -> io::Result<Self> {
Ok(match ctype {
CompressType::NoFilter => Filter::NoFilter,
_ => {
let tool = ctype.get_decompress_tool()?;
let path = tool.path().expect("Unknown path for selected tool");
let service = tool
.get_decompress(ctype)
.expect("tool does not support selected decompress type");
Filter::Filter(FilterSpec::new_compress(
path,
service.args(CompressThreads::Default),
ctype,
))
}
})
}
pub fn new_compress_filter(ctype: CompressType, cthreads: CompressThreads) -> io::Result<Self> {
Ok(match ctype {
CompressType::NoFilter => Filter::NoFilter,
_ => {
let tool = ctype.get_compress_tool()?;
let path = tool.path().expect("Unknown path for selected tool");
let service = tool
.get_compress(ctype)
.expect("tool does not support selected compress type");
Filter::Filter(FilterSpec::new_compress(
path,
service.args(cthreads),
ctype,
))
}
})
}
}
impl Default for Filter {
fn default() -> Self {
Self::NoFilter
}
}
pub fn open_read_filter<T: Into<Stdio>>(f: &FilterSpec, input: Option<T>) -> io::Result<Child> {
let mut com = Command::new(f.path());
let com = match input {
Some(s) => com.stdin(s),
None => com.stdin(Stdio::inherit()),
};
match com.args(f.args()).stdout(Stdio::piped()).spawn() {
Ok(proc) => Ok(proc),
Err(error) => Err(Error::new(
ErrorKind::Other,
format!(
"Error executing pipe command '{}': {}",
f.path().display(),
error
),
)),
}
}
pub fn open_write_filter<T: Into<Stdio> + std::fmt::Debug>(
f: &FilterSpec,
output: Option<T>,
) -> io::Result<Child> {
let mut com = Command::new(f.path());
let com = match output {
Some(s) => com.stdout(s),
None => com.stdout(Stdio::inherit()),
};
match com.args(f.args()).stdin(Stdio::piped()).spawn() {
Ok(proc) => Ok(proc),
Err(error) => Err(Error::new(
ErrorKind::Other,
format!(
"Error executing pipe command '{}': {}",
f.path().display(),
error
),
)),
}
}
#[derive(Debug)]
pub enum Writer {
File(File),
Child(Option<ChildStdin>, Option<Child>),
ChildStdin(ChildStdin),
Stdout(Stdout),
}
impl Writer {
pub fn from_child(mut child: Child, no_wait: bool) -> Self {
let cs = child.stdin.take().expect("Pipe error");
if no_wait {
Self::ChildStdin(cs)
} else {
Self::Child(Some(cs), Some(child))
}
}
pub fn take_child(&mut self) -> Option<Child> {
match self {
Self::Child(_, ch) => ch.take(),
_ => None,
}
}
pub fn from_file(file: File) -> Self {
Self::File(file)
}
pub fn from_stdout() -> Self {
Self::Stdout(stdout())
}
}
impl Write for Writer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
Self::File(f) => f.write(buf),
Self::Child(Some(c), _) => c.write(buf),
Self::ChildStdin(c) => c.write(buf),
Self::Stdout(s) => s.write(buf),
_ => Ok(0),
}
}
fn flush(&mut self) -> io::Result<()> {
match self {
Self::File(f) => f.flush(),
Self::Child(Some(c), _) => c.flush(),
Self::ChildStdin(c) => c.flush(),
Self::Stdout(s) => s.flush(),
_ => Ok(()),
}
}
}
impl Drop for Writer {
fn drop(&mut self) {
if let Self::Child(cs, ch) = self {
if let Some(mut child) = ch.take() {
drop(cs.take());
let _ = child.wait();
}
}
}
}
#[derive(Debug)]
pub enum Reader {
File(File),
Child(Child, ChildStdout),
Stdin(Stdin),
PipeReader(PipeReader),
}
impl Drop for Reader {
fn drop(&mut self) {
if let Self::Child(c, _) = self {
let _ = c.kill();
let _ = c.wait();
}
}
}
impl Reader {
pub fn from_file(file: File) -> Self {
Self::File(file)
}
pub fn from_stdin() -> Self {
Self::Stdin(stdin())
}
pub fn from_child(mut c: Child) -> Self {
let cs = c.stdout.take().expect("Erro getting child stdout");
Self::Child(c, cs)
}
pub fn from_pipe_reader(pr: PipeReader) -> Self {
Self::PipeReader(pr)
}
}
impl Read for Reader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
Self::File(f) => f.read(buf),
Self::Child(_, cs) => cs.read(buf),
Self::Stdin(s) => s.read(buf),
Self::PipeReader(pr) => pr.read(buf),
}
}
}
#[derive(Default, Debug)]
pub struct CompressIo {
path: Option<PathBuf>,
ctype: CompressType,
cthreads: CompressThreads,
fix_path: bool,
no_wait: bool,
}
impl CompressIo {
pub fn new() -> Self {
Self::default()
}
pub fn path<P: AsRef<Path>>(&mut self, path: P) -> &mut Self {
self.path = Some(path.as_ref().to_owned());
self
}
pub fn opt_path<P: AsRef<Path>>(&mut self, path: Option<P>) -> &mut Self {
self.path = path.map(|p| p.as_ref().to_owned());
self
}
pub fn ctype(&mut self, ctype: CompressType) -> &mut Self {
self.ctype = ctype;
self
}
pub fn cthreads(&mut self, cthreads: CompressThreads) -> &mut Self {
self.cthreads = cthreads;
self
}
pub fn fix_path(&mut self) -> &mut Self {
self.fix_path = true;
self
}
pub fn no_wait(&mut self) -> &mut Self {
self.no_wait = true;
self
}
pub fn reader(&self) -> io::Result<Reader> {
let mut buf = CheckBuf::default();
let filter = Filter::new_decompress_filter(check_read_ctype(
self.path.as_ref(),
self.ctype,
Some(&mut buf),
)?)?;
filter.reader(self.path.as_ref(), buf)
}
pub fn bufreader(&self) -> io::Result<BufReader<Reader>> {
self.reader().map(BufReader::new)
}
pub fn writer(&self) -> io::Result<Writer> {
let ctype = if self.ctype == CompressType::Unknown {
if let Some(p) = self.path.as_ref() {
CompressType::from_suffix(p)
} else {
CompressType::NoFilter
}
} else {
self.ctype
};
let filter = Filter::new_compress_filter(ctype, self.cthreads)?;
filter.writer(self.path.as_ref(), self.fix_path, self.no_wait)
}
pub fn bufwriter(&self) -> io::Result<BufWriter<Writer>> {
self.writer().map( BufWriter::new)
}
}