use tempfile::TempDir;
use x0x::{network::NetworkConfig, Agent};
fn cfg_a_local(_port_offset: u16) -> NetworkConfig {
NetworkConfig {
bind_addr: Some("127.0.0.1:0".parse().unwrap()),
bootstrap_nodes: vec![],
..Default::default()
}
}
fn cfg_a_vps(_port_offset: u16) -> NetworkConfig {
use x0x::network::DEFAULT_BOOTSTRAP_PEERS;
NetworkConfig {
bind_addr: Some("127.0.0.1:0".parse().unwrap()),
bootstrap_nodes: DEFAULT_BOOTSTRAP_PEERS
.iter()
.filter_map(|s| s.parse().ok())
.collect(),
..Default::default()
}
}
fn cfg_b(a_addr: std::net::SocketAddr, vps: bool) -> NetworkConfig {
let mut nodes: Vec<std::net::SocketAddr> = if vps {
use x0x::network::DEFAULT_BOOTSTRAP_PEERS;
DEFAULT_BOOTSTRAP_PEERS
.iter()
.filter_map(|s| s.parse().ok())
.collect()
} else {
vec![]
};
nodes.push(a_addr);
NetworkConfig {
bind_addr: Some("127.0.0.1:0".parse().unwrap()),
bootstrap_nodes: nodes,
..Default::default()
}
}
fn cfg_b_vps_only() -> NetworkConfig {
use x0x::network::DEFAULT_BOOTSTRAP_PEERS;
NetworkConfig {
bind_addr: Some(std::net::SocketAddr::from(([127, 0, 0, 1], 0))),
bootstrap_nodes: DEFAULT_BOOTSTRAP_PEERS
.iter()
.filter_map(|s| s.parse().ok())
.collect(),
..Default::default()
}
}
const RENDEZVOUS_VALIDITY_MS: u64 = 3_600_000;
async fn wait_for_discovery(
observer: &Agent,
target_id: x0x::identity::AgentId,
timeout: std::time::Duration,
) -> bool {
let start = tokio::time::Instant::now();
loop {
let agents = observer.discovered_agents().await.unwrap_or_default();
if agents.iter().any(|a| a.agent_id == target_id) {
return true;
}
if start.elapsed() >= timeout {
return false;
}
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
#[ignore = "requires real QUIC loopback connections — timing-sensitive on macOS dual-stack"]
#[tokio::test(flavor = "multi_thread")]
async fn test_local_identity_announcement_and_discovery() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let agent_a = Agent::builder()
.with_machine_key(dir_a.path().join("machine.key"))
.with_agent_key_path(dir_a.path().join("agent.key"))
.with_network_config(cfg_a_local(0))
.build()
.await
.unwrap();
agent_a.join_network().await.unwrap();
let a_addr = agent_a
.bound_addr()
.await
.expect("agent A must have a bound address");
let agent_b = Agent::builder()
.with_machine_key(dir_b.path().join("machine.key"))
.with_agent_key_path(dir_b.path().join("agent.key"))
.with_network_config(cfg_b(a_addr, false))
.build()
.await
.unwrap();
agent_b.join_network().await.unwrap();
agent_a.announce_identity(false, false).await.unwrap();
let found = wait_for_discovery(
&agent_b,
agent_a.agent_id(),
std::time::Duration::from_secs(10),
)
.await;
assert!(found, "Agent B should discover Agent A within 10s");
let discovered = agent_b.discovered_agents().await.unwrap();
let entry = discovered
.iter()
.find(|a| a.agent_id == agent_a.agent_id())
.expect("entry must be present");
assert_eq!(entry.machine_id, agent_a.machine_id());
assert!(entry.user_id.is_none());
assert!(entry.announced_at > 0);
assert!(
!entry.machine_public_key.is_empty(),
"machine public key must be populated"
);
}
#[ignore = "requires real QUIC loopback connections — timing-sensitive on macOS dual-stack"]
#[tokio::test(flavor = "multi_thread")]
async fn test_local_late_join_heartbeat_discovery() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let agent_a = Agent::builder()
.with_machine_key(dir_a.path().join("machine.key"))
.with_agent_key_path(dir_a.path().join("agent.key"))
.with_network_config(cfg_a_local(1))
.with_heartbeat_interval(5)
.build()
.await
.unwrap();
agent_a.join_network().await.unwrap();
let a_addr = agent_a
.bound_addr()
.await
.expect("agent A must have a bound address");
tokio::time::sleep(std::time::Duration::from_secs(8)).await;
let agent_b = Agent::builder()
.with_machine_key(dir_b.path().join("machine.key"))
.with_agent_key_path(dir_b.path().join("agent.key"))
.with_network_config(cfg_b(a_addr, false))
.build()
.await
.unwrap();
agent_b.join_network().await.unwrap();
let found = wait_for_discovery(
&agent_b,
agent_a.agent_id(),
std::time::Duration::from_secs(10),
)
.await;
assert!(
found,
"Late-joining Agent B should discover Agent A within 10s via heartbeat"
);
}
#[ignore = "requires real QUIC loopback connections — timing-sensitive on macOS dual-stack"]
#[tokio::test(flavor = "multi_thread")]
async fn test_local_find_agent_returns_cached_result() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let agent_a = Agent::builder()
.with_machine_key(dir_a.path().join("machine.key"))
.with_agent_key_path(dir_a.path().join("agent.key"))
.with_network_config(cfg_a_local(2))
.build()
.await
.unwrap();
agent_a.join_network().await.unwrap();
let a_addr = agent_a
.bound_addr()
.await
.expect("agent A must have a bound address");
let agent_b = Agent::builder()
.with_machine_key(dir_b.path().join("machine.key"))
.with_agent_key_path(dir_b.path().join("agent.key"))
.with_network_config(cfg_b(a_addr, false))
.build()
.await
.unwrap();
agent_b.join_network().await.unwrap();
agent_a.announce_identity(false, false).await.unwrap();
let discovered = wait_for_discovery(
&agent_b,
agent_a.agent_id(),
std::time::Duration::from_secs(10),
)
.await;
assert!(
discovered,
"identity listener must populate B's cache from A's announcement"
);
let result = agent_b
.find_agent(agent_a.agent_id())
.await
.expect("find_agent must not error");
assert!(
result.is_some(),
"find_agent must return Some when target is already in cache"
);
let addrs = result.unwrap();
assert!(
!addrs.is_empty(),
"cached entry must include at least one address"
);
}
#[ignore = "requires real QUIC loopback connections — timing-sensitive on macOS dual-stack"]
#[tokio::test(flavor = "multi_thread")]
async fn test_local_user_identity_discovery() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let user_kp = x0x::identity::UserKeypair::generate().unwrap();
let user_id = user_kp.user_id();
let agent_a = Agent::builder()
.with_machine_key(dir_a.path().join("machine.key"))
.with_agent_key_path(dir_a.path().join("agent.key"))
.with_network_config(cfg_a_local(3))
.with_user_key(user_kp)
.build()
.await
.unwrap();
agent_a.join_network().await.unwrap();
agent_a.announce_identity(true, true).await.unwrap();
let a_addr = agent_a
.bound_addr()
.await
.expect("agent A must have a bound address");
let agent_b = Agent::builder()
.with_machine_key(dir_b.path().join("machine.key"))
.with_agent_key_path(dir_b.path().join("agent.key"))
.with_network_config(cfg_b(a_addr, false))
.build()
.await
.unwrap();
agent_b.join_network().await.unwrap();
agent_a.announce_identity(true, true).await.unwrap();
let found = wait_for_discovery(
&agent_b,
agent_a.agent_id(),
std::time::Duration::from_secs(10),
)
.await;
assert!(found, "Agent B should discover Agent A within 10s");
let by_user = agent_b
.find_agents_by_user(user_id)
.await
.expect("find_agents_by_user must not error");
assert!(
!by_user.is_empty(),
"find_agents_by_user should return Agent A"
);
assert_eq!(by_user[0].agent_id, agent_a.agent_id());
assert_eq!(by_user[0].user_id, Some(user_id));
}
#[ignore = "requires live VPS bootstrap nodes (UDP port 5483 must be accessible)"]
#[tokio::test(flavor = "multi_thread")]
async fn test_vps_identity_announcement_and_discovery() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let agent_a = Agent::builder()
.with_machine_key(dir_a.path().join("machine.key"))
.with_agent_key_path(dir_a.path().join("agent.key"))
.with_network_config(cfg_a_vps(10))
.build()
.await
.unwrap();
agent_a.join_network().await.unwrap();
let a_addr = agent_a
.bound_addr()
.await
.expect("agent A must have a bound address");
let agent_b = Agent::builder()
.with_machine_key(dir_b.path().join("machine.key"))
.with_agent_key_path(dir_b.path().join("agent.key"))
.with_network_config(cfg_b(a_addr, true))
.build()
.await
.unwrap();
agent_b.join_network().await.unwrap();
agent_a.announce_identity(false, false).await.unwrap();
let found = wait_for_discovery(
&agent_b,
agent_a.agent_id(),
std::time::Duration::from_secs(20),
)
.await;
assert!(
found,
"Agent B should discover Agent A within 20s via VPS gossip"
);
let discovered = agent_b.discovered_agents().await.unwrap();
let entry = discovered
.iter()
.find(|a| a.agent_id == agent_a.agent_id())
.expect("entry must be present");
assert_eq!(entry.machine_id, agent_a.machine_id());
assert!(!entry.machine_public_key.is_empty());
}
#[ignore = "requires live VPS bootstrap nodes (UDP port 5483 must be accessible)"]
#[tokio::test(flavor = "multi_thread")]
async fn test_vps_late_join_heartbeat_discovery() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let agent_a = Agent::builder()
.with_machine_key(dir_a.path().join("machine.key"))
.with_agent_key_path(dir_a.path().join("agent.key"))
.with_network_config(cfg_a_vps(11))
.with_heartbeat_interval(10)
.build()
.await
.unwrap();
agent_a.join_network().await.unwrap();
let a_addr = agent_a
.bound_addr()
.await
.expect("agent A must have a bound address");
tokio::time::sleep(std::time::Duration::from_secs(15)).await;
let agent_b = Agent::builder()
.with_machine_key(dir_b.path().join("machine.key"))
.with_agent_key_path(dir_b.path().join("agent.key"))
.with_network_config(cfg_b(a_addr, true))
.build()
.await
.unwrap();
agent_b.join_network().await.unwrap();
let found = wait_for_discovery(
&agent_b,
agent_a.agent_id(),
std::time::Duration::from_secs(15),
)
.await;
assert!(
found,
"Late-joining Agent B should discover Agent A via heartbeat"
);
}
#[ignore = "requires live VPS bootstrap nodes (UDP port 5483 must be accessible)"]
#[tokio::test(flavor = "multi_thread")]
async fn test_vps_rendezvous_find_agent() -> Result<(), Box<dyn std::error::Error>> {
let dir_a = TempDir::new()?;
let dir_b = TempDir::new()?;
let agent_a = Agent::builder()
.with_machine_key(dir_a.path().join("machine.key"))
.with_agent_key_path(dir_a.path().join("agent.key"))
.with_network_config(cfg_a_vps(12))
.with_heartbeat_interval(4)
.build()
.await?;
agent_a.join_network().await?;
let agent_b = Agent::builder()
.with_machine_key(dir_b.path().join("machine.key"))
.with_agent_key_path(dir_b.path().join("agent.key"))
.with_network_config(cfg_b_vps_only())
.build()
.await?;
agent_b.join_network().await?;
let target_id = agent_a.agent_id();
let lookup = agent_b.find_agent_rendezvous(target_id, 10);
let advertise = async {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
agent_a.advertise_identity(RENDEZVOUS_VALIDITY_MS).await
};
let (result, ()) = tokio::try_join!(lookup, advertise)?;
assert!(
result.is_some(),
"find_agent_rendezvous should locate Agent A within 10s"
);
Ok(())
}
#[ignore = "requires live VPS bootstrap nodes (UDP port 5483 must be accessible)"]
#[tokio::test(flavor = "multi_thread")]
async fn test_vps_user_identity_discovery() {
let dir_a = TempDir::new().unwrap();
let dir_b = TempDir::new().unwrap();
let user_kp = x0x::identity::UserKeypair::generate().unwrap();
let user_id = user_kp.user_id();
let agent_a = Agent::builder()
.with_machine_key(dir_a.path().join("machine.key"))
.with_agent_key_path(dir_a.path().join("agent.key"))
.with_network_config(cfg_a_vps(13))
.with_user_key(user_kp)
.build()
.await
.unwrap();
agent_a.join_network().await.unwrap();
agent_a.announce_identity(true, true).await.unwrap();
let a_addr = agent_a
.bound_addr()
.await
.expect("agent A must have a bound address");
let agent_b = Agent::builder()
.with_machine_key(dir_b.path().join("machine.key"))
.with_agent_key_path(dir_b.path().join("agent.key"))
.with_network_config(cfg_b(a_addr, true))
.build()
.await
.unwrap();
agent_b.join_network().await.unwrap();
agent_a.announce_identity(true, true).await.unwrap();
let found = wait_for_discovery(
&agent_b,
agent_a.agent_id(),
std::time::Duration::from_secs(20),
)
.await;
assert!(found, "Agent B should discover Agent A within 20s");
let by_user = agent_b
.find_agents_by_user(user_id)
.await
.expect("find_agents_by_user must not error");
assert!(
!by_user.is_empty(),
"find_agents_by_user must return Agent A"
);
assert_eq!(by_user[0].agent_id, agent_a.agent_id());
assert_eq!(by_user[0].user_id, Some(user_id));
}