robotrt-cli 0.1.0-beta.2

RobotRT modular robotics runtime and middleware components.
pub(super) use std::fs;
pub(super) use std::io;
pub(super) use std::path::PathBuf;
pub(super) use std::process::Command;
pub(super) use std::process::Stdio;
pub(super) use std::time::Duration;

pub(super) use introspection_core::{
    ActionStatus, GraphEdge, HealthStatusItem, MissionStatus, NodeStatus, PluginStatus,
    RuntimeHealthView, RuntimeLoadReport, RuntimeMetricView, ServiceStatus, StatusSnapshot,
    TopicStatus, TopicAckSummary, write_runtime_load_report,
    write_status_snapshot,
};
pub(super) use serde_json::Value;

pub(super) struct CliOutput {
    pub status: std::process::ExitStatus,
    pub stdout: String,
    pub stderr: String,
}

pub(super) fn run_cli(args: &[&str]) -> CliOutput {
    let mut owned_args = args.iter().map(|item| item.to_string()).collect::<Vec<_>>();

    if should_inject_default_endpoint(&owned_args) {
        let gateway = TestGateway::spawn();
        owned_args.push("--endpoint".to_string());
        owned_args.push(gateway.endpoint.clone());
        return run_cli_owned(&owned_args);
    }

    run_cli_owned(&owned_args)
}

fn run_cli_owned(args: &[String]) -> CliOutput {
    let output = Command::new(env!("CARGO_BIN_EXE_robotrt-cli"))
        .args(args)
        .output()
        .expect("run robotrt-cli command");

    CliOutput {
        status: output.status,
        stdout: String::from_utf8_lossy(&output.stdout).to_string(),
        stderr: String::from_utf8_lossy(&output.stderr).to_string(),
    }
}

fn has_option(args: &[String], option: &str) -> bool {
    args.windows(1).any(|item| item[0] == option)
}

fn should_inject_default_endpoint(args: &[String]) -> bool {
    if args.is_empty() {
        return false;
    }

    if has_option(args, "--reports") || has_option(args, "--baseline-reports") {
        return false;
    }

    if has_option(args, "--endpoint") || has_option(args, "--endpoints") {
        return false;
    }

    match args[0].as_str() {
        "help" | "--help" | "-h" => false,
        "gateway" => false,
        "orchestrate" | "sdk" => false,
        "snapshot" => false,
        "node" | "topic" | "service" | "action" | "mission" | "plugin" | "health"
        | "graph" | "obs" | "ops" | "runtime" | "middleware" | "bag" => true,
        _ => false,
    }
}

struct TestGateway {
    endpoint: String,
    child: std::process::Child,
}

impl TestGateway {
    fn spawn() -> Self {
        let endpoint = allocate_udp_endpoint();
        let child = Command::new(env!("CARGO_BIN_EXE_robotrt-cli"))
            .args([
                "gateway",
                "serve",
                "--bind",
                endpoint.as_str(),
                "--source",
                "demo",
            ])
            .stdout(Stdio::null())
            .stderr(Stdio::piped())
            .spawn()
            .expect("spawn robotrt-cli gateway serve for conformance");

        let mut gateway = Self { endpoint, child };
        gateway.wait_ready();
        gateway
    }

    fn wait_ready(&mut self) {
        let probe_args = vec![
            "gateway".to_string(),
            "observe".to_string(),
            "--endpoint".to_string(),
            self.endpoint.clone(),
            "--timeout-ms".to_string(),
            "500".to_string(),
            "--json".to_string(),
        ];

        for _ in 0..40 {
            let probe = run_cli_owned(&probe_args);
            if probe.status.success() {
                return;
            }
            std::thread::sleep(Duration::from_millis(100));
        }

        let _ = self.child.kill();
        let _ = self.child.wait();
        panic!("gateway fixture did not become ready on {}", self.endpoint);
    }

    fn shutdown(&mut self) {
        let shutdown_args = vec![
            "gateway".to_string(),
            "shutdown".to_string(),
            "--endpoint".to_string(),
            self.endpoint.clone(),
            "--timeout-ms".to_string(),
            "800".to_string(),
            "--json".to_string(),
        ];
        let _ = run_cli_owned(&shutdown_args);

        for _ in 0..20 {
            if self
                .child
                .try_wait()
                .expect("poll gateway fixture child state")
                .is_some()
            {
                return;
            }
            std::thread::sleep(Duration::from_millis(50));
        }

        let _ = self.child.kill();
        let _ = self.child.wait();
    }
}

