use super::common::*;
use core_types::{Timestamp, TransportDomain};
use data_model::{ControlEnvelope, Packet, PacketHeader, SchemaId, SchemaVersion};
use replay_core::{
file::FilePacketRecorder,
recorder::RecordedEntry,
};
fn write_demo_replay_bag(path: &PathBuf, topic: &str, entries: u64) {
let mut writer = FilePacketRecorder::create(path).expect("create replay bag fixture");
for seq in 1..=entries {
let header = PacketHeader {
version: 1,
domain: TransportDomain::Local,
session_id: None,
stream_id: None,
sequence: seq,
ack: None,
timestamp: Timestamp(seq as u128 * 1_000_000),
schema_id: SchemaId::new("demo.profile"),
schema_version: SchemaVersion(1),
};
let packet = Packet::Control(ControlEnvelope {
header,
label: "demo.profile.sample".to_string(),
payload: vec![seq as u8],
});
let entry = RecordedEntry::new(Timestamp(seq as u128 * 1_000_000), topic, packet);
writer.append(&entry).expect("append replay bag fixture entry");
}
writer.flush().expect("flush replay bag fixture");
}
#[test]
fn ops_snapshot_json_contract_has_stable_shape() {
let status = temp_path("ops-snapshot-contract", "json");
let status_arg = status.to_str().expect("utf-8 path");
let output = run_cli(&[
"ops",
"snapshot",
"--report",
status_arg,
"--refresh-demo",
"--json",
]);
assert!(output.status.success(), "stderr: {}", output.stderr);
let payload: Value = serde_json::from_str(&output.stdout).expect("parse ops snapshot json");
assert_exact_object_keys(
&payload,
&[
"api_version",
"kind",
"captured_at_unix_nanos",
"source",
"result",
],
);
assert_eq!(
payload.get("api_version").and_then(Value::as_str),
Some("robotrt.ops.snapshot.v1")
);
assert_exact_object_keys(
payload.get("result").expect("result object"),
&[
"summary", "runtime", "topology", "missions", "plugins", "alerts",
],
);
assert_exact_object_keys(
payload["result"].get("summary").expect("summary object"),
&[
"overall_health",
"alerts",
"loaded_plugins",
"pending_replay_sessions",
"topic_reliable_ack",
],
);
assert_exact_object_keys(
payload["result"]["summary"]
.get("topic_reliable_ack")
.expect("topic reliable ack summary object"),
&["local", "remote"],
);
}
#[test]
fn ops_console_writes_html_and_returns_json() {
let status = temp_path("ops-console-contract", "json");
let html = temp_path("ops-console", "html");
let status_arg = status.to_str().expect("utf-8 path");
let html_arg = html.to_str().expect("utf-8 path");
let output = run_cli(&[
"ops",
"console",
"--report",
status_arg,
"--refresh-demo",
"--output",
html_arg,
"--json",
]);
assert!(output.status.success(), "stderr: {}", output.stderr);
let payload: Value = serde_json::from_str(&output.stdout).expect("parse ops console json");
assert_exact_object_keys(
&payload,
&[
"api_version",
"kind",
"captured_at_unix_nanos",
"source",
"query",
"result",
],
);
assert_eq!(
payload.get("api_version").and_then(Value::as_str),
Some("robotrt.ops.console.v1")
);
let html_body = fs::read_to_string(&html).expect("read generated ops console html");
assert!(html_body.contains("RobotRT Ops Console"));
assert!(html_body.contains("Topology Graph"));
}
#[test]
fn ops_diff_json_contract_reports_changes() {
let baseline = temp_path("ops-diff-baseline", "json");
let current = temp_path("ops-diff-current", "json");
let baseline_arg = baseline.to_str().expect("utf-8 path");
let current_arg = current.to_str().expect("utf-8 path");
write_demo_status_snapshot(&baseline).expect("write baseline status snapshot");
fs::copy(&baseline, ¤t).expect("copy baseline report to current report");
let mut current_json: Value =
serde_json::from_str(&fs::read_to_string(¤t).expect("read current report"))
.expect("parse current report");
if let Some(missions) = current_json
.get_mut("missions")
.and_then(Value::as_array_mut)
&& let Some(first) = missions.first_mut()
{
first["state"] = Value::String("failed".to_string());
}
if let Some(plugins) = current_json
.get_mut("plugins")
.and_then(Value::as_array_mut)
&& let Some(first) = plugins.first_mut()
{
first["loaded"] = Value::Bool(false);
}
fs::write(
¤t,
serde_json::to_string_pretty(¤t_json).expect("serialize current report"),
)
.expect("write mutated current report");
let output = run_cli(&[
"ops",
"diff",
"--report",
current_arg,
"--baseline-report",
baseline_arg,
"--json",
]);
assert!(output.status.success(), "stderr: {}", output.stderr);
let payload: Value = serde_json::from_str(&output.stdout).expect("parse ops diff json");
assert_exact_object_keys(
&payload,
&[
"api_version",
"kind",
"captured_at_unix_nanos",
"source",
"result",
],
);
assert_eq!(
payload.get("api_version").and_then(Value::as_str),
Some("robotrt.ops.diff.v1")
);
assert_exact_object_keys(
payload.get("result").expect("result object"),
&[
"summary", "delta", "topology", "missions", "plugins", "health",
],
);
let changed_missions = payload["result"]["missions"]["changed"]
.as_array()
.expect("changed missions array");
assert!(
!changed_missions.is_empty(),
"expected mission state changes"
);
}
#[test]
fn ops_diff_writes_report_file_when_output_is_set() {
let baseline = temp_path("ops-diff-output-baseline", "json");
let current = temp_path("ops-diff-output-current", "json");
let output_report = temp_path("ops-diff-output-report", "json");
let baseline_arg = baseline.to_str().expect("utf-8 path");
let current_arg = current.to_str().expect("utf-8 path");
let output_arg = output_report.to_str().expect("utf-8 path");
write_demo_status_snapshot(&baseline).expect("write baseline status snapshot");
fs::copy(&baseline, ¤t).expect("copy baseline to current");
let output = run_cli(&[
"ops",
"diff",
"--report",
current_arg,
"--baseline-report",
baseline_arg,
"--output",
output_arg,
"--json",
]);
assert!(output.status.success(), "stderr: {}", output.stderr);
let report_body = fs::read_to_string(&output_report).expect("read ops diff output report");
let report_payload: Value =
serde_json::from_str(&report_body).expect("parse ops diff output report json");
assert_eq!(
report_payload.get("api_version").and_then(Value::as_str),
Some("robotrt.ops.diff.v1")
);
assert_eq!(
report_payload.get("kind").and_then(Value::as_str),
Some("ops_diff")
);
}
#[test]
fn ops_snapshot_rejects_invalid_alert_thresholds() {
let output = run_cli(&[
"ops",
"snapshot",
"--topic-warn-utilization",
"90",
"--topic-critical-utilization",
"80",
"--json",
]);
assert!(!output.status.success(), "stdout: {}", output.stdout);
assert!(
output
.stderr
.contains("--topic-warn-utilization must be <= --topic-critical-utilization"),
"stderr: {}",
output.stderr
);
}
#[test]
fn ops_fleet_rejects_invalid_ack_alert_thresholds() {
let output = run_cli(&[
"ops",
"fleet",
"--ack-warn-coverage",
"40",
"--ack-critical-coverage",
"50",
"--json",
]);
assert!(!output.status.success(), "stdout: {}", output.stdout);
assert!(
output
.stderr
.contains("--ack-warn-coverage must be >= --ack-critical-coverage"),
"stderr: {}",
output.stderr
);
}
#[test]
fn ops_snapshot_policy_file_overrides_thresholds() {
let status = temp_path("ops-policy-status", "json");
let policy = temp_path("ops-policy", "json");
let status_arg = status.to_str().expect("utf-8 path");
let policy_arg = policy.to_str().expect("utf-8 path");
let seed = run_cli(&[
"ops",
"snapshot",
"--report",
status_arg,
"--refresh-demo",
"--json",
]);
assert!(seed.status.success(), "stderr: {}", seed.stderr);
let mut status_json: Value =
serde_json::from_str(&fs::read_to_string(&status).expect("read status snapshot file"))
.expect("parse status snapshot file");
let topics = status_json
.get_mut("topics")
.and_then(Value::as_array_mut)
.expect("topics array in status snapshot");
assert!(!topics.is_empty(), "demo status must contain topics");
for topic in topics {
topic["pending"] = Value::from(75u64);
topic["max_depth"] = Value::from(100u64);
}
fs::write(
&status,
serde_json::to_string_pretty(&status_json).expect("serialize updated status snapshot"),
)
.expect("write updated status snapshot");
let baseline_output = run_cli(&["ops", "snapshot", "--report", status_arg, "--json"]);
assert!(
baseline_output.status.success(),
"stderr: {}",
baseline_output.stderr
);
let baseline_payload: Value =
serde_json::from_str(&baseline_output.stdout).expect("parse baseline ops snapshot json");
let baseline_alerts = baseline_payload["result"]["alerts"]
.as_array()
.expect("baseline alerts array")
.len();
let policy_payload = serde_json::json!({
"api_version": "robotrt.ops.alert-policy.v1",
"topic_warn_utilization": 70.0,
"topic_critical_utilization": 95.0
});
fs::write(
&policy,
serde_json::to_string_pretty(&policy_payload).expect("serialize policy json"),
)
.expect("write policy file");
let policy_output = run_cli(&[
"ops", "snapshot", "--report", status_arg, "--policy", policy_arg, "--json",
]);
assert!(
policy_output.status.success(),
"stderr: {}",
policy_output.stderr
);
let policy_snapshot: Value =
serde_json::from_str(&policy_output.stdout).expect("parse policy ops snapshot json");
let policy_alerts = policy_snapshot["result"]["alerts"]
.as_array()
.expect("policy alerts array")
.len();
assert!(
policy_alerts > baseline_alerts,
"policy thresholds should increase alert count (baseline={baseline_alerts}, policy={policy_alerts})"
);
}
#[test]
fn ops_fleet_policy_file_overrides_ack_thresholds() {
let report = temp_path("ops-ack-policy-report", "json");
let policy = temp_path("ops-ack-policy", "json");
let report_arg = report.to_str().expect("utf-8 path");
let policy_arg = policy.to_str().expect("utf-8 path");
write_demo_status_snapshot(&report).expect("write status fixture");
let mut status_json: Value =
serde_json::from_str(&fs::read_to_string(&report).expect("read status fixture"))
.expect("parse status fixture");
let topics = status_json
.get_mut("topics")
.and_then(Value::as_array_mut)
.expect("topics array");
assert!(!topics.is_empty(), "status fixture must have topics");
if let Some(first) = topics.first_mut() {
first["reliable_local_ack"] = serde_json::json!({
"subscribers": 10,
"acked_subscribers": 9,
"min_acked_seq": 1,
"max_acked_seq": 9,
});
first["reliable_remote_ack"] = serde_json::json!({
"subscribers": 0,
"acked_subscribers": 0,
"min_acked_seq": null,
"max_acked_seq": null,
});
}
fs::write(
&report,
serde_json::to_string_pretty(&status_json).expect("serialize status fixture"),
)
.expect("write updated status fixture");
let baseline_output = run_cli(&["ops", "fleet", "--reports", report_arg, "--json"]);
assert!(
baseline_output.status.success(),
"stderr: {}",
baseline_output.stderr
);
let baseline_payload: Value =
serde_json::from_str(&baseline_output.stdout).expect("parse baseline fleet payload");
let baseline_topic_ack_alerts = baseline_payload["result"]["hotspots"]["alerts"]
.as_array()
.expect("baseline alerts array")
.iter()
.filter(|item| item["component"].as_str() == Some("topic_ack"))
.count();
let policy_payload = serde_json::json!({
"api_version": "robotrt.ops.alert-policy.v1",
"ack_warn_coverage": 95.0,
"ack_critical_coverage": 92.0
});
fs::write(
&policy,
serde_json::to_string_pretty(&policy_payload).expect("serialize policy json"),
)
.expect("write policy file");
let policy_output = run_cli(&[
"ops",
"fleet",
"--reports",
report_arg,
"--policy",
policy_arg,
"--json",
]);
assert!(
policy_output.status.success(),
"stderr: {}",
policy_output.stderr
);
let policy_payload: Value =
serde_json::from_str(&policy_output.stdout).expect("parse policy fleet payload");
let policy_topic_ack_alerts = policy_payload["result"]["hotspots"]["alerts"]
.as_array()
.expect("policy alerts array")
.iter()
.filter(|item| item["component"].as_str() == Some("topic_ack"))
.count();
assert!(
policy_topic_ack_alerts > baseline_topic_ack_alerts,
"policy thresholds should increase topic_ack alerts (baseline={baseline_topic_ack_alerts}, policy={policy_topic_ack_alerts})"
);
}
#[test]
fn ops_replay_json_contract_from_bag() {
let bag = temp_path("ops-replay-bag", "rrbag");
let bag_arg = bag.to_str().expect("utf-8 path");
write_demo_replay_bag(&bag, "/demo/ops-replay", 6);
let replay = run_cli(&["ops", "replay", "--input", bag_arg, "--json"]);
assert!(replay.status.success(), "stderr: {}", replay.stderr);
let payload: Value = serde_json::from_str(&replay.stdout).expect("parse ops replay json");
assert_exact_object_keys(
&payload,
&["api_version", "kind", "source", "query", "result"],
);
assert_eq!(
payload.get("api_version").and_then(Value::as_str),
Some("robotrt.ops.replay.v1")
);
assert_eq!(
payload["result"]["total_entries"].as_u64(),
Some(6),
"ops replay should report full bag entry count"
);
}
#[test]
fn ops_fleet_json_contract_reports_hotspots_and_diff() {
let current_a = temp_path("ops-fleet-current-a", "json");
let current_b = temp_path("ops-fleet-current-b", "json");
let baseline_a = temp_path("ops-fleet-baseline-a", "json");
write_demo_status_snapshot(¤t_a).expect("write current_a status snapshot");
fs::copy(¤t_a, ¤t_b).expect("copy current report");
fs::copy(¤t_a, &baseline_a).expect("copy baseline report");
let mut current_b_json: Value =
serde_json::from_str(&fs::read_to_string(¤t_b).expect("read current_b"))
.expect("parse current_b");
if let Some(topics) = current_b_json
.get_mut("topics")
.and_then(Value::as_array_mut)
&& let Some(first) = topics.first_mut()
{
first["pending"] = Value::from(15u64);
first["max_depth"] = Value::from(16u64);
first["reliable_local_ack"] = serde_json::json!({
"subscribers": 2,
"acked_subscribers": 1,
"min_acked_seq": 3,
"max_acked_seq": 4,
});
first["reliable_remote_ack"] = serde_json::json!({
"subscribers": 1,
"acked_subscribers": 0,
"min_acked_seq": null,
"max_acked_seq": null,
});
}
fs::write(
¤t_b,
serde_json::to_string_pretty(¤t_b_json).expect("serialize current_b"),
)
.expect("write current_b");
let reports = format!(
"{},{}",
current_a.to_str().expect("utf-8 path"),
current_b.to_str().expect("utf-8 path")
);
let baseline_reports = baseline_a.to_str().expect("utf-8 path");
let output = run_cli(&[
"ops",
"fleet",
"--reports",
reports.as_str(),
"--baseline-reports",
baseline_reports,
"--json",
]);
assert!(output.status.success(), "stderr: {}", output.stderr);
let payload: Value = serde_json::from_str(&output.stdout).expect("parse ops fleet json");
assert_eq!(
payload.get("api_version").and_then(Value::as_str),
Some("robotrt.ops.fleet.v1")
);
assert_exact_object_keys(
payload.get("result").expect("result object"),
&["summary", "hotspots", "tasks", "governance", "diff"],
);
assert_exact_object_keys(
payload["result"].get("summary").expect("summary object"),
&["instances", "overall_health", "alerts", "totals", "topic_reliable_ack", "health_counts"],
);
assert_exact_object_keys(
payload["result"]["summary"]
.get("topic_reliable_ack")
.expect("topic_reliable_ack object"),
&["local", "remote"],
);
assert!(
!payload["result"]["hotspots"]["topics_by_utilization"]
.as_array()
.expect("topics_by_utilization array")
.is_empty()
);
assert!(
!payload["result"]["hotspots"]["topics_low_reliable_ack_coverage"]
.as_array()
.expect("topics_low_reliable_ack_coverage array")
.is_empty()
);
}
#[test]
fn ops_profile_json_contract_supports_replay_and_obs_export() {
let report = temp_path("ops-profile-report", "json");
let bag = temp_path("ops-profile-bag", "rrbag");
write_demo_status_snapshot(&report).expect("write ops profile status fixture");
write_demo_replay_bag(&bag, "/demo/profile", 4);
let output = run_cli(&[
"ops",
"profile",
"--reports",
report.to_str().expect("utf-8 path"),
"--input-bag",
bag.to_str().expect("utf-8 path"),
"--obs-format",
"both",
"--json",
]);
assert!(output.status.success(), "stderr: {}", output.stderr);
let payload: Value = serde_json::from_str(&output.stdout).expect("parse ops profile json");
assert_eq!(
payload.get("api_version").and_then(Value::as_str),
Some("robotrt.ops.profile.v1")
);
assert_exact_object_keys(
payload.get("result").expect("result object"),
&["fleet", "tasks", "replay", "obs_export"],
);
assert_eq!(
payload["result"]["replay"]["api_version"].as_str(),
Some("robotrt.ops.replay.v1")
);
assert_eq!(
payload["result"]["obs_export"]["format"].as_str(),
Some("both")
);
}
#[test]
fn ops_fleet_includes_gateway_observation_from_remote_endpoint() {
let endpoint = allocate_udp_endpoint();
let mut server = Command::new(env!("CARGO_BIN_EXE_robotrt-cli"))
.args([
"gateway",
"serve",
"--bind",
endpoint.as_str(),
"--source",
"demo",
])
.stdout(Stdio::null())
.stderr(Stdio::piped())
.spawn()
.expect("spawn robotrt-cli gateway serve");
let mut ready = false;
let mut last_probe_stderr = String::new();
for _ in 0..20 {
let probe = run_cli(&[
"gateway",
"observe",
"--endpoint",
endpoint.as_str(),
"--timeout-ms",
"250",
"--json",
]);
if probe.status.success() {
ready = true;
break;
}
last_probe_stderr = probe.stderr;
std::thread::sleep(Duration::from_millis(50));
}
assert!(
ready,
"gateway serve did not become ready for endpoint {}: {}",
endpoint,
last_probe_stderr
);
let output = run_cli(&[
"ops",
"fleet",
"--endpoints",
endpoint.as_str(),
"--timeout-ms",
"3000",
"--json",
]);
assert!(output.status.success(), "stderr: {}", output.stderr);
let payload: Value = serde_json::from_str(&output.stdout).expect("parse ops fleet json");
assert_eq!(
payload["result"]["governance"]["gateway"]["instances_reporting"].as_u64(),
Some(1)
);
let shutdown = run_cli(&[
"gateway",
"shutdown",
"--endpoint",
endpoint.as_str(),
"--json",
]);
assert!(shutdown.status.success(), "stderr: {}", shutdown.stderr);
let status = server.wait().expect("wait gateway serve child");
assert!(status.success(), "gateway serve child exited with {status:?}");
}
#[test]
fn ops_profile_template_applies_defaults_and_metadata() {
let report = temp_path("ops-profile-template-report", "json");
let seed = run_cli(&[
"ops",
"snapshot",
"--report",
report.to_str().expect("utf-8 path"),
"--refresh-demo",
"--json",
]);
assert!(seed.status.success(), "stderr: {}", seed.stderr);
let output = run_cli(&[
"ops",
"profile",
"--reports",
report.to_str().expect("utf-8 path"),
"--profile-template",
"task-stall",
"--json",
]);
assert!(output.status.success(), "stderr: {}", output.stderr);
let payload: Value =
serde_json::from_str(&output.stdout).expect("parse ops profile template json");
assert_eq!(
payload["query"]["profile_template"].as_str(),
Some("task-stall")
);
assert_eq!(payload["query"]["obs_format"].as_str(), Some("otel"));
assert_eq!(
payload["result"]["obs_export"]["format"].as_str(),
Some("otel")
);
assert_eq!(
payload["result"]["profile_template"]["name"].as_str(),
Some("task-stall")
);
assert_eq!(
payload["result"]["profile_template"]["applied_defaults"]["obs_format_from_template"]
.as_bool(),
Some(true)
);
}
#[test]
fn ops_profile_queue_backlog_template_applies_ack_threshold_defaults() {
let report = temp_path("ops-profile-queue-backlog-ack", "json");
let report_arg = report.to_str().expect("utf-8 path");
write_demo_status_snapshot(&report).expect("write status fixture");
let mut status_json: Value =
serde_json::from_str(&fs::read_to_string(&report).expect("read status fixture"))
.expect("parse status fixture");
let topics = status_json
.get_mut("topics")
.and_then(Value::as_array_mut)
.expect("topics array");
assert!(!topics.is_empty(), "status fixture must have topics");
if let Some(first) = topics.first_mut() {
first["pending"] = Value::from(10u64);
first["max_depth"] = Value::from(100u64);
first["reliable_local_ack"] = serde_json::json!({
"subscribers": 10,
"acked_subscribers": 9,
"min_acked_seq": 1,
"max_acked_seq": 9,
});
first["reliable_remote_ack"] = serde_json::json!({
"subscribers": 0,
"acked_subscribers": 0,
"min_acked_seq": null,
"max_acked_seq": null,
});
}
fs::write(
&report,
serde_json::to_string_pretty(&status_json).expect("serialize status fixture"),
)
.expect("write updated status fixture");
let baseline = run_cli(&["ops", "profile", "--reports", report_arg, "--json"]);
assert!(baseline.status.success(), "stderr: {}", baseline.stderr);
let baseline_payload: Value =
serde_json::from_str(&baseline.stdout).expect("parse baseline profile payload");
let baseline_topic_ack_alerts = baseline_payload["result"]["fleet"]["hotspots"]["alerts"]
.as_array()
.expect("baseline alerts array")
.iter()
.filter(|item| item["component"].as_str() == Some("topic_ack"))
.count();
let templated = run_cli(&[
"ops",
"profile",
"--reports",
report_arg,
"--profile-template",
"queue-backlog",
"--json",
]);
assert!(templated.status.success(), "stderr: {}", templated.stderr);
let templated_payload: Value =
serde_json::from_str(&templated.stdout).expect("parse templated profile payload");
let templated_topic_ack_alerts = templated_payload["result"]["fleet"]["hotspots"]["alerts"]
.as_array()
.expect("templated alerts array")
.iter()
.filter(|item| item["component"].as_str() == Some("topic_ack"))
.count();
assert!(
templated_topic_ack_alerts > baseline_topic_ack_alerts,
"queue-backlog template should increase topic_ack alerts (baseline={baseline_topic_ack_alerts}, templated={templated_topic_ack_alerts})"
);
assert_eq!(
templated_payload["result"]["profile_template"]["applied_defaults"]
["policy_thresholds_from_template"]
.as_bool(),
Some(true)
);
}