use anyhow::{anyhow, Context};
use log::{debug, error};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::mpsc;
pub mod http_server;
pub mod http_upstream;
pub const CHANNEL_DEPTH: usize = 64;
pub struct UpstreamHandle {
pub tx: mpsc::Sender<String>,
pub rx: mpsc::Receiver<String>,
pub label: String,
pub child: Option<Child>,
}
pub fn spawn_stdio_upstream(cmd: &[String]) -> anyhow::Result<UpstreamHandle> {
let (program, args) = cmd
.split_first()
.ok_or_else(|| anyhow!("empty upstream command"))?;
let mut child = Command::new(program)
.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit())
.kill_on_drop(true)
.spawn()
.with_context(|| format!("failed to spawn upstream '{}'", program))?;
let mut child_in = child.stdin.take().ok_or_else(|| anyhow!("missing child stdin"))?;
let child_out = child.stdout.take().ok_or_else(|| anyhow!("missing child stdout"))?;
let (to_tx, mut to_rx) = mpsc::channel::<String>(CHANNEL_DEPTH);
let (from_tx, from_rx) = mpsc::channel::<String>(CHANNEL_DEPTH);
tokio::spawn(async move {
while let Some(frame) = to_rx.recv().await {
if let Err(e) = child_in.write_all(frame.as_bytes()).await {
error!("[shield] child stdin write error: {}", e);
break;
}
if let Err(e) = child_in.write_all(b"\n").await {
error!("[shield] child stdin write error: {}", e);
break;
}
let _ = child_in.flush().await;
}
let _ = child_in.shutdown().await;
});
tokio::spawn(async move {
let mut reader = BufReader::new(child_out);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => {
debug!("[shield] child EOF");
break;
}
Ok(_) => {}
Err(e) => {
error!("[shield] child read error: {}", e);
break;
}
}
let frame = line.trim_end();
if frame.is_empty() {
continue;
}
if from_tx.send(frame.to_string()).await.is_err() {
break;
}
}
});
Ok(UpstreamHandle {
tx: to_tx,
rx: from_rx,
label: cmd.join(" "),
child: Some(child),
})
}