robotrt-cli 0.1.0-beta.1

RobotRT modular robotics runtime and middleware components.
use super::*;

pub(in crate::commands::ops) fn build_fleet_payload(
    current_items: &[(StatusSnapshot, OpsSource)],
    baseline_items: &[(StatusSnapshot, OpsSource)],
    policy: &AlertPolicy,
) -> Result<serde_json::Value, String> {
    let current = analyze_fleet(current_items, policy);
    let baseline = if baseline_items.is_empty() {
        None
    } else {
        Some(analyze_fleet(baseline_items, policy))
    };

    let diff = baseline
        .as_ref()
        .map(|base| build_fleet_diff(&current.totals, &base.totals));

    Ok(serde_json::json!({
        "api_version": OPS_FLEET_API_VERSION,
        "kind": "ops_fleet",
        "captured_at_unix_nanos": current.captured_at_unix_nanos,
        "source": {
            "current": current.sources,
            "baseline": baseline.as_ref().map(|value| value.sources.clone()).unwrap_or_default(),
        },
        "result": {
            "summary": {
                "instances": current.totals.instances,
                "overall_health": current.overall_health,
                "alerts": current.totals.alerts,
                "totals": {
                    "nodes": current.totals.nodes,
                    "topics": current.totals.topics,
                    "services": current.totals.services,
                    "actions": current.totals.actions,
                    "missions": current.totals.missions,
                    "plugins_loaded": current.totals.plugins_loaded,
                },
                "health_counts": {
                    "healthy": current.totals.healthy_instances,
                    "degraded": current.totals.degraded_instances,
                    "unhealthy": current.totals.unhealthy_instances,
                },
            },
            "hotspots": current.hotspots,
            "tasks": current.tasks,
            "governance": current.governance,
            "diff": diff,
        }
    }))
}

pub(in crate::commands::ops) struct FleetAnalysis {
    totals: FleetTotals,
    overall_health: String,
    captured_at_unix_nanos: u128,
    sources: Vec<serde_json::Value>,
    hotspots: serde_json::Value,
    tasks: serde_json::Value,
    governance: serde_json::Value,
}

