use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::time::Duration;
use tokio::process::{Child, Command};
use tokio::time::{sleep, timeout};
use url::Url;
use crate::{Error, Result, TapClient, TapConfig};
pub struct TapProcess {
child: Child,
url: Url,
config: TapConfig,
}
impl TapProcess {
pub async fn spawn(path: impl AsRef<Path>, config: TapConfig) -> Result<Self> {
let path = path.as_ref();
if !path.exists() {
return Err(Error::ProcessStart {
message: format!("tap binary not found at: {}", path.display()),
});
}
Self::spawn_inner(path.to_path_buf(), config).await
}
pub async fn spawn_default(config: TapConfig) -> Result<Self> {
let local_tap = PathBuf::from("./tap");
if local_tap.exists() {
return Self::spawn_inner(local_tap, config).await;
}
Self::spawn_inner(PathBuf::from("tap"), config).await
}
async fn spawn_inner(path: PathBuf, config: TapConfig) -> Result<Self> {
let bind = config.bind.clone().unwrap_or_else(|| ":2480".to_string());
let port = parse_port(&bind).unwrap_or(2480);
let url: Url = format!("http://127.0.0.1:{}", port)
.parse()
.map_err(|_| Error::InvalidUrl(format!("http://127.0.0.1:{}", port)))?;
let mut cmd = Command::new(&path);
cmd.arg("run").stdin(Stdio::null()).kill_on_drop(true);
if config.inherit_stdio() {
cmd.stdout(Stdio::inherit()).stderr(Stdio::inherit());
} else {
cmd.stdout(Stdio::null()).stderr(Stdio::null());
}
for (key, value) in config.to_env_vars() {
cmd.env(key, value);
}
let child = cmd.spawn().map_err(|e| Error::ProcessStart {
message: format!("Failed to spawn {}: {}", path.display(), e),
})?;
let mut process = Self {
child,
url: url.clone(),
config,
};
if let Err(e) = process.wait_for_healthy().await {
let _ = process.child.kill().await;
return Err(e);
}
Ok(process)
}
async fn wait_for_healthy(&self) -> Result<()> {
let startup_timeout = self.config.startup_timeout();
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(2))
.build()
.map_err(Error::Http)?;
let health_url = self.url.join("/health")?;
let result = timeout(startup_timeout, async {
loop {
match client.get(health_url.clone()).send().await {
Ok(resp) if resp.status().is_success() => return Ok(()),
_ => sleep(Duration::from_millis(100)).await,
}
}
})
.await;
match result {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(e),
Err(_) => Err(Error::Timeout),
}
}
pub fn url(&self) -> &Url {
&self.url
}
pub fn client(&self) -> Result<TapClient> {
TapClient::with_config(self.url.as_str(), &self.config)
}
pub fn is_running(&mut self) -> bool {
matches!(self.child.try_wait(), Ok(None))
}
pub async fn shutdown(&mut self) -> Result<()> {
#[cfg(unix)]
{
if let Some(pid) = self.child.id() {
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
}
let shutdown_timeout = self.config.shutdown_timeout();
match timeout(shutdown_timeout, self.child.wait()).await {
Ok(Ok(_)) => return Ok(()),
Ok(Err(e)) => return Err(Error::Io(e)),
Err(_) => {
let _ = self.child.kill().await;
}
}
}
#[cfg(not(unix))]
{
let _ = self.child.kill().await;
}
Ok(())
}
}
impl Drop for TapProcess {
fn drop(&mut self) {
#[cfg(unix)]
{
if let Some(pid) = self.child.id() {
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
}
}
}
}
fn parse_port(bind: &str) -> Option<u16> {
bind.rsplit(':').next()?.parse().ok()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_port() {
assert_eq!(parse_port(":2480"), Some(2480));
assert_eq!(parse_port("127.0.0.1:3000"), Some(3000));
assert_eq!(parse_port("0.0.0.0:8080"), Some(8080));
assert_eq!(parse_port("[::1]:2480"), Some(2480));
assert_eq!(parse_port("[2001:db8::1]:8080"), Some(8080));
assert_eq!(parse_port("invalid"), None);
}
}