use super::*;
pub(in crate::commands::ops) fn build_fleet_payload(
current_items: &[(StatusSnapshot, OpsSource)],
baseline_items: &[(StatusSnapshot, OpsSource)],
policy: &AlertPolicy,
) -> Result<serde_json::Value, String> {
let current = analyze_fleet(current_items, policy);
let baseline = if baseline_items.is_empty() {
None
} else {
Some(analyze_fleet(baseline_items, policy))
};
let diff = baseline
.as_ref()
.map(|base| build_fleet_diff(¤t.totals, &base.totals));
Ok(serde_json::json!({
"api_version": OPS_FLEET_API_VERSION,
"kind": "ops_fleet",
"captured_at_unix_nanos": current.captured_at_unix_nanos,
"source": {
"current": current.sources,
"baseline": baseline.as_ref().map(|value| value.sources.clone()).unwrap_or_default(),
},
"result": {
"summary": {
"instances": current.totals.instances,
"overall_health": current.overall_health,
"alerts": current.totals.alerts,
"totals": {
"nodes": current.totals.nodes,
"topics": current.totals.topics,
"services": current.totals.services,
"actions": current.totals.actions,
"missions": current.totals.missions,
"plugins_loaded": current.totals.plugins_loaded,
},
"topic_reliable_ack": {
"local": {
"subscribers": current.totals.local_reliable_subscribers,
"acked_subscribers": current.totals.local_reliable_acked_subscribers,
},
"remote": {
"subscribers": current.totals.remote_reliable_subscribers,
"acked_subscribers": current.totals.remote_reliable_acked_subscribers,
},
},
"health_counts": {
"healthy": current.totals.healthy_instances,
"degraded": current.totals.degraded_instances,
"unhealthy": current.totals.unhealthy_instances,
},
},
"hotspots": current.hotspots,
"tasks": current.tasks,
"governance": current.governance,
"diff": diff,
}
}))
}
pub(in crate::commands::ops) struct FleetAnalysis {
totals: FleetTotals,
overall_health: String,
captured_at_unix_nanos: u128,
sources: Vec<serde_json::Value>,
hotspots: serde_json::Value,
tasks: serde_json::Value,
governance: serde_json::Value,
}
pub(in crate::commands::ops) fn analyze_fleet(
items: &[(StatusSnapshot, OpsSource)],
policy: &AlertPolicy,
) -> FleetAnalysis {
let mut totals = FleetTotals {
instances: items.len(),
..FleetTotals::default()
};
let mut captured_at_unix_nanos = 0u128;
let mut sources = Vec::new();
let mut topic_hotspots = Vec::new();
let mut topic_ack_hotspots = Vec::new();
let mut mission_hotspots = Vec::new();
let mut action_hotspots = Vec::new();
let mut health_hotspots = Vec::new();
let mut alert_samples = Vec::new();
let mut mission_state_counts: HashMap<String, usize> = HashMap::new();
let mut action_state_counts: HashMap<String, usize> = HashMap::new();
let mut action_health_counts: HashMap<String, usize> = HashMap::new();
let mut tenant_counts: HashMap<String, usize> = HashMap::new();
let mut namespace_counts: HashMap<String, usize> = HashMap::new();
let mut cross_tenant_edges = Vec::new();
let mut gateway_observed_instances = 0usize;
let mut gateway_policy_route_preferences: HashMap<String, usize> = HashMap::new();
let mut gateway_audit_total = 0usize;
let mut gateway_alerts = Vec::new();
for (snapshot, source) in items {
captured_at_unix_nanos = captured_at_unix_nanos.max(snapshot.captured_at_unix_nanos);
sources.push(source.json.clone());
totals.nodes += snapshot.nodes.len();
totals.topics += snapshot.topics.len();
totals.services += snapshot.services.len();
totals.actions += snapshot.actions.len();
totals.missions += snapshot.missions.len();
totals.plugins_loaded += snapshot
.plugins
.iter()
.filter(|plugin| plugin.loaded)
.count();
let fleet_health = overall_health(snapshot);
match fleet_health.as_str() {
"healthy" => totals.healthy_instances += 1,
"degraded" => totals.degraded_instances += 1,
_ => totals.unhealthy_instances += 1,
}
let alerts = detect_alerts(snapshot, policy);
totals.alerts += alerts.len();
for alert in alerts.into_iter().take(8) {
alert_samples.push(serde_json::json!({
"source": source.label,
"severity": alert.severity,
"component": alert.component,
"message": alert.message,
}));
}
for topic in &snapshot.topics {
totals.local_reliable_subscribers += topic.reliable_local_ack.subscribers;
totals.local_reliable_acked_subscribers += topic.reliable_local_ack.acked_subscribers;
totals.remote_reliable_subscribers += topic.reliable_remote_ack.subscribers;
totals.remote_reliable_acked_subscribers += topic.reliable_remote_ack.acked_subscribers;
let local_coverage = if topic.reliable_local_ack.subscribers == 0 {
100.0
} else {
(topic.reliable_local_ack.acked_subscribers as f64
/ topic.reliable_local_ack.subscribers as f64)
* 100.0
};
let remote_coverage = if topic.reliable_remote_ack.subscribers == 0 {
100.0
} else {
(topic.reliable_remote_ack.acked_subscribers as f64
/ topic.reliable_remote_ack.subscribers as f64)
* 100.0
};
let has_local_gap = topic.reliable_local_ack.subscribers > 0
&& topic.reliable_local_ack.acked_subscribers
< topic.reliable_local_ack.subscribers;
let has_remote_gap = topic.reliable_remote_ack.subscribers > 0
&& topic.reliable_remote_ack.acked_subscribers
< topic.reliable_remote_ack.subscribers;
if has_local_gap || has_remote_gap {
topic_ack_hotspots.push(serde_json::json!({
"source": source.label,
"topic": topic.name,
"lowest_coverage_percent": local_coverage.min(remote_coverage),
"local": {
"subscribers": topic.reliable_local_ack.subscribers,
"acked_subscribers": topic.reliable_local_ack.acked_subscribers,
"coverage_percent": local_coverage,
},
"remote": {
"subscribers": topic.reliable_remote_ack.subscribers,
"acked_subscribers": topic.reliable_remote_ack.acked_subscribers,
"coverage_percent": remote_coverage,
},
}));
}
if topic.max_depth == 0 {
continue;
}
let utilization = (topic.pending as f64 / topic.max_depth as f64) * 100.0;
topic_hotspots.push(serde_json::json!({
"source": source.label,
"topic": topic.name,
"pending": topic.pending,
"max_depth": topic.max_depth,
"utilization_percent": utilization,
}));
}
for mission in &snapshot.missions {
*mission_state_counts
.entry(mission.state.clone())
.or_insert(0) += 1;
if !mission.state.eq_ignore_ascii_case("running")
&& !mission.state.eq_ignore_ascii_case("succeeded")
{
mission_hotspots.push(serde_json::json!({
"source": source.label,
"name": mission.name,
"state": mission.state,
"last_checkpoint": mission.last_checkpoint,
}));
}
}
for action in &snapshot.actions {
let state = action
.current_state
.clone()
.unwrap_or_else(|| "unknown".to_string());
let health = action
.health_state
.clone()
.unwrap_or_else(|| "unknown".to_string());
*action_state_counts.entry(state.clone()).or_insert(0) += 1;
*action_health_counts.entry(health.clone()).or_insert(0) += 1;
let healthy =
health.eq_ignore_ascii_case("active") || health.eq_ignore_ascii_case("completed");
if !healthy {
action_hotspots.push(serde_json::json!({
"source": source.label,
"name": action.name,
"current_state": action.current_state,
"health_state": action.health_state,
"active_goals": action.active_goals,
"last_heartbeat_at_unix_nanos": action.last_heartbeat_at_unix_nanos,
}));
}
}
for item in &snapshot.health {
if !item.status.eq_ignore_ascii_case("healthy") {
health_hotspots.push(serde_json::json!({
"source": source.label,
"component": item.component,
"status": item.status,
"reason": item.reason,
}));
}
}
let mut node_tenant = HashMap::new();
for node in &snapshot.nodes {
let namespace = if node.namespace.is_empty() {
"/".to_string()
} else {
node.namespace.clone()
};
let tenant = tenant_from_namespace(&namespace);
*tenant_counts.entry(tenant.clone()).or_insert(0) += 1;
*namespace_counts.entry(namespace.clone()).or_insert(0) += 1;
node_tenant.insert(node.name.clone(), tenant);
}
for edge in &snapshot.edges {
let from_tenant = node_tenant
.get(&edge.from)
.cloned()
.unwrap_or_else(|| tenant_from_label(&edge.from));
let to_tenant = node_tenant
.get(&edge.to)
.cloned()
.unwrap_or_else(|| tenant_from_label(&edge.to));
if !from_tenant.is_empty() && !to_tenant.is_empty() && from_tenant != to_tenant {
cross_tenant_edges.push(serde_json::json!({
"source": source.label,
"from": edge.from,
"to": edge.to,
"relation": edge.relation,
"from_tenant": from_tenant,
"to_tenant": to_tenant,
}));
}
}
if let Some(gateway_observation) = source.json.get("gateway_observation")
&& !gateway_observation.is_null()
{
gateway_observed_instances += 1;
if let Some(route_preference) = gateway_observation["policy"]["route_preference"].as_str() {
*gateway_policy_route_preferences
.entry(route_preference.to_string())
.or_insert(0) += 1;
}
gateway_audit_total += gateway_observation["audit"]["total"]
.as_u64()
.unwrap_or(0) as usize;
if let Some(items) = gateway_observation["alerts"].as_array() {
for alert in items.iter().take(6) {
gateway_alerts.push(serde_json::json!({
"source": source.label,
"severity": alert["severity"],
"component": alert["component"],
"message": alert["message"],
}));
}
}
}
}
topic_hotspots.sort_by(|left, right| {
right["utilization_percent"]
.as_f64()
.partial_cmp(&left["utilization_percent"].as_f64())
.unwrap_or(std::cmp::Ordering::Equal)
});
topic_ack_hotspots.sort_by(|left, right| {
left["lowest_coverage_percent"]
.as_f64()
.partial_cmp(&right["lowest_coverage_percent"].as_f64())
.unwrap_or(std::cmp::Ordering::Equal)
});
topic_hotspots.truncate(10);
topic_ack_hotspots.truncate(10);
mission_hotspots.truncate(10);
action_hotspots.truncate(10);
health_hotspots.truncate(10);
alert_samples.truncate(20);
cross_tenant_edges.truncate(20);
gateway_alerts.truncate(20);
let overall_health = if totals.unhealthy_instances > 0 {
"unhealthy"
} else if totals.degraded_instances > 0 {
"degraded"
} else {
"healthy"
}
.to_string();
FleetAnalysis {
totals,
overall_health,
captured_at_unix_nanos,
sources,
hotspots: serde_json::json!({
"topics_by_utilization": topic_hotspots,
"topics_low_reliable_ack_coverage": topic_ack_hotspots,
"missions_non_steady": mission_hotspots,
"actions_non_steady": action_hotspots,
"health_non_healthy": health_hotspots,
"alerts": alert_samples,
}),
tasks: serde_json::json!({
"mission_states": mission_state_counts,
"action_states": action_state_counts,
"action_health": action_health_counts,
}),
governance: serde_json::json!({
"tenants": tenant_counts,
"namespaces": namespace_counts,
"cross_tenant_edges": cross_tenant_edges,
"gateway": {
"instances_reporting": gateway_observed_instances,
"policy_route_preferences": gateway_policy_route_preferences,
"audit_event_total": gateway_audit_total,
"alerts": gateway_alerts,
}
}),
}
}
pub(in crate::commands::ops) fn tenant_from_namespace(namespace: &str) -> String {
namespace
.trim_start_matches('/')
.split('/')
.next()
.unwrap_or("")
.to_string()
}
pub(in crate::commands::ops) fn tenant_from_label(label: &str) -> String {
label
.trim_start_matches('/')
.split('/')
.next()
.unwrap_or("")
.to_string()
}
pub(in crate::commands::ops) fn build_fleet_diff(
current: &FleetTotals,
baseline: &FleetTotals,
) -> serde_json::Value {
serde_json::json!({
"instances": count_delta(current.instances, baseline.instances),
"nodes": count_delta(current.nodes, baseline.nodes),
"topics": count_delta(current.topics, baseline.topics),
"services": count_delta(current.services, baseline.services),
"actions": count_delta(current.actions, baseline.actions),
"missions": count_delta(current.missions, baseline.missions),
"plugins_loaded": count_delta(current.plugins_loaded, baseline.plugins_loaded),
"alerts": count_delta(current.alerts, baseline.alerts),
"healthy_instances": count_delta(current.healthy_instances, baseline.healthy_instances),
"degraded_instances": count_delta(current.degraded_instances, baseline.degraded_instances),
"unhealthy_instances": count_delta(current.unhealthy_instances, baseline.unhealthy_instances),
})
}
pub(in crate::commands::ops) fn overall_health(snapshot: &StatusSnapshot) -> String {
let mut has_degraded = false;
for item in &snapshot.health {
match item.status.to_ascii_lowercase().as_str() {
"unhealthy" => return "unhealthy".to_string(),
"degraded" => has_degraded = true,
_ => {}
}
}
if has_degraded {
"degraded".to_string()
} else {
"healthy".to_string()
}
}
pub(in crate::commands::ops) fn detect_alerts(
snapshot: &StatusSnapshot,
policy: &AlertPolicy,
) -> Vec<OpsAlert> {
let mut alerts = Vec::new();
fn ack_coverage_percent(subscribers: usize, acked_subscribers: usize) -> f64 {
if subscribers == 0 {
100.0
} else {
(acked_subscribers as f64 / subscribers as f64) * 100.0
}
}
fn append_ack_coverage_alert(
alerts: &mut Vec<OpsAlert>,
topic_name: &str,
origin: &str,
subscribers: usize,
acked_subscribers: usize,
policy: &AlertPolicy,
) {
if subscribers == 0 {
return;
}
let coverage = ack_coverage_percent(subscribers, acked_subscribers);
let severity = if coverage <= policy.ack_critical_coverage {
Some("critical")
} else if coverage <= policy.ack_warn_coverage {
Some("warning")
} else {
None
};
if let Some(severity) = severity {
alerts.push(OpsAlert {
severity: severity.to_string(),
component: "topic_ack".to_string(),
message: format!(
"{} reliable ack coverage ({origin}) is low ({:.1}%: {}/{})",
topic_name, coverage, acked_subscribers, subscribers
),
});
}
}
for item in &snapshot.health {
let status = item.status.to_ascii_lowercase();
if status == "healthy" {
continue;
}
alerts.push(OpsAlert {
severity: if status == "unhealthy" {
"critical".to_string()
} else {
"warning".to_string()
},
component: item.component.clone(),
message: item
.reason
.clone()
.unwrap_or_else(|| format!("status={}", item.status)),
});
}
for topic in &snapshot.topics {
if topic.max_depth == 0 {
continue;
}
let utilization = (topic.pending as f64 / topic.max_depth as f64) * 100.0;
let severity = if utilization >= policy.topic_critical_utilization {
Some("critical")
} else if utilization >= policy.topic_warn_utilization {
Some("warning")
} else {
None
};
if let Some(severity) = severity {
alerts.push(OpsAlert {
severity: severity.to_string(),
component: "topic".to_string(),
message: format!(
"{} queue utilization is high ({:.1}%: {}/{})",
topic.name, utilization, topic.pending, topic.max_depth
),
});
}
append_ack_coverage_alert(
&mut alerts,
&topic.name,
"local",
topic.reliable_local_ack.subscribers,
topic.reliable_local_ack.acked_subscribers,
policy,
);
append_ack_coverage_alert(
&mut alerts,
&topic.name,
"remote",
topic.reliable_remote_ack.subscribers,
topic.reliable_remote_ack.acked_subscribers,
policy,
);
}
for mission in &snapshot.missions {
let state = mission.state.to_ascii_lowercase();
if state == "failed" {
alerts.push(OpsAlert {
severity: "critical".to_string(),
component: "mission".to_string(),
message: format!("mission {} is failed", mission.name),
});
} else if state == "recovering" {
alerts.push(OpsAlert {
severity: "warning".to_string(),
component: "mission".to_string(),
message: format!("mission {} is recovering", mission.name),
});
}
}
alerts
}