pub(in crate::commands::ops) fn analyze_fleet(
    items: &[(StatusSnapshot, OpsSource)],
    policy: &AlertPolicy,
) -> FleetAnalysis {
    let mut totals = FleetTotals {
        instances: items.len(),
        ..FleetTotals::default()
    };

    let mut captured_at_unix_nanos = 0u128;
    let mut sources = Vec::new();

    let mut topic_hotspots = Vec::new();
    let mut mission_hotspots = Vec::new();
    let mut action_hotspots = Vec::new();
    let mut health_hotspots = Vec::new();
    let mut alert_samples = Vec::new();

    let mut mission_state_counts: HashMap<String, usize> = HashMap::new();
    let mut action_state_counts: HashMap<String, usize> = HashMap::new();
    let mut action_health_counts: HashMap<String, usize> = HashMap::new();

    let mut tenant_counts: HashMap<String, usize> = HashMap::new();
    let mut namespace_counts: HashMap<String, usize> = HashMap::new();
    let mut cross_tenant_edges = Vec::new();

    for (snapshot, source) in items {
        captured_at_unix_nanos = captured_at_unix_nanos.max(snapshot.captured_at_unix_nanos);
        sources.push(source.json.clone());

        totals.nodes += snapshot.nodes.len();
        totals.topics += snapshot.topics.len();
        totals.services += snapshot.services.len();
        totals.actions += snapshot.actions.len();
        totals.missions += snapshot.missions.len();
        totals.plugins_loaded += snapshot
            .plugins
            .iter()
            .filter(|plugin| plugin.loaded)
            .count();

        let fleet_health = overall_health(snapshot);
        match fleet_health.as_str() {
            "healthy" => totals.healthy_instances += 1,
            "degraded" => totals.degraded_instances += 1,
            _ => totals.unhealthy_instances += 1,
        }

        let alerts = detect_alerts(snapshot, policy);
        totals.alerts += alerts.len();
        for alert in alerts.into_iter().take(8) {
            alert_samples.push(serde_json::json!({
                "source": source.label,
                "severity": alert.severity,
                "component": alert.component,
                "message": alert.message,
            }));
        }

        for topic in &snapshot.topics {
            if topic.max_depth == 0 {
                continue;
            }
            let utilization = (topic.pending as f64 / topic.max_depth as f64) * 100.0;
            topic_hotspots.push(serde_json::json!({
                "source": source.label,
                "topic": topic.name,
                "pending": topic.pending,
                "max_depth": topic.max_depth,
                "utilization_percent": utilization,
            }));
        }

        for mission in &snapshot.missions {
            *mission_state_counts
                .entry(mission.state.clone())
                .or_insert(0) += 1;
            if !mission.state.eq_ignore_ascii_case("running")
                && !mission.state.eq_ignore_ascii_case("succeeded")
            {
                mission_hotspots.push(serde_json::json!({
                    "source": source.label,
                    "name": mission.name,
                    "state": mission.state,
                    "last_checkpoint": mission.last_checkpoint,
                }));
            }
        }

        for action in &snapshot.actions {
            let state = action
                .current_state
                .clone()
                .unwrap_or_else(|| "unknown".to_string());
            let health = action
                .health_state
                .clone()
                .unwrap_or_else(|| "unknown".to_string());
            *action_state_counts.entry(state.clone()).or_insert(0) += 1;
            *action_health_counts.entry(health.clone()).or_insert(0) += 1;

            let healthy =
                health.eq_ignore_ascii_case("active") || health.eq_ignore_ascii_case("completed");
            if !healthy {
                action_hotspots.push(serde_json::json!({
                    "source": source.label,
                    "name": action.name,
                    "current_state": action.current_state,
                    "health_state": action.health_state,
                    "active_goals": action.active_goals,
                    "last_heartbeat_at_unix_nanos": action.last_heartbeat_at_unix_nanos,
                }));
            }
        }

        for item in &snapshot.health {
            if !item.status.eq_ignore_ascii_case("healthy") {
                health_hotspots.push(serde_json::json!({
                    "source": source.label,
                    "component": item.component,
                    "status": item.status,
                    "reason": item.reason,
                }));
            }
        }

        let mut node_tenant = HashMap::new();
        for node in &snapshot.nodes {
            let namespace = if node.namespace.is_empty() {
                "/".to_string()
            } else {
                node.namespace.clone()
            };
            let tenant = tenant_from_namespace(&namespace);
            *tenant_counts.entry(tenant.clone()).or_insert(0) += 1;
            *namespace_counts.entry(namespace.clone()).or_insert(0) += 1;
            node_tenant.insert(node.name.clone(), tenant);
        }

        for edge in &snapshot.edges {
            let from_tenant = node_tenant
                .get(&edge.from)
                .cloned()
                .unwrap_or_else(|| tenant_from_label(&edge.from));
            let to_tenant = node_tenant
                .get(&edge.to)
                .cloned()
                .unwrap_or_else(|| tenant_from_label(&edge.to));
            if !from_tenant.is_empty() && !to_tenant.is_empty() && from_tenant != to_tenant {
                cross_tenant_edges.push(serde_json::json!({
                    "source": source.label,
                    "from": edge.from,
                    "to": edge.to,
                    "relation": edge.relation,
                    "from_tenant": from_tenant,
                    "to_tenant": to_tenant,
                }));
            }
        }
    }

    topic_hotspots.sort_by(|left, right| {
        right["utilization_percent"]
            .as_f64()
            .partial_cmp(&left["utilization_percent"].as_f64())
            .unwrap_or(std::cmp::Ordering::Equal)
    });
    topic_hotspots.truncate(10);
    mission_hotspots.truncate(10);
    action_hotspots.truncate(10);
    health_hotspots.truncate(10);
    alert_samples.truncate(20);
    cross_tenant_edges.truncate(20);

    let overall_health = if totals.unhealthy_instances > 0 {
        "unhealthy"
    } else if totals.degraded_instances > 0 {
        "degraded"
    } else {
        "healthy"
    }
    .to_string();

    FleetAnalysis {
        totals,
        overall_health,
        captured_at_unix_nanos,
        sources,
        hotspots: serde_json::json!({
            "topics_by_utilization": topic_hotspots,
            "missions_non_steady": mission_hotspots,
            "actions_non_steady": action_hotspots,
            "health_non_healthy": health_hotspots,
            "alerts": alert_samples,
        }),
        tasks: serde_json::json!({
            "mission_states": mission_state_counts,
            "action_states": action_state_counts,
            "action_health": action_health_counts,
        }),
        governance: serde_json::json!({
            "tenants": tenant_counts,
            "namespaces": namespace_counts,
            "cross_tenant_edges": cross_tenant_edges,
        }),
    }
}

