use std::time::Duration;
use serde_json::Value;
use crate::helpers::{
first_positional, has_flag, parse_u64_option, parse_usize_option, resolve_runtime_endpoint,
};
use crate::gateway::{
STATUS_OP_TOPIC_POLL, STATUS_OP_TOPIC_SUBSCRIBE, STATUS_OP_TOPIC_UNSUBSCRIBE,
STATUS_SERVICE_NAME, StatusServiceResponse, TOPIC_ECHO_API_VERSION, TOPIC_HZ_API_VERSION,
build_op_payload_request, make_udp_service_client, next_request_id, validate_response,
};
const DEFAULT_DAEMON_ENDPOINT: &str = "127.0.0.1:7588";
fn parse_stream_id(result: &Value) -> Result<String, String> {
result
.get("stream_id")
.and_then(Value::as_str)
.map(|value| value.to_string())
.ok_or_else(|| String::from("topic subscribe response missing stream_id"))
}
fn subscribe_topic_stream(
endpoint: &str,
timeout_ms: u64,
topic_name: &str,
max_batch: usize,
) -> Result<(core_api::UdpServiceClient, String), String> {
let client = make_udp_service_client(endpoint.to_string(), timeout_ms)?;
let request_id = next_request_id();
let request = build_op_payload_request(
STATUS_OP_TOPIC_SUBSCRIBE,
serde_json::json!({
"topic": topic_name,
"max_batch": max_batch,
}),
);
let response: StatusServiceResponse = client
.call_json(STATUS_SERVICE_NAME, request_id, &request)
.map_err(|err| format!("topic subscribe to {endpoint} failed: {err}"))?;
validate_response(&response, request_id, STATUS_OP_TOPIC_SUBSCRIBE)?;
let result = response.op_result.ok_or_else(|| {
format!("topic subscribe response from {endpoint} missing op_result payload")
})?;
let stream_id = parse_stream_id(&result)?;
Ok((client, stream_id))
}
fn poll_topic_frames(
client: &core_api::UdpServiceClient,
endpoint: &str,
stream_id: &str,
max_items: usize,
) -> Result<Vec<Value>, String> {
let request_id = next_request_id();
let request = build_op_payload_request(
STATUS_OP_TOPIC_POLL,
serde_json::json!({
"stream_id": stream_id,
"max_items": max_items,
}),
);
let response: StatusServiceResponse = client
.call_json(STATUS_SERVICE_NAME, request_id, &request)
.map_err(|err| format!("topic poll to {endpoint} failed: {err}"))?;
validate_response(&response, request_id, STATUS_OP_TOPIC_POLL)?;
let result = response
.op_result
.ok_or_else(|| format!("topic poll response from {endpoint} missing op_result"))?;
let frames = result
.get("frames")
.and_then(Value::as_array)
.ok_or_else(|| String::from("topic poll response missing frames"))?
.clone();
Ok(frames)
}
fn unsubscribe_topic_stream(
client: &core_api::UdpServiceClient,
endpoint: &str,
stream_id: &str,
) -> Result<(), String> {
let request_id = next_request_id();
let request = build_op_payload_request(
STATUS_OP_TOPIC_UNSUBSCRIBE,
serde_json::json!({
"stream_id": stream_id,
}),
);
let response: StatusServiceResponse = client
.call_json(STATUS_SERVICE_NAME, request_id, &request)
.map_err(|err| format!("topic unsubscribe to {endpoint} failed: {err}"))?;
validate_response(&response, request_id, STATUS_OP_TOPIC_UNSUBSCRIBE)
}
pub fn topic_hz(args: &[String]) -> Result<(), String> {
let topic_name = first_positional(args).ok_or_else(|| String::from("missing topic name"))?;
let json = has_flag(args, "--json");
let timeout_ms = parse_u64_option(args, "--timeout-ms", 1000)?;
let iterations = parse_usize_option(args, "--iterations", 6)?;
let interval_ms = parse_u64_option(args, "--interval-ms", 500)?;
let max_items = parse_usize_option(args, "--max-items", 8)?.clamp(1, 256);
let endpoint = resolve_runtime_endpoint(
args,
DEFAULT_DAEMON_ENDPOINT,
"topic commands require --endpoint in embedded mode",
)?;
let (client, stream_id) = subscribe_topic_stream(&endpoint, timeout_ms, &topic_name, max_items)?;
let mut frame_count = 0usize;
let mut first_ts: Option<u64> = None;
let mut last_ts: Option<u64> = None;
for idx in 0..iterations {
let frames = poll_topic_frames(&client, &endpoint, &stream_id, max_items)?;
frame_count += frames.len();
for frame in frames {
if let Some(ts) = frame.get("captured_at_unix_nanos").and_then(Value::as_u64) {
if first_ts.is_none() {
first_ts = Some(ts);
}
last_ts = Some(ts);
}
}
if idx + 1 < iterations {
std::thread::sleep(Duration::from_millis(interval_ms));
}
}
let _ = unsubscribe_topic_stream(&client, &endpoint, &stream_id);
let elapsed_ns = match (first_ts, last_ts) {
(Some(first), Some(last)) if last > first => last - first,
_ => 0,
};
let hz = if frame_count < 2 || elapsed_ns == 0 {
0.0
} else {
(frame_count as f64 - 1.0) / (elapsed_ns as f64 / 1_000_000_000.0)
};
if json {
let payload = serde_json::json!({
"api_version": TOPIC_HZ_API_VERSION,
"topic": topic_name,
"source": {
"mode": "payload_stream",
"endpoint": endpoint,
"stream_id": stream_id,
"timeout_ms": timeout_ms,
"control_plane": "topic_subscribe/topic_unsubscribe",
"data_plane": "topic_poll",
},
"sampling": {
"iterations": iterations,
"interval_ms": interval_ms,
"max_items": max_items,
"frames": frame_count,
"elapsed_ns": elapsed_ns,
},
"result": {
"payload_hz": hz,
}
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize topic hz payload failed: {err}"))?
);
} else {
println!("RobotRT Topic Hz");
println!("topic: {topic_name}");
println!("endpoint: {endpoint}");
println!("iterations: {iterations} interval_ms: {interval_ms}");
println!("frames: {frame_count}");
println!("payload_hz: {:.3}", hz);
}
Ok(())
}
pub fn topic_echo(args: &[String]) -> Result<(), String> {
let topic_name = first_positional(args).ok_or_else(|| String::from("missing topic name"))?;
let json = has_flag(args, "--json");
let timeout_ms = parse_u64_option(args, "--timeout-ms", 1000)?;
let iterations = parse_usize_option(args, "--iterations", 5)?;
let interval_ms = parse_u64_option(args, "--interval-ms", 500)?;
let limit = parse_usize_option(args, "--limit", 5)?;
let max_items = parse_usize_option(args, "--max-items", 8)?.clamp(1, 256);
let endpoint = resolve_runtime_endpoint(
args,
DEFAULT_DAEMON_ENDPOINT,
"topic commands require --endpoint in embedded mode",
)?;
let (client, stream_id) = subscribe_topic_stream(&endpoint, timeout_ms, &topic_name, max_items)?;
let mut rows = Vec::new();
for idx in 0..iterations {
let frames = poll_topic_frames(&client, &endpoint, &stream_id, max_items)?;
for frame in frames {
rows.push(serde_json::json!({
"sample": idx,
"topic": topic_name,
"sequence": frame.get("sequence").cloned().unwrap_or(Value::Null),
"captured_at_unix_nanos": frame.get("captured_at_unix_nanos").cloned().unwrap_or(Value::Null),
"transport": frame.get("transport").cloned().unwrap_or(Value::Null),
"payload": frame.get("payload").cloned().unwrap_or(Value::Null),
"external_ref": frame.get("external_ref").cloned().unwrap_or(Value::Null),
}));
if rows.len() >= limit {
break;
}
}
if rows.len() >= limit {
break;
}
if idx + 1 < iterations {
std::thread::sleep(Duration::from_millis(interval_ms));
}
}
let _ = unsubscribe_topic_stream(&client, &endpoint, &stream_id);
if json {
let payload = serde_json::json!({
"api_version": TOPIC_ECHO_API_VERSION,
"topic": topic_name,
"source": {
"mode": "payload_stream",
"endpoint": endpoint,
"stream_id": stream_id,
"timeout_ms": timeout_ms,
"control_plane": "topic_subscribe/topic_unsubscribe",
"data_plane": "topic_poll",
},
"sampling": {
"iterations": iterations,
"interval_ms": interval_ms,
"limit": limit,
"max_items": max_items,
},
"entries": rows,
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize topic echo payload failed: {err}"))?
);
} else {
println!("RobotRT Topic Echo");
println!("topic: {topic_name}");
println!("endpoint: {endpoint}");
println!("iterations: {iterations} interval_ms: {interval_ms}");
for row in rows {
println!("{}", row);
}
}
Ok(())
}