usereport/
runner.rs

1use crate::command::{Command, CommandResult};
2
3use snafu::{ResultExt, Snafu};
4use std::fmt::Debug;
5
6/// Error type
7#[derive(Debug, Snafu)]
8#[allow(missing_docs)]
9pub enum Error {
10    /// Command execution failed
11    #[snafu(display("failed to run command {}: {}", name, source))]
12    ExecuteCommandFailed { name: String, source: std::io::Error },
13}
14
15/// Result type
16pub type Result<T, E = Error> = std::result::Result<T, E>;
17
18/// Runner Interface
19pub trait Runner<'a, I: IntoIterator<Item = &'a Command>>: Debug {
20    /// Execute all commands and wait until all commands return
21    fn run(&self, commands: I, max_parallel_commands: usize) -> Result<Vec<CommandResult>>;
22}
23
24pub use thread::ThreadRunner;
25
26/// Thread based runner
27pub mod thread {
28    use super::*;
29
30    use std::{
31        sync::mpsc::{self, Receiver, Sender},
32        thread,
33        thread::JoinHandle,
34    };
35
36    /// Ensures that results are in same order as commands
37    #[derive(Default, Debug, Clone)]
38    pub struct ThreadRunner {
39        progress_tx: Option<Sender<usize>>,
40    }
41
42    impl<'a, I: IntoIterator<Item = &'a Command>> super::Runner<'a, I> for ThreadRunner {
43        fn run(&self, commands: I, max_parallel_commands: usize) -> Result<Vec<CommandResult>> {
44            let mut results = Vec::new();
45
46            let commands: Vec<&Command> = commands.into_iter().collect();
47            for chunk in commands.chunks(max_parallel_commands).map(|x| x.to_vec()) {
48                // Create child threads and run commands
49                let (children, rx) = ThreadRunner::create_children(chunk, &self.progress_tx)?;
50                // Wait for results
51                let mut chunk_results = ThreadRunner::wait_for_results(children, rx);
52                results.append(&mut chunk_results);
53            }
54
55            Ok(results)
56        }
57    }
58
59    type ChildResult = (usize, CommandResult);
60    type ChildrenSupervision = (Vec<JoinHandle<()>>, Receiver<ChildResult>);
61
62    impl ThreadRunner {
63        pub fn new() -> Self { ThreadRunner::default() }
64
65        pub fn with_progress<T: Into<Option<Sender<usize>>>>(self, progress_tx: T) -> Self {
66            ThreadRunner {
67                progress_tx: progress_tx.into(),
68            }
69        }
70
71        fn create_children<'a, I: IntoIterator<Item = &'a Command>>(
72            commands: I,
73            progress_tx: &Option<Sender<usize>>,
74        ) -> Result<ChildrenSupervision> {
75            let (tx, rx): (Sender<ChildResult>, Receiver<ChildResult>) = mpsc::channel();
76            let mut children = Vec::new();
77
78            for (seq, command) in commands.into_iter().enumerate() {
79                let command = command.clone();
80                let child = ThreadRunner::create_child(seq, command, tx.clone(), progress_tx.clone())?;
81                children.push(child);
82            }
83
84            Ok((children, rx))
85        }
86
87        fn create_child(
88            seq: usize,
89            command: Command,
90            tx: Sender<ChildResult>,
91            progress_tx: Option<Sender<usize>>,
92        ) -> Result<JoinHandle<()>> {
93            let name = command.name.clone();
94            thread::Builder::new()
95                .name(command.name.clone())
96                .spawn(move || {
97                    let res = command.exec();
98                    // This should not happen as long as the parent is alive; if it happens, this is a valid reason to
99                    // panic
100                    tx.send((seq, res)).expect("Thread failed to send result via channel");
101                    if let Some(progress_tx) = progress_tx {
102                        progress_tx.send(1).expect("Thread failed to send progress via channel");
103                    }
104                })
105                .context(ExecuteCommandFailed { name })
106        }
107
108        fn wait_for_results(children: Vec<JoinHandle<()>>, rx: Receiver<ChildResult>) -> Vec<CommandResult> {
109            let mut results = Vec::with_capacity(children.len());
110            // Get results
111            for _ in 0..children.len() {
112                // This should not happen as long as the child's tx is alive; if it happens, this is a valid reason
113                // to panic
114                let (seq, result) = rx.recv().expect("Failed to receive from child");
115                results.push((seq, result));
116            }
117
118            // Ensure all child threads have completed execution
119            for child in children {
120                // This should not happen as long as the parent is alive; if it happens, this is a valid reason to
121                // panic
122                child.join().expect("Parent failed to wait for child");
123            }
124
125            results.sort_by_key(|(seq, _)| *seq);
126            results.into_iter().map(|(_, result)| result).collect()
127        }
128    }
129
130    #[cfg(test)]
131    mod tests {
132        use super::*;
133        use crate::{runner::Runner, tests::*};
134
135        use spectral::prelude::*;
136
137        #[test]
138        fn run_ok() {
139            let mut commands = Vec::new();
140            #[cfg(target_os = "macos")]
141            commands.push(Command::new("uname", r#"/usr/bin/uname -a"#));
142            #[cfg(target_os = "macos")]
143            commands.push(Command::new("uname", r#"/usr/bin/uname -a"#));
144            #[cfg(target_os = "macos")]
145            let expected = "Darwin";
146            #[cfg(target_os = "linux")]
147            commands.push(Command::new("cat-uname", r#"/bin/cat /proc/version"#));
148            #[cfg(target_os = "linux")]
149            commands.push(Command::new("cat-uname", r#"/bin/cat /proc/version"#));
150            #[cfg(target_os = "linux")]
151            let expected = "Linux";
152
153            let r = ThreadRunner::new();
154            let results = r.run(&commands, 64);
155
156            asserting("Command run").that(&results).is_ok().has_length(2);
157
158            let results = results.unwrap();
159            asserting("First command result is success")
160                .that(&results[0])
161                .is_success_contains(expected);
162        }
163    }
164}