#![allow(dead_code)]
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use anyhow::{Context, Result, anyhow};
use async_nats::jetstream;
use kanade_shared::bootstrap::ensure_jetstream_resources;
use kanade_shared::kv::{BUCKET_AGENT_CONFIG, KEY_AGENT_CONFIG_GLOBAL};
use serde_json::json;
use tempfile::TempDir;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::TcpStream;
use tokio::process::{Child, Command};
const AGENT_LOG_PREFIX: &str = "[agent]";
const NATS_LOG_PREFIX: &str = "[nats]";
const NATS_READY_TIMEOUT: Duration = Duration::from_secs(5);
const PORT_ALLOC_ATTEMPTS: u32 = 3;
pub fn nats_server_in_path() -> bool {
which::which("nats-server").is_ok()
}
#[macro_export]
macro_rules! skip_if_no_nats_server {
() => {
if !$crate::common::nats_server_in_path() {
eprintln!(
"skipping: `nats-server` not in PATH. Install via \
scoop / brew / GitHub release and re-run."
);
return;
}
};
}
pub struct NatsServer {
child: Child,
pub port: u16,
pub url: String,
storage_dir: TempDir,
}
impl NatsServer {
pub async fn start() -> Result<Self> {
let mut last_err: Option<anyhow::Error> = None;
for attempt in 1..=PORT_ALLOC_ATTEMPTS {
let port = portpicker::pick_unused_port()
.ok_or_else(|| anyhow!("no free TCP port available"))?;
match Self::start_on_port(port).await {
Ok(s) => return Ok(s),
Err(e) => {
eprintln!(
"{NATS_LOG_PREFIX} attempt {attempt}/{PORT_ALLOC_ATTEMPTS} \
on port {port} failed: {e:#}"
);
last_err = Some(e);
}
}
}
Err(last_err.unwrap_or_else(|| anyhow!("nats-server failed to start (no attempts ran)")))
}
pub async fn start_on_port(port: u16) -> Result<Self> {
let storage_dir = TempDir::new().context("create nats-server storage tempdir")?;
let mut cmd = Command::new("nats-server");
cmd.arg("-js")
.arg("-p")
.arg(port.to_string())
.arg("-sd")
.arg(storage_dir.path())
.arg("-T=false")
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
let mut child = cmd.spawn().context("spawn nats-server (is it in PATH?)")?;
if let Some(stdout) = child.stdout.take() {
forward_lines(stdout, NATS_LOG_PREFIX);
}
if let Some(stderr) = child.stderr.take() {
forward_lines(stderr, NATS_LOG_PREFIX);
}
let url = format!("nats://127.0.0.1:{port}");
wait_for_tcp(port, NATS_READY_TIMEOUT)
.await
.with_context(|| format!("nats-server did not become ready on {url}"))?;
Ok(Self {
child,
port,
url,
storage_dir,
})
}
pub async fn kill(&mut self) -> Result<()> {
let _ = self.child.start_kill();
let _ = self.child.wait().await;
Ok(())
}
pub async fn client(&self) -> Result<async_nats::Client> {
async_nats::connect(&self.url)
.await
.with_context(|| format!("connect to {}", self.url))
}
pub async fn bootstrap_jetstream(&self) -> Result<()> {
let client = self.client().await?;
let js = jetstream::new(client);
ensure_jetstream_resources(&js)
.await
.context("ensure_jetstream_resources")
}
pub async fn set_heartbeat_interval(&self, secs: u64) -> Result<()> {
let client = self.client().await?;
let js = jetstream::new(client);
let kv = js
.get_key_value(BUCKET_AGENT_CONFIG)
.await
.context("get_key_value(agent_config) — did you forget bootstrap_jetstream()?")?;
let payload = json!({ "heartbeat_interval": format!("{secs}s") });
kv.put(
KEY_AGENT_CONFIG_GLOBAL,
serde_json::to_vec(&payload)?.into(),
)
.await
.context("kv.put(agent_config/global)")?;
Ok(())
}
}
impl Drop for NatsServer {
fn drop(&mut self) {
let _ = self.child.start_kill();
}
}
pub struct AgentHandle {
child: Child,
pub pc_id: String,
_config_dir: TempDir,
}
impl AgentHandle {
pub async fn spawn(nats_url: &str) -> Result<Self> {
let config_dir = TempDir::new().context("create agent config tempdir")?;
let pc_id = format!("itest-{}", uuid::Uuid::new_v4().simple());
let log_path = config_dir.path().join("agent.log");
let toml = format!(
r#"[agent]
id = "{pc_id}"
nats_url = "{nats_url}"
[log]
path = {log_path:?}
level = "info"
# 0 disables the rolling file appender so the test doesn't race
# with the tracing-appender background flusher when the tempdir
# gets cleaned up.
keep_days = 0
"#
);
let config_path = config_dir.path().join("agent.toml");
std::fs::write(&config_path, toml).context("write temp agent.toml")?;
let bin: PathBuf = env!("CARGO_BIN_EXE_kanade-agent").into();
let mut cmd = Command::new(bin);
cmd.arg("--config")
.arg(&config_path)
.env_remove("KANADE_NATS_TOKEN")
.env("RUST_LOG", "info,kanade_agent=debug")
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
let mut child = cmd.spawn().context("spawn kanade-agent binary")?;
if let Some(stdout) = child.stdout.take() {
forward_lines(stdout, AGENT_LOG_PREFIX);
}
if let Some(stderr) = child.stderr.take() {
forward_lines(stderr, AGENT_LOG_PREFIX);
}
Ok(Self {
child,
pc_id,
_config_dir: config_dir,
})
}
pub fn is_alive(&mut self) -> bool {
self.child.try_wait().ok().flatten().is_none()
}
}
impl Drop for AgentHandle {
fn drop(&mut self) {
let _ = self.child.start_kill();
}
}
async fn wait_for_tcp(port: u16, timeout: Duration) -> Result<()> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
if TcpStream::connect(("127.0.0.1", port)).await.is_ok() {
return Ok(());
}
if tokio::time::Instant::now() >= deadline {
return Err(anyhow!(
"nats-server did not accept TCP on 127.0.0.1:{port} within {timeout:?}"
));
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
fn forward_lines<R>(stream: R, prefix: &'static str)
where
R: tokio::io::AsyncRead + Unpin + Send + 'static,
{
tokio::spawn(async move {
let mut lines = BufReader::new(stream).lines();
while let Ok(Some(line)) = lines.next_line().await {
eprintln!("{prefix} {line}");
}
});
}
use futures::StreamExt;
use kanade_shared::subject;
use kanade_shared::wire::Heartbeat;
pub async fn await_heartbeat(url: &str, pc_id: &str, timeout: Duration) -> Result<Heartbeat> {
let client = async_nats::connect(url)
.await
.context("connect for subscribe")?;
let mut sub = client
.subscribe(subject::heartbeat(pc_id))
.await
.with_context(|| format!("subscribe heartbeat.{pc_id}"))?;
let msg = tokio::time::timeout(timeout, sub.next())
.await
.map_err(|_| anyhow!("no heartbeat from {pc_id} within {timeout:?}"))?
.ok_or_else(|| anyhow!("heartbeat subscription closed"))?;
let hb: Heartbeat = serde_json::from_slice(&msg.payload).context("decode Heartbeat payload")?;
Ok(hb)
}