use super::{Command, CommandResult, ErrorEvent, LaunchError};
use calloop::{transient::TransientSource, PostAction};
use futures::{io::BufReader, AsyncBufReadExt, StreamExt};
pub struct SubprocListen {
executor: calloop::futures::Executor<CommandResult>,
outcome: Option<Result<(), ErrorEvent>>,
receiver: TransientSource<calloop::channel::Channel<ListenEvent>>,
stopper: Option<futures::channel::oneshot::Sender<()>>,
output_finished: bool,
}
impl SubprocListen {
pub fn new(command: Command) -> calloop::Result<Self> {
let (executor, scheduler) = calloop::futures::executor()?;
let (receiver, stopper) = Self::schedule_command(scheduler, command)?;
Ok(Self {
executor,
outcome: None,
receiver: receiver.into(),
stopper: Some(stopper),
output_finished: false,
})
}
pub fn kill(&mut self) {
if let Some(stopper) = self.stopper.take() {
stopper
.send(())
.expect("Could not send internal message to stop subprocess");
}
}
fn schedule_command(
scheduler: calloop::futures::Scheduler<CommandResult>,
command: Command,
) -> std::io::Result<(
calloop::channel::Channel<ListenEvent>,
futures::channel::oneshot::Sender<()>,
)> {
let command_debug_str_for_here = format!("{:?}", command);
let (sender, receiver) = calloop::channel::channel();
let (stopper, stop_rx) = futures::channel::oneshot::channel();
let async_exec = subproc_listener(command, sender, stop_rx);
scheduler.schedule(async_exec).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Could not schedule command: {:?}",
command_debug_str_for_here
),
)
})?;
Ok((receiver, stopper))
}
}
impl calloop::EventSource for SubprocListen {
type Event = ListenEvent;
type Metadata = ();
type Ret = bool;
type Error = LaunchError;
fn process_events<F>(
&mut self,
readiness: calloop::Readiness,
token: calloop::Token,
mut callback: F,
) -> Result<calloop::PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.executor
.process_events(readiness, token, |cmd_res, _| {
let CommandResult { command, result } = cmd_res;
log::trace!("Subprocess ended: {}", command);
self.stopper.take();
self.outcome = Some(result);
})?;
let mut kill = false;
let channel_post_action = self.receiver.process_events(readiness, token, |msg, _| {
match msg {
calloop::channel::Event::Msg(event) => {
let this_kill = callback(event, &mut ());
kill = kill || this_kill;
}
calloop::channel::Event::Closed => self.output_finished = true,
}
})?;
if kill {
self.kill();
}
let process_finished = self.outcome.is_some();
let post_action = if process_finished && self.output_finished {
let _ = callback(ListenEvent::End(self.outcome.take().unwrap()), &mut ());
PostAction::Remove
} else {
channel_post_action
};
Ok(post_action)
}
fn register(
&mut self,
poll: &mut calloop::Poll,
token_factory: &mut calloop::TokenFactory,
) -> calloop::Result<()> {
calloop::batch_register!(poll, token_factory, self.executor, self.receiver)
}
fn reregister(
&mut self,
poll: &mut calloop::Poll,
token_factory: &mut calloop::TokenFactory,
) -> calloop::Result<()> {
calloop::batch_reregister!(poll, token_factory, self.executor, self.receiver)
}
fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
calloop::batch_unregister!(poll, self.executor, self.receiver)
}
}
#[derive(Debug)]
pub enum ListenEvent {
Start,
Line(String),
End(Result<(), ErrorEvent>),
}
async fn subproc_listener(
command: Command,
sender: calloop::channel::Sender<ListenEvent>,
stopper: futures::channel::oneshot::Receiver<()>,
) -> CommandResult {
let command_debug_str = format!("{:?}", command);
let mut async_command: async_process::Command = command.into();
let async_child = async_command.stdout(async_process::Stdio::piped()).spawn();
let mut async_child = match async_child {
Ok(child) => child,
Err(error) => {
return CommandResult {
command: command_debug_str,
result: Err(ErrorEvent::IoError(error)),
}
}
};
sender
.send(ListenEvent::Start)
.expect("Could not send start message over internal channel");
let lines = BufReader::new(
async_child
.stdout
.take()
.expect("Cannot access subprocess stdout"),
)
.lines();
let mut lines_or_stop = lines.take_until(stopper);
while let Some(line) = lines_or_stop.next().await {
match line {
Ok(line) => {
sender
.send(ListenEvent::Line(line))
.expect("Could not send data over internal channel");
}
Err(error) => {
log::warn!(
"Error in output stream for subprocess: {}",
command_debug_str
);
log::warn!("Error: {:#?}", error);
break;
}
}
}
if let Err(error) = async_child.kill() {
log::warn!("Error killing subprocess: {}", command_debug_str);
log::warn!("Error: {:#?}", error);
} else {
log::trace!("Killed subprocess: {}", command_debug_str);
}
match async_child.status().await {
Ok(status) => {
if status.success() {
CommandResult {
command: command_debug_str,
result: Ok(()),
}
} else {
CommandResult {
command: command_debug_str,
result: Err(ErrorEvent::SubprocError(status)),
}
}
}
Err(error) => CommandResult {
command: command_debug_str,
result: Err(ErrorEvent::IoError(error)),
},
}
}