#[path = "common/mod.rs"]
mod common;
use std::time::Duration;
use crate::common::{AgentHandle, NatsServer, await_heartbeat};
const HEARTBEAT_INTERVAL_SECS: u64 = 1;
const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(60);
const BOGUS_NATS_URL: &str = "nats://127.0.0.1:1";
#[tokio::test]
#[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
async fn boots_with_no_broker_stays_alive() {
skip_if_no_nats_server!();
let mut agent = AgentHandle::spawn(BOGUS_NATS_URL)
.await
.expect("spawn agent");
tokio::time::sleep(Duration::from_secs(3)).await;
assert!(
agent.is_alive(),
"agent exited with no broker reachable; #137 regression?"
);
}
#[tokio::test]
#[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
async fn catches_up_when_broker_starts_later() {
skip_if_no_nats_server!();
let port = portpicker::pick_unused_port().expect("pick port");
let url = format!("nats://127.0.0.1:{port}");
let agent = AgentHandle::spawn(&url).await.expect("spawn agent");
tokio::time::sleep(Duration::from_secs(2)).await;
let nats = NatsServer::start_on_port(port).await.expect("start nats");
nats.bootstrap_jetstream()
.await
.expect("ensure_jetstream_resources");
nats.set_heartbeat_interval(HEARTBEAT_INTERVAL_SECS)
.await
.expect("set heartbeat cadence");
let hb = await_heartbeat(&nats.url, &agent.pc_id, HEARTBEAT_TIMEOUT)
.await
.expect("agent should publish a heartbeat after broker comes up");
assert_eq!(hb.pc_id, agent.pc_id);
}
#[tokio::test]
#[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
async fn recovers_after_broker_restart() {
skip_if_no_nats_server!();
let mut nats = NatsServer::start().await.expect("start nats");
nats.bootstrap_jetstream()
.await
.expect("ensure_jetstream_resources");
nats.set_heartbeat_interval(HEARTBEAT_INTERVAL_SECS)
.await
.expect("set heartbeat cadence");
let port = nats.port;
let agent = AgentHandle::spawn(&nats.url).await.expect("spawn agent");
let hb_before = await_heartbeat(&nats.url, &agent.pc_id, HEARTBEAT_TIMEOUT)
.await
.expect("pre-restart heartbeat");
assert_eq!(hb_before.pc_id, agent.pc_id);
nats.kill().await.expect("kill nats");
tokio::time::sleep(Duration::from_secs(2)).await;
let nats = NatsServer::start_on_port(port)
.await
.expect("restart nats on same port");
nats.bootstrap_jetstream()
.await
.expect("re-bootstrap after restart");
nats.set_heartbeat_interval(HEARTBEAT_INTERVAL_SECS)
.await
.expect("re-apply cadence");
let hb_after = await_heartbeat(&nats.url, &agent.pc_id, HEARTBEAT_TIMEOUT)
.await
.expect("post-restart heartbeat");
assert_eq!(hb_after.pc_id, agent.pc_id);
assert!(
hb_after.at >= hb_before.at,
"post-restart heartbeat timestamp regressed",
);
}