pub(super) use std::fs;
pub(super) use std::io;
pub(super) use std::path::PathBuf;
pub(super) use std::process::Command;
pub(super) use std::process::Stdio;
pub(super) use std::time::Duration;
pub(super) use introspection_core::{
ActionStatus, GraphEdge, HealthStatusItem, MissionStatus, NodeStatus, PluginStatus,
RuntimeHealthView, RuntimeLoadReport, RuntimeMetricView, ServiceStatus, StatusSnapshot,
TopicStatus, TopicAckSummary, write_runtime_load_report,
write_status_snapshot,
};
pub(super) use serde_json::Value;
pub(super) struct CliOutput {
pub status: std::process::ExitStatus,
pub stdout: String,
pub stderr: String,
}
pub(super) fn run_cli(args: &[&str]) -> CliOutput {
let mut owned_args = args.iter().map(|item| item.to_string()).collect::<Vec<_>>();
if should_inject_default_endpoint(&owned_args) {
let gateway = TestGateway::spawn();
owned_args.push("--endpoint".to_string());
owned_args.push(gateway.endpoint.clone());
return run_cli_owned(&owned_args);
}
run_cli_owned(&owned_args)
}
fn run_cli_owned(args: &[String]) -> CliOutput {
let output = Command::new(env!("CARGO_BIN_EXE_robotrt-cli"))
.args(args)
.output()
.expect("run robotrt-cli command");
CliOutput {
status: output.status,
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
}
}
fn has_option(args: &[String], option: &str) -> bool {
args.windows(1).any(|item| item[0] == option)
}
fn should_inject_default_endpoint(args: &[String]) -> bool {
if args.is_empty() {
return false;
}
if has_option(args, "--reports") || has_option(args, "--baseline-reports") {
return false;
}
if has_option(args, "--endpoint") || has_option(args, "--endpoints") {
return false;
}
match args[0].as_str() {
"help" | "--help" | "-h" => false,
"gateway" => false,
"orchestrate" | "sdk" => false,
"snapshot" => false,
"node" | "topic" | "service" | "action" | "mission" | "plugin" | "health"
| "graph" | "obs" | "ops" | "runtime" | "middleware" | "bag" => true,
_ => false,
}
}
struct TestGateway {
endpoint: String,
child: std::process::Child,
}
impl TestGateway {
fn spawn() -> Self {
let endpoint = allocate_udp_endpoint();
let child = Command::new(env!("CARGO_BIN_EXE_robotrt-cli"))
.args([
"gateway",
"serve",
"--bind",
endpoint.as_str(),
"--source",
"demo",
])
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
.expect("spawn robotrt-cli gateway serve for conformance");
let mut gateway = Self { endpoint, child };
gateway.wait_ready();
gateway
}
fn wait_ready(&mut self) {
let probe_args = vec![
"gateway".to_string(),
"observe".to_string(),
"--endpoint".to_string(),
self.endpoint.clone(),
"--timeout-ms".to_string(),
"500".to_string(),
"--json".to_string(),
];
for _ in 0..40 {
let probe = run_cli_owned(&probe_args);
if probe.status.success() {
return;
}
std::thread::sleep(Duration::from_millis(100));
}
let _ = self.child.kill();
let _ = self.child.wait();
panic!("gateway fixture did not become ready on {}", self.endpoint);
}
fn shutdown(&mut self) {
let shutdown_args = vec![
"gateway".to_string(),
"shutdown".to_string(),
"--endpoint".to_string(),
self.endpoint.clone(),
"--timeout-ms".to_string(),
"800".to_string(),
"--json".to_string(),
];
let _ = run_cli_owned(&shutdown_args);
for _ in 0..20 {
if self
.child
.try_wait()
.expect("poll gateway fixture child state")
.is_some()
{
return;
}
std::thread::sleep(Duration::from_millis(50));
}
let _ = self.child.kill();
let _ = self.child.wait();
}
}
impl Drop for TestGateway {
fn drop(&mut self) {
self.shutdown();
}
}
pub(super) fn temp_path(prefix: &str, ext: &str) -> PathBuf {
let mut path = std::env::temp_dir();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("unix epoch")
.as_nanos();
path.push(format!("robotrt-cli-{prefix}-{nanos}.{ext}"));
path
}
pub(super) fn temp_dir(prefix: &str) -> io::Result<PathBuf> {
let mut path = std::env::temp_dir();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("unix epoch")
.as_nanos();
path.push(format!("robotrt-cli-{prefix}-{nanos}"));
fs::create_dir_all(&path)?;
Ok(path)
}
pub(super) fn allocate_udp_endpoint() -> String {
let socket = std::net::UdpSocket::bind("127.0.0.1:0").expect("bind loopback udp port");
let addr = socket.local_addr().expect("read local addr");
format!("127.0.0.1:{}", addr.port())
}
pub(super) fn assert_exact_object_keys(value: &Value, expected_keys: &[&str]) {
let obj = value.as_object().expect("json object expected");
assert_eq!(
obj.len(),
expected_keys.len(),
"unexpected key count: {obj:?}"
);
for key in expected_keys {
assert!(obj.contains_key(*key), "missing expected key: {key}");
}
}
pub(super) fn write_demo_status_snapshot(path: &PathBuf) -> io::Result<()> {
let snapshot = StatusSnapshot::new(
vec![NodeStatus {
name: "demo_node".to_string(),
namespace: "/demo".to_string(),
capabilities: vec![
"pubsub".to_string(),
"service".to_string(),
"mission".to_string(),
],
}],
vec![TopicStatus {
name: "/demo/camera/raw".to_string(),
schema: "sample.image".to_string(),
reliable: true,
depth: 16,
pending: 3,
max_depth: 16,
publishers: 1,
subscribers: 1,
reliable_local_ack: TopicAckSummary::default(),
reliable_remote_ack: TopicAckSummary::default(),
}],
vec![ServiceStatus {
name: "/demo/echo".to_string(),
pending_requests: 1,
pending_responses: 0,
clients: 1,
servers: 1,
}],
vec![ActionStatus {
name: "/demo/calibrate".to_string(),
clients: 1,
servers: 1,
current_state: Some("executing".to_string()),
active_goals: Some(1),
health_state: Some("active".to_string()),
heartbeat_timeout_ms: Some(5000),
last_heartbeat_at_unix_nanos: Some(1),
last_feedback_at_unix_nanos: Some(1),
last_result_at_unix_nanos: None,
}],
vec![MissionStatus {
name: "demo_mission".to_string(),
state: "running".to_string(),
last_checkpoint: Some("cp-demo-1".to_string()),
}],
vec![
HealthStatusItem {
component: "runtime".to_string(),
status: "healthy".to_string(),
reason: None,
},
HealthStatusItem {
component: "device".to_string(),
status: "healthy".to_string(),
reason: None,
},
HealthStatusItem {
component: "pipeline".to_string(),
status: "healthy".to_string(),
reason: None,
},
HealthStatusItem {
component: "plugin".to_string(),
status: "healthy".to_string(),
reason: None,
},
],
vec![
PluginStatus {
name: "sample-device-plugin".to_string(),
kind: "device".to_string(),
loaded: true,
},
PluginStatus {
name: "sample-obs-plugin".to_string(),
kind: "observability".to_string(),
loaded: false,
},
],
vec![
GraphEdge {
from: "demo_node".to_string(),
to: "/demo/camera/raw".to_string(),
relation: "publishes".to_string(),
},
GraphEdge {
from: "demo_node".to_string(),
to: "/demo/echo".to_string(),
relation: "provides".to_string(),
},
],
);
write_status_snapshot(path, &snapshot)
}