calloop_subproc/chain.rs
1use std::collections::VecDeque;
2
3use super::{Command, CommandResult, ErrorEvent, LaunchError, SrcResult};
4use log::*;
5
6/// An event source that processes a sequence of commands.
7///
8/// Exactly one event will ever be generated, and it will be either at the
9/// successful completion of all the commands OR at the first failed subprocess.
10/// If an optional cleanup command is provided, that will be run before the
11/// event is generated, but only in the event of a failure.
12///
13/// Note that to avoid leaking file descriptors (or the memory associated with
14/// the event source itself), you must either honour the [`calloop::PostAction`]
15/// returned from `process_events()` yourself, or use this with
16/// [`TransientSource`](calloop::transient::TransientSource) in `calloop`.
17pub struct SubprocChain {
18 /// The executor (event source) that actually runs commands and generates
19 /// events for us.
20 executor: calloop::futures::Executor<CommandResult>,
21
22 /// The scheduler that sends `Future`s to the executor.
23 scheduler: calloop::futures::Scheduler<CommandResult>,
24
25 /// A FIFO of `Command`s to run.
26 commands: VecDeque<Command>,
27
28 /// If we have to abort the requested series of commands due to an error, we
29 /// store the result from the failed command here. This being `Some(_)`
30 /// implies that we are in a cleanup phase.
31 result: Option<SrcResult>,
32
33 /// If provided, this is a cleanup command to run on failure.
34 cleanup: Option<Command>,
35}
36
37impl SubprocChain {
38 /// Create a new subprocess event source from a sequence of commands and,
39 /// optionally, a cleanup command to run in case of failure.
40 pub fn new<T>(commands: T, cleanup: Option<Command>) -> calloop::Result<Self>
41 where
42 T: IntoIterator<Item = Command>,
43 {
44 let (executor, scheduler) = calloop::futures::executor()?;
45
46 Ok(SubprocChain {
47 commands: commands.into_iter().collect(),
48 executor,
49 scheduler,
50 result: None,
51 cleanup,
52 })
53 }
54
55 /// Schedules a single command in our async executor. This does not have to
56 /// come from our internal queue (but might).
57 fn schedule_command(&self, command: Command) -> std::io::Result<()> {
58 let command_debug_str_for_here = format!("{:?}", command);
59 let command_debug_str_for_async = command_debug_str_for_here.clone();
60
61 let async_exec = async move {
62 let mut async_command: async_process::Command = command.into();
63 let async_status = async_command.status();
64
65 match async_status.await {
66 Ok(status) => {
67 if status.success() {
68 CommandResult {
69 command: command_debug_str_for_async,
70 result: Ok(()),
71 }
72 } else {
73 CommandResult {
74 command: command_debug_str_for_async,
75 result: Err(ErrorEvent::SubprocError(status)),
76 }
77 }
78 }
79
80 Err(error) => CommandResult {
81 command: command_debug_str_for_async,
82 result: Err(ErrorEvent::IoError(error)),
83 },
84 }
85 };
86 self.scheduler.schedule(async_exec).map_err(|_| {
87 std::io::Error::new(
88 std::io::ErrorKind::InvalidData,
89 format!(
90 "Could not schedule command: {:?}",
91 command_debug_str_for_here
92 ),
93 )
94 })
95 }
96
97 /// Schedules the next command in our internal queue. Returns `Ok(true)` if
98 /// a command was scheduled, or `Ok(false)` if there were none left.
99 fn schedule_next(&mut self) -> std::io::Result<bool> {
100 if let Some(cmd) = self.commands.pop_front() {
101 self.schedule_command(cmd)?;
102 Ok(true)
103 } else {
104 Ok(false)
105 }
106 }
107}
108
109impl calloop::EventSource for SubprocChain {
110 type Event = SrcResult;
111 type Metadata = ();
112 type Ret = ();
113 type Error = LaunchError;
114
115 /// This event source is designed to fire exactly once, and the callback
116 /// will receive the result. See the `SubprocChain` docs for more detail.
117 ///
118 /// Even though the callback will only be called once, the API for
119 /// `calloop::EventSource` requires it to be a `FnMut`. This can be worked
120 /// around by wrapping a `FnOnce` in an `Option` type if necessary.
121 fn process_events<F>(
122 &mut self,
123 readiness: calloop::Readiness,
124 token: calloop::Token,
125 mut callback: F,
126 ) -> Result<calloop::PostAction, Self::Error>
127 where
128 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
129 {
130 // Basic approach:
131 // - get the result of the most recent command from self.executor via
132 // this_result
133 // - look at that to decide if we need to call the callback
134 // - if we do, the thing to pass to the callback is stored in
135 // result_for_callback (if it's None, don't call it)
136
137 let mut this_result = None;
138 let mut result_for_callback = None;
139
140 self.executor
141 .process_events(readiness, token, |call_result, _| {
142 this_result = Some(call_result);
143 })?;
144
145 if let Some(result) = this_result {
146 match result {
147 CommandResult {
148 command: _,
149 result: Ok(()),
150 } => {
151 if !self.schedule_next()? {
152 // We're all done! Check the main result in case we're
153 // here after cleanup.
154 result_for_callback = Some(self.result.take().unwrap_or(Ok(())));
155 }
156 }
157
158 CommandResult {
159 command,
160 result: Err(error),
161 } => {
162 error!("Error executing command: {}", command);
163
164 // If we already have a "main" result it means we're in the
165 // cleanup phase. Finish up by passing the main result.
166 // Otherwise do cleanup.
167 if let Some(res) = self.result.take() {
168 result_for_callback = Some(res);
169 } else if let Some(cmd) = self.cleanup.take() {
170 warn!("Invoking cleanup command: {:?}", cmd);
171 self.commands.clear();
172 self.result = Some(Err(error));
173 self.schedule_command(cmd)?;
174 } else {
175 result_for_callback = Some(Err(error));
176 }
177 }
178 }
179 } else {
180 // We don't expect spurious wakeups from the async executor, but
181 // apparently it happens quite a bit.
182 }
183
184 if let Some(res) = result_for_callback {
185 // We're done, fire the callback now.
186 callback(res, &mut ());
187 return Ok(calloop::PostAction::Remove);
188 }
189
190 Ok(calloop::PostAction::Continue)
191 }
192
193 fn register(
194 &mut self,
195 poll: &mut calloop::Poll,
196 token_factory: &mut calloop::TokenFactory,
197 ) -> calloop::Result<()> {
198 // Kickstart the events.
199 self.schedule_next()?;
200 self.executor.register(poll, token_factory)
201 }
202
203 fn reregister(
204 &mut self,
205 poll: &mut calloop::Poll,
206 token_factory: &mut calloop::TokenFactory,
207 ) -> calloop::Result<()> {
208 // Kickstart the events.
209 self.schedule_next()?;
210 self.executor.reregister(poll, token_factory)
211 }
212
213 fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
214 self.executor.unregister(poll)
215 }
216}