use crate::prelude::*;
type Tx = t_io::Split<t_io::BufReader<t_proc::ChildStdout>>;
pub struct Ipif {
pub tx: Tx,
pub rx: t_proc::ChildStdin,
stderr_task: JoinHandle<io::Result<()>>,
child: t_proc::Child,
cmd: String,
}
impl Ipif {
#[throws(AE)]
pub fn start(cmd: &str, ic_name: Option<String>) -> Self {
debug!("{}ipif: running command: {}",
OptionPrefixColon(ic_name.as_ref()),
cmd);
let mut child = tokio::process::Command::new("sh")
.args(["-c", cmd])
.stdin (process::Stdio::piped())
.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.kill_on_drop(true)
.spawn().context("spawn ipif")?;
let stderr = child.stderr.take().unwrap();
let stderr_task = task::spawn(async move {
let mut stderr = t_io::BufReader::new(stderr).lines();
while let Some(l) = stderr.next_line().await? {
error!("{}ipif stderr: {}",
OptionPrefixColon(ic_name.as_ref()),
l.trim_end());
}
Ok::<_,io::Error>(())
});
let tx = child.stdout.take().unwrap();
let rx = child.stdin .take().unwrap();
let tx = t_io::BufReader::new(tx).split(SLIP_END);
Ipif {
tx,
rx,
stderr_task,
child,
cmd: cmd.to_owned(),
}
}
pub async fn quitting(mut self, ic: Option<&InstanceConfig>) {
let icd = OptionPrefixColon(ic);
drop(self.rx);
error!("{}failed ipif command: {}", icd, &self.cmd);
match self.child.wait().await {
Err(e) => error!("{}also, failed to await ipif child: {}", icd, e),
Ok(st) => {
let stderr_timeout = Duration::from_millis(1000);
match tokio::time::timeout(stderr_timeout, self.stderr_task).await {
Err::<_,tokio::time::error::Elapsed>(_)
=> warn!("{}ipif stderr task continues!", icd),
Ok(Err(e)) => error!("{}ipif stderr task crashed: {}", icd, e),
Ok(Ok(Err(e))) => error!("{}ipif stderr read failed: {}", icd, e),
Ok(Ok(Ok(()))) => { },
}
if ! st.success() {
error!("{}ipif process failed: {}", icd, st);
}
}
}
drop(self.tx);
}
#[throws(AE)]
pub async fn next_frame(tx: &mut Tx) -> Vec<u8> {
let data = tx.next_segment().await;
(||{
data?.ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))
})().context("read from ipif")?
}
}