fclones/
transform.rs

1use std::cell::RefCell;
2use std::ffi::OsString;
3use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
4use std::io;
5use std::io::Read;
6use std::path::PathBuf;
7use std::process::{Child, Command, Stdio};
8use std::sync::{Arc, Mutex};
9use std::thread::JoinHandle;
10
11use nom::branch::alt;
12use nom::bytes::complete::tag;
13use nom::character::complete::{none_of, one_of};
14use nom::combinator::map;
15use nom::error::{ErrorKind, ParseError};
16use nom::multi::{many1, separated_list0};
17use nom::sequence::tuple;
18use nom::IResult;
19use regex::Regex;
20use uuid::Uuid;
21
22use crate::path::Path;
23
24/// Controls how we pass data to the child process.
25/// By default, the file to process is sent to the standard input of the child process.
26/// Some programs do not accept reading input from the stdin, but prefer to be pointed
27/// to a file by a command-line option - in this case `Named` variant is used.
28enum Input {
29    /// Pipe the input file from the given path to the stdin of the child
30    StdIn(PathBuf),
31    /// Pass the original path to the file as $IN param
32    Named(PathBuf),
33    /// Copy the original file to a temporary location and pass it as $IN param
34    Copied(PathBuf, PathBuf),
35}
36
37impl Input {
38    fn input_path(&self) -> &PathBuf {
39        match self {
40            Input::StdIn(path) => path,
41            Input::Named(path) => path,
42            Input::Copied(_src, target) => target,
43        }
44    }
45
46    fn prepare_input_file(&self) -> io::Result<()> {
47        match self {
48            Input::StdIn(_path) => Ok(()),
49            Input::Named(_path) => Ok(()),
50            Input::Copied(src, target) => {
51                std::fs::copy(src, target)?;
52                Ok(())
53            }
54        }
55    }
56}
57
58impl Drop for Input {
59    /// Removes the temporary file if it was created
60    fn drop(&mut self) {
61        let _ = match self {
62            Input::StdIn(_) => Ok(()),
63            Input::Named(_) => Ok(()),
64            Input::Copied(_, target) => std::fs::remove_file(target),
65        };
66    }
67}
68
69/// Controls how we read data out from the child process.
70/// By default we read output directly from the standard output of the child process.
71/// If the preprocessor program can't output data to its stdout, but supports only writing
72/// to files, it can be configured to write to a named pipe, and we read from that named pipe.
73enum Output {
74    /// Pipe data directly to StdOut
75    StdOut,
76    /// Send data through a named pipe
77    Named(PathBuf),
78    /// Read data from the same file as the input
79    InPlace(PathBuf),
80}
81
82impl Output {
83    /// Returns the path to the named pipe if the process is configured to write to a pipe.
84    /// Returns None if the process is configured to write to stdout or to modify the input file.
85    pub fn pipe_path(&self) -> Option<PathBuf> {
86        match &self {
87            Output::Named(output) => Some(output.clone()),
88            _ => None,
89        }
90    }
91}
92
93impl Drop for Output {
94    /// Removes the output file if it was created
95    fn drop(&mut self) {
96        let _ = match self {
97            Output::StdOut => Ok(()),
98            Output::Named(target) => std::fs::remove_file(target),
99            Output::InPlace(target) => std::fs::remove_file(target),
100        };
101    }
102}
103
104/// Transforms files through an external program.
105/// The `command_str` field contains a path to a program and its space separated arguments.
106/// The command takes a file given in the `$IN` variable and produces an `$OUT` file.
107#[derive(Clone)]
108pub struct Transform {
109    /// a path to a program and its space separated arguments
110    pub command_str: String,
111    /// temporary directory for storing files and named pipes
112    pub tmp_dir: PathBuf,
113    /// copy the file into temporary directory before running the transform on it
114    pub copy: bool,
115    /// read output from the same location as the original
116    pub in_place: bool,
117    /// will be set to the name of the program, extracted from the command_str
118    pub program: String,
119}
120
121impl Transform {
122    pub fn new(command_str: String, in_place: bool) -> io::Result<Transform> {
123        let has_in = RefCell::new(false);
124        let has_out = RefCell::new(false);
125
126        let parsed = parse_command(&command_str, |s: &str| {
127            match s {
128                "OUT" if cfg!(windows) => *has_out.borrow_mut() = true,
129                "IN" => *has_in.borrow_mut() = true,
130                _ => {}
131            };
132            OsString::from(s)
133        });
134
135        let has_in = has_in.into_inner();
136        let has_out = has_out.into_inner();
137
138        if cfg!(windows) && has_out {
139            return Err(io::Error::new(
140                io::ErrorKind::Other,
141                "$OUT not supported on Windows yet",
142            ));
143        }
144        if in_place && has_out {
145            return Err(io::Error::new(
146                io::ErrorKind::Other,
147                "$OUT conflicts with --in-place",
148            ));
149        }
150        if in_place && !has_in {
151            return Err(io::Error::new(
152                io::ErrorKind::Other,
153                "$IN required with --in-place",
154            ));
155        }
156
157        let program = parsed
158            .first()
159            .and_then(|p| PathBuf::from(p).file_name().map(|s| s.to_os_string()));
160        let program = match program {
161            Some(p) => p.into_string().unwrap(),
162            None => {
163                return Err(io::Error::new(
164                    io::ErrorKind::Other,
165                    "Command cannot be empty",
166                ))
167            }
168        };
169
170        // Check if the program is runnable, fail fast if it is not.
171        match Command::new(&program).spawn() {
172            Ok(mut child) => {
173                let _ignore = child.kill();
174            }
175            Err(e) => {
176                return Err(io::Error::new(
177                    e.kind(),
178                    format!("Cannot launch {program}: {e}"),
179                ))
180            }
181        }
182
183        Ok(Transform {
184            command_str,
185            program,
186            tmp_dir: Transform::create_temp_dir()?,
187            copy: has_in,
188            in_place,
189        })
190    }
191
192    /// Creates the directory where preprocessed files will be stored
193    fn create_temp_dir() -> io::Result<PathBuf> {
194        let tmp = std::env::temp_dir().join(format!("fclones-{:032x}", Uuid::new_v4().as_u128()));
195        match create_dir_all(&tmp) {
196            Ok(()) => Ok(tmp),
197            Err(e) => Err(io::Error::new(
198                e.kind(),
199                format!(
200                    "Failed to create temporary directory {}: {}",
201                    tmp.display(),
202                    e
203                ),
204            )),
205        }
206    }
207
208    /// Creates a new unique random file name in the temporary directory
209    fn random_tmp_file_name(&self) -> PathBuf {
210        self.tmp_dir
211            .join(format!("{:032x}", Uuid::new_v4().as_u128()))
212    }
213
214    /// Returns the output file path for the given input file path
215    pub fn output(&self, input: &Path) -> PathBuf {
216        self.tmp_dir.join(format!("{:x}", input.hash128()))
217    }
218
219    /// Processes the input file and returns its output and err as stream
220    pub fn run(&self, input: &Path) -> io::Result<Execution> {
221        let (args, input_conf, output_conf) = self.make_args(input);
222        let mut command = build_command(&args, &input_conf, &output_conf)?;
223        let result = execute(&mut command, input_conf, output_conf)?;
224        Ok(result)
225    }
226
227    /// Creates arguments, input and output configuration for processing given input path.
228    /// The first element of the argument vector contains the program name.
229    fn make_args(&self, input: &Path) -> (Vec<OsString>, Input, Output) {
230        let input_conf = RefCell::<Input>::new(Input::StdIn(input.to_path_buf()));
231        let output_conf = RefCell::<Output>::new(Output::StdOut);
232
233        let args = parse_command(self.command_str.as_str(), |arg| match arg {
234            "IN" if self.copy => {
235                let tmp_target = self.random_tmp_file_name();
236                input_conf.replace(Input::Copied(input.to_path_buf(), tmp_target.clone()));
237                tmp_target.into_os_string()
238            }
239            "IN" => {
240                let input = input.to_path_buf();
241                input_conf.replace(Input::Named(input.clone()));
242                input.into_os_string()
243            }
244            "OUT" => {
245                let output = self.output(input);
246                output_conf.replace(Output::Named(output.clone()));
247                output.into_os_string()
248            }
249            _ => OsString::from(arg),
250        });
251
252        let input_conf = input_conf.into_inner();
253        let mut output_conf = output_conf.into_inner();
254
255        if self.in_place {
256            output_conf = Output::InPlace(input_conf.input_path().clone())
257        }
258
259        (args, input_conf, output_conf)
260    }
261}
262
263/// Cleans up temporary files
264impl Drop for Transform {
265    fn drop(&mut self) {
266        let _ = remove_dir_all(&self.tmp_dir);
267    }
268}
269
270/// Keeps the results of the transform program execution
271pub struct Execution {
272    pub(crate) child: Arc<Mutex<Child>>,
273    pub(crate) out_stream: Box<dyn Read>,
274    pub(crate) err_stream: Option<JoinHandle<String>>,
275    _input: Input,   // holds the temporary input file(s) until execution is done
276    _output: Output, // holds the temporary output file(s) until execution is done
277}
278
279impl Drop for Execution {
280    fn drop(&mut self) {
281        let mut buf = [0; 4096];
282        while let Ok(1..) = self.out_stream.read(&mut buf) {}
283        let _ = self.child.lock().unwrap().wait();
284    }
285}
286
287/// Builds the `Command` struct from the parsed arguments
288fn build_command(
289    args: &[OsString],
290    input_conf: &Input,
291    output_conf: &Output,
292) -> io::Result<Command> {
293    let mut args = args.iter();
294    let mut command = Command::new(args.next().unwrap());
295    command.args(args);
296    command.stderr(Stdio::piped());
297
298    input_conf.prepare_input_file()?;
299    if let Input::StdIn(_) = input_conf {
300        command.stdin(File::open(input_conf.input_path())?);
301    } else {
302        command.stdin(Stdio::null());
303    }
304
305    if let Output::Named(output) = output_conf {
306        command.stdout(Stdio::null());
307        create_named_pipe(output)?;
308    } else {
309        command.stdout(Stdio::piped());
310    }
311
312    Ok(command)
313}
314
315#[cfg(unix)]
316fn create_named_pipe(path: &std::path::Path) -> io::Result<()> {
317    use nix::sys::stat;
318    use nix::unistd::mkfifo;
319    if let Err(e) = mkfifo(path, stat::Mode::S_IRWXU) {
320        let io_err: io::Error = e.into();
321        return Err(io::Error::new(
322            io_err.kind(),
323            format!("Failed to create named pipe {}: {}", path.display(), io_err),
324        ));
325    }
326    Ok(())
327}
328
329#[cfg(windows)]
330fn create_named_pipe(_path: &PathBuf) -> io::Result<()> {
331    unimplemented!()
332}
333
334/// Spawns the command process, and returns its output as a stream.
335/// The standard error is captured by a background thread and read to a string.
336fn execute(command: &mut Command, input: Input, output: Output) -> io::Result<Execution> {
337    let child = Arc::new(Mutex::new(command.spawn()?));
338
339    // We call 'take' to avoid borrowing `child` for longer than a single line.
340    // We can't reference stdout/stderr directly, because a mutable borrow of a field
341    // creates a mutable borrow of the containing struct, but later we need to mutably
342    // borrow `child` again to wait on it.
343    let child_out = child.lock().unwrap().stdout.take();
344    let child_err = child.lock().unwrap().stderr.take();
345
346    let output_pipe = output.pipe_path();
347    let child_ref = child.clone();
348
349    // Capture the stderr in background in order to avoid a deadlock when the child process
350    // would block on writing to stdout, and this process would block on reading stderr
351    // (or the other way round).
352    // The other solution could be to use non-blocking I/O, but threads look simpler.
353    let stderr_reaper = std::thread::spawn(move || {
354        let mut str = String::new();
355        if let Some(mut stream) = child_err {
356            let _ = stream.read_to_string(&mut str);
357        }
358        // If the child is supposed to communicate its output through a named pipe,
359        // ensure the pipe gets closed and the reader at the other end receives an EOF.
360        // It is possible that due to a misconfiguration
361        // (e.g. wrong arguments given by the user) the child would never open the output file
362        // and the reader at the other end would block forever.
363        if let Some(output_pipe) = output_pipe {
364            // If those fail, we have no way to report the failure.
365            // However if waiting fails here, the child process likely doesn't run, so that's not
366            // a problem.
367            let _ignore = child_ref.lock().unwrap().wait();
368            let _ignore = OpenOptions::new().write(true).open(output_pipe);
369        }
370        str
371    });
372
373    let child_out: Box<dyn Read> = match &output {
374        Output::StdOut => Box::new(child_out.unwrap()),
375        Output::Named(output) => Box::new(File::open(output)?),
376        Output::InPlace(output) => {
377            child.lock().unwrap().wait()?;
378            Box::new(File::open(output)?)
379        }
380    };
381
382    Ok(Execution {
383        child,
384        out_stream: child_out,
385        err_stream: Some(stderr_reaper),
386        _input: input,
387        _output: output,
388    })
389}
390
391/// Compares the input with a regular expression and returns the first match.
392/// Backported from nom 6.0.0-alpha1. We can't use nom 6.0.0-alpha1 directly,
393/// because it had some issues with our use of functions in pattern.rs.
394fn re_find<'s, E>(re: Regex) -> impl Fn(&'s str) -> IResult<&'s str, &'s str, E>
395where
396    E: ParseError<&'s str>,
397{
398    move |i| {
399        if let Some(m) = re.find(i) {
400            Ok((&i[m.end()..], &i[m.start()..m.end()]))
401        } else {
402            Err(nom::Err::Error(E::from_error_kind(
403                i,
404                ErrorKind::RegexpMatch,
405            )))
406        }
407    }
408}
409
410/// Splits the command string into separate arguments and substitutes $params
411fn parse_command<F>(command: &str, substitute: F) -> Vec<OsString>
412where
413    F: Fn(&str) -> OsString,
414{
415    fn join_chars(chars: Vec<char>) -> OsString {
416        let mut result = OsString::new();
417        for c in chars {
418            result.push(c.to_string())
419        }
420        result
421    }
422
423    fn join_str(strings: Vec<OsString>) -> OsString {
424        let mut result = OsString::new();
425        for c in strings {
426            result.push(c)
427        }
428        result
429    }
430
431    let r_var = Regex::new(r"^([[:alnum:]]|_)+").unwrap();
432    let p_var = map(tuple((tag("$"), re_find(r_var))), |(_, str)| {
433        (substitute)(str)
434    });
435    let p_non_var = map(many1(none_of(" $")), join_chars);
436    let p_arg = map(many1(alt((p_var, p_non_var))), join_str);
437    let p_whitespace = many1(one_of(" \t"));
438    let p_args = |s| separated_list0(p_whitespace, p_arg)(s);
439    let result: IResult<&str, Vec<OsString>> = (p_args)(command);
440    result.expect("Parse error").1
441}
442
443#[cfg(test)]
444mod test {
445    use std::io::Write;
446
447    use crate::file::{FileChunk, FileLen, FilePos};
448    use crate::hasher::{FileHasher, HashFn};
449    use crate::log::StdLog;
450    use crate::util::test::with_dir;
451
452    use super::*;
453
454    #[test]
455    fn empty() {
456        assert!(Transform::new(String::from(" "), false).is_err());
457    }
458
459    #[test]
460    #[cfg(unix)]
461    fn piped() {
462        with_dir("target/test/transform/piped/", |root| {
463            let transform = Transform::new(String::from("dd"), false).unwrap();
464            let input_path = root.join("input.txt");
465            let mut input = File::create(&input_path).unwrap();
466            let content = b"content";
467            input.write_all(content).unwrap();
468            drop(input);
469
470            let log = StdLog::default();
471            let hasher = FileHasher::new(HashFn::default(), Some(transform), &log);
472            let input_path = Path::from(input_path);
473            let chunk = FileChunk::new(&input_path, FilePos(0), FileLen::MAX);
474            let good_file_hash = hasher.hash_file(&chunk, |_| {}).unwrap();
475            let result = hasher.hash_transformed(&chunk, |_| {}).unwrap();
476            assert_eq!(result.0, FileLen(content.len() as u64));
477            assert_eq!(result.1, good_file_hash);
478        })
479    }
480
481    #[test]
482    #[cfg(unix)]
483    fn parameterized() {
484        with_dir("target/test/transform/param/", |root| {
485            let transform = Transform::new(String::from("dd if=$IN of=$OUT"), false).unwrap();
486            let input_path = root.join("input.txt");
487            let mut input = File::create(&input_path).unwrap();
488            let content = b"content";
489            input.write_all(content).unwrap();
490            drop(input);
491
492            let log = StdLog::default();
493            let hasher = FileHasher::new(HashFn::default(), Some(transform), &log);
494            let input_path = Path::from(input_path);
495
496            let chunk = FileChunk::new(&input_path, FilePos(0), FileLen::MAX);
497            let good_file_hash = hasher.hash_file(&chunk, |_| {}).unwrap();
498            let result = hasher.hash_transformed(&chunk, |_| {}).unwrap();
499            assert_eq!(result.0, FileLen(content.len() as u64));
500            assert_eq!(result.1, good_file_hash);
501        })
502    }
503
504    #[test]
505    fn parse_command() {
506        let result = super::parse_command("foo bar", |s| OsString::from(s));
507        assert_eq!(result, vec![OsString::from("foo"), OsString::from("bar")])
508    }
509
510    #[test]
511    fn parse_command_substitute() {
512        let result = super::parse_command("foo bar in=$IN", |s| match s {
513            "IN" => OsString::from("/input"),
514            _ => OsString::from(s),
515        });
516
517        assert_eq!(
518            result,
519            vec![
520                OsString::from("foo"),
521                OsString::from("bar"),
522                OsString::from("in=/input")
523            ]
524        )
525    }
526}