pub(in crate::commands::ops) fn tenant_from_namespace(namespace: &str) -> String {
    namespace
        .trim_start_matches('/')
        .split('/')
        .next()
        .unwrap_or("")
        .to_string()
}

pub(in crate::commands::ops) fn tenant_from_label(label: &str) -> String {
    label
        .trim_start_matches('/')
        .split('/')
        .next()
        .unwrap_or("")
        .to_string()
}

pub(in crate::commands::ops) fn build_fleet_diff(
    current: &FleetTotals,
    baseline: &FleetTotals,
) -> serde_json::Value {
    serde_json::json!({
        "instances": count_delta(current.instances, baseline.instances),
        "nodes": count_delta(current.nodes, baseline.nodes),
        "topics": count_delta(current.topics, baseline.topics),
        "services": count_delta(current.services, baseline.services),
        "actions": count_delta(current.actions, baseline.actions),
        "missions": count_delta(current.missions, baseline.missions),
        "plugins_loaded": count_delta(current.plugins_loaded, baseline.plugins_loaded),
        "alerts": count_delta(current.alerts, baseline.alerts),
        "healthy_instances": count_delta(current.healthy_instances, baseline.healthy_instances),
        "degraded_instances": count_delta(current.degraded_instances, baseline.degraded_instances),
        "unhealthy_instances": count_delta(current.unhealthy_instances, baseline.unhealthy_instances),
    })
}

pub(in crate::commands::ops) fn overall_health(snapshot: &StatusSnapshot) -> String {
    let mut has_degraded = false;
    for item in &snapshot.health {
        match item.status.to_ascii_lowercase().as_str() {
            "unhealthy" => return "unhealthy".to_string(),
            "degraded" => has_degraded = true,
            _ => {}
        }
    }

    if has_degraded {
        "degraded".to_string()
    } else {
        "healthy".to_string()
    }
}

pub(in crate::commands::ops) fn detect_alerts(
    snapshot: &StatusSnapshot,
    policy: &AlertPolicy,
) -> Vec<OpsAlert> {
    let mut alerts = Vec::new();

    for item in &snapshot.health {
        let status = item.status.to_ascii_lowercase();
        if status == "healthy" {
            continue;
        }

        alerts.push(OpsAlert {
            severity: if status == "unhealthy" {
                "critical".to_string()
            } else {
                "warning".to_string()
            },
            component: item.component.clone(),
            message: item
                .reason
                .clone()
                .unwrap_or_else(|| format!("status={}", item.status)),
        });
    }

    for topic in &snapshot.topics {
        if topic.max_depth == 0 {
            continue;
        }
        let utilization = (topic.pending as f64 / topic.max_depth as f64) * 100.0;
        let severity = if utilization >= policy.topic_critical_utilization {
            Some("critical")
        } else if utilization >= policy.topic_warn_utilization {
            Some("warning")
        } else {
            None
        };

        if let Some(severity) = severity {
            alerts.push(OpsAlert {
                severity: severity.to_string(),
                component: "topic".to_string(),
                message: format!(
                    "{} queue utilization is high ({:.1}%: {}/{})",
                    topic.name, utilization, topic.pending, topic.max_depth
                ),
            });
        }
    }

    for mission in &snapshot.missions {
        let state = mission.state.to_ascii_lowercase();
        if state == "failed" {
            alerts.push(OpsAlert {
                severity: "critical".to_string(),
                component: "mission".to_string(),
                message: format!("mission {} is failed", mission.name),
            });
        } else if state == "recovering" {
            alerts.push(OpsAlert {
                severity: "warning".to_string(),
                component: "mission".to_string(),
                message: format!("mission {} is recovering", mission.name),
            });
        }
    }

    alerts
}