use std::time::Duration;
use crate::helpers::{
first_positional, has_flag, option_value, parse_u64_option, parse_usize_option,
resolve_runtime_endpoint,
};
use crate::gateway::{
STATUS_OP_MISSION_REPLY, STATUS_OP_MISSION_REQUEST, STATUS_OP_MISSION_WATCH,
STATUS_SERVICE_NAME, StatusServiceResponse, build_op_payload_request, make_udp_service_client,
next_request_id, validate_response,
};
const DEFAULT_DAEMON_ENDPOINT: &str = "127.0.0.1:7588";
pub fn mission_request(args: &[String]) -> Result<(), String> {
let mission = first_positional(args).ok_or_else(|| String::from("missing mission name"))?;
let message = option_value(args, "--message").ok_or_else(|| String::from("missing --message"))?;
let timeout_ms = parse_u64_option(args, "--timeout-ms", 1000)?;
let endpoint = resolve_runtime_endpoint(
args,
DEFAULT_DAEMON_ENDPOINT,
"mission control requires --endpoint in embedded mode",
)?;
let json = has_flag(args, "--json");
let client = make_udp_service_client(endpoint.clone(), timeout_ms)?;
let request_id = next_request_id();
let request = build_op_payload_request(
STATUS_OP_MISSION_REQUEST,
serde_json::json!({
"mission": mission,
"message": message,
"timeout_ms": timeout_ms,
}),
);
let response: StatusServiceResponse = client
.call_json(STATUS_SERVICE_NAME, request_id, &request)
.map_err(|err| format!("mission request to {endpoint} failed: {err}"))?;
validate_response(&response, request_id, STATUS_OP_MISSION_REQUEST)?;
let result = response
.op_result
.ok_or_else(|| format!("mission request response from {endpoint} missing op_result"))?;
if json {
let payload = serde_json::json!({
"api_version": "robotrt.mission.request.v1",
"source": {
"mode": "remote_service",
"service": STATUS_SERVICE_NAME,
"endpoint": endpoint,
"timeout_ms": timeout_ms,
},
"result": result,
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize mission request json failed: {err}"))?
);
return Ok(());
}
println!("RobotRT Mission Request");
println!("endpoint: {endpoint}");
println!("mission: {}", result.get("mission").and_then(serde_json::Value::as_str).unwrap_or("-"));
println!("accepted: {}", result.get("accepted").and_then(serde_json::Value::as_bool).unwrap_or(false));
println!("response: {}", result.get("response").and_then(serde_json::Value::as_str).unwrap_or("-"));
Ok(())
}
pub fn mission_reply(args: &[String]) -> Result<(), String> {
let mission = first_positional(args).ok_or_else(|| String::from("missing mission name"))?;
let message = option_value(args, "--message").ok_or_else(|| String::from("missing --message"))?;
let timeout_ms = parse_u64_option(args, "--timeout-ms", 1000)?;
let endpoint = resolve_runtime_endpoint(
args,
DEFAULT_DAEMON_ENDPOINT,
"mission control requires --endpoint in embedded mode",
)?;
let json = has_flag(args, "--json");
let client = make_udp_service_client(endpoint.clone(), timeout_ms)?;
let request_id = next_request_id();
let request = build_op_payload_request(
STATUS_OP_MISSION_REPLY,
serde_json::json!({
"mission": mission,
"message": message,
}),
);
let response: StatusServiceResponse = client
.call_json(STATUS_SERVICE_NAME, request_id, &request)
.map_err(|err| format!("mission reply to {endpoint} failed: {err}"))?;
validate_response(&response, request_id, STATUS_OP_MISSION_REPLY)?;
let result = response
.op_result
.ok_or_else(|| format!("mission reply response from {endpoint} missing op_result"))?;
if json {
let payload = serde_json::json!({
"api_version": "robotrt.mission.reply.v1",
"source": {
"mode": "remote_service",
"service": STATUS_SERVICE_NAME,
"endpoint": endpoint,
"timeout_ms": timeout_ms,
},
"result": result,
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize mission reply json failed: {err}"))?
);
return Ok(());
}
println!("RobotRT Mission Reply");
println!("endpoint: {endpoint}");
println!("mission: {}", result.get("mission").and_then(serde_json::Value::as_str).unwrap_or("-"));
println!("accepted: {}", result.get("accepted").and_then(serde_json::Value::as_bool).unwrap_or(false));
Ok(())
}
pub fn mission_stream(args: &[String]) -> Result<(), String> {
let mission = first_positional(args).ok_or_else(|| String::from("missing mission name"))?;
let timeout_ms = parse_u64_option(args, "--timeout-ms", 1000)?;
let iterations = parse_usize_option(args, "--iterations", 5)?;
let interval_ms = parse_u64_option(args, "--interval-ms", 500)?;
let max_items = parse_usize_option(args, "--max-items", 8)?.clamp(1, 256);
let endpoint = resolve_runtime_endpoint(
args,
DEFAULT_DAEMON_ENDPOINT,
"mission control requires --endpoint in embedded mode",
)?;
let json = has_flag(args, "--json");
let client = make_udp_service_client(endpoint.clone(), timeout_ms)?;
let mut entries = Vec::new();
for idx in 0..iterations {
let request_id = next_request_id();
let request = build_op_payload_request(
STATUS_OP_MISSION_WATCH,
serde_json::json!({
"mission": mission,
"max_items": max_items,
}),
);
let response: StatusServiceResponse = client
.call_json(STATUS_SERVICE_NAME, request_id, &request)
.map_err(|err| format!("mission stream to {endpoint} failed: {err}"))?;
validate_response(&response, request_id, STATUS_OP_MISSION_WATCH)?;
let result = response
.op_result
.ok_or_else(|| format!("mission stream response from {endpoint} missing op_result"))?;
let Some(batch) = result.get("entries").and_then(serde_json::Value::as_array) else {
return Err(String::from("mission stream response missing entries"));
};
for entry in batch {
entries.push(serde_json::json!({
"sample": idx,
"direction": entry.get("direction").cloned().unwrap_or(serde_json::Value::Null),
"message": entry.get("message").cloned().unwrap_or(serde_json::Value::Null),
"captured_at_unix_nanos": entry.get("captured_at_unix_nanos").cloned().unwrap_or(serde_json::Value::Null),
}));
}
if idx + 1 < iterations {
std::thread::sleep(Duration::from_millis(interval_ms));
}
}
if json {
let payload = serde_json::json!({
"api_version": "robotrt.mission.stream.v1",
"source": {
"mode": "remote_service",
"service": STATUS_SERVICE_NAME,
"endpoint": endpoint,
"timeout_ms": timeout_ms,
"data_plane": "mission_watch",
},
"query": {
"mission": mission,
"iterations": iterations,
"interval_ms": interval_ms,
"max_items": max_items,
},
"entries": entries,
});
println!(
"{}",
serde_json::to_string_pretty(&payload)
.map_err(|err| format!("serialize mission stream json failed: {err}"))?
);
return Ok(());
}
println!("RobotRT Mission Stream");
println!("endpoint: {endpoint}");
println!("mission: {mission}");
println!("iterations: {iterations} interval_ms: {interval_ms}");
for entry in entries {
println!("{}", entry);
}
Ok(())
}