hippotat 1.1.7

Asinine HTTP-over-IP
Documentation
// Copyright 2021-2022 Ian Jackson and contributors to Hippotat
// SPDX-License-Identifier: GPL-3.0-or-later WITH LicenseRef-Hippotat-OpenSSL-Exception
// There is NO WARRANTY.

use crate::prelude::*;

type Tx = t_io::Split<t_io::BufReader<t_proc::ChildStdout>>;

pub struct Ipif {
  pub tx: Tx,
  pub rx: t_proc::ChildStdin,
  stderr_task: JoinHandle<io::Result<()>>,
  child: t_proc::Child,
  cmd: String,
}

impl Ipif {
  #[throws(AE)]
  pub fn start(cmd: &str, ic_name: Option<String>) -> Self {
    debug!("{}ipif: running command: {}",
           OptionPrefixColon(ic_name.as_ref()),
           cmd);

    let mut child = tokio::process::Command::new("sh")
      .args(["-c", cmd])
      .stdin (process::Stdio::piped())
      .stdout(process::Stdio::piped())
      .stderr(process::Stdio::piped())
      .kill_on_drop(true)
      .spawn().context("spawn ipif")?;

    let stderr = child.stderr.take().unwrap();

    let stderr_task = task::spawn(async move {
      let mut stderr = t_io::BufReader::new(stderr).lines();
      while let Some(l) = stderr.next_line().await? {
        error!("{}ipif stderr: {}",
               OptionPrefixColon(ic_name.as_ref()),
               l.trim_end());
      }
      Ok::<_,io::Error>(())
    });
 
    let tx = child.stdout.take().unwrap();
    let rx = child.stdin .take().unwrap();
    let tx = t_io::BufReader::new(tx).split(SLIP_END);

    Ipif {
      tx,
      rx,
      stderr_task,
      child,
      cmd: cmd.to_owned(),
    }
  }

  pub async fn quitting(mut self, ic: Option<&InstanceConfig>) {
    let icd = OptionPrefixColon(ic);
    drop(self.rx);

    error!("{}failed ipif command: {}", icd, &self.cmd);

    match self.child.wait().await {
      Err(e) => error!("{}also, failed to await ipif child: {}", icd, e),
      Ok(st) => {
        let stderr_timeout = Duration::from_millis(1000);
        match tokio::time::timeout(stderr_timeout, self.stderr_task).await {
          Err::<_,tokio::time::error::Elapsed>(_)
            => warn!("{}ipif stderr task continues!", icd),
          Ok(Err(e)) => error!("{}ipif stderr task crashed: {}", icd, e),
          Ok(Ok(Err(e))) => error!("{}ipif stderr read failed: {}", icd, e),
          Ok(Ok(Ok(()))) => { },
        }
        if ! st.success() {
          error!("{}ipif process failed: {}", icd, st);
        }
      }
    }

    drop(self.tx);
  }

  #[throws(AE)]
  pub async fn next_frame(tx: &mut Tx) -> Vec<u8> {
    let data = tx.next_segment().await;
    (||{
      data?.ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))
    })().context("read from ipif")?
  }
}