impl Drop for TestGateway {
    fn drop(&mut self) {
        self.shutdown();
    }
}

pub(super) fn temp_path(prefix: &str, ext: &str) -> PathBuf {
    let mut path = std::env::temp_dir();
    let nanos = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .expect("unix epoch")
        .as_nanos();
    path.push(format!("robotrt-cli-{prefix}-{nanos}.{ext}"));
    path
}

pub(super) fn temp_dir(prefix: &str) -> io::Result<PathBuf> {
    let mut path = std::env::temp_dir();
    let nanos = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .expect("unix epoch")
        .as_nanos();
    path.push(format!("robotrt-cli-{prefix}-{nanos}"));
    fs::create_dir_all(&path)?;
    Ok(path)
}

pub(super) fn allocate_udp_endpoint() -> String {
    let socket = std::net::UdpSocket::bind("127.0.0.1:0").expect("bind loopback udp port");
    let addr = socket.local_addr().expect("read local addr");
    format!("127.0.0.1:{}", addr.port())
}

pub(super) fn assert_exact_object_keys(value: &Value, expected_keys: &[&str]) {
    let obj = value.as_object().expect("json object expected");
    assert_eq!(
        obj.len(),
        expected_keys.len(),
        "unexpected key count: {obj:?}"
    );
    for key in expected_keys {
        assert!(obj.contains_key(*key), "missing expected key: {key}");
    }
}

pub(super) fn write_demo_status_snapshot(path: &PathBuf) -> io::Result<()> {
    let snapshot = StatusSnapshot::new(
        vec![NodeStatus {
            name: "demo_node".to_string(),
            namespace: "/demo".to_string(),
            capabilities: vec![
                "pubsub".to_string(),
                "service".to_string(),
                "mission".to_string(),
            ],
        }],
        vec![TopicStatus {
            name: "/demo/camera/raw".to_string(),
            schema: "sample.image".to_string(),
            reliable: true,
            depth: 16,
            pending: 3,
            max_depth: 16,
            publishers: 1,
            subscribers: 1,
            reliable_local_ack: TopicAckSummary::default(),
            reliable_remote_ack: TopicAckSummary::default(),
        }],
        vec![ServiceStatus {
            name: "/demo/echo".to_string(),
            pending_requests: 1,
            pending_responses: 0,
            clients: 1,
            servers: 1,
        }],
        vec![ActionStatus {
            name: "/demo/calibrate".to_string(),
            clients: 1,
            servers: 1,
            current_state: Some("executing".to_string()),
            active_goals: Some(1),
            health_state: Some("active".to_string()),
            heartbeat_timeout_ms: Some(5000),
            last_heartbeat_at_unix_nanos: Some(1),
            last_feedback_at_unix_nanos: Some(1),
            last_result_at_unix_nanos: None,
        }],
        vec![MissionStatus {
            name: "demo_mission".to_string(),
            state: "running".to_string(),
            last_checkpoint: Some("cp-demo-1".to_string()),
        }],
        vec![
            HealthStatusItem {
                component: "runtime".to_string(),
                status: "healthy".to_string(),
                reason: None,
            },
            HealthStatusItem {
                component: "device".to_string(),
                status: "healthy".to_string(),
                reason: None,
            },
            HealthStatusItem {
                component: "pipeline".to_string(),
                status: "healthy".to_string(),
                reason: None,
            },
            HealthStatusItem {
                component: "plugin".to_string(),
                status: "healthy".to_string(),
                reason: None,
            },
        ],
        vec![
            PluginStatus {
                name: "sample-device-plugin".to_string(),
                kind: "device".to_string(),
                loaded: true,
            },
            PluginStatus {
                name: "sample-obs-plugin".to_string(),
                kind: "observability".to_string(),
                loaded: false,
            },
        ],
        vec![
            GraphEdge {
                from: "demo_node".to_string(),
                to: "/demo/camera/raw".to_string(),
                relation: "publishes".to_string(),
            },
            GraphEdge {
                from: "demo_node".to_string(),
                to: "/demo/echo".to_string(),
                relation: "provides".to_string(),
            },
        ],
    );

    write_status_snapshot(path, &snapshot)
}