use super::*;
use std::{
io,
process::{Child, Command, Stdio},
thread,
};
use anyhow::{Result, ensure};
pub struct BackendFilter {
pub filter: String,
pub unfilter: String,
pub raw: Box<dyn super::Backend + Send + Sync>,
}
struct UnfilterRead {
from: String,
unfilter: String,
copy_thread: Option<thread::JoinHandle<Result<()>>>,
child: Child,
}
impl Drop for UnfilterRead {
fn drop(&mut self) {
self.copy_thread
.take()
.unwrap()
.join()
.unwrap()
.unwrap_or_else(|e| {
panic!(
"Piping {} through {} failed: {:#?}",
self.from, self.unfilter, e
)
});
if !self.child.wait().unwrap().success() {
panic!("{} < {} failed", self.unfilter, self.from)
}
}
}
impl Read for UnfilterRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.child.stdout.as_mut().unwrap().read(buf)
}
}
impl Backend for BackendFilter {
fn read(&self, from: &str) -> Result<Box<dyn Read + Send + 'static>> {
debug!("{} < {from}", self.unfilter);
let mut inner_read = self.raw.read(from)?;
let mut uf = Command::new("sh")
.arg("-c")
.arg(&self.unfilter)
.stdout(Stdio::piped())
.stdin(Stdio::piped())
.spawn()
.with_context(|| format!("Couldn't run {}", self.unfilter))?;
let mut to_unfilter = uf.stdin.take().unwrap();
let copy_thread = thread::Builder::new()
.name("unfilter-copy".to_string())
.spawn(move || -> anyhow::Result<()> {
io::copy(&mut inner_read, &mut to_unfilter)?;
Ok(())
})
.unwrap();
Ok(Box::new(UnfilterRead {
from: from.to_string(),
unfilter: self.unfilter.clone(),
copy_thread: Some(copy_thread),
child: uf,
}))
}
fn write(&self, _len: u64, from: &mut (dyn Read + Send), to: &str) -> Result<()> {
debug!("{} > {to}", self.filter);
let mut f = Command::new("sh")
.arg("-c")
.arg(&self.filter)
.stdout(Stdio::piped())
.stdin(Stdio::piped())
.spawn()
.with_context(|| format!("Couldn't run {}", self.filter))?;
let mut to_filter = f.stdin.take().unwrap();
let mut from_filter = f.stdout.take().unwrap();
let mut filtered = tempfile::tempfile_in(".")?;
thread::scope(|s| -> anyhow::Result<()> {
let copy_to = thread::Builder::new()
.name("filter-copy".to_string())
.spawn_scoped(s, move || -> anyhow::Result<()> {
io::copy(from, &mut to_filter)?;
Ok(())
})
.unwrap();
io::copy(&mut from_filter, &mut filtered)?;
copy_to.join().unwrap()?;
Ok(())
})
.with_context(|| format!("Piping {to} through {} failed", self.filter))?;
ensure!(
f.wait().unwrap().success(),
format!("{} > {to} failed", self.filter)
);
let len = filtered.stream_position()?;
filtered.seek(io::SeekFrom::Start(0))?;
self.raw.write(len, &mut filtered, to)?;
Ok(())
}
fn remove(&self, which: &str) -> Result<()> {
self.raw.remove(which)
}
fn list(&self, prefix: &str) -> Result<Vec<(String, u64)>> {
self.raw.list(prefix)
}
}
#[cfg(test)]
mod test {
use super::*;
use std::io::Cursor;
#[test]
fn smoke() -> Result<()> {
let f = BackendFilter {
filter: "cat".to_string(),
unfilter: "cat".to_string(),
raw: Box::new(crate::backend::memory::MemoryBackend::new()),
};
let epitaph = "Everything was beautiful and nothing hurt";
f.write(epitaph.len() as u64, &mut Cursor::new(epitaph), "epitaph")?;
let mut so_it_goes = String::new();
f.read("epitaph")?.read_to_string(&mut so_it_goes)?;
assert_eq!(so_it_goes, "Everything was beautiful and nothing hurt");
Ok(())
}
}