tapped 0.3.1

Rust wrapper for the tap ATProto utility
Documentation
//! Subprocess management for spawning and managing a tap process.

use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::time::Duration;

use tokio::process::{Child, Command};
use tokio::time::{sleep, timeout};
use url::Url;

use crate::{Error, Result, TapClient, TapConfig};

/// A running tap process.
///
/// The process receives SIGTERM when this struct is dropped. For graceful
/// shutdown with timeout handling, call [`shutdown()`](TapProcess::shutdown) explicitly.
pub struct TapProcess {
    child: Child,
    url: Url,
    config: TapConfig,
}

impl TapProcess {
    /// Spawn a tap process at the given path with the given configuration.
    ///
    /// The path should point to the `tap` binary. The process will be started
    /// with the `run` subcommand and configuration passed as environment
    /// variables.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # async fn example() -> tapped::Result<()> {
    /// use tapped::{TapProcess, TapConfig};
    ///
    /// let config = TapConfig::builder()
    ///     .database_url("sqlite://./my-tap.db")
    ///     .build();
    ///
    /// let process = TapProcess::spawn("./tap", config).await?;
    /// let client = process.client()?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn spawn(path: impl AsRef<Path>, config: TapConfig) -> Result<Self> {
        let path = path.as_ref();

        if !path.exists() {
            return Err(Error::ProcessStart {
                message: format!("tap binary not found at: {}", path.display()),
            });
        }

        Self::spawn_inner(path.to_path_buf(), config).await
    }

    /// Spawn a tap process using the default path discovery.
    ///
    /// Checks for `./tap` first, then falls back to `tap` on PATH.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # async fn example() -> tapped::Result<()> {
    /// use tapped::{TapProcess, TapConfig};
    ///
    /// let config = TapConfig::builder()
    ///     .database_url("sqlite://./my-tap.db")
    ///     .build();
    ///
    /// let process = TapProcess::spawn_default(config).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn spawn_default(config: TapConfig) -> Result<Self> {
        let local_tap = PathBuf::from("./tap");
        if local_tap.exists() {
            return Self::spawn_inner(local_tap, config).await;
        }

        Self::spawn_inner(PathBuf::from("tap"), config).await
    }

    async fn spawn_inner(path: PathBuf, config: TapConfig) -> Result<Self> {
        let bind = config.bind.clone().unwrap_or_else(|| ":2480".to_string());
        let port = parse_port(&bind).unwrap_or(2480);

        // Spawned processes are always local - connect to localhost regardless
        // of what interface tap binds to.
        let url: Url = format!("http://127.0.0.1:{}", port)
            .parse()
            .map_err(|_| Error::InvalidUrl(format!("http://127.0.0.1:{}", port)))?;

        let mut cmd = Command::new(&path);
        cmd.arg("run").stdin(Stdio::null()).kill_on_drop(true);

        if config.inherit_stdio() {
            cmd.stdout(Stdio::inherit()).stderr(Stdio::inherit());
        } else {
            cmd.stdout(Stdio::null()).stderr(Stdio::null());
        }

        for (key, value) in config.to_env_vars() {
            cmd.env(key, value);
        }

        let child = cmd.spawn().map_err(|e| Error::ProcessStart {
            message: format!("Failed to spawn {}: {}", path.display(), e),
        })?;

        let mut process = Self {
            child,
            url: url.clone(),
            config,
        };

        if let Err(e) = process.wait_for_healthy().await {
            // Kill the process if health check fails
            let _ = process.child.kill().await;
            return Err(e);
        }

        Ok(process)
    }

    /// Wait for the tap process to become healthy.
    async fn wait_for_healthy(&self) -> Result<()> {
        let startup_timeout = self.config.startup_timeout();
        let client = reqwest::Client::builder()
            .timeout(Duration::from_secs(2))
            .build()
            .map_err(Error::Http)?;

        let health_url = self.url.join("/health")?;

        let result = timeout(startup_timeout, async {
            loop {
                match client.get(health_url.clone()).send().await {
                    Ok(resp) if resp.status().is_success() => return Ok(()),
                    _ => sleep(Duration::from_millis(100)).await,
                }
            }
        })
        .await;

        match result {
            Ok(Ok(())) => Ok(()),
            Ok(Err(e)) => Err(e),
            Err(_) => Err(Error::Timeout),
        }
    }

    /// Get the URL of the running tap instance.
    pub fn url(&self) -> &Url {
        &self.url
    }

    /// Create a client connected to this tap process.
    pub fn client(&self) -> Result<TapClient> {
        TapClient::with_config(self.url.as_str(), &self.config)
    }

    /// Check if the process is still running.
    pub fn is_running(&mut self) -> bool {
        matches!(self.child.try_wait(), Ok(None))
    }

    /// Gracefully shut down the tap process.
    ///
    /// Sends SIGTERM and waits up to `shutdown_timeout` before sending SIGKILL.
    pub async fn shutdown(&mut self) -> Result<()> {
        #[cfg(unix)]
        {
            // Send SIGTERM
            if let Some(pid) = self.child.id() {
                unsafe {
                    libc::kill(pid as i32, libc::SIGTERM);
                }
            }

            // Wait for graceful shutdown
            let shutdown_timeout = self.config.shutdown_timeout();
            match timeout(shutdown_timeout, self.child.wait()).await {
                Ok(Ok(_)) => return Ok(()),
                Ok(Err(e)) => return Err(Error::Io(e)),
                Err(_) => {
                    // Timeout, send SIGKILL
                    let _ = self.child.kill().await;
                }
            }
        }

        #[cfg(not(unix))]
        {
            let _ = self.child.kill().await;
        }

        Ok(())
    }
}

impl Drop for TapProcess {
    fn drop(&mut self) {
        #[cfg(unix)]
        {
            if let Some(pid) = self.child.id() {
                unsafe {
                    libc::kill(pid as i32, libc::SIGTERM);
                }
            }
        }
    }
}

/// Extract the port from a bind address.
///
/// The bind format is passed directly to tap (Go's net.Listen format).
/// We just need the port to construct a localhost URL for health checks.
fn parse_port(bind: &str) -> Option<u16> {
    // The port is always after the last colon, even for IPv6 like [::1]:2480
    bind.rsplit(':').next()?.parse().ok()
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_port() {
        assert_eq!(parse_port(":2480"), Some(2480));
        assert_eq!(parse_port("127.0.0.1:3000"), Some(3000));
        assert_eq!(parse_port("0.0.0.0:8080"), Some(8080));
        assert_eq!(parse_port("[::1]:2480"), Some(2480));
        assert_eq!(parse_port("[2001:db8::1]:8080"), Some(8080));
        assert_eq!(parse_port("invalid"), None);
    }
}