calloop-subproc 1.0.0

Subprocess support for the Calloop event loop
Documentation
use std::collections::VecDeque;

use super::{Command, CommandResult, ErrorEvent, LaunchError, SrcResult};
use log::*;

/// An event source that processes a sequence of commands.
///
/// Exactly one event will ever be generated, and it will be either at the
/// successful completion of all the commands OR at the first failed subprocess.
/// If an optional cleanup command is provided, that will be run before the
/// event is generated, but only in the event of a failure.
///
/// Note that to avoid leaking file descriptors (or the memory associated with
/// the event source itself), you must either honour the [`calloop::PostAction`]
/// returned from `process_events()` yourself, or use this with
/// [`TransientSource`](calloop::transient::TransientSource) in `calloop`.
pub struct SubprocChain {
    /// The executor (event source) that actually runs commands and generates
    /// events for us.
    executor: calloop::futures::Executor<CommandResult>,

    /// The scheduler that sends `Future`s to the executor.
    scheduler: calloop::futures::Scheduler<CommandResult>,

    /// A FIFO of `Command`s to run.
    commands: VecDeque<Command>,

    /// If we have to abort the requested series of commands due to an error, we
    /// store the result from the failed command here. This being `Some(_)`
    /// implies that we are in a cleanup phase.
    result: Option<SrcResult>,

    /// If provided, this is a cleanup command to run on failure.
    cleanup: Option<Command>,
}

impl SubprocChain {
    /// Create a new subprocess event source from a sequence of commands and,
    /// optionally, a cleanup command to run in case of failure.
    pub fn new<T>(commands: T, cleanup: Option<Command>) -> calloop::Result<Self>
    where
        T: IntoIterator<Item = Command>,
    {
        let (executor, scheduler) = calloop::futures::executor()?;

        Ok(SubprocChain {
            commands: commands.into_iter().collect(),
            executor,
            scheduler,
            result: None,
            cleanup,
        })
    }

    /// Schedules a single command in our async executor. This does not have to
    /// come from our internal queue (but might).
    fn schedule_command(&self, command: Command) -> std::io::Result<()> {
        let command_debug_str_for_here = format!("{:?}", command);
        let command_debug_str_for_async = command_debug_str_for_here.clone();

        let async_exec = async move {
            let mut async_command: async_process::Command = command.into();
            let async_status = async_command.status();

            match async_status.await {
                Ok(status) => {
                    if status.success() {
                        CommandResult {
                            command: command_debug_str_for_async,
                            result: Ok(()),
                        }
                    } else {
                        CommandResult {
                            command: command_debug_str_for_async,
                            result: Err(ErrorEvent::SubprocError(status)),
                        }
                    }
                }

                Err(error) => CommandResult {
                    command: command_debug_str_for_async,
                    result: Err(ErrorEvent::IoError(error)),
                },
            }
        };
        self.scheduler.schedule(async_exec).map_err(|_| {
            std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!(
                    "Could not schedule command: {:?}",
                    command_debug_str_for_here
                ),
            )
        })
    }

    /// Schedules the next command in our internal queue. Returns `Ok(true)` if
    /// a command was scheduled, or `Ok(false)` if there were none left.
    fn schedule_next(&mut self) -> std::io::Result<bool> {
        if let Some(cmd) = self.commands.pop_front() {
            self.schedule_command(cmd)?;
            Ok(true)
        } else {
            Ok(false)
        }
    }
}

impl calloop::EventSource for SubprocChain {
    type Event = SrcResult;
    type Metadata = ();
    type Ret = ();
    type Error = LaunchError;

    /// This event source is designed to fire exactly once, and the callback
    /// will receive the result. See the `SubprocChain` docs for more detail.
    ///
    /// Even though the callback will only be called once, the API for
    /// `calloop::EventSource` requires it to be a `FnMut`. This can be worked
    /// around by wrapping a `FnOnce` in an `Option` type if necessary.
    fn process_events<F>(
        &mut self,
        readiness: calloop::Readiness,
        token: calloop::Token,
        mut callback: F,
    ) -> Result<calloop::PostAction, Self::Error>
    where
        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
    {
        // Basic approach:
        // - get the result of the most recent command from self.executor via
        //   this_result
        // - look at that to decide if we need to call the callback
        // - if we do, the thing to pass to the callback is stored in
        //   result_for_callback (if it's None, don't call it)

        let mut this_result = None;
        let mut result_for_callback = None;

        self.executor
            .process_events(readiness, token, |call_result, _| {
                this_result = Some(call_result);
            })?;

        if let Some(result) = this_result {
            match result {
                CommandResult {
                    command: _,
                    result: Ok(()),
                } => {
                    if !self.schedule_next()? {
                        // We're all done! Check the main result in case we're
                        // here after cleanup.
                        result_for_callback = Some(self.result.take().unwrap_or(Ok(())));
                    }
                }

                CommandResult {
                    command,
                    result: Err(error),
                } => {
                    error!("Error executing command: {}", command);

                    // If we already have a "main" result it means we're in the
                    // cleanup phase. Finish up by passing the main result.
                    // Otherwise do cleanup.
                    if let Some(res) = self.result.take() {
                        result_for_callback = Some(res);
                    } else if let Some(cmd) = self.cleanup.take() {
                        warn!("Invoking cleanup command: {:?}", cmd);
                        self.commands.clear();
                        self.result = Some(Err(error));
                        self.schedule_command(cmd)?;
                    } else {
                        result_for_callback = Some(Err(error));
                    }
                }
            }
        } else {
            // We don't expect spurious wakeups from the async executor, but
            // apparently it happens quite a bit.
        }

        if let Some(res) = result_for_callback {
            // We're done, fire the callback now.
            callback(res, &mut ());
            return Ok(calloop::PostAction::Remove);
        }

        Ok(calloop::PostAction::Continue)
    }

    fn register(
        &mut self,
        poll: &mut calloop::Poll,
        token_factory: &mut calloop::TokenFactory,
    ) -> calloop::Result<()> {
        // Kickstart the events.
        self.schedule_next()?;
        self.executor.register(poll, token_factory)
    }

    fn reregister(
        &mut self,
        poll: &mut calloop::Poll,
        token_factory: &mut calloop::TokenFactory,
    ) -> calloop::Result<()> {
        // Kickstart the events.
        self.schedule_next()?;
        self.executor.reregister(poll, token_factory)
    }

    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
        self.executor.unregister(poll)
    }
}