use super::common::*;
use core_types::{Timestamp, TransportDomain};
use data_model::{ControlEnvelope, Packet, PacketHeader, SchemaId, SchemaVersion};
use replay_core::{file::FilePacketRecorder, recorder::RecordedEntry};
fn write_demo_replay_bag(path: &PathBuf, topic: &str, entries: u64) {
let mut writer = FilePacketRecorder::create(path).expect("create replay bag fixture");
for seq in 1..=entries {
let header = PacketHeader {
version: 1,
domain: TransportDomain::Local,
session_id: None,
stream_id: None,
sequence: seq,
ack: None,
timestamp: Timestamp(seq as u128 * 1_000_000),
schema_id: SchemaId::new("demo.profile"),
schema_version: SchemaVersion(1),
};
let packet = Packet::Control(ControlEnvelope {
header,
label: "demo.profile.sample".to_string(),
payload: vec![seq as u8],
});
let entry = RecordedEntry::new(Timestamp(seq as u128 * 1_000_000), topic, packet);
writer.append(&entry).expect("append replay bag fixture entry");
}
writer.flush().expect("flush replay bag fixture");
}
#[test]
fn topic_list_json_refresh_demo_contains_expected_fields() {
let report = temp_path("resource-report", "json");
let output = run_cli(&[
"topic",
"list",
"--report",
report.to_str().expect("utf-8 path"),
"--refresh-demo",
"--json",
]);
assert!(output.status.success(), "stderr: {}", output.stderr);
assert!(output.stdout.contains("\"kind\": \"topic\""));
assert!(output.stdout.contains("/demo/camera/raw"));
}
#[test]
fn middleware_load_refresh_demo_json_contains_totals() {
let report = temp_path("middleware-load", "json");
let output = run_cli(&[
"middleware",
"load",
"--report",
report.to_str().expect("utf-8 path"),
"--refresh-demo",
"--json",
]);
assert!(output.status.success(), "stderr: {}", output.stderr);
assert!(output.stdout.contains("\"topic_pending_total\""));
assert!(output.stdout.contains("\"service_pending_requests_total\""));
}
#[test]
fn help_command_and_flag_show_usage() {
let help = run_cli(&["help"]);
assert!(help.status.success(), "stderr: {}", help.stderr);
assert!(help.stdout.contains("robotrt-cli commands:"));
assert!(help.stdout.contains("topic hz <name>"));
let flag = run_cli(&["--help"]);
assert!(flag.status.success(), "stderr: {}", flag.stderr);
assert!(flag.stdout.contains("plugin list"));
}
#[test]
fn bag_record_and_play_json_roundtrip() {
let bag = temp_path("demo-bag", "rrbag");
write_demo_replay_bag(&bag, "/demo/test", 5);
let hz = run_cli(&[
"topic",
"hz",
"/demo/test",
"--input",
bag.to_str().expect("utf-8 path"),
"--timeout-ms",
"3000",
"--json",
]);
assert!(hz.status.success(), "stderr: {}", hz.stderr);
let echo = run_cli(&[
"topic",
"echo",
"/demo/test",
"--input",
bag.to_str().expect("utf-8 path"),
"--limit",
"2",
"--timeout-ms",
"3000",
"--json",
]);
assert!(echo.status.success(), "stderr: {}", echo.stderr);
}
#[test]
fn node_topic_health_graph_plugin_and_mission_commands_work_with_demo_refresh() {
let status = temp_path("status-report", "json");
let status_arg = status.to_str().expect("utf-8 path");
let node_info = run_cli(&[
"node",
"info",
"demo_node",
"--report",
status_arg,
"--refresh-demo",
"--json",
]);
assert!(node_info.status.success(), "stderr: {}", node_info.stderr);
assert!(
node_info
.stdout
.contains("\"api_version\": \"robotrt.node.info.v1\"")
);
assert!(node_info.stdout.contains("\"kind\": \"node_info\""));
assert!(node_info.stdout.contains("\"node_name\": \"demo_node\""));
assert!(node_info.stdout.contains("\"name\": \"demo_node\""));
let topic_info = run_cli(&[
"topic",
"info",
"/demo/camera/raw",
"--report",
status_arg,
"--json",
]);
assert!(topic_info.status.success(), "stderr: {}", topic_info.stderr);
assert!(
topic_info
.stdout
.contains("\"api_version\": \"robotrt.topic.info.v1\"")
);
assert!(topic_info.stdout.contains("\"kind\": \"topic_info\""));
assert!(
topic_info
.stdout
.contains("\"topic_name\": \"/demo/camera/raw\"")
);
assert!(topic_info.stdout.contains("\"schema\": \"sample.image\""));
let health = run_cli(&["health", "--report", status_arg, "--json"]);
assert!(health.status.success(), "stderr: {}", health.stderr);
assert!(health.stdout.contains("\"component\": \"runtime\""));
let graph = run_cli(&["graph", "--report", status_arg, "--json"]);
assert!(graph.status.success(), "stderr: {}", graph.stderr);
assert!(graph.stdout.contains("\"relation\": \"publishes\""));
let plugins = run_cli(&[
"plugin", "list", "--report", status_arg, "--loaded", "--json",
]);
assert!(plugins.status.success(), "stderr: {}", plugins.stderr);
assert!(plugins.stdout.contains("\"loaded_only\": true"));
assert!(
plugins
.stdout
.contains("\"name\": \"sample-device-plugin\"")
);
let watch = run_cli(&[
"mission",
"watch",
"demo_mission",
"--report",
status_arg,
"--iterations",
"1",
"--json",
]);
assert!(watch.status.success(), "stderr: {}", watch.stderr);
assert!(watch.stdout.contains("\"name\": \"demo_mission\""));
let action_info = run_cli(&[
"action",
"info",
"/demo/calibrate",
"--report",
status_arg,
"--json",
]);
assert!(
action_info.status.success(),
"stderr: {}",
action_info.stderr
);
assert!(
action_info
.stdout
.contains("\"api_version\": \"robotrt.action.info.v1\"")
);
assert!(action_info.stdout.contains("\"health_state\": \"active\""));
let action_watch = run_cli(&[
"action",
"watch",
"/demo/calibrate",
"--report",
status_arg,
"--iterations",
"1",
"--json",
]);
assert!(
action_watch.status.success(),
"stderr: {}",
action_watch.stderr
);
assert!(
action_watch
.stdout
.contains("\"api_version\": \"robotrt.action.watch.v1\"")
);
}
#[test]
fn node_info_supports_remote_status_query_endpoint() {
let endpoint = allocate_udp_endpoint();
let mut server = Command::new(env!("CARGO_BIN_EXE_robotrt-cli"))
.args([
"gateway",
"serve",
"--bind",
endpoint.as_str(),
"--source",
"demo",
"--once",
])
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
.expect("spawn robotrt-cli gateway serve");
std::thread::sleep(Duration::from_millis(120));
let mut output = run_cli(&[
"node",
"info",
"demo_node",
"--endpoint",
endpoint.as_str(),
"--timeout-ms",
"2000",
"--json",
]);
for _ in 0..4 {
if output.status.success() {
break;
}
std::thread::sleep(Duration::from_millis(150));
output = run_cli(&[
"node",
"info",
"demo_node",
"--endpoint",
endpoint.as_str(),
"--timeout-ms",
"2000",
"--json",
]);
}
assert!(output.status.success(), "stderr: {}", output.stderr);
assert!(output.stdout.contains("\"mode\": \"remote_service\""));
assert!(
output
.stdout
.contains("\"service\": \"/robotrt/gateway/query\"")
);
assert!(output.stdout.contains("\"endpoint\": \""));
assert!(
output
.stdout
.contains("\"api_version\": \"robotrt.node.info.v1\"")
);
let status = server.wait().expect("wait gateway server child");
assert!(
status.success(),
"gateway serve child exited with {status:?}"
);
}
#[test]
fn daemon_mode_switch_works_for_info_and_list_commands() {
let endpoint = allocate_udp_endpoint();
let mut server_info = Command::new(env!("CARGO_BIN_EXE_robotrt-cli"))
.args([
"gateway",
"serve",
"--bind",
endpoint.as_str(),
"--source",
"demo",
"--once",
])
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
.expect("spawn robotrt-cli gateway serve for info");
std::thread::sleep(Duration::from_millis(120));
let mut info_output = run_cli(&[
"node",
"info",
"demo_node",
"--mode",
"daemon",
"--endpoint",
endpoint.as_str(),
"--json",
]);
for _ in 0..4 {
if info_output.status.success() {
break;
}
std::thread::sleep(Duration::from_millis(150));
info_output = run_cli(&[
"node",
"info",
"demo_node",
"--mode",
"daemon",
"--endpoint",
endpoint.as_str(),
"--json",
]);
}
assert!(info_output.status.success(), "stderr: {}", info_output.stderr);
assert!(info_output.stdout.contains("\"mode\": \"remote_service\""));
let status_info = server_info.wait().expect("wait gateway serve info child");
assert!(status_info.success(), "gateway serve info exited with {status_info:?}");
let mut server_list = Command::new(env!("CARGO_BIN_EXE_robotrt-cli"))
.args([
"gateway",
"serve",
"--bind",
endpoint.as_str(),
"--source",
"demo",
"--once",
])
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
.expect("spawn robotrt-cli gateway serve for list");
std::thread::sleep(Duration::from_millis(120));
let mut list_output = run_cli(&[
"service",
"list",
"--mode",
"daemon",
"--endpoint",
endpoint.as_str(),
"--json",
]);
for _ in 0..4 {
if list_output.status.success() {
break;
}
std::thread::sleep(Duration::from_millis(150));
list_output = run_cli(&[
"service",
"list",
"--mode",
"daemon",
"--endpoint",
endpoint.as_str(),
"--json",
]);
}
assert!(list_output.status.success(), "stderr: {}", list_output.stderr);
assert!(list_output.stdout.contains("\"mode\": \"remote_service\""));
assert!(list_output.stdout.contains("\"/demo/echo\""));
let status_list = server_list.wait().expect("wait gateway serve list child");
assert!(status_list.success(), "gateway serve list exited with {status_list:?}");
}
#[test]
fn snapshot_config_writes_expected_fields() {
let config = temp_path("snapshot-config", "json");
let config_arg = config.to_str().expect("utf-8 path");
let output = run_cli(&[
"snapshot",
"config",
"--file",
config_arg,
"--enable",
"--interval-ms",
"250",
"--rate-max",
"10",
"--rate-burst",
"20",
"--atomic-write",
"true",
"--include",
"all",
"--status-report",
"artifacts/introspection/custom-status.json",
"--runtime-report",
"artifacts/introspection/custom-runtime.json",
"--middleware-report",
"artifacts/introspection/custom-load.json",
"--resource-report",
"artifacts/introspection/custom-resources.json",
"--json",
]);
assert!(output.status.success(), "stderr: {}", output.stderr);
assert!(
output
.stdout
.contains("\"api_version\": \"robotrt.snapshot.config.v1\"")
);
assert!(output.stdout.contains("\"enabled\": true"));
assert!(output.stdout.contains("\"interval_ms\": 250"));
assert!(output.stdout.contains("\"max_writes_per_sec\": 10"));
assert!(output.stdout.contains("\"burst\": 20"));
assert!(output.stdout.contains("\"atomic_write\": true"));
let written = fs::read_to_string(&config).expect("read written config");
assert!(written.contains("\"enabled\": true"));
assert!(written.contains("\"interval_ms\": 250"));
}
#[test]
fn snapshot_config_remote_apply_takes_effect_immediately() {
let endpoint = allocate_udp_endpoint();
let mut server = Command::new(env!("CARGO_BIN_EXE_robotrt-cli"))
.args([
"gateway",
"serve",
"--bind",
endpoint.as_str(),
"--source",
"demo",
])
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
.expect("spawn gateway serve");
std::thread::sleep(Duration::from_millis(150));
let mut apply = run_cli(&[
"snapshot",
"config",
"--endpoint",
endpoint.as_str(),
"--include",
"runtime,middleware,resource",
"--json",
]);
for _ in 0..4 {
if apply.status.success() {
break;
}
std::thread::sleep(Duration::from_millis(150));
apply = run_cli(&[
"snapshot",
"config",
"--endpoint",
endpoint.as_str(),
"--include",
"runtime,middleware,resource",
"--json",
]);
}
assert!(apply.status.success(), "stderr: {}", apply.stderr);
assert!(apply.stdout.contains("\"mode\": \"remote_service\""));
assert!(apply.stdout.contains("\"applied\": true"));
let node_info = run_cli(&[
"node",
"info",
"demo_node",
"--mode",
"daemon",
"--endpoint",
endpoint.as_str(),
"--json",
]);
assert!(
!node_info.status.success(),
"node info should fail after status include flag disabled"
);
assert!(
node_info.stderr.contains("include_flags.status"),
"stderr: {}",
node_info.stderr
);
server.kill().expect("kill gateway serve child");
let _ = server.wait();
}
#[test]
fn gateway_observe_policy_audit_and_shutdown_commands_work() {
let endpoint = allocate_udp_endpoint();
let mut server = Command::new(env!("CARGO_BIN_EXE_robotrt-cli"))
.args([
"gateway",
"serve",
"--bind",
endpoint.as_str(),
"--source",
"demo",
])
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
.expect("spawn robotrt-cli gateway serve");
std::thread::sleep(Duration::from_millis(150));
let policy_args = [
"gateway",
"policy-set",
"--endpoint",
endpoint.as_str(),
"--route-preference",
"network_first",
"--rate-limit",
"120",
"--namespace-isolation",
"tenant-a",
"--rollback-enabled",
"true",
"--topic-max-retry",
"4",
"--topic-retry-timeout-ms",
"250",
"--service-retry-timeout-ms",
"320",
"--action-retry-timeout-ms",
"350",
"--mission-retry-timeout-ms",
"380",
"--topic-max-inflight",
"24",
"--topic-replay-window",
"32",
"--topic-dedupe-window",
"96",
"--topic-replay-strategy",
"drop_oldest",
"--json",
];
let mut policy_set = run_cli(&policy_args);
for _ in 0..4 {
if policy_set.status.success() {
break;
}
std::thread::sleep(Duration::from_millis(120));
policy_set = run_cli(&policy_args);
}
assert!(policy_set.status.success(), "stderr: {}", policy_set.stderr);
assert!(policy_set.stdout.contains("\"kind\": \"gateway_policy_set\""));
let observe_args = [
"gateway",
"observe",
"--endpoint",
endpoint.as_str(),
"--audit-limit",
"10",
"--json",
];
let mut observe = run_cli(&observe_args);
for _ in 0..4 {
if observe.status.success() {
break;
}
std::thread::sleep(Duration::from_millis(120));
observe = run_cli(&observe_args);
}
assert!(observe.status.success(), "stderr: {}", observe.stderr);
assert!(observe.stdout.contains("\"kind\": \"gateway_observe\""));
assert!(observe.stdout.contains("\"route_preference\": \"network_first\""));
assert!(
observe
.stdout
.contains("\"topic_replay_strategy\": \"drop_oldest\""),
"stdout: {}",
observe.stdout
);
assert!(
observe
.stdout
.contains("\"service_retry_timeout_ms\": 320"),
"stdout: {}",
observe.stdout
);
assert!(
observe
.stdout
.contains("\"topic_dedupe_window\": 96"),
"stdout: {}",
observe.stdout
);
let audit = run_cli(&[
"gateway",
"audit",
"--endpoint",
endpoint.as_str(),
"--limit",
"20",
"--category",
"control",
"--json",
]);
assert!(audit.status.success(), "stderr: {}", audit.stderr);
assert!(audit.stdout.contains("\"kind\": \"gateway_audit_list\""));
let shutdown = run_cli(&[
"gateway",
"shutdown",
"--endpoint",
endpoint.as_str(),
"--json",
]);
assert!(shutdown.status.success(), "stderr: {}", shutdown.stderr);
assert!(shutdown.stdout.contains("\"kind\": \"gateway_shutdown\""));
let status = server.wait().expect("wait gateway serve child");
assert!(status.success(), "gateway serve child exited with {status:?}");
}
#[test]
fn gateway_policy_set_respects_cli_over_policy_file_precedence() {
let endpoint = allocate_udp_endpoint();
let mut server = Command::new(env!("CARGO_BIN_EXE_robotrt-cli"))
.args([
"gateway",
"serve",
"--bind",
endpoint.as_str(),
"--source",
"demo",
])
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
.expect("spawn robotrt-cli gateway serve");
std::thread::sleep(Duration::from_millis(150));
let policy_file = temp_path("gateway-policy-precedence", "json");
fs::write(
&policy_file,
serde_json::to_vec_pretty(&serde_json::json!({
"api_version": "robotrt.gateway.policy.v1",
"route_preference": "local_first",
"topic_retry_timeout_ms": 240,
"service_retry_timeout_ms": 260,
"topic_dedupe_window": 64,
"topic_replay_strategy": "drop_oldest"
}))
.expect("serialize policy file"),
)
.expect("write policy file");
let policy_args = [
"gateway",
"policy-set",
"--endpoint",
endpoint.as_str(),
"--policy",
policy_file.to_str().expect("utf-8 policy path"),
"--route-preference",
"network_first",
"--service-retry-timeout-ms",
"360",
"--topic-dedupe-window",
"none",
"--json",
];
let mut policy_set = run_cli(&policy_args);
for _ in 0..4 {
if policy_set.status.success() {
break;
}
std::thread::sleep(Duration::from_millis(120));
policy_set = run_cli(&policy_args);
}
assert!(policy_set.status.success(), "stderr: {}", policy_set.stderr);
let payload: Value = serde_json::from_str(&policy_set.stdout).expect("parse policy set json");
assert_eq!(payload["kind"].as_str(), Some("gateway_policy_set"));
assert_eq!(
payload["result"]["policy"]["route_preference"].as_str(),
Some("network_first")
);
assert_eq!(
payload["result"]["policy"]["topic_retry_timeout_ms"].as_u64(),
Some(240)
);
assert_eq!(
payload["result"]["policy"]["service_retry_timeout_ms"].as_u64(),
Some(360)
);
assert!(payload["result"]["policy"]["topic_dedupe_window"].is_null());
let shutdown = run_cli(&[
"gateway",
"shutdown",
"--endpoint",
endpoint.as_str(),
"--json",
]);
assert!(shutdown.status.success(), "stderr: {}", shutdown.stderr);
let status = server.wait().expect("wait gateway serve child");
assert!(status.success(), "gateway serve child exited with {status:?}");
let _ = fs::remove_file(policy_file);
}