robotrt-cli 0.1.0-beta.2

RobotRT modular robotics runtime and middleware components.
use core_types::{HealthStatus, MetricsProvider, MetricsSnapshot};
use introspection_core::{HealthStatusItem, StatusSnapshot};
use obs_core::MetricsAggregator;

use crate::helpers::{parse_u64_option, resolve_runtime_endpoint_from_hint};
use crate::gateway::{STATUS_SERVICE_NAME, fetch_snapshot_from_endpoint};

const DEFAULT_DAEMON_ENDPOINT: &str = "127.0.0.1:7588";

#[derive(Clone)]
pub(crate) struct SnapshotSource {
    pub(crate) label: String,
    pub(crate) json: serde_json::Value,
}

struct SnapshotProvider {
    status: HealthStatus,
    metrics: Vec<MetricsSnapshot>,
}

impl SnapshotProvider {
    fn new(status: HealthStatus, metrics: Vec<MetricsSnapshot>) -> Self {
        Self { status, metrics }
    }
}

impl MetricsProvider for SnapshotProvider {
    fn collect(&self) -> Vec<MetricsSnapshot> {
        self.metrics.clone()
    }

    fn health(&self) -> HealthStatus {
        self.status.clone()
    }
}

pub(crate) fn load_status_snapshot(
    args: &[String],
    endpoint: Option<String>,
    timeout_option: &str,
    _default_report_path: &str,
) -> Result<(StatusSnapshot, SnapshotSource), String> {
    let raw_endpoint = resolve_runtime_endpoint_from_hint(args, endpoint, DEFAULT_DAEMON_ENDPOINT)?;
    let timeout_ms = parse_u64_option(args, timeout_option, 1000)?;
    let (snapshot, endpoint) = fetch_remote_status_snapshot(&raw_endpoint, timeout_ms)?;

    let source = SnapshotSource {
        label: format!("remote:{endpoint}"),
        json: serde_json::json!({
            "mode": "remote_service",
            "service": STATUS_SERVICE_NAME,
            "endpoint": endpoint,
            "timeout_ms": timeout_ms,
        }),
    };
    Ok((snapshot, source))
}

pub(crate) fn fetch_remote_status_snapshot(
    endpoint: &str,
    timeout_ms: u64,
) -> Result<(StatusSnapshot, String), String> {
    fetch_snapshot_from_endpoint(endpoint, timeout_ms)
}

pub(crate) fn build_snapshot_aggregator(snapshot: &StatusSnapshot) -> MetricsAggregator {
    let mut aggregator = MetricsAggregator::new();

    let topic_pending_total = snapshot.topics.iter().map(|item| item.pending as f64).sum();
    let topic_depth_total = snapshot
        .topics
        .iter()
        .map(|item| item.max_depth as f64)
        .sum();
    let service_pending_requests_total = snapshot
        .services
        .iter()
        .map(|item| item.pending_requests as f64)
        .sum();
    let service_pending_responses_total = snapshot
        .services
        .iter()
        .map(|item| item.pending_responses as f64)
        .sum();
    let loaded_plugins = snapshot
        .plugins
        .iter()
        .filter(|plugin| plugin.loaded)
        .count() as f64;

    let runtime_status = snapshot
        .health
        .iter()
        .find(|item| item.component == "runtime")
        .map(health_status_from_item)
        .unwrap_or(HealthStatus::Healthy);

    let runtime_metrics = vec![
        MetricsSnapshot::gauge("nodes.count", snapshot.nodes.len() as f64, ""),
        MetricsSnapshot::gauge("topics.count", snapshot.topics.len() as f64, ""),
        MetricsSnapshot::gauge("topics.pending_total", topic_pending_total, ""),
        MetricsSnapshot::gauge("topics.max_depth_total", topic_depth_total, ""),
        MetricsSnapshot::gauge("services.count", snapshot.services.len() as f64, ""),
        MetricsSnapshot::gauge(
            "services.pending_requests_total",
            service_pending_requests_total,
            "",
        ),
        MetricsSnapshot::gauge(
            "services.pending_responses_total",
            service_pending_responses_total,
            "",
        ),
        MetricsSnapshot::gauge("actions.count", snapshot.actions.len() as f64, ""),
        MetricsSnapshot::gauge("missions.count", snapshot.missions.len() as f64, ""),
        MetricsSnapshot::gauge("plugins.loaded", loaded_plugins, ""),
        MetricsSnapshot::gauge("graph.edges", snapshot.edges.len() as f64, ""),
    ];
    aggregator.register(
        "runtime",
        Box::new(SnapshotProvider::new(runtime_status, runtime_metrics)),
    );

    for item in &snapshot.health {
        if item.component == "runtime" {
            continue;
        }
        aggregator.register(
            item.component.clone(),
            Box::new(SnapshotProvider::new(health_status_from_item(item), Vec::new())),
        );
    }

    aggregator
}

fn health_status_from_item(item: &HealthStatusItem) -> HealthStatus {
    match item.status.to_ascii_lowercase().as_str() {
        "healthy" => HealthStatus::Healthy,
        "degraded" => HealthStatus::Degraded {
            reason: item
                .reason
                .clone()
                .unwrap_or_else(|| "reported degraded status".to_string()),
        },
        _ => HealthStatus::Unhealthy {
            reason: item
                .reason
                .clone()
                .unwrap_or_else(|| format!("reported status={}", item.status)),
        },
    }
}