use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::PathBuf;
#[cfg(feature = "rns-hooks-builtin")]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use rns_crypto::identity::Identity;
use rns_crypto::OsRng;
#[cfg(feature = "rns-hooks-builtin")]
use rns_hooks_crate::{BuiltinHookCall, BuiltinHookHost, HookError, HookResult};
use rusqlite::Connection;
use rns_net::{
InterfaceConfig, InterfaceId, NodeConfig, RnsNode, TcpClientConfig, TcpServerConfig, MODE_FULL,
};
use rns_ctl::api::NodeHandle;
use rns_ctl::bridge::CtlCallbacks;
use rns_ctl::config::CtlConfig;
use rns_ctl::server::{self, ServerContext};
use rns_ctl::state::{
ensure_process, push_process_log, CtlState, LaunchProcessSnapshot, ManagedProcessState,
ServerConfigApplyPlan, ServerConfigChange, ServerConfigFieldSchema, ServerConfigMutationResult,
ServerConfigSchemaSnapshot, ServerConfigSnapshot, ServerConfigStatusState,
ServerConfigValidationSnapshot, ServerHttpConfigSnapshot, SharedState, WsBroadcast,
};
#[cfg(feature = "rns-hooks-builtin")]
static BUILTIN_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
#[cfg(feature = "rns-hooks-builtin")]
fn register_builtin_continue_hook(label: &str) -> String {
let id = format!(
"test.rns_ctl.{}.{}.{}",
label,
std::process::id(),
BUILTIN_ID_COUNTER.fetch_add(1, Ordering::SeqCst)
);
rns_hooks_crate::register_builtin_hook(id.clone(), builtin_continue_hook)
.expect("register built-in test hook");
id
}
#[cfg(feature = "rns-hooks-builtin")]
fn builtin_continue_hook(
_call: BuiltinHookCall<'_>,
_host: &mut BuiltinHookHost,
) -> Result<HookResult, HookError> {
Ok(HookResult::continue_result())
}
struct TestServer {
ctx: Arc<ServerContext>,
port: u16,
_thread: JoinHandle<()>,
}
impl TestServer {
fn shutdown(&self) {
if let Some(node) = self.ctx.node.lock().unwrap().take() {
node.shutdown();
}
}
}
fn find_free_port() -> u16 {
use std::sync::atomic::{AtomicU16, Ordering};
static NEXT_PORT: AtomicU16 = AtomicU16::new(0);
let pid = std::process::id() as u16;
let base = 20_000 + (pid % 250) * 160;
let _ = NEXT_PORT.compare_exchange(0, base, Ordering::SeqCst, Ordering::SeqCst);
loop {
let port = NEXT_PORT.fetch_add(1, Ordering::SeqCst);
if TcpListener::bind(("127.0.0.1", port)).is_ok() {
return port;
}
}
}
fn bind_test_http_listener() -> TcpListener {
TcpListener::bind(("127.0.0.1", 0)).expect("Failed to bind test HTTP listener")
}
fn start_test_server() -> TestServer {
start_test_server_with_config(
CtlConfig {
host: "127.0.0.1".into(),
port: 0, auth_token: None,
disable_auth: true,
config_path: None,
daemon_mode: false,
tls_cert: None,
tls_key: None,
},
vec![],
)
}
fn start_test_server_with_auth(token: &str) -> TestServer {
start_test_server_with_config(
CtlConfig {
host: "127.0.0.1".into(),
port: 0,
auth_token: Some(token.to_string()),
disable_auth: false,
config_path: None,
daemon_mode: false,
tls_cert: None,
tls_key: None,
},
vec![],
)
}
fn start_test_server_with_config(
mut cfg: CtlConfig,
interfaces: Vec<InterfaceConfig>,
) -> TestServer {
let listener = bind_test_http_listener();
let port = listener.local_addr().unwrap().port();
cfg.port = port;
cfg.host = "127.0.0.1".into();
let shared_state: SharedState = Arc::new(RwLock::new(CtlState::new()));
let ws_broadcast: WsBroadcast = Arc::new(Mutex::new(Vec::new()));
let callbacks = Box::new(CtlCallbacks::new(
shared_state.clone(),
ws_broadcast.clone(),
));
let identity = Identity::new(&mut OsRng);
let node = RnsNode::start(
NodeConfig {
transport_enabled: false,
identity: Some(Identity::from_private_key(
&identity.get_private_key().unwrap(),
)),
interfaces,
..NodeConfig::default()
},
callbacks,
)
.expect("Failed to start test node");
{
let mut s = shared_state.write().unwrap();
s.identity_hash = Some(*identity.hash());
if let Some(prv) = identity.get_private_key() {
s.identity = Some(Identity::from_private_key(&prv));
}
}
let node_handle: NodeHandle = Arc::new(Mutex::new(Some(node)));
let ctx = Arc::new(ServerContext {
node: node_handle,
state: shared_state,
ws_broadcast,
config: Arc::new(RwLock::new(cfg)),
#[cfg(feature = "tls")]
tls_config: None,
});
let ctx2 = ctx.clone();
let thread = thread::Builder::new()
.name("test-server".into())
.spawn(move || {
server::run_server_with_listener(listener, ctx2).expect("test HTTP server failed");
})
.expect("Failed to spawn server thread");
TestServer {
ctx,
port,
_thread: thread,
}
}
struct HttpResult {
status: u16,
body: String,
}
impl HttpResult {
fn json(&self) -> serde_json::Value {
serde_json::from_str(&self.body)
.unwrap_or_else(|e| panic!("Failed to parse JSON: {} body={}", e, self.body))
}
}
fn http_get(port: u16, path: &str) -> HttpResult {
http_request(port, "GET", path, None, None)
}
fn http_get_auth(port: u16, path: &str, token: &str) -> HttpResult {
http_request(port, "GET", path, None, Some(token))
}
fn http_post(port: u16, path: &str, body: &str) -> HttpResult {
http_request(port, "POST", path, Some(body), None)
}
#[allow(dead_code)]
fn http_post_auth(port: u16, path: &str, body: &str, token: &str) -> HttpResult {
http_request(port, "POST", path, Some(body), Some(token))
}
fn http_request(
port: u16,
method: &str,
path: &str,
body: Option<&str>,
token: Option<&str>,
) -> HttpResult {
let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).expect("Failed to connect");
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
let mut request = format!(
"{} {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n",
method, path
);
if let Some(token) = token {
request.push_str(&format!("Authorization: Bearer {}\r\n", token));
}
if let Some(body) = body {
request.push_str(&format!("Content-Length: {}\r\n", body.len()));
request.push_str("Content-Type: application/json\r\n");
request.push_str("\r\n");
request.push_str(body);
} else {
request.push_str("\r\n");
}
stream
.write_all(request.as_bytes())
.expect("Failed to write request");
let mut response = Vec::new();
loop {
let mut buf = [0u8; 4096];
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => response.extend_from_slice(&buf[..n]),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => break,
Err(e) => panic!("Read error: {}", e),
}
}
let response_str = String::from_utf8_lossy(&response);
let status_line = response_str.lines().next().unwrap_or("");
let status: u16 = status_line
.split_whitespace()
.nth(1)
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let body = if let Some(pos) = response_str.find("\r\n\r\n") {
response_str[pos + 4..].to_string()
} else {
String::new()
};
HttpResult { status, body }
}
fn sample_server_config_snapshot() -> ServerConfigSnapshot {
ServerConfigSnapshot {
config_path: Some("/tmp/rns".into()),
resolved_config_dir: "/tmp/rns".into(),
server_config_file_path: "/tmp/rns/rns-server.json".into(),
server_config_file_present: true,
server_config_file_json: "{\n \"http\": {\n \"port\": 8080\n }\n}".into(),
stats_db_path: "/tmp/rns/stats.db".into(),
rnsd_bin: "rnsd".into(),
sentineld_bin: "rns-sentineld".into(),
statsd_bin: "rns-statsd".into(),
http: ServerHttpConfigSnapshot {
enabled: true,
host: "127.0.0.1".into(),
port: 8080,
auth_mode: "disabled".into(),
token_configured: false,
daemon_mode: true,
},
launch_plan: vec![LaunchProcessSnapshot {
name: "rnsd".into(),
bin: "rnsd".into(),
args: vec!["--config".into(), "/tmp/rns".into()],
command_line: "rnsd --config /tmp/rns".into(),
}],
}
}
fn unique_temp_path(name: &str) -> PathBuf {
std::env::temp_dir().join(format!(
"rns-ctl-{}-{}-{}",
name,
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
))
}
fn seed_stats_db(path: &PathBuf) {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).unwrap();
}
let conn = Connection::open(path).unwrap();
conn.execute_batch(
"CREATE TABLE packet_counters (
interface_key TEXT NOT NULL,
interface_id INTEGER NULL,
direction TEXT NOT NULL,
packet_type TEXT NOT NULL,
packets INTEGER NOT NULL,
bytes INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL,
PRIMARY KEY (interface_key, direction, packet_type)
);
CREATE TABLE packet_samples (
ts_ms INTEGER NOT NULL,
interface_key TEXT NOT NULL,
interface_id INTEGER NULL,
direction TEXT NOT NULL,
packet_type TEXT NOT NULL,
packets INTEGER NOT NULL,
bytes INTEGER NOT NULL
);
CREATE TABLE seen_announces (
destination_hash BLOB NOT NULL,
random_hash BLOB NOT NULL,
identity_hash BLOB NOT NULL,
name_hash BLOB NOT NULL,
hops INTEGER NOT NULL,
interface_id INTEGER NULL,
seen_at_ms INTEGER NOT NULL,
PRIMARY KEY (destination_hash, random_hash)
);
CREATE TABLE seen_destinations (
destination_hash BLOB NOT NULL PRIMARY KEY,
identity_hash BLOB NOT NULL,
name_hash BLOB NOT NULL,
first_seen_ms INTEGER NOT NULL,
last_seen_ms INTEGER NOT NULL,
announce_count INTEGER NOT NULL DEFAULT 1,
last_hops INTEGER NOT NULL,
last_interface_id INTEGER NULL
);
CREATE TABLE process_samples (
ts_ms INTEGER NOT NULL PRIMARY KEY,
pid INTEGER NOT NULL,
rss_bytes INTEGER NOT NULL,
cpu_user_ms INTEGER NOT NULL,
cpu_system_ms INTEGER NOT NULL,
threads INTEGER NOT NULL,
fds INTEGER NOT NULL
);
CREATE TABLE provider_drop_samples (
ts_ms INTEGER NOT NULL PRIMARY KEY,
dropped_events INTEGER NOT NULL
);
CREATE TABLE link_event_samples (
ts_ms INTEGER NOT NULL,
link_id BLOB NOT NULL,
interface_id INTEGER NULL,
event_type TEXT NOT NULL
);",
)
.unwrap();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let bucket_2h = now_ms - 2 * 60 * 60 * 1000;
let bucket_1h = now_ms - 60 * 60 * 1000;
let d1 = [0x11u8; 16];
let d2 = [0x22u8; 16];
let r1 = [0x31u8; 16];
let r2 = [0x32u8; 16];
let r3 = [0x33u8; 16];
let i1 = [0x41u8; 16];
let i2 = [0x42u8; 16];
let n1 = [0x51u8; 10];
let n2 = [0x52u8; 10];
let l1 = [0x61u8; 16];
let l2 = [0x62u8; 16];
conn.execute(
"INSERT INTO seen_announces (
destination_hash, random_hash, identity_hash, name_hash, hops, interface_id, seen_at_ms
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
(&d1[..], &r1[..], &i1[..], &n1[..], 1, 7, bucket_2h),
)
.unwrap();
conn.execute(
"INSERT INTO seen_announces (
destination_hash, random_hash, identity_hash, name_hash, hops, interface_id, seen_at_ms
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
(&d1[..], &r2[..], &i1[..], &n1[..], 1, 7, bucket_1h),
)
.unwrap();
conn.execute(
"INSERT INTO seen_announces (
destination_hash, random_hash, identity_hash, name_hash, hops, interface_id, seen_at_ms
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
(&d2[..], &r3[..], &i2[..], &n2[..], 2, 8, bucket_1h + 1000),
)
.unwrap();
conn.execute(
"INSERT INTO seen_destinations (
destination_hash, identity_hash, name_hash, first_seen_ms, last_seen_ms,
announce_count, last_hops, last_interface_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
(&d1[..], &i1[..], &n1[..], bucket_2h, bucket_1h, 2, 1, 7),
)
.unwrap();
conn.execute(
"INSERT INTO seen_destinations (
destination_hash, identity_hash, name_hash, first_seen_ms, last_seen_ms,
announce_count, last_hops, last_interface_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
(
&d2[..],
&i2[..],
&n2[..],
bucket_1h,
bucket_1h + 1000,
1,
2,
8,
),
)
.unwrap();
conn.execute(
"INSERT INTO packet_counters (
interface_key, interface_id, direction, packet_type, packets, bytes, updated_at_ms
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
("iface:7", 7, "in", "announce", 12, 1200, bucket_1h),
)
.unwrap();
conn.execute(
"INSERT INTO packet_counters (
interface_key, interface_id, direction, packet_type, packets, bytes, updated_at_ms
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
("iface:8", 8, "out", "data", 5, 900, bucket_1h + 1000),
)
.unwrap();
conn.execute(
"INSERT INTO packet_samples (
ts_ms, interface_key, interface_id, direction, packet_type, packets, bytes
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
(bucket_2h, "iface:7", 7, "rx", "announce", 4, 400),
)
.unwrap();
conn.execute(
"INSERT INTO packet_samples (
ts_ms, interface_key, interface_id, direction, packet_type, packets, bytes
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
(bucket_1h, "iface:7", 7, "rx", "announce", 8, 800),
)
.unwrap();
conn.execute(
"INSERT INTO packet_samples (
ts_ms, interface_key, interface_id, direction, packet_type, packets, bytes
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
(bucket_1h + 1000, "iface:8", 8, "tx", "data", 5, 900),
)
.unwrap();
conn.execute(
"INSERT INTO process_samples (
ts_ms, pid, rss_bytes, cpu_user_ms, cpu_system_ms, threads, fds
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
(bucket_2h, 999, 10_000_000i64, 100i64, 50i64, 4i64, 12i64),
)
.unwrap();
conn.execute(
"INSERT INTO process_samples (
ts_ms, pid, rss_bytes, cpu_user_ms, cpu_system_ms, threads, fds
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
(bucket_1h, 999, 12_000_000i64, 180i64, 70i64, 5i64, 14i64),
)
.unwrap();
conn.execute(
"INSERT INTO provider_drop_samples (ts_ms, dropped_events) VALUES (?1, ?2)",
(bucket_1h, 3i64),
)
.unwrap();
conn.execute(
"INSERT INTO link_event_samples (ts_ms, link_id, interface_id, event_type) VALUES (?1, ?2, ?3, ?4)",
(bucket_2h, &l1[..], 7, "requested"),
)
.unwrap();
conn.execute(
"INSERT INTO link_event_samples (ts_ms, link_id, interface_id, event_type) VALUES (?1, ?2, ?3, ?4)",
(bucket_1h, &l1[..], 7, "established"),
)
.unwrap();
conn.execute(
"INSERT INTO link_event_samples (ts_ms, link_id, interface_id, event_type) VALUES (?1, ?2, ?3, ?4)",
(bucket_1h + 500, &l2[..], 8, "closed"),
)
.unwrap();
}
fn configure_stats_db(server: &TestServer, stats_db_path: &PathBuf) {
let mut snapshot = sample_server_config_snapshot();
snapshot.stats_db_path = stats_db_path.display().to_string();
let mut state = server.ctx.state.write().unwrap();
state.server_config = Some(snapshot);
}
fn seed_legacy_stats_db(path: &PathBuf) {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).unwrap();
}
let conn = Connection::open(path).unwrap();
conn.execute_batch(
"CREATE TABLE packet_counters (
interface_key TEXT NOT NULL,
interface_id INTEGER NULL,
direction TEXT NOT NULL,
packet_type TEXT NOT NULL,
packets INTEGER NOT NULL,
bytes INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL,
PRIMARY KEY (interface_key, direction, packet_type)
);
CREATE TABLE seen_announces (
destination_hash BLOB NOT NULL,
random_hash BLOB NOT NULL,
identity_hash BLOB NOT NULL,
name_hash BLOB NOT NULL,
hops INTEGER NOT NULL,
interface_id INTEGER NULL,
seen_at_ms INTEGER NOT NULL,
PRIMARY KEY (destination_hash, random_hash)
);
CREATE TABLE seen_destinations (
destination_hash BLOB NOT NULL PRIMARY KEY,
identity_hash BLOB NOT NULL,
name_hash BLOB NOT NULL,
first_seen_ms INTEGER NOT NULL,
last_seen_ms INTEGER NOT NULL,
announce_count INTEGER NOT NULL DEFAULT 1,
last_hops INTEGER NOT NULL,
last_interface_id INTEGER NULL
);
CREATE TABLE process_samples (
ts_ms INTEGER NOT NULL PRIMARY KEY,
pid INTEGER NOT NULL,
rss_bytes INTEGER NOT NULL,
cpu_user_ms INTEGER NOT NULL,
cpu_system_ms INTEGER NOT NULL,
threads INTEGER NOT NULL,
fds INTEGER NOT NULL
);
CREATE TABLE provider_drop_samples (
ts_ms INTEGER NOT NULL PRIMARY KEY,
dropped_events INTEGER NOT NULL
);",
)
.unwrap();
}
fn sample_server_config_schema() -> ServerConfigSchemaSnapshot {
ServerConfigSchemaSnapshot {
format: "rns-server.json".into(),
example_config_json: "{\n \"http\": {\n \"port\": 8080\n }\n}".into(),
notes: vec!["Config note".into()],
fields: vec![ServerConfigFieldSchema {
field: "http.port".into(),
field_type: "u16".into(),
required: false,
default_value: "8080".into(),
description: "HTTP port".into(),
effect: "restart rns-server".into(),
}],
}
}
fn sample_apply_plan() -> ServerConfigApplyPlan {
ServerConfigApplyPlan {
overall_action: "restart_children".into(),
processes_to_restart: vec!["rns-statsd".into()],
control_plane_reload_required: false,
control_plane_restart_required: false,
notes: vec!["Restart required for: rns-statsd.".into()],
changes: vec![ServerConfigChange {
field: "stats_db_path".into(),
before: "/tmp/rns/stats.db".into(),
after: "/tmp/rns/other.db".into(),
effect: "restart rns-statsd".into(),
}],
}
}
#[test]
fn test_health_endpoint() {
let server = start_test_server();
let res = http_get(server.port, "/health");
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["status"], "healthy");
server.shutdown();
}
#[test]
fn test_no_auth_ui_serves_app_shell() {
let server = start_test_server();
let res = http_get(server.port, "/");
assert_eq!(res.status, 200);
assert!(res.body.contains("<title>RNS Server</title>"));
assert!(res.body.contains("/assets/app.js"));
assert!(!res.body.contains("location.replace"));
server.shutdown();
}
#[test]
fn test_auth_required() {
let server = start_test_server_with_auth("test-secret-token");
let res = http_get(server.port, "/api/info");
assert_eq!(res.status, 401);
server.shutdown();
}
#[test]
fn test_auth_valid_token() {
let server = start_test_server_with_auth("test-secret-token");
let res = http_get_auth(server.port, "/api/info", "test-secret-token");
assert_eq!(res.status, 200);
server.shutdown();
}
#[test]
fn test_auth_invalid_token() {
let server = start_test_server_with_auth("test-secret-token");
let res = http_get_auth(server.port, "/api/info", "wrong-token");
assert_eq!(res.status, 401);
server.shutdown();
}
#[test]
fn test_health_no_auth() {
let server = start_test_server_with_auth("test-secret-token");
let res = http_get(server.port, "/health");
assert_eq!(res.status, 200);
assert_eq!(res.json()["status"], "healthy");
server.shutdown();
}
#[test]
fn test_get_info() {
let server = start_test_server();
let res = http_get(server.port, "/api/info");
assert_eq!(res.status, 200);
let json = res.json();
let identity_hash = json["identity_hash"].as_str().unwrap();
assert_eq!(identity_hash.len(), 32);
let uptime = json["uptime_seconds"].as_f64().unwrap();
assert!(uptime < 30.0);
server.shutdown();
}
#[test]
#[cfg(feature = "rns-hooks-builtin")]
fn test_hook_api_loads_reloads_lists_and_unloads_builtin_hook() {
let server = start_test_server();
let first_builtin_id = register_builtin_continue_hook("api_load");
let second_builtin_id = register_builtin_continue_hook("api_reload");
let load_body = format!(
r#"{{"name":"api_builtin","path":"{0}","type":"builtin","builtin_id":"{0}","attach_point":"Tick","priority":13}}"#,
first_builtin_id
);
let res = http_post(server.port, "/api/hook/load", &load_body);
assert_eq!(res.status, 200);
assert_eq!(res.json()["status"], "loaded");
let res = http_get(server.port, "/api/hooks");
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["hooks"].as_array().unwrap().len(), 1);
assert_eq!(json["hooks"][0]["name"], "api_builtin");
assert_eq!(json["hooks"][0]["type"], "builtin");
assert_eq!(json["hooks"][0]["attach_point"], "Tick");
assert_eq!(json["hooks"][0]["priority"], 13);
let reload_body = format!(
r#"{{"name":"api_builtin","path":"{0}","type":"builtin","builtin_id":"{0}","attach_point":"Tick"}}"#,
second_builtin_id
);
let res = http_post(server.port, "/api/hook/reload", &reload_body);
assert_eq!(res.status, 200);
assert_eq!(res.json()["status"], "reloaded");
let unload_body = r#"{"name":"api_builtin","attach_point":"Tick"}"#;
let res = http_post(server.port, "/api/hook/unload", unload_body);
assert_eq!(res.status, 200);
assert_eq!(res.json()["status"], "unloaded");
let res = http_get(server.port, "/api/hooks");
assert_eq!(res.status, 200);
assert!(res.json()["hooks"].as_array().unwrap().is_empty());
server.shutdown();
}
#[test]
fn test_get_node_reports_drain_status() {
let server = start_test_server();
{
let guard = server.ctx.node.lock().unwrap();
let node = guard.as_ref().unwrap();
node.begin_drain(Duration::from_secs(5)).unwrap();
}
let res = http_get(server.port, "/api/node");
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["drain"]["state"], "draining");
assert_eq!(json["drain"]["drain_complete"], true);
assert!(json["drain"]["drain_age_seconds"].as_f64().unwrap() >= 0.0);
assert!(
json["drain"]["deadline_remaining_seconds"]
.as_f64()
.unwrap()
> 0.0
);
assert_eq!(json["drain"]["interface_writer_queued_frames"], 0);
assert_eq!(json["drain"]["provider_backlog_events"], 0);
assert_eq!(json["drain"]["provider_consumer_queued_events"], 0);
assert!(json["drain"]["detail"]
.as_str()
.unwrap()
.contains("no active links"));
server.shutdown();
}
#[test]
fn test_send_returns_conflict_while_draining() {
let server = start_test_server();
let register = http_post(
server.port,
"/api/destination",
r#"{"type":"plain","app_name":"drain-test","aspects":["send"]}"#,
);
assert_eq!(register.status, 201);
let dest_hash = register.json()["dest_hash"].as_str().unwrap().to_string();
{
let guard = server.ctx.node.lock().unwrap();
let node = guard.as_ref().unwrap();
node.begin_drain(Duration::from_secs(5)).unwrap();
}
let res = http_post(
server.port,
"/api/send",
&format!(r#"{{"dest_hash":"{dest_hash}","data":"aGVsbG8="}}"#),
);
assert_eq!(res.status, 409);
assert!(res.json()["error"]
.as_str()
.unwrap()
.contains("draining existing work"));
server.shutdown();
}
#[test]
fn test_get_interfaces_empty() {
let server = start_test_server();
let res = http_get(server.port, "/api/interfaces");
assert_eq!(res.status, 200);
let json = res.json();
let ifaces = json["interfaces"].as_array().unwrap();
assert!(ifaces.is_empty());
server.shutdown();
}
#[test]
fn test_get_destinations_initial() {
let server = start_test_server();
let res = http_get(server.port, "/api/destinations");
assert_eq!(res.status, 200);
let json = res.json();
assert!(json["destinations"].as_array().is_some());
server.shutdown();
}
#[test]
fn test_get_paths_empty() {
let server = start_test_server();
let res = http_get(server.port, "/api/paths");
assert_eq!(res.status, 200);
let json = res.json();
let paths = json["paths"].as_array().unwrap();
assert!(paths.is_empty());
server.shutdown();
}
#[test]
fn test_get_links_empty() {
let server = start_test_server();
let res = http_get(server.port, "/api/links");
assert_eq!(res.status, 200);
let json = res.json();
let links = json["links"].as_array().unwrap();
assert!(links.is_empty());
server.shutdown();
}
#[test]
fn test_get_resources_empty() {
let server = start_test_server();
let res = http_get(server.port, "/api/resources");
assert_eq!(res.status, 200);
let json = res.json();
let resources = json["resources"].as_array().unwrap();
assert!(resources.is_empty());
server.shutdown();
}
#[test]
fn test_get_announces_empty() {
let server = start_test_server();
let res = http_get(server.port, "/api/announces");
assert_eq!(res.status, 200);
let json = res.json();
let announces = json["announces"].as_array().unwrap();
assert!(announces.is_empty());
server.shutdown();
}
#[test]
fn test_clear_announce_queues() {
let server = start_test_server();
let res = http_post(server.port, "/api/announce_queues/clear", "{}");
assert_eq!(res.status, 200);
assert_eq!(res.json()["status"], "ok");
server.shutdown();
}
#[test]
fn test_get_packets_empty() {
let server = start_test_server();
let res = http_get(server.port, "/api/packets");
assert_eq!(res.status, 200);
let json = res.json();
let packets = json["packets"].as_array().unwrap();
assert!(packets.is_empty());
server.shutdown();
}
#[test]
fn test_stats_summary_and_rankings() {
let server = start_test_server();
let stats_db_path = unique_temp_path("stats-summary.db");
seed_stats_db(&stats_db_path);
configure_stats_db(&server, &stats_db_path);
let summary = http_get(server.port, "/api/stats/summary?window=24h");
assert_eq!(summary.status, 200);
let summary_json = summary.json();
assert_eq!(summary_json["announces"]["total"], 3);
assert_eq!(summary_json["announces"]["unique_destinations"], 2);
assert_eq!(summary_json["announces"]["unique_identities"], 2);
assert_eq!(summary_json["packets"]["rx_packets"], 12);
assert_eq!(summary_json["packets"]["tx_packets"], 5);
assert_eq!(summary_json["system"]["provider_dropped_events"], 3);
assert_eq!(summary_json["system"]["latest_process_sample"]["pid"], 999);
let interfaces = http_get(server.port, "/api/stats/interfaces?window=24h&limit=5");
assert_eq!(interfaces.status, 200);
let interfaces_json = interfaces.json();
let interface_rows = interfaces_json["interfaces"].as_array().unwrap();
assert_eq!(interface_rows.len(), 2);
assert_eq!(interface_rows[0]["interface_id"], 7);
assert_eq!(interface_rows[0]["announce_count"], 2);
let destinations = http_get(server.port, "/api/stats/destinations?window=24h&limit=1");
assert_eq!(destinations.status, 200);
let destinations_json = destinations.json();
let destination_rows = destinations_json["destinations"].as_array().unwrap();
assert_eq!(destination_rows.len(), 1);
assert_eq!(destination_rows[0]["announce_count"], 2);
assert_eq!(
destination_rows[0]["destination_hash"].as_str().unwrap(),
"11111111111111111111111111111111"
);
let packets = http_get(server.port, "/api/stats/packets?window=24h&limit=10");
assert_eq!(packets.status, 200);
let packets_json = packets.json();
let counters = packets_json["counters"].as_array().unwrap();
assert_eq!(counters.len(), 2);
server.shutdown();
let _ = std::fs::remove_file(stats_db_path);
}
#[test]
fn test_stats_timeseries_and_system_anomalies() {
let server = start_test_server();
let stats_db_path = unique_temp_path("stats-series.db");
seed_stats_db(&stats_db_path);
configure_stats_db(&server, &stats_db_path);
let announces = http_get(server.port, "/api/stats/announces?window=6h&bucket=1h");
assert_eq!(announces.status, 200);
let announces_json = announces.json();
let series = announces_json["series"].as_array().unwrap();
assert!(series.len() >= 6);
assert!(
series
.iter()
.map(|bucket| bucket["announce_count"].as_i64().unwrap_or(0))
.sum::<i64>()
>= 3
);
let system = http_get(server.port, "/api/stats/system?window=6h&bucket=1h");
assert_eq!(system.status, 200);
let system_json = system.json();
assert_eq!(system_json["latest_process_sample"]["fds"], 14);
let anomaly_buckets = system_json["anomalies"]["provider_drop_buckets"]
.as_array()
.unwrap();
assert_eq!(anomaly_buckets.len(), 1);
assert_eq!(anomaly_buckets[0]["provider_dropped_events"], 3);
let packet_series = http_get(server.port, "/api/stats/packets/series?window=6h&bucket=1h");
assert_eq!(packet_series.status, 200);
let packet_series_json = packet_series.json();
let packet_buckets = packet_series_json["series"].as_array().unwrap();
assert!(packet_buckets.len() >= 6);
assert!(
packet_buckets
.iter()
.map(|bucket| bucket["total_packets"].as_i64().unwrap_or(0))
.sum::<i64>()
>= 17
);
let links = http_get(server.port, "/api/stats/links?window=6h&bucket=1h&limit=5");
assert_eq!(links.status, 200);
let links_json = links.json();
let link_buckets = links_json["series"].as_array().unwrap();
assert!(link_buckets.len() >= 6);
let link_interfaces = links_json["interfaces"].as_array().unwrap();
assert_eq!(link_interfaces.len(), 2);
assert_eq!(link_interfaces[0]["interface_id"], 7);
let close_buckets = links_json["anomalies"]["close_buckets"].as_array().unwrap();
assert_eq!(close_buckets.len(), 1);
assert_eq!(close_buckets[0]["closed"], 1);
server.shutdown();
let _ = std::fs::remove_file(stats_db_path);
}
#[test]
fn test_stats_history_endpoints_are_backward_compatible_with_legacy_db() {
let server = start_test_server();
let stats_db_path = unique_temp_path("stats-legacy.db");
seed_legacy_stats_db(&stats_db_path);
configure_stats_db(&server, &stats_db_path);
let packet_series = http_get(server.port, "/api/stats/packets/series?window=6h&bucket=1h");
assert_eq!(packet_series.status, 200);
let packet_series_json = packet_series.json();
let packet_buckets = packet_series_json["series"].as_array().unwrap();
assert!(packet_buckets.len() >= 6);
assert!(packet_buckets
.iter()
.all(|bucket| bucket["total_packets"].as_i64().unwrap_or(-1) == 0));
assert_eq!(
packet_series_json["anomalies"]["busy_buckets"]
.as_array()
.unwrap()
.len(),
0
);
let links = http_get(server.port, "/api/stats/links?window=6h&bucket=1h&limit=5");
assert_eq!(links.status, 200);
let links_json = links.json();
let link_buckets = links_json["series"].as_array().unwrap();
assert!(link_buckets.len() >= 6);
assert!(link_buckets
.iter()
.all(|bucket| bucket["closed"].as_i64().unwrap_or(-1) == 0));
assert!(links_json["interfaces"].as_array().unwrap().is_empty());
server.shutdown();
let _ = std::fs::remove_file(stats_db_path);
}
#[test]
fn test_get_proofs_empty() {
let server = start_test_server();
let res = http_get(server.port, "/api/proofs");
assert_eq!(res.status, 200);
let json = res.json();
let proofs = json["proofs"].as_array().unwrap();
assert!(proofs.is_empty());
server.shutdown();
}
#[test]
fn test_get_config_snapshot() {
let server = start_test_server();
{
let mut state = server.ctx.state.write().unwrap();
state.server_config = Some(sample_server_config_snapshot());
}
let res = http_get(server.port, "/api/config");
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["config"]["stats_db_path"], "/tmp/rns/stats.db");
assert_eq!(json["config"]["rnsd_bin"], "rnsd");
assert_eq!(json["config"]["launch_plan"][0]["name"], "rnsd");
server.shutdown();
}
#[test]
fn test_get_config_schema() {
let server = start_test_server();
{
let mut state = server.ctx.state.write().unwrap();
state.server_config_schema = Some(sample_server_config_schema());
}
let res = http_get(server.port, "/api/config/schema");
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["schema"]["format"], "rns-server.json");
assert_eq!(json["schema"]["fields"][0]["field"], "http.port");
server.shutdown();
}
#[test]
fn test_get_config_status() {
let server = start_test_server();
{
let mut state = server.ctx.state.write().unwrap();
state.server_config_status = ServerConfigStatusState {
last_action: Some("save".into()),
control_plane_reload_required: true,
runtime_differs_from_saved: true,
last_apply_plan: Some(ServerConfigApplyPlan {
overall_action: "reload_control_plane".into(),
processes_to_restart: Vec::new(),
control_plane_reload_required: true,
control_plane_restart_required: false,
notes: vec![
"Embedded control-plane auth settings will be reloaded in place.".into(),
],
changes: vec![ServerConfigChange {
field: "http.auth_token".into(),
before: "unset".into(),
after: "set(10 chars)".into(),
effect: "reload embedded HTTP auth".into(),
}],
}),
..ServerConfigStatusState::default()
};
}
let res = http_get(server.port, "/api/config/status");
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["status"]["last_action"], "save");
assert_eq!(json["status"]["converged"], false);
assert_eq!(json["status"]["pending_action"], "reload_control_plane");
assert_eq!(json["status"]["pending_targets"][0], "embedded-http-auth");
assert!(json["status"]["blocking_reason"]
.as_str()
.unwrap()
.contains("Apply config"));
server.shutdown();
}
#[test]
fn test_get_processes_exposes_log_metadata() {
let server = start_test_server();
{
let mut state = server.ctx.state.write().unwrap();
state.processes.insert(
"rns-statsd".into(),
ManagedProcessState {
name: "rns-statsd".into(),
status: "running".into(),
ready: true,
ready_state: "ready".into(),
pid: Some(4242),
last_exit_code: None,
restart_count: 2,
drain_ack_count: 1,
forced_kill_count: 0,
last_error: None,
status_detail: Some("stats pipeline active".into()),
durable_log_path: Some("/tmp/rns/logs/rns-statsd.log".into()),
last_log_at: Some(std::time::Instant::now()),
recent_log_lines: 3,
started_at: Some(std::time::Instant::now()),
last_transition_at: Some(std::time::Instant::now()),
},
);
}
let res = http_get(server.port, "/api/processes");
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(
json["processes"][0]["durable_log_path"],
"/tmp/rns/logs/rns-statsd.log"
);
assert_eq!(json["processes"][0]["drain_ack_count"], 1);
assert_eq!(json["processes"][0]["forced_kill_count"], 0);
assert_eq!(json["processes"][0]["recent_log_lines"], 3);
assert!(json["processes"][0]["last_log_age_seconds"]
.as_f64()
.is_some());
server.shutdown();
}
#[test]
fn test_get_processes_exposes_readiness_and_failure_detail() {
let server = start_test_server();
{
let mut state = server.ctx.state.write().unwrap();
state.processes.insert(
"rns-sentineld".into(),
ManagedProcessState {
name: "rns-sentineld".into(),
status: "failed".into(),
ready: false,
ready_state: "not-ready".into(),
pid: None,
last_exit_code: Some(17),
restart_count: 4,
drain_ack_count: 2,
forced_kill_count: 1,
last_error: Some("hook registration timed out".into()),
status_detail: Some("waiting for provider bridge".into()),
durable_log_path: Some("/tmp/rns/logs/rns-sentineld.log".into()),
last_log_at: Some(std::time::Instant::now()),
recent_log_lines: 12,
started_at: None,
last_transition_at: Some(std::time::Instant::now()),
},
);
}
let res = http_get(server.port, "/api/processes");
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["processes"][0]["name"], "rns-sentineld");
assert_eq!(json["processes"][0]["ready_state"], "not-ready");
assert_eq!(json["processes"][0]["restart_count"], 4);
assert_eq!(json["processes"][0]["drain_ack_count"], 2);
assert_eq!(json["processes"][0]["forced_kill_count"], 1);
assert_eq!(json["processes"][0]["last_exit_code"], 17);
assert_eq!(
json["processes"][0]["last_error"],
"hook registration timed out"
);
assert_eq!(
json["processes"][0]["status_detail"],
"waiting for provider bridge"
);
server.shutdown();
}
#[test]
fn test_get_process_logs_exposes_log_metadata() {
let server = start_test_server();
ensure_process(&server.ctx.state, "rns-statsd");
{
let mut state = server.ctx.state.write().unwrap();
let process = state.processes.get_mut("rns-statsd").unwrap();
process.durable_log_path = Some("/tmp/rns/logs/rns-statsd.log".into());
}
push_process_log(&server.ctx.state, "rns-statsd", "stdout", "statsd started");
let res = http_get(server.port, "/api/processes/rns-statsd/logs");
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["process"], "rns-statsd");
assert_eq!(json["durable_log_path"], "/tmp/rns/logs/rns-statsd.log");
assert_eq!(json["recent_log_lines"], 1);
assert_eq!(json["lines"][0]["line"], "statsd started");
assert!(json["last_log_age_seconds"].as_f64().is_some());
server.shutdown();
}
#[test]
fn test_get_process_logs_limit_and_missing_process() {
let server = start_test_server();
ensure_process(&server.ctx.state, "rns-statsd");
push_process_log(&server.ctx.state, "rns-statsd", "stdout", "line one");
push_process_log(&server.ctx.state, "rns-statsd", "stdout", "line two");
push_process_log(&server.ctx.state, "rns-statsd", "stderr", "line three");
let limited = http_get(server.port, "/api/processes/rns-statsd/logs?limit=2");
assert_eq!(limited.status, 200);
let limited_json = limited.json();
let lines = limited_json["lines"].as_array().unwrap();
assert_eq!(lines.len(), 2);
assert_eq!(lines[0]["line"], "line three");
assert_eq!(lines[1]["line"], "line two");
let missing = http_get(server.port, "/api/processes/missing/logs");
assert_eq!(missing.status, 404);
server.shutdown();
}
#[test]
fn test_config_validate_endpoint_uses_validator() {
let server = start_test_server();
{
let mut state = server.ctx.state.write().unwrap();
state.server_config_validator = Some(Arc::new(|body| {
let parsed: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(parsed["http"]["port"], 9090);
Ok(ServerConfigValidationSnapshot {
valid: true,
config: sample_server_config_snapshot(),
warnings: vec!["validation warning".into()],
})
}));
}
let res = http_post(
server.port,
"/api/config/validate",
r#"{"http":{"port":9090}}"#,
);
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["result"]["valid"], true);
assert_eq!(json["result"]["warnings"][0], "validation warning");
server.shutdown();
}
#[test]
fn test_config_save_endpoint_uses_mutator() {
let server = start_test_server();
{
let mut state = server.ctx.state.write().unwrap();
state.server_config_mutator = Some(Arc::new(|mode, body| {
match mode {
rns_ctl::state::ServerConfigMutationMode::Save => {}
_ => panic!("expected save mode"),
}
let parsed: serde_json::Value = serde_json::from_slice(body).unwrap();
assert_eq!(parsed["stats_db_path"], "/tmp/rns/other.db");
Ok(ServerConfigMutationResult {
action: "save".into(),
config: sample_server_config_snapshot(),
apply_plan: sample_apply_plan(),
warnings: vec!["save warning".into()],
})
}));
}
let res = http_post(
server.port,
"/api/config",
r#"{"stats_db_path":"/tmp/rns/other.db"}"#,
);
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["result"]["action"], "save");
assert_eq!(
json["result"]["apply_plan"]["overall_action"],
"restart_children"
);
assert_eq!(json["result"]["warnings"][0], "save warning");
assert_eq!(
json["result"]["apply_plan"]["processes_to_restart"][0],
"rns-statsd"
);
server.shutdown();
}
#[test]
fn test_config_apply_endpoint_uses_mutator() {
let server = start_test_server();
{
let mut state = server.ctx.state.write().unwrap();
state.server_config_mutator = Some(Arc::new(|mode, _body| {
match mode {
rns_ctl::state::ServerConfigMutationMode::Apply => {}
_ => panic!("expected apply mode"),
}
Ok(ServerConfigMutationResult {
action: "apply".into(),
config: sample_server_config_snapshot(),
apply_plan: sample_apply_plan(),
warnings: Vec::new(),
})
}));
}
let res = http_post(
server.port,
"/api/config/apply",
r#"{"stats_db_path":"/tmp/rns/other.db"}"#,
);
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["result"]["action"], "apply");
assert_eq!(
json["result"]["apply_plan"]["overall_action"],
"restart_children"
);
assert_eq!(
json["result"]["apply_plan"]["changes"][0]["field"],
"stats_db_path"
);
server.shutdown();
}
#[test]
fn test_config_validate_endpoint_returns_bad_request_on_validation_error() {
let server = start_test_server();
{
let mut state = server.ctx.state.write().unwrap();
state.server_config_validator = Some(Arc::new(|_body| {
Err("stats_db_path must be absolute".into())
}));
}
let res = http_post(
server.port,
"/api/config/validate",
r#"{"stats_db_path":"relative.db"}"#,
);
assert_eq!(res.status, 400);
assert_eq!(res.json()["error"], "stats_db_path must be absolute");
server.shutdown();
}
#[test]
fn test_get_config_status_reports_restart_pending_targets() {
let server = start_test_server();
{
let mut state = server.ctx.state.write().unwrap();
state.server_config_status = ServerConfigStatusState {
last_action: Some("apply".into()),
runtime_differs_from_saved: true,
pending_process_restarts: vec!["rns-statsd".into()],
control_plane_restart_required: true,
last_apply_plan: Some(ServerConfigApplyPlan {
overall_action: "restart_children_and_server".into(),
processes_to_restart: vec!["rns-statsd".into()],
control_plane_reload_required: false,
control_plane_restart_required: true,
notes: vec!["Restart required for: rns-statsd and embedded HTTP bind.".into()],
changes: vec![
ServerConfigChange {
field: "stats_db_path".into(),
before: "/tmp/rns/stats.db".into(),
after: "/tmp/rns/other.db".into(),
effect: "restart rns-statsd".into(),
},
ServerConfigChange {
field: "http.port".into(),
before: "8080".into(),
after: "9090".into(),
effect: "restart rns-server".into(),
},
],
}),
..ServerConfigStatusState::default()
};
}
let res = http_get(server.port, "/api/config/status");
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(
json["status"]["pending_action"],
"restart_children_and_server"
);
assert_eq!(json["status"]["pending_targets"][0], "rns-statsd");
assert_eq!(json["status"]["pending_targets"][1], "rns-server");
assert!(json["status"]["blocking_reason"]
.as_str()
.unwrap()
.contains("Restart rns-server"));
server.shutdown();
}
#[test]
fn test_process_control_returns_internal_error_without_supervisor() {
let server = start_test_server();
let res = http_post(server.port, "/api/processes/rns-statsd/restart", "{}");
assert_eq!(res.status, 500);
assert_eq!(res.json()["error"], "Process control is not enabled");
server.shutdown();
}
#[test]
fn test_process_control_queues_commands_when_supervision_enabled() {
let server = start_test_server();
let (tx, rx) = mpsc::channel();
{
let mut state = server.ctx.state.write().unwrap();
state.control_tx = Some(tx);
}
let res = http_post(server.port, "/api/processes/rns-statsd/restart", "{}");
assert_eq!(res.status, 200);
assert_eq!(res.json()["queued"], true);
assert_eq!(res.json()["action"], "restart");
assert_eq!(res.json()["process"], "rns-statsd");
match rx.recv_timeout(Duration::from_secs(1)).unwrap() {
rns_ctl::state::ProcessControlCommand::Restart(name) => {
assert_eq!(name, "rns-statsd");
}
_ => panic!("unexpected command variant"),
}
server.shutdown();
}
#[test]
fn test_register_single_destination() {
let server = start_test_server();
let body = r#"{"type":"single","app_name":"test_app","aspects":["echo"]}"#;
let res = http_post(server.port, "/api/destination", body);
assert_eq!(res.status, 201);
let json = res.json();
assert_eq!(json["type"], "single");
assert_eq!(json["name"], "test_app.echo");
let dh = json["dest_hash"].as_str().unwrap();
assert_eq!(dh.len(), 32);
server.shutdown();
}
#[test]
fn test_register_plain_destination() {
let server = start_test_server();
let body = r#"{"type":"plain","app_name":"test_app","aspects":["broadcast"]}"#;
let res = http_post(server.port, "/api/destination", body);
assert_eq!(res.status, 201);
let json = res.json();
assert_eq!(json["type"], "plain");
assert_eq!(json["name"], "test_app.broadcast");
server.shutdown();
}
#[test]
fn test_register_group_destination() {
let server = start_test_server();
let body = r#"{"type":"group","app_name":"test_app","aspects":["group"]}"#;
let res = http_post(server.port, "/api/destination", body);
assert_eq!(res.status, 201);
let json = res.json();
assert_eq!(json["type"], "group");
let gk = json["group_key"].as_str().unwrap();
assert!(!gk.is_empty());
server.shutdown();
}
#[test]
fn test_destinations_after_register() {
let server = start_test_server();
let body = r#"{"type":"plain","app_name":"myapp","aspects":["test"]}"#;
let reg = http_post(server.port, "/api/destination", body);
assert_eq!(reg.status, 201);
let dest_hash = reg.json()["dest_hash"].as_str().unwrap().to_string();
let res = http_get(server.port, "/api/destinations");
assert_eq!(res.status, 200);
let json = res.json();
let dests = json["destinations"].as_array().unwrap();
assert!(dests.iter().any(|d| d["hash"].as_str() == Some(&dest_hash)));
server.shutdown();
}
#[test]
fn test_announce_destination() {
let server = start_test_server();
let body = r#"{"type":"single","app_name":"test_app","aspects":["ann"]}"#;
let reg = http_post(server.port, "/api/destination", body);
assert_eq!(reg.status, 201);
let dest_hash = reg.json()["dest_hash"].as_str().unwrap().to_string();
let ann_body = format!(r#"{{"dest_hash":"{}"}}"#, dest_hash);
let res = http_post(server.port, "/api/announce", &ann_body);
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["status"], "announced");
server.shutdown();
}
#[test]
fn test_register_bad_type() {
let server = start_test_server();
let body = r#"{"type":"invalid","app_name":"test_app","aspects":["echo"]}"#;
let res = http_post(server.port, "/api/destination", body);
assert_eq!(res.status, 400);
server.shutdown();
}
#[test]
fn test_send_packet_no_dest() {
let server = start_test_server();
let body = r#"{"dest_hash":"00000000000000000000000000000000","data":"aGVsbG8="}"#;
let res = http_post(server.port, "/api/send", body);
assert_eq!(res.status, 400); server.shutdown();
}
#[test]
fn test_path_request() {
let server = start_test_server();
let body = r#"{"dest_hash":"00000000000000000000000000000000"}"#;
let res = http_post(server.port, "/api/path/request", body);
assert!(res.status == 200 || res.status == 500);
server.shutdown();
}
#[test]
fn test_not_found() {
let server = start_test_server();
let res = http_get(server.port, "/nonexistent");
assert_eq!(res.status, 404);
server.shutdown();
}
#[test]
fn test_bad_json() {
let server = start_test_server();
let res = http_post(server.port, "/api/destination", "not-json");
assert_eq!(res.status, 400);
let json = res.json();
assert!(json["error"].as_str().unwrap().contains("Invalid JSON"));
server.shutdown();
}
#[test]
fn test_missing_fields() {
let server = start_test_server();
let body = r#"{"type":"single","aspects":["echo"]}"#;
let res = http_post(server.port, "/api/destination", body);
assert_eq!(res.status, 400);
let json = res.json();
assert!(json["error"].as_str().unwrap().contains("app_name"));
server.shutdown();
}
struct TestPair {
server_a: TestServer,
server_b: TestServer,
}
impl TestPair {
fn shutdown(&self) {
self.server_b.shutdown();
self.server_a.shutdown();
}
}
fn start_test_pair() -> TestPair {
let tcp_port = find_free_port();
let http_port_a = find_free_port();
let http_port_b = find_free_port();
let cfg_a = CtlConfig {
host: "127.0.0.1".into(),
port: http_port_a,
auth_token: None,
disable_auth: true,
config_path: None,
daemon_mode: false,
tls_cert: None,
tls_key: None,
};
let ifaces_a = vec![InterfaceConfig {
name: String::new(),
type_name: "TCPServerInterface".to_string(),
config_data: Box::new(TcpServerConfig {
name: "Test TCP Server".into(),
listen_ip: "127.0.0.1".into(),
listen_port: tcp_port,
interface_id: InterfaceId(1),
max_connections: None,
..TcpServerConfig::default()
}),
mode: MODE_FULL,
ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
ifac: None,
discovery: None,
}];
let server_a = start_test_server_with_config(cfg_a, ifaces_a);
let cfg_b = CtlConfig {
host: "127.0.0.1".into(),
port: http_port_b,
auth_token: None,
disable_auth: true,
config_path: None,
daemon_mode: false,
tls_cert: None,
tls_key: None,
};
let ifaces_b = vec![InterfaceConfig {
name: String::new(),
type_name: "TCPClientInterface".to_string(),
config_data: Box::new(TcpClientConfig {
name: "Test TCP Client".into(),
target_host: "127.0.0.1".into(),
target_port: tcp_port,
interface_id: InterfaceId(1),
..Default::default()
}),
mode: MODE_FULL,
ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
ifac: None,
discovery: None,
}];
let server_b = start_test_server_with_config(cfg_b, ifaces_b);
thread::sleep(Duration::from_secs(1));
TestPair { server_a, server_b }
}
#[test]
fn test_announce_propagation() {
let pair = start_test_pair();
let body = r#"{"type":"single","app_name":"test_prop","aspects":["echo"]}"#;
let reg = http_post(pair.server_a.port, "/api/destination", body);
assert_eq!(reg.status, 201);
let dest_hash = reg.json()["dest_hash"].as_str().unwrap().to_string();
let ann_body = format!(r#"{{"dest_hash":"{}"}}"#, dest_hash);
let ann = http_post(pair.server_a.port, "/api/announce", &ann_body);
assert_eq!(ann.status, 200);
let mut found = false;
for _ in 0..20 {
thread::sleep(Duration::from_millis(500));
let res = http_get(pair.server_b.port, "/api/announces");
if res.status == 200 {
let json = res.json();
if let Some(announces) = json["announces"].as_array() {
if announces
.iter()
.any(|a| a["dest_hash"].as_str() == Some(&dest_hash))
{
found = true;
break;
}
}
}
}
assert!(
found,
"Node B should have received the announce from Node A within 10s"
);
pair.shutdown();
}
#[test]
fn test_identity_recall() {
let pair = start_test_pair();
let body = r#"{"type":"single","app_name":"test_recall","aspects":["id"]}"#;
let reg = http_post(pair.server_a.port, "/api/destination", body);
assert_eq!(reg.status, 201);
let dest_hash = reg.json()["dest_hash"].as_str().unwrap().to_string();
let ann_body = format!(r#"{{"dest_hash":"{}"}}"#, dest_hash);
let ann = http_post(pair.server_a.port, "/api/announce", &ann_body);
assert_eq!(ann.status, 200);
let mut recalled = false;
for _ in 0..20 {
thread::sleep(Duration::from_millis(500));
let res = http_get(pair.server_b.port, &format!("/api/identity/{}", dest_hash));
if res.status == 200 {
let json = res.json();
assert_eq!(json["dest_hash"].as_str().unwrap(), dest_hash);
let ih = json["identity_hash"].as_str().unwrap();
assert_eq!(ih.len(), 32);
let pk = json["public_key"].as_str().unwrap();
assert_eq!(pk.len(), 128);
recalled = true;
break;
}
}
assert!(
recalled,
"Node B should have recalled the identity from Node A within 10s"
);
pair.shutdown();
}
#[test]
fn test_packet_delivery() {
let pair = start_test_pair();
let reg_body =
r#"{"type":"single","app_name":"test_delivery","aspects":["pkt"],"proof_strategy":"all"}"#;
let reg = http_post(pair.server_a.port, "/api/destination", reg_body);
assert_eq!(reg.status, 201);
let dest_hash = reg.json()["dest_hash"].as_str().unwrap().to_string();
let ann_body = format!(r#"{{"dest_hash":"{}"}}"#, dest_hash);
let ann = http_post(pair.server_a.port, "/api/announce", &ann_body);
assert_eq!(ann.status, 200);
let mut ready = false;
for _ in 0..20 {
thread::sleep(Duration::from_millis(500));
let res = http_get(pair.server_b.port, &format!("/api/identity/{}", dest_hash));
if res.status == 200 {
ready = true;
break;
}
}
assert!(ready, "Announce should propagate to Node B within 10s");
let out_body = format!(
r#"{{"type":"single","app_name":"test_delivery","aspects":["pkt"],"direction":"out","dest_hash":"{}"}}"#,
dest_hash
);
let out_reg = http_post(pair.server_b.port, "/api/destination", &out_body);
assert_eq!(
out_reg.status, 201,
"Failed to register outbound destination: {}",
out_reg.body
);
let out_hash = out_reg.json()["dest_hash"].as_str().unwrap().to_string();
let send_body = format!(
r#"{{"dest_hash":"{}","data":"aGVsbG8gd29ybGQ="}}"#,
out_hash
);
let send = http_post(pair.server_b.port, "/api/send", &send_body);
assert_eq!(send.status, 200, "Failed to send packet: {}", send.body);
let mut delivered = false;
for _ in 0..20 {
thread::sleep(Duration::from_millis(500));
let res = http_get(pair.server_a.port, "/api/packets");
if res.status == 200 {
let json = res.json();
if let Some(packets) = json["packets"].as_array() {
if !packets.is_empty() {
delivered = true;
break;
}
}
}
}
assert!(
delivered,
"Node A should have received at least one packet within 10s"
);
pair.shutdown();
}
#[cfg(feature = "tls")]
mod tls_tests {
use super::*;
use std::io::{Read, Write};
use std::sync::Arc;
fn start_tls_test_server() -> (TestServer, Arc<rustls::RootCertStore>) {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap();
let cert_pem = cert.cert.pem();
let key_pem = cert.key_pair.serialize_pem();
let id = COUNTER.fetch_add(1, Ordering::SeqCst);
let tmp_dir =
std::env::temp_dir().join(format!("rns-ctl-tls-test-{}-{}", std::process::id(), id));
std::fs::create_dir_all(&tmp_dir).unwrap();
let cert_path = tmp_dir.join("cert.pem");
let key_path = tmp_dir.join("key.pem");
std::fs::write(&cert_path, &cert_pem).unwrap();
std::fs::write(&key_path, &key_pem).unwrap();
let tls_config =
rns_ctl::tls::load_tls_config(cert_path.to_str().unwrap(), key_path.to_str().unwrap())
.expect("Failed to load TLS config");
let listener = bind_test_http_listener();
let port = listener.local_addr().unwrap().port();
let cfg = Arc::new(RwLock::new(CtlConfig {
host: "127.0.0.1".into(),
port,
auth_token: None,
disable_auth: true,
config_path: None,
daemon_mode: false,
tls_cert: Some(cert_path.to_str().unwrap().into()),
tls_key: Some(key_path.to_str().unwrap().into()),
}));
let shared_state: SharedState = Arc::new(RwLock::new(CtlState::new()));
let ws_broadcast: WsBroadcast = Arc::new(Mutex::new(Vec::new()));
let callbacks = Box::new(CtlCallbacks::new(
shared_state.clone(),
ws_broadcast.clone(),
));
let identity = rns_crypto::identity::Identity::new(&mut rns_crypto::OsRng);
let node = RnsNode::start(
NodeConfig {
transport_enabled: false,
identity: Some(rns_crypto::identity::Identity::from_private_key(
&identity.get_private_key().unwrap(),
)),
interfaces: vec![],
..NodeConfig::default()
},
callbacks,
)
.expect("Failed to start test node");
{
let mut s = shared_state.write().unwrap();
s.identity_hash = Some(*identity.hash());
if let Some(prv) = identity.get_private_key() {
s.identity = Some(rns_crypto::identity::Identity::from_private_key(&prv));
}
}
let node_handle: NodeHandle = Arc::new(Mutex::new(Some(node)));
let ctx = Arc::new(ServerContext {
node: node_handle,
state: shared_state,
ws_broadcast,
config: cfg,
tls_config: Some(tls_config),
});
let ctx2 = ctx.clone();
let handle = thread::Builder::new()
.name("test-tls-server".into())
.spawn(move || {
server::run_server_with_listener(listener, ctx2).expect("test TLS server failed");
})
.expect("Failed to spawn TLS server thread");
let mut root_store = rustls::RootCertStore::empty();
let der_cert = rustls_pemfile::certs(&mut cert_pem.as_bytes())
.collect::<Result<Vec<_>, _>>()
.unwrap();
for c in der_cert {
root_store.add(c).unwrap();
}
let server = TestServer {
ctx,
port,
_thread: handle,
};
(server, Arc::new(root_store))
}
fn tls_http_get(
port: u16,
path: &str,
root_store: &Arc<rustls::RootCertStore>,
) -> super::HttpResult {
let tls_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store.clone())
.with_no_client_auth();
let server_name: rustls::pki_types::ServerName = "localhost".try_into().unwrap();
let mut conn = rustls::ClientConnection::new(Arc::new(tls_config), server_name).unwrap();
let mut tcp = TcpStream::connect(format!("127.0.0.1:{}", port)).expect("Failed to connect");
tcp.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
let mut tls = rustls::Stream::new(&mut conn, &mut tcp);
let request = format!(
"GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n",
path
);
tls.write_all(request.as_bytes())
.expect("Failed to write request");
let mut response = Vec::new();
loop {
let mut buf = [0u8; 4096];
match tls.read(&mut buf) {
Ok(0) => break,
Ok(n) => response.extend_from_slice(&buf[..n]),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => break,
Err(ref e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => panic!("TLS read error: {}", e),
}
}
let response_str = String::from_utf8_lossy(&response);
let status_line = response_str.lines().next().unwrap_or("");
let status: u16 = status_line
.split_whitespace()
.nth(1)
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let body = if let Some(pos) = response_str.find("\r\n\r\n") {
response_str[pos + 4..].to_string()
} else {
String::new()
};
super::HttpResult { status, body }
}
#[test]
fn test_tls_health_endpoint() {
let (server, root_store) = start_tls_test_server();
let res = tls_http_get(server.port, "/health", &root_store);
assert_eq!(res.status, 200);
let json = res.json();
assert_eq!(json["status"], "healthy");
server.shutdown();
}
#[test]
fn test_tls_api_info() {
let (server, root_store) = start_tls_test_server();
let res = tls_http_get(server.port, "/api/info", &root_store);
assert_eq!(res.status, 200);
let json = res.json();
let identity_hash = json["identity_hash"].as_str().unwrap();
assert_eq!(identity_hash.len(), 32);
server.shutdown();
}
#[test]
fn test_tls_plain_connection_rejected() {
let (server, _root_store) = start_tls_test_server();
let mut tcp =
TcpStream::connect(format!("127.0.0.1:{}", server.port)).expect("Failed to connect");
tcp.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
let request = "GET /health HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n";
let _ = tcp.write_all(request.as_bytes());
let mut response = Vec::new();
loop {
let mut buf = [0u8; 4096];
match tcp.read(&mut buf) {
Ok(0) => break,
Ok(n) => response.extend_from_slice(&buf[..n]),
Err(_) => break,
}
}
let response_str = String::from_utf8_lossy(&response);
assert!(
!response_str.contains("HTTP/1.1 200"),
"Plain HTTP should not get a valid response from TLS server"
);
server.shutdown();
}
}