1use crate::command::{Command, CommandResult};
2
3use snafu::{ResultExt, Snafu};
4use std::fmt::Debug;
5
6#[derive(Debug, Snafu)]
8#[allow(missing_docs)]
9pub enum Error {
10 #[snafu(display("failed to run command {}: {}", name, source))]
12 ExecuteCommandFailed { name: String, source: std::io::Error },
13}
14
15pub type Result<T, E = Error> = std::result::Result<T, E>;
17
18pub trait Runner<'a, I: IntoIterator<Item = &'a Command>>: Debug {
20 fn run(&self, commands: I, max_parallel_commands: usize) -> Result<Vec<CommandResult>>;
22}
23
24pub use thread::ThreadRunner;
25
26pub mod thread {
28 use super::*;
29
30 use std::{
31 sync::mpsc::{self, Receiver, Sender},
32 thread,
33 thread::JoinHandle,
34 };
35
36 #[derive(Default, Debug, Clone)]
38 pub struct ThreadRunner {
39 progress_tx: Option<Sender<usize>>,
40 }
41
42 impl<'a, I: IntoIterator<Item = &'a Command>> super::Runner<'a, I> for ThreadRunner {
43 fn run(&self, commands: I, max_parallel_commands: usize) -> Result<Vec<CommandResult>> {
44 let mut results = Vec::new();
45
46 let commands: Vec<&Command> = commands.into_iter().collect();
47 for chunk in commands.chunks(max_parallel_commands).map(|x| x.to_vec()) {
48 let (children, rx) = ThreadRunner::create_children(chunk, &self.progress_tx)?;
50 let mut chunk_results = ThreadRunner::wait_for_results(children, rx);
52 results.append(&mut chunk_results);
53 }
54
55 Ok(results)
56 }
57 }
58
59 type ChildResult = (usize, CommandResult);
60 type ChildrenSupervision = (Vec<JoinHandle<()>>, Receiver<ChildResult>);
61
62 impl ThreadRunner {
63 pub fn new() -> Self { ThreadRunner::default() }
64
65 pub fn with_progress<T: Into<Option<Sender<usize>>>>(self, progress_tx: T) -> Self {
66 ThreadRunner {
67 progress_tx: progress_tx.into(),
68 }
69 }
70
71 fn create_children<'a, I: IntoIterator<Item = &'a Command>>(
72 commands: I,
73 progress_tx: &Option<Sender<usize>>,
74 ) -> Result<ChildrenSupervision> {
75 let (tx, rx): (Sender<ChildResult>, Receiver<ChildResult>) = mpsc::channel();
76 let mut children = Vec::new();
77
78 for (seq, command) in commands.into_iter().enumerate() {
79 let command = command.clone();
80 let child = ThreadRunner::create_child(seq, command, tx.clone(), progress_tx.clone())?;
81 children.push(child);
82 }
83
84 Ok((children, rx))
85 }
86
87 fn create_child(
88 seq: usize,
89 command: Command,
90 tx: Sender<ChildResult>,
91 progress_tx: Option<Sender<usize>>,
92 ) -> Result<JoinHandle<()>> {
93 let name = command.name.clone();
94 thread::Builder::new()
95 .name(command.name.clone())
96 .spawn(move || {
97 let res = command.exec();
98 tx.send((seq, res)).expect("Thread failed to send result via channel");
101 if let Some(progress_tx) = progress_tx {
102 progress_tx.send(1).expect("Thread failed to send progress via channel");
103 }
104 })
105 .context(ExecuteCommandFailed { name })
106 }
107
108 fn wait_for_results(children: Vec<JoinHandle<()>>, rx: Receiver<ChildResult>) -> Vec<CommandResult> {
109 let mut results = Vec::with_capacity(children.len());
110 for _ in 0..children.len() {
112 let (seq, result) = rx.recv().expect("Failed to receive from child");
115 results.push((seq, result));
116 }
117
118 for child in children {
120 child.join().expect("Parent failed to wait for child");
123 }
124
125 results.sort_by_key(|(seq, _)| *seq);
126 results.into_iter().map(|(_, result)| result).collect()
127 }
128 }
129
130 #[cfg(test)]
131 mod tests {
132 use super::*;
133 use crate::{runner::Runner, tests::*};
134
135 use spectral::prelude::*;
136
137 #[test]
138 fn run_ok() {
139 let mut commands = Vec::new();
140 #[cfg(target_os = "macos")]
141 commands.push(Command::new("uname", r#"/usr/bin/uname -a"#));
142 #[cfg(target_os = "macos")]
143 commands.push(Command::new("uname", r#"/usr/bin/uname -a"#));
144 #[cfg(target_os = "macos")]
145 let expected = "Darwin";
146 #[cfg(target_os = "linux")]
147 commands.push(Command::new("cat-uname", r#"/bin/cat /proc/version"#));
148 #[cfg(target_os = "linux")]
149 commands.push(Command::new("cat-uname", r#"/bin/cat /proc/version"#));
150 #[cfg(target_os = "linux")]
151 let expected = "Linux";
152
153 let r = ThreadRunner::new();
154 let results = r.run(&commands, 64);
155
156 asserting("Command run").that(&results).is_ok().has_length(2);
157
158 let results = results.unwrap();
159 asserting("First command result is success")
160 .that(&results[0])
161 .is_success_contains(expected);
162 }
163 }
164}