use super::*;
pub(in crate::commands::ops) fn build_fleet_payload(
current_items: &[(StatusSnapshot, OpsSource)],
baseline_items: &[(StatusSnapshot, OpsSource)],
policy: &AlertPolicy,
) -> Result<serde_json::Value, String> {
let current = analyze_fleet(current_items, policy);
let baseline = if baseline_items.is_empty() {
None
} else {
Some(analyze_fleet(baseline_items, policy))
};
let diff = baseline
.as_ref()
.map(|base| build_fleet_diff(¤t.totals, &base.totals));
Ok(serde_json::json!({
"api_version": OPS_FLEET_API_VERSION,
"kind": "ops_fleet",
"captured_at_unix_nanos": current.captured_at_unix_nanos,
"source": {
"current": current.sources,
"baseline": baseline.as_ref().map(|value| value.sources.clone()).unwrap_or_default(),
},
"result": {
"summary": {
"instances": current.totals.instances,
"overall_health": current.overall_health,
"alerts": current.totals.alerts,
"totals": {
"nodes": current.totals.nodes,
"topics": current.totals.topics,
"services": current.totals.services,
"actions": current.totals.actions,
"missions": current.totals.missions,
"plugins_loaded": current.totals.plugins_loaded,
},
"health_counts": {
"healthy": current.totals.healthy_instances,
"degraded": current.totals.degraded_instances,
"unhealthy": current.totals.unhealthy_instances,
},
},
"hotspots": current.hotspots,
"tasks": current.tasks,
"governance": current.governance,
"diff": diff,
}
}))
}
pub(in crate::commands::ops) struct FleetAnalysis {
totals: FleetTotals,
overall_health: String,
captured_at_unix_nanos: u128,
sources: Vec<serde_json::Value>,
hotspots: serde_json::Value,
tasks: serde_json::Value,
governance: serde_json::Value,
}
pub(in crate::commands::ops) fn analyze_fleet(
items: &[(StatusSnapshot, OpsSource)],
policy: &AlertPolicy,
) -> FleetAnalysis {
let mut totals = FleetTotals {
instances: items.len(),
..FleetTotals::default()
};
let mut captured_at_unix_nanos = 0u128;
let mut sources = Vec::new();
let mut topic_hotspots = Vec::new();
let mut mission_hotspots = Vec::new();
let mut action_hotspots = Vec::new();
let mut health_hotspots = Vec::new();
let mut alert_samples = Vec::new();
let mut mission_state_counts: HashMap<String, usize> = HashMap::new();
let mut action_state_counts: HashMap<String, usize> = HashMap::new();
let mut action_health_counts: HashMap<String, usize> = HashMap::new();
let mut tenant_counts: HashMap<String, usize> = HashMap::new();
let mut namespace_counts: HashMap<String, usize> = HashMap::new();
let mut cross_tenant_edges = Vec::new();
for (snapshot, source) in items {
captured_at_unix_nanos = captured_at_unix_nanos.max(snapshot.captured_at_unix_nanos);
sources.push(source.json.clone());
totals.nodes += snapshot.nodes.len();
totals.topics += snapshot.topics.len();
totals.services += snapshot.services.len();
totals.actions += snapshot.actions.len();
totals.missions += snapshot.missions.len();
totals.plugins_loaded += snapshot
.plugins
.iter()
.filter(|plugin| plugin.loaded)
.count();
let fleet_health = overall_health(snapshot);
match fleet_health.as_str() {
"healthy" => totals.healthy_instances += 1,
"degraded" => totals.degraded_instances += 1,
_ => totals.unhealthy_instances += 1,
}
let alerts = detect_alerts(snapshot, policy);
totals.alerts += alerts.len();
for alert in alerts.into_iter().take(8) {
alert_samples.push(serde_json::json!({
"source": source.label,
"severity": alert.severity,
"component": alert.component,
"message": alert.message,
}));
}
for topic in &snapshot.topics {
if topic.max_depth == 0 {
continue;
}
let utilization = (topic.pending as f64 / topic.max_depth as f64) * 100.0;
topic_hotspots.push(serde_json::json!({
"source": source.label,
"topic": topic.name,
"pending": topic.pending,
"max_depth": topic.max_depth,
"utilization_percent": utilization,
}));
}
for mission in &snapshot.missions {
*mission_state_counts
.entry(mission.state.clone())
.or_insert(0) += 1;
if !mission.state.eq_ignore_ascii_case("running")
&& !mission.state.eq_ignore_ascii_case("succeeded")
{
mission_hotspots.push(serde_json::json!({
"source": source.label,
"name": mission.name,
"state": mission.state,
"last_checkpoint": mission.last_checkpoint,
}));
}
}
for action in &snapshot.actions {
let state = action
.current_state
.clone()
.unwrap_or_else(|| "unknown".to_string());
let health = action
.health_state
.clone()
.unwrap_or_else(|| "unknown".to_string());
*action_state_counts.entry(state.clone()).or_insert(0) += 1;
*action_health_counts.entry(health.clone()).or_insert(0) += 1;
let healthy =
health.eq_ignore_ascii_case("active") || health.eq_ignore_ascii_case("completed");
if !healthy {
action_hotspots.push(serde_json::json!({
"source": source.label,
"name": action.name,
"current_state": action.current_state,
"health_state": action.health_state,
"active_goals": action.active_goals,
"last_heartbeat_at_unix_nanos": action.last_heartbeat_at_unix_nanos,
}));
}
}
for item in &snapshot.health {
if !item.status.eq_ignore_ascii_case("healthy") {
health_hotspots.push(serde_json::json!({
"source": source.label,
"component": item.component,
"status": item.status,
"reason": item.reason,
}));
}
}
let mut node_tenant = HashMap::new();
for node in &snapshot.nodes {
let namespace = if node.namespace.is_empty() {
"/".to_string()
} else {
node.namespace.clone()
};
let tenant = tenant_from_namespace(&namespace);
*tenant_counts.entry(tenant.clone()).or_insert(0) += 1;
*namespace_counts.entry(namespace.clone()).or_insert(0) += 1;
node_tenant.insert(node.name.clone(), tenant);
}
for edge in &snapshot.edges {
let from_tenant = node_tenant
.get(&edge.from)
.cloned()
.unwrap_or_else(|| tenant_from_label(&edge.from));
let to_tenant = node_tenant
.get(&edge.to)
.cloned()
.unwrap_or_else(|| tenant_from_label(&edge.to));
if !from_tenant.is_empty() && !to_tenant.is_empty() && from_tenant != to_tenant {
cross_tenant_edges.push(serde_json::json!({
"source": source.label,
"from": edge.from,
"to": edge.to,
"relation": edge.relation,
"from_tenant": from_tenant,
"to_tenant": to_tenant,
}));
}
}
}
topic_hotspots.sort_by(|left, right| {
right["utilization_percent"]
.as_f64()
.partial_cmp(&left["utilization_percent"].as_f64())
.unwrap_or(std::cmp::Ordering::Equal)
});
topic_hotspots.truncate(10);
mission_hotspots.truncate(10);
action_hotspots.truncate(10);
health_hotspots.truncate(10);
alert_samples.truncate(20);
cross_tenant_edges.truncate(20);
let overall_health = if totals.unhealthy_instances > 0 {
"unhealthy"
} else if totals.degraded_instances > 0 {
"degraded"
} else {
"healthy"
}
.to_string();
FleetAnalysis {
totals,
overall_health,
captured_at_unix_nanos,
sources,
hotspots: serde_json::json!({
"topics_by_utilization": topic_hotspots,
"missions_non_steady": mission_hotspots,
"actions_non_steady": action_hotspots,
"health_non_healthy": health_hotspots,
"alerts": alert_samples,
}),
tasks: serde_json::json!({
"mission_states": mission_state_counts,
"action_states": action_state_counts,
"action_health": action_health_counts,
}),
governance: serde_json::json!({
"tenants": tenant_counts,
"namespaces": namespace_counts,
"cross_tenant_edges": cross_tenant_edges,
}),
}
}
pub(in crate::commands::ops) fn tenant_from_namespace(namespace: &str) -> String {
namespace
.trim_start_matches('/')
.split('/')
.next()
.unwrap_or("")
.to_string()
}
pub(in crate::commands::ops) fn tenant_from_label(label: &str) -> String {
label
.trim_start_matches('/')
.split('/')
.next()
.unwrap_or("")
.to_string()
}
pub(in crate::commands::ops) fn build_fleet_diff(
current: &FleetTotals,
baseline: &FleetTotals,
) -> serde_json::Value {
serde_json::json!({
"instances": count_delta(current.instances, baseline.instances),
"nodes": count_delta(current.nodes, baseline.nodes),
"topics": count_delta(current.topics, baseline.topics),
"services": count_delta(current.services, baseline.services),
"actions": count_delta(current.actions, baseline.actions),
"missions": count_delta(current.missions, baseline.missions),
"plugins_loaded": count_delta(current.plugins_loaded, baseline.plugins_loaded),
"alerts": count_delta(current.alerts, baseline.alerts),
"healthy_instances": count_delta(current.healthy_instances, baseline.healthy_instances),
"degraded_instances": count_delta(current.degraded_instances, baseline.degraded_instances),
"unhealthy_instances": count_delta(current.unhealthy_instances, baseline.unhealthy_instances),
})
}
pub(in crate::commands::ops) fn overall_health(snapshot: &StatusSnapshot) -> String {
let mut has_degraded = false;
for item in &snapshot.health {
match item.status.to_ascii_lowercase().as_str() {
"unhealthy" => return "unhealthy".to_string(),
"degraded" => has_degraded = true,
_ => {}
}
}
if has_degraded {
"degraded".to_string()
} else {
"healthy".to_string()
}
}
pub(in crate::commands::ops) fn detect_alerts(
snapshot: &StatusSnapshot,
policy: &AlertPolicy,
) -> Vec<OpsAlert> {
let mut alerts = Vec::new();
for item in &snapshot.health {
let status = item.status.to_ascii_lowercase();
if status == "healthy" {
continue;
}
alerts.push(OpsAlert {
severity: if status == "unhealthy" {
"critical".to_string()
} else {
"warning".to_string()
},
component: item.component.clone(),
message: item
.reason
.clone()
.unwrap_or_else(|| format!("status={}", item.status)),
});
}
for topic in &snapshot.topics {
if topic.max_depth == 0 {
continue;
}
let utilization = (topic.pending as f64 / topic.max_depth as f64) * 100.0;
let severity = if utilization >= policy.topic_critical_utilization {
Some("critical")
} else if utilization >= policy.topic_warn_utilization {
Some("warning")
} else {
None
};
if let Some(severity) = severity {
alerts.push(OpsAlert {
severity: severity.to_string(),
component: "topic".to_string(),
message: format!(
"{} queue utilization is high ({:.1}%: {}/{})",
topic.name, utilization, topic.pending, topic.max_depth
),
});
}
}
for mission in &snapshot.missions {
let state = mission.state.to_ascii_lowercase();
if state == "failed" {
alerts.push(OpsAlert {
severity: "critical".to_string(),
component: "mission".to_string(),
message: format!("mission {} is failed", mission.name),
});
} else if state == "recovering" {
alerts.push(OpsAlert {
severity: "warning".to_string(),
component: "mission".to_string(),
message: format!("mission {} is recovering", mission.name),
});
}
}
alerts
}