robotrt-cli 0.1.0-beta.2

RobotRT modular robotics runtime and middleware components.
use std::time::Duration;

use serde_json::Value;

use crate::helpers::{
    first_positional, has_flag, parse_u64_option, parse_usize_option, resolve_runtime_endpoint,
};
use crate::gateway::{
    STATUS_OP_TOPIC_POLL, STATUS_OP_TOPIC_SUBSCRIBE, STATUS_OP_TOPIC_UNSUBSCRIBE,
    STATUS_SERVICE_NAME, StatusServiceResponse, TOPIC_ECHO_API_VERSION, TOPIC_HZ_API_VERSION,
    build_op_payload_request, make_udp_service_client, next_request_id, validate_response,
};

const DEFAULT_DAEMON_ENDPOINT: &str = "127.0.0.1:7588";

fn parse_stream_id(result: &Value) -> Result<String, String> {
    result
        .get("stream_id")
        .and_then(Value::as_str)
        .map(|value| value.to_string())
        .ok_or_else(|| String::from("topic subscribe response missing stream_id"))
}

fn subscribe_topic_stream(
    endpoint: &str,
    timeout_ms: u64,
    topic_name: &str,
    max_batch: usize,
) -> Result<(core_api::UdpServiceClient, String), String> {
    let client = make_udp_service_client(endpoint.to_string(), timeout_ms)?;
    let request_id = next_request_id();
    let request = build_op_payload_request(
        STATUS_OP_TOPIC_SUBSCRIBE,
        serde_json::json!({
            "topic": topic_name,
            "max_batch": max_batch,
        }),
    );

    let response: StatusServiceResponse = client
        .call_json(STATUS_SERVICE_NAME, request_id, &request)
        .map_err(|err| format!("topic subscribe to {endpoint} failed: {err}"))?;
    validate_response(&response, request_id, STATUS_OP_TOPIC_SUBSCRIBE)?;
    let result = response.op_result.ok_or_else(|| {
        format!("topic subscribe response from {endpoint} missing op_result payload")
    })?;
    let stream_id = parse_stream_id(&result)?;

    Ok((client, stream_id))
}

fn poll_topic_frames(
    client: &core_api::UdpServiceClient,
    endpoint: &str,
    stream_id: &str,
    max_items: usize,
) -> Result<Vec<Value>, String> {
    let request_id = next_request_id();
    let request = build_op_payload_request(
        STATUS_OP_TOPIC_POLL,
        serde_json::json!({
            "stream_id": stream_id,
            "max_items": max_items,
        }),
    );

    let response: StatusServiceResponse = client
        .call_json(STATUS_SERVICE_NAME, request_id, &request)
        .map_err(|err| format!("topic poll to {endpoint} failed: {err}"))?;
    validate_response(&response, request_id, STATUS_OP_TOPIC_POLL)?;
    let result = response
        .op_result
        .ok_or_else(|| format!("topic poll response from {endpoint} missing op_result"))?;

    let frames = result
        .get("frames")
        .and_then(Value::as_array)
        .ok_or_else(|| String::from("topic poll response missing frames"))?
        .clone();

    Ok(frames)
}

fn unsubscribe_topic_stream(
    client: &core_api::UdpServiceClient,
    endpoint: &str,
    stream_id: &str,
) -> Result<(), String> {
    let request_id = next_request_id();
    let request = build_op_payload_request(
        STATUS_OP_TOPIC_UNSUBSCRIBE,
        serde_json::json!({
            "stream_id": stream_id,
        }),
    );
    let response: StatusServiceResponse = client
        .call_json(STATUS_SERVICE_NAME, request_id, &request)
        .map_err(|err| format!("topic unsubscribe to {endpoint} failed: {err}"))?;
    validate_response(&response, request_id, STATUS_OP_TOPIC_UNSUBSCRIBE)
}

pub fn topic_hz(args: &[String]) -> Result<(), String> {
    let topic_name = first_positional(args).ok_or_else(|| String::from("missing topic name"))?;
    let json = has_flag(args, "--json");
    let timeout_ms = parse_u64_option(args, "--timeout-ms", 1000)?;
    let iterations = parse_usize_option(args, "--iterations", 6)?;
    let interval_ms = parse_u64_option(args, "--interval-ms", 500)?;
    let max_items = parse_usize_option(args, "--max-items", 8)?.clamp(1, 256);

    let endpoint = resolve_runtime_endpoint(
        args,
        DEFAULT_DAEMON_ENDPOINT,
        "topic commands require --endpoint in embedded mode",
    )?;
    let (client, stream_id) = subscribe_topic_stream(&endpoint, timeout_ms, &topic_name, max_items)?;

    let mut frame_count = 0usize;
    let mut first_ts: Option<u64> = None;
    let mut last_ts: Option<u64> = None;

    for idx in 0..iterations {
        let frames = poll_topic_frames(&client, &endpoint, &stream_id, max_items)?;

        frame_count += frames.len();
        for frame in frames {
            if let Some(ts) = frame.get("captured_at_unix_nanos").and_then(Value::as_u64) {
                if first_ts.is_none() {
                    first_ts = Some(ts);
                }
                last_ts = Some(ts);
            }
        }

        if idx + 1 < iterations {
            std::thread::sleep(Duration::from_millis(interval_ms));
        }
    }

    let _ = unsubscribe_topic_stream(&client, &endpoint, &stream_id);

    let elapsed_ns = match (first_ts, last_ts) {
        (Some(first), Some(last)) if last > first => last - first,
        _ => 0,
    };
    let hz = if frame_count < 2 || elapsed_ns == 0 {
        0.0
    } else {
        (frame_count as f64 - 1.0) / (elapsed_ns as f64 / 1_000_000_000.0)
    };

    if json {
        let payload = serde_json::json!({
            "api_version": TOPIC_HZ_API_VERSION,
            "topic": topic_name,
            "source": {
                "mode": "payload_stream",
                "endpoint": endpoint,
                "stream_id": stream_id,
                "timeout_ms": timeout_ms,
                "control_plane": "topic_subscribe/topic_unsubscribe",
                "data_plane": "topic_poll",
            },
            "sampling": {
                "iterations": iterations,
                "interval_ms": interval_ms,
                "max_items": max_items,
                "frames": frame_count,
                "elapsed_ns": elapsed_ns,
            },
            "result": {
                "payload_hz": hz,
            }
        });
        println!(
            "{}",
            serde_json::to_string_pretty(&payload)
                .map_err(|err| format!("serialize topic hz payload failed: {err}"))?
        );
    } else {
        println!("RobotRT Topic Hz");
        println!("topic: {topic_name}");
        println!("endpoint: {endpoint}");
        println!("iterations: {iterations} interval_ms: {interval_ms}");
        println!("frames: {frame_count}");
        println!("payload_hz: {:.3}", hz);
    }

    Ok(())
}

pub fn topic_echo(args: &[String]) -> Result<(), String> {
    let topic_name = first_positional(args).ok_or_else(|| String::from("missing topic name"))?;
    let json = has_flag(args, "--json");
    let timeout_ms = parse_u64_option(args, "--timeout-ms", 1000)?;
    let iterations = parse_usize_option(args, "--iterations", 5)?;
    let interval_ms = parse_u64_option(args, "--interval-ms", 500)?;
    let limit = parse_usize_option(args, "--limit", 5)?;
    let max_items = parse_usize_option(args, "--max-items", 8)?.clamp(1, 256);

    let endpoint = resolve_runtime_endpoint(
        args,
        DEFAULT_DAEMON_ENDPOINT,
        "topic commands require --endpoint in embedded mode",
    )?;
    let (client, stream_id) = subscribe_topic_stream(&endpoint, timeout_ms, &topic_name, max_items)?;

    let mut rows = Vec::new();
    for idx in 0..iterations {
        let frames = poll_topic_frames(&client, &endpoint, &stream_id, max_items)?;

        for frame in frames {
            rows.push(serde_json::json!({
                "sample": idx,
                "topic": topic_name,
                "sequence": frame.get("sequence").cloned().unwrap_or(Value::Null),
                "captured_at_unix_nanos": frame.get("captured_at_unix_nanos").cloned().unwrap_or(Value::Null),
                "transport": frame.get("transport").cloned().unwrap_or(Value::Null),
                "payload": frame.get("payload").cloned().unwrap_or(Value::Null),
                "external_ref": frame.get("external_ref").cloned().unwrap_or(Value::Null),
            }));
            if rows.len() >= limit {
                break;
            }
        }

        if rows.len() >= limit {
            break;
        }

        if idx + 1 < iterations {
            std::thread::sleep(Duration::from_millis(interval_ms));
        }
    }

    let _ = unsubscribe_topic_stream(&client, &endpoint, &stream_id);

    if json {
        let payload = serde_json::json!({
            "api_version": TOPIC_ECHO_API_VERSION,
            "topic": topic_name,
            "source": {
                "mode": "payload_stream",
                "endpoint": endpoint,
                "stream_id": stream_id,
                "timeout_ms": timeout_ms,
                "control_plane": "topic_subscribe/topic_unsubscribe",
                "data_plane": "topic_poll",
            },
            "sampling": {
                "iterations": iterations,
                "interval_ms": interval_ms,
                "limit": limit,
                "max_items": max_items,
            },
            "entries": rows,
        });
        println!(
            "{}",
            serde_json::to_string_pretty(&payload)
                .map_err(|err| format!("serialize topic echo payload failed: {err}"))?
        );
    } else {
        println!("RobotRT Topic Echo");
        println!("topic: {topic_name}");
        println!("endpoint: {endpoint}");
        println!("iterations: {iterations} interval_ms: {interval_ms}");
        for row in rows {
            println!("{}", row);
        }
    }

    Ok(())
}