use std::collections::VecDeque;
use super::{Command, CommandResult, ErrorEvent, LaunchError, SrcResult};
use log::*;
pub struct SubprocChain {
executor: calloop::futures::Executor<CommandResult>,
scheduler: calloop::futures::Scheduler<CommandResult>,
commands: VecDeque<Command>,
result: Option<SrcResult>,
cleanup: Option<Command>,
}
impl SubprocChain {
pub fn new<T>(commands: T, cleanup: Option<Command>) -> calloop::Result<Self>
where
T: IntoIterator<Item = Command>,
{
let (executor, scheduler) = calloop::futures::executor()?;
Ok(SubprocChain {
commands: commands.into_iter().collect(),
executor,
scheduler,
result: None,
cleanup,
})
}
fn schedule_command(&self, command: Command) -> std::io::Result<()> {
let command_debug_str_for_here = format!("{:?}", command);
let command_debug_str_for_async = command_debug_str_for_here.clone();
let async_exec = async move {
let mut async_command: async_process::Command = command.into();
let async_status = async_command.status();
match async_status.await {
Ok(status) => {
if status.success() {
CommandResult {
command: command_debug_str_for_async,
result: Ok(()),
}
} else {
CommandResult {
command: command_debug_str_for_async,
result: Err(ErrorEvent::SubprocError(status)),
}
}
}
Err(error) => CommandResult {
command: command_debug_str_for_async,
result: Err(ErrorEvent::IoError(error)),
},
}
};
self.scheduler.schedule(async_exec).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Could not schedule command: {:?}",
command_debug_str_for_here
),
)
})
}
fn schedule_next(&mut self) -> std::io::Result<bool> {
if let Some(cmd) = self.commands.pop_front() {
self.schedule_command(cmd)?;
Ok(true)
} else {
Ok(false)
}
}
}
impl calloop::EventSource for SubprocChain {
type Event = SrcResult;
type Metadata = ();
type Ret = ();
type Error = LaunchError;
fn process_events<F>(
&mut self,
readiness: calloop::Readiness,
token: calloop::Token,
mut callback: F,
) -> Result<calloop::PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let mut this_result = None;
let mut result_for_callback = None;
self.executor
.process_events(readiness, token, |call_result, _| {
this_result = Some(call_result);
})?;
if let Some(result) = this_result {
match result {
CommandResult {
command: _,
result: Ok(()),
} => {
if !self.schedule_next()? {
result_for_callback = Some(self.result.take().unwrap_or(Ok(())));
}
}
CommandResult {
command,
result: Err(error),
} => {
error!("Error executing command: {}", command);
if let Some(res) = self.result.take() {
result_for_callback = Some(res);
} else if let Some(cmd) = self.cleanup.take() {
warn!("Invoking cleanup command: {:?}", cmd);
self.commands.clear();
self.result = Some(Err(error));
self.schedule_command(cmd)?;
} else {
result_for_callback = Some(Err(error));
}
}
}
} else {
}
if let Some(res) = result_for_callback {
callback(res, &mut ());
return Ok(calloop::PostAction::Remove);
}
Ok(calloop::PostAction::Continue)
}
fn register(
&mut self,
poll: &mut calloop::Poll,
token_factory: &mut calloop::TokenFactory,
) -> calloop::Result<()> {
self.schedule_next()?;
self.executor.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut calloop::Poll,
token_factory: &mut calloop::TokenFactory,
) -> calloop::Result<()> {
self.schedule_next()?;
self.executor.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
self.executor.unregister(poll)
}
}