commandspec/
process.rs

1// From https://raw.githubusercontent.com/watchexec/watchexec/master/src/process.rs
2#![allow(unused)]
3
4// use pathop::PathOp;
5use std::collections::{HashMap, HashSet};
6use std::path::PathBuf;
7
8// pub fn spawn(cmd: &Vec<String>, updated_paths: Vec<PathOp>, no_shell: bool) -> Process {
9//     self::imp::Process::new(cmd, updated_paths, no_shell).expect("unable to spawn process")
10// }
11
12pub use self::imp::Process;
13
14/*
15fn needs_wrapping(s: &String) -> bool {
16    s.contains(|ch| match ch {
17        ' ' | '\t' | '\'' | '"' => true,
18        _ => false,
19    })
20}
21
22#[cfg(target_family = "unix")]
23fn wrap_in_quotes(s: &String) -> String {
24    format!(
25        "'{}'",
26        if s.contains('\'') {
27            s.replace('\'', "'\"'\"'")
28        } else {
29            s.clone()
30        }
31    )
32}
33
34#[cfg(target_family = "windows")]
35fn wrap_in_quotes(s: &String) -> String {
36    format!(
37        "\"{}\"",
38        if s.contains('"') {
39            s.replace('"', "\"\"")
40        } else {
41            s.clone()
42        }
43    )
44}
45
46fn wrap_commands(cmd: &Vec<String>) -> Vec<String> {
47    cmd.iter()
48        .map(|fragment| {
49            if needs_wrapping(fragment) {
50                wrap_in_quotes(fragment)
51            } else {
52                fragment.clone()
53            }
54        })
55        .collect()
56}
57*/
58
59#[cfg(target_family = "unix")]
60mod imp {
61    //use super::wrap_commands;
62    use nix::libc::*;
63    use nix::{self, Error};
64    // use pathop::PathOp;
65    use signal::Signal;
66    use std::io::{self, Result};
67    use std::process::Command;
68    use std::sync::*;
69
70    pub struct Process {
71        pgid: pid_t,
72        lock: Mutex<bool>,
73        cvar: Condvar,
74    }
75
76    fn from_nix_error(err: nix::Error) -> io::Error {
77        match err {
78            Error::Sys(errno) => io::Error::from_raw_os_error(errno as i32),
79            Error::InvalidPath => io::Error::new(io::ErrorKind::InvalidInput, err),
80            _ => io::Error::new(io::ErrorKind::Other, err),
81        }
82    }
83
84    #[allow(unknown_lints)]
85    #[allow(mutex_atomic)]
86    impl Process {
87        pub fn new(
88            mut command: Command,
89        ) -> Result<Process> {
90            use nix::unistd::*;
91            use std::os::unix::process::CommandExt;
92
93            command
94                .before_exec(|| setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(from_nix_error))
95                .spawn()
96                .and_then(|p| {
97                    Ok(Process {
98                        pgid: p.id() as i32,
99                        lock: Mutex::new(false),
100                        cvar: Condvar::new(),
101                    })
102                })
103        }
104
105        pub fn id(&self) -> i32 {
106            self.pgid as i32
107        }
108
109        pub fn reap(&self) {
110            use nix::sys::wait::*;
111            use nix::unistd::Pid;
112
113            let mut finished = true;
114            loop {
115                match waitpid(Pid::from_raw(-self.pgid), Some(WaitPidFlag::WNOHANG)) {
116                    Ok(WaitStatus::Exited(_, _)) | Ok(WaitStatus::Signaled(_, _, _)) => {
117                        finished = finished && true
118                    }
119                    Ok(_) => {
120                        finished = false;
121                        break;
122                    }
123                    Err(_) => break,
124                }
125            }
126
127            if finished {
128                let mut done = self.lock.lock().unwrap();
129                *done = true;
130                self.cvar.notify_one();
131            }
132        }
133
134        pub fn signal(&self, signal: Signal) {
135            use signal::ConvertToLibc;
136
137            let signo = signal.convert_to_libc();
138            debug!("Sending {:?} (int: {}) to child process", signal, signo);
139            self.c_signal(signo);
140        }
141
142        fn c_signal(&self, sig: c_int) {
143            extern "C" {
144                fn killpg(pgrp: pid_t, sig: c_int) -> c_int;
145            }
146
147            unsafe {
148                killpg(self.pgid, sig);
149            }
150        }
151
152        pub fn wait(&self) {
153            let mut done = self.lock.lock().unwrap();
154            while !*done {
155                done = self.cvar.wait(done).unwrap();
156            }
157        }
158    }
159}
160
161#[cfg(target_family = "windows")]
162mod imp {
163    //use super::wrap_commands;
164    use kernel32::*;
165    // use pathop::PathOp;
166    use signal::Signal;
167    use std::io;
168    use std::io::Result;
169    use std::mem;
170    use std::process::Command;
171    use std::ptr;
172    use winapi::*;
173
174    pub struct Process {
175        job: HANDLE,
176        completion_port: HANDLE,
177    }
178
179    #[repr(C)]
180    struct JOBOBJECT_ASSOCIATE_COMPLETION_PORT {
181        completion_key: PVOID,
182        completion_port: HANDLE,
183    }
184
185    impl Process {
186        pub fn new(
187            mut command: Command,
188        ) -> Result<Process> {
189            use std::os::windows::io::IntoRawHandle;
190            use std::os::windows::process::CommandExt;
191
192            fn last_err() -> io::Error {
193                io::Error::last_os_error()
194            }
195
196            let job = unsafe { CreateJobObjectW(0 as *mut _, 0 as *const _) };
197            if job.is_null() {
198                panic!("failed to create job object: {}", last_err());
199            }
200
201            let completion_port =
202                unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, ptr::null_mut(), 0, 1) };
203            if job.is_null() {
204                panic!(
205                    "unable to create IO completion port for job: {}",
206                    last_err()
207                );
208            }
209
210            let mut associate_completion: JOBOBJECT_ASSOCIATE_COMPLETION_PORT =
211                unsafe { mem::zeroed() };
212            associate_completion.completion_key = job;
213            associate_completion.completion_port = completion_port;
214            unsafe {
215                let r = SetInformationJobObject(
216                    job,
217                    JobObjectAssociateCompletionPortInformation,
218                    &mut associate_completion as *mut _ as LPVOID,
219                    mem::size_of_val(&associate_completion) as DWORD,
220                );
221                if r == 0 {
222                    panic!(
223                        "failed to associate completion port with job: {}",
224                        last_err()
225                    );
226                }
227            }
228
229            let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { mem::zeroed() };
230            info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
231            let r = unsafe {
232                SetInformationJobObject(
233                    job,
234                    JobObjectExtendedLimitInformation,
235                    &mut info as *mut _ as LPVOID,
236                    mem::size_of_val(&info) as DWORD,
237                )
238            };
239            if r == 0 {
240                panic!("failed to set job info: {}", last_err());
241            }
242
243            command.creation_flags(CREATE_SUSPENDED);
244            command.spawn().and_then(|p| {
245                let handle = p.into_raw_handle();
246                let r = unsafe { AssignProcessToJobObject(job, handle) };
247                if r == 0 {
248                    panic!("failed to add to job object: {}", last_err());
249                }
250
251                resume_threads(handle);
252
253                Ok(Process {
254                    job: job,
255                    completion_port: completion_port,
256                })
257            })
258        }
259
260        pub fn id(&self) -> i32 {
261            self.job as i32
262        }
263
264        pub fn reap(&self) {}
265
266        pub fn signal(&self, _signal: Signal) {
267            unsafe {
268                let _ = TerminateJobObject(self.job, 1);
269            }
270        }
271
272        pub fn wait(&self) {
273            unsafe {
274                loop {
275                    let mut code: DWORD = 0;
276                    let mut key: ULONG_PTR = 0;
277                    let mut overlapped: LPOVERLAPPED = mem::uninitialized();
278                    GetQueuedCompletionStatus(
279                        self.completion_port,
280                        &mut code,
281                        &mut key,
282                        &mut overlapped,
283                        INFINITE,
284                    );
285
286                    if code == JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO && (key as HANDLE) == self.job {
287                        break;
288                    }
289                }
290            }
291        }
292    }
293
294    impl Drop for Process {
295        fn drop(&mut self) {
296            unsafe {
297                let _ = CloseHandle(self.job);
298                let _ = CloseHandle(self.completion_port);
299            }
300        }
301    }
302
303    unsafe impl Send for Process {}
304    unsafe impl Sync for Process {}
305
306    // This is pretty terrible, but it's either this or we re-implement all of Rust's std::process just to get at PROCESS_INFORMATION
307    fn resume_threads(child_process: HANDLE) {
308        unsafe {
309            let child_id = GetProcessId(child_process);
310
311            let h = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0);
312            let mut entry = THREADENTRY32 {
313                dwSize: 28,
314                cntUsage: 0,
315                th32ThreadID: 0,
316                th32OwnerProcessID: 0,
317                tpBasePri: 0,
318                tpDeltaPri: 0,
319                dwFlags: 0,
320            };
321
322            let mut result = Thread32First(h, &mut entry);
323            while result != 0 {
324                if entry.th32OwnerProcessID == child_id {
325                    let thread_handle = OpenThread(0x0002, 0, entry.th32ThreadID);
326                    ResumeThread(thread_handle);
327                    CloseHandle(thread_handle);
328                }
329
330                result = Thread32Next(h, &mut entry);
331            }
332
333            CloseHandle(h);
334        }
335    }
336}
337
338/// Collect `PathOp` details into op-categories to pass onto the exec'd command as env-vars
339///
340/// WRITTEN -> `notify::ops::WRITE`, `notify::ops::CLOSE_WRITE`
341/// META_CHANGED -> `notify::ops::CHMOD`
342/// REMOVED -> `notify::ops::REMOVE`
343/// CREATED -> `notify::ops::CREATE`
344/// RENAMED -> `notify::ops::RENAME`
345// fn collect_path_env_vars(pathops: &[PathOp]) -> Vec<(String, String)> {
346//     #[cfg(target_family = "unix")]
347//     const ENV_SEP: &'static str = ":";
348//     #[cfg(not(target_family = "unix"))]
349//     const ENV_SEP: &'static str = ";";
350
351//     let mut by_op = HashMap::new(); // Paths as `String`s collected by `notify::op`
352//     let mut all_pathbufs = HashSet::new(); // All unique `PathBuf`s
353//     for pathop in pathops {
354//         if let Some(op) = pathop.op {
355//             // ignore pathops that don't have a `notify::op` set
356//             if let Some(s) = pathop.path.to_str() {
357//                 // ignore invalid utf8 paths
358//                 all_pathbufs.insert(pathop.path.clone());
359//                 let e = by_op.entry(op).or_insert_with(Vec::new);
360//                 e.push(s.to_owned());
361//             }
362//         }
363//     }
364
365//     let mut vars = vec![];
366//     // Only break off a common path if we have more than one unique path,
367//     // otherwise we end up with a `COMMON_PATH` being set and other vars
368//     // being present but empty.
369//     let common_path = if all_pathbufs.len() > 1 {
370//         let all_pathbufs: Vec<PathBuf> = all_pathbufs.into_iter().collect();
371//         get_longest_common_path(&all_pathbufs)
372//     } else {
373//         None
374//     };
375//     if let Some(ref common_path) = common_path {
376//         vars.push(("WATCHEXEC_COMMON_PATH".to_string(), common_path.to_string()));
377//     }
378//     for (op, paths) in by_op {
379//         let key = match op {
380//             op if PathOp::is_create(op) => "WATCHEXEC_CREATED_PATH",
381//             op if PathOp::is_remove(op) => "WATCHEXEC_REMOVED_PATH",
382//             op if PathOp::is_rename(op) => "WATCHEXEC_RENAMED_PATH",
383//             op if PathOp::is_write(op) => "WATCHEXEC_WRITTEN_PATH",
384//             op if PathOp::is_meta(op) => "WATCHEXEC_META_CHANGED_PATH",
385//             _ => continue, // ignore `notify::op::RESCAN`s
386//         };
387
388//         let paths = if let Some(ref common_path) = common_path {
389//             paths
390//                 .iter()
391//                 .map(|path_str| path_str.trim_left_matches(common_path).to_string())
392//                 .collect::<Vec<_>>()
393//         } else {
394//             paths
395//         };
396//         vars.push((key.to_string(), paths.as_slice().join(ENV_SEP)));
397//     }
398//     vars
399// }
400
401fn get_longest_common_path(paths: &[PathBuf]) -> Option<String> {
402    match paths.len() {
403        0 => return None,
404        1 => return paths[0].to_str().map(|ref_val| ref_val.to_string()),
405        _ => {}
406    };
407
408    let mut longest_path: Vec<_> = paths[0].components().collect();
409
410    for path in &paths[1..] {
411        let mut greatest_distance = 0;
412        for component_pair in path.components().zip(longest_path.iter()) {
413            if component_pair.0 != *component_pair.1 {
414                break;
415            }
416
417            greatest_distance += 1;
418        }
419
420        if greatest_distance != longest_path.len() {
421            longest_path.truncate(greatest_distance);
422        }
423    }
424
425    let mut result = PathBuf::new();
426    for component in longest_path {
427        result.push(component.as_os_str());
428    }
429
430    result.to_str().map(|ref_val| ref_val.to_string())
431}
432
433#[cfg(test)]
434#[cfg(target_family = "unix")]
435mod tests {
436    // use notify;
437    // use pathop::PathOp;
438    use std::collections::HashSet;
439    use std::path::PathBuf;
440
441    // use super::collect_path_env_vars;
442    use super::get_longest_common_path;
443    // use super::spawn;
444    // //use super::wrap_commands;
445
446    // #[test]
447    // fn test_start() {
448    //     let _ = spawn(&vec!["echo".into(), "hi".into()], vec![], true);
449    // }
450
451    /*
452    #[test]
453    fn wrap_commands_that_have_whitespace() {
454        assert_eq!(
455            wrap_commands(&vec!["echo".into(), "hello world".into()]),
456            vec!["echo".into(), "'hello world'".into()] as Vec<String>
457        );
458    }
459
460    #[test]
461    fn wrap_commands_that_have_long_whitespace() {
462        assert_eq!(
463            wrap_commands(&vec!["echo".into(), "hello    world".into()]),
464            vec!["echo".into(), "'hello    world'".into()] as Vec<String>
465        );
466    }
467
468    #[test]
469    fn wrap_commands_that_have_single_quotes() {
470        assert_eq!(
471            wrap_commands(&vec!["echo".into(), "hello ' world".into()]),
472            vec!["echo".into(), "'hello '\"'\"' world'".into()] as Vec<String>
473        );
474        assert_eq!(
475            wrap_commands(&vec!["echo".into(), "hello'world".into()]),
476            vec!["echo".into(), "'hello'\"'\"'world'".into()] as Vec<String>
477        );
478    }
479
480    #[test]
481    fn wrap_commands_that_have_double_quotes() {
482        assert_eq!(
483            wrap_commands(&vec!["echo".into(), "hello \" world".into()]),
484            vec!["echo".into(), "'hello \" world'".into()] as Vec<String>
485        );
486        assert_eq!(
487            wrap_commands(&vec!["echo".into(), "hello\"world".into()]),
488            vec!["echo".into(), "'hello\"world'".into()] as Vec<String>
489        );
490    }
491    */
492
493    #[test]
494    fn longest_common_path_should_return_correct_value() {
495        let single_path = vec![PathBuf::from("/tmp/random/")];
496        let single_result = get_longest_common_path(&single_path).unwrap();
497        assert_eq!(single_result, "/tmp/random/");
498
499        let common_paths = vec![
500            PathBuf::from("/tmp/logs/hi"),
501            PathBuf::from("/tmp/logs/bye"),
502            PathBuf::from("/tmp/logs/bye"),
503            PathBuf::from("/tmp/logs/fly"),
504        ];
505
506        let common_result = get_longest_common_path(&common_paths).unwrap();
507        assert_eq!(common_result, "/tmp/logs");
508
509        let diverging_paths = vec![PathBuf::from("/tmp/logs/hi"), PathBuf::from("/var/logs/hi")];
510
511        let diverging_result = get_longest_common_path(&diverging_paths).unwrap();
512        assert_eq!(diverging_result, "/");
513
514        let uneven_paths = vec![
515            PathBuf::from("/tmp/logs/hi"),
516            PathBuf::from("/tmp/logs/"),
517            PathBuf::from("/tmp/logs/bye"),
518        ];
519
520        let uneven_result = get_longest_common_path(&uneven_paths).unwrap();
521        assert_eq!(uneven_result, "/tmp/logs");
522    }
523
524    // #[test]
525    // fn pathops_collect_to_env_vars() {
526    //     let pathops = vec![
527    //         PathOp::new(
528    //             &PathBuf::from("/tmp/logs/hi"),
529    //             Some(notify::op::CREATE),
530    //             None,
531    //         ),
532    //         PathOp::new(
533    //             &PathBuf::from("/tmp/logs/hey/there"),
534    //             Some(notify::op::CREATE),
535    //             None,
536    //         ),
537    //         PathOp::new(
538    //             &PathBuf::from("/tmp/logs/bye"),
539    //             Some(notify::op::REMOVE),
540    //             None,
541    //         ),
542    //     ];
543    //     let expected_vars = vec![
544    //         ("WATCHEXEC_COMMON_PATH".to_string(), "/tmp/logs".to_string()),
545    //         ("WATCHEXEC_REMOVED_PATH".to_string(), "/bye".to_string()),
546    //         (
547    //             "WATCHEXEC_CREATED_PATH".to_string(),
548    //             "/hi:/hey/there".to_string(),
549    //         ),
550    //     ];
551    //     let vars = collect_path_env_vars(&pathops);
552    //     assert_eq!(
553    //         vars.iter().collect::<HashSet<_>>(),
554    //         expected_vars.iter().collect::<HashSet<_>>()
555    //     );
556    // }
557}
558
559#[cfg(test)]
560#[cfg(target_family = "windows")]
561mod tests {
562    // use super::spawn;
563    //use super::wrap_commands;
564
565    // #[test]
566    // fn test_start() {
567    //     let _ = spawn(&vec!["echo".into(), "hi".into()], vec![], true);
568    // }
569
570    /*
571    #[test]
572    fn wrap_commands_that_have_whitespace() {
573        assert_eq!(
574            wrap_commands(&vec!["echo".into(), "hello world".into()]),
575            vec!["echo".into(), "\"hello world\"".into()] as Vec<String>
576        );
577    }
578
579    #[test]
580    fn wrap_commands_that_have_long_whitespace() {
581        assert_eq!(
582            wrap_commands(&vec!["echo".into(), "hello    world".into()]),
583            vec!["echo".into(), "\"hello    world\"".into()] as Vec<String>
584        );
585    }
586
587    #[test]
588    fn wrap_commands_that_have_single_quotes() {
589        assert_eq!(
590            wrap_commands(&vec!["echo".into(), "hello ' world".into()]),
591            vec!["echo".into(), "\"hello ' world\"".into()] as Vec<String>
592        );
593        assert_eq!(
594            wrap_commands(&vec!["echo".into(), "hello'world".into()]),
595            vec!["echo".into(), "\"hello'world\"".into()] as Vec<String>
596        );
597    }
598
599    #[test]
600    fn wrap_commands_that_have_double_quotes() {
601        assert_eq!(
602            wrap_commands(&vec!["echo".into(), "hello \" world".into()]),
603            vec!["echo".into(), "\"hello \"\" world\"".into()] as Vec<String>
604        );
605        assert_eq!(
606            wrap_commands(&vec!["echo".into(), "hello\"world".into()]),
607            vec!["echo".into(), "\"hello\"\"world\"".into()] as Vec<String>
608        );
609    }
610    */
611}