calloop_subproc/
chain.rs

1use std::collections::VecDeque;
2
3use super::{Command, CommandResult, ErrorEvent, LaunchError, SrcResult};
4use log::*;
5
6/// An event source that processes a sequence of commands.
7///
8/// Exactly one event will ever be generated, and it will be either at the
9/// successful completion of all the commands OR at the first failed subprocess.
10/// If an optional cleanup command is provided, that will be run before the
11/// event is generated, but only in the event of a failure.
12///
13/// Note that to avoid leaking file descriptors (or the memory associated with
14/// the event source itself), you must either honour the [`calloop::PostAction`]
15/// returned from `process_events()` yourself, or use this with
16/// [`TransientSource`](calloop::transient::TransientSource) in `calloop`.
17pub struct SubprocChain {
18    /// The executor (event source) that actually runs commands and generates
19    /// events for us.
20    executor: calloop::futures::Executor<CommandResult>,
21
22    /// The scheduler that sends `Future`s to the executor.
23    scheduler: calloop::futures::Scheduler<CommandResult>,
24
25    /// A FIFO of `Command`s to run.
26    commands: VecDeque<Command>,
27
28    /// If we have to abort the requested series of commands due to an error, we
29    /// store the result from the failed command here. This being `Some(_)`
30    /// implies that we are in a cleanup phase.
31    result: Option<SrcResult>,
32
33    /// If provided, this is a cleanup command to run on failure.
34    cleanup: Option<Command>,
35}
36
37impl SubprocChain {
38    /// Create a new subprocess event source from a sequence of commands and,
39    /// optionally, a cleanup command to run in case of failure.
40    pub fn new<T>(commands: T, cleanup: Option<Command>) -> calloop::Result<Self>
41    where
42        T: IntoIterator<Item = Command>,
43    {
44        let (executor, scheduler) = calloop::futures::executor()?;
45
46        Ok(SubprocChain {
47            commands: commands.into_iter().collect(),
48            executor,
49            scheduler,
50            result: None,
51            cleanup,
52        })
53    }
54
55    /// Schedules a single command in our async executor. This does not have to
56    /// come from our internal queue (but might).
57    fn schedule_command(&self, command: Command) -> std::io::Result<()> {
58        let command_debug_str_for_here = format!("{:?}", command);
59        let command_debug_str_for_async = command_debug_str_for_here.clone();
60
61        let async_exec = async move {
62            let mut async_command: async_process::Command = command.into();
63            let async_status = async_command.status();
64
65            match async_status.await {
66                Ok(status) => {
67                    if status.success() {
68                        CommandResult {
69                            command: command_debug_str_for_async,
70                            result: Ok(()),
71                        }
72                    } else {
73                        CommandResult {
74                            command: command_debug_str_for_async,
75                            result: Err(ErrorEvent::SubprocError(status)),
76                        }
77                    }
78                }
79
80                Err(error) => CommandResult {
81                    command: command_debug_str_for_async,
82                    result: Err(ErrorEvent::IoError(error)),
83                },
84            }
85        };
86        self.scheduler.schedule(async_exec).map_err(|_| {
87            std::io::Error::new(
88                std::io::ErrorKind::InvalidData,
89                format!(
90                    "Could not schedule command: {:?}",
91                    command_debug_str_for_here
92                ),
93            )
94        })
95    }
96
97    /// Schedules the next command in our internal queue. Returns `Ok(true)` if
98    /// a command was scheduled, or `Ok(false)` if there were none left.
99    fn schedule_next(&mut self) -> std::io::Result<bool> {
100        if let Some(cmd) = self.commands.pop_front() {
101            self.schedule_command(cmd)?;
102            Ok(true)
103        } else {
104            Ok(false)
105        }
106    }
107}
108
109impl calloop::EventSource for SubprocChain {
110    type Event = SrcResult;
111    type Metadata = ();
112    type Ret = ();
113    type Error = LaunchError;
114
115    /// This event source is designed to fire exactly once, and the callback
116    /// will receive the result. See the `SubprocChain` docs for more detail.
117    ///
118    /// Even though the callback will only be called once, the API for
119    /// `calloop::EventSource` requires it to be a `FnMut`. This can be worked
120    /// around by wrapping a `FnOnce` in an `Option` type if necessary.
121    fn process_events<F>(
122        &mut self,
123        readiness: calloop::Readiness,
124        token: calloop::Token,
125        mut callback: F,
126    ) -> Result<calloop::PostAction, Self::Error>
127    where
128        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
129    {
130        // Basic approach:
131        // - get the result of the most recent command from self.executor via
132        //   this_result
133        // - look at that to decide if we need to call the callback
134        // - if we do, the thing to pass to the callback is stored in
135        //   result_for_callback (if it's None, don't call it)
136
137        let mut this_result = None;
138        let mut result_for_callback = None;
139
140        self.executor
141            .process_events(readiness, token, |call_result, _| {
142                this_result = Some(call_result);
143            })?;
144
145        if let Some(result) = this_result {
146            match result {
147                CommandResult {
148                    command: _,
149                    result: Ok(()),
150                } => {
151                    if !self.schedule_next()? {
152                        // We're all done! Check the main result in case we're
153                        // here after cleanup.
154                        result_for_callback = Some(self.result.take().unwrap_or(Ok(())));
155                    }
156                }
157
158                CommandResult {
159                    command,
160                    result: Err(error),
161                } => {
162                    error!("Error executing command: {}", command);
163
164                    // If we already have a "main" result it means we're in the
165                    // cleanup phase. Finish up by passing the main result.
166                    // Otherwise do cleanup.
167                    if let Some(res) = self.result.take() {
168                        result_for_callback = Some(res);
169                    } else if let Some(cmd) = self.cleanup.take() {
170                        warn!("Invoking cleanup command: {:?}", cmd);
171                        self.commands.clear();
172                        self.result = Some(Err(error));
173                        self.schedule_command(cmd)?;
174                    } else {
175                        result_for_callback = Some(Err(error));
176                    }
177                }
178            }
179        } else {
180            // We don't expect spurious wakeups from the async executor, but
181            // apparently it happens quite a bit.
182        }
183
184        if let Some(res) = result_for_callback {
185            // We're done, fire the callback now.
186            callback(res, &mut ());
187            return Ok(calloop::PostAction::Remove);
188        }
189
190        Ok(calloop::PostAction::Continue)
191    }
192
193    fn register(
194        &mut self,
195        poll: &mut calloop::Poll,
196        token_factory: &mut calloop::TokenFactory,
197    ) -> calloop::Result<()> {
198        // Kickstart the events.
199        self.schedule_next()?;
200        self.executor.register(poll, token_factory)
201    }
202
203    fn reregister(
204        &mut self,
205        poll: &mut calloop::Poll,
206        token_factory: &mut calloop::TokenFactory,
207    ) -> calloop::Result<()> {
208        // Kickstart the events.
209        self.schedule_next()?;
210        self.executor.reregister(poll, token_factory)
211    }
212
213    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
214        self.executor.unregister(poll)
215    }
216}