robotrt-cli 0.1.0-beta.1

RobotRT modular robotics runtime and middleware components.
use super::*;

pub(in crate::commands::ops) fn build_replay_profile(
    input_path: PathBuf,
) -> Result<serde_json::Value, String> {
    let replay = FileReplaySession::open(&input_path)
        .map_err(|err| format!("open bag file {} failed: {err}", input_path.display()))?;
    let entries = replay.entries();

    let total_entries = entries.len();
    let topic_count = entries
        .iter()
        .map(|entry| entry.topic.as_str())
        .collect::<HashSet<_>>()
        .len();
    let capture_span_ns = match (entries.first(), entries.last()) {
        (Some(first), Some(last)) => {
            (last.captured_at.0.saturating_sub(first.captured_at.0)) as u64
        }
        _ => 0,
    };

    Ok(serde_json::json!({
        "api_version": OPS_REPLAY_API_VERSION,
        "input": input_path,
        "total_entries": total_entries,
        "topic_count": topic_count,
        "capture_span_ns": capture_span_ns,
    }))
}

pub(in crate::commands::ops) fn build_obs_profile(
    snapshot: &StatusSnapshot,
    format: &str,
) -> Result<serde_json::Value, String> {
    let aggregator = build_obs_aggregator(snapshot);

    match format {
        "prometheus" => Ok(serde_json::json!({
            "format": "prometheus",
            "metrics": export_prometheus_text(&aggregator),
        })),
        "otel" => Ok(serde_json::json!({
            "format": "otel",
            "metrics": export_otel_json_value(&aggregator),
        })),
        "both" => Ok(serde_json::json!({
            "format": "both",
            "metrics": {
                "prometheus": export_prometheus_text(&aggregator),
                "otel": export_otel_json_value(&aggregator),
            }
        })),
        other => Err(format!(
            "unsupported --obs-format value: {other} (expected prometheus|otel|both)"
        )),
    }
}

pub(in crate::commands::ops) fn build_obs_aggregator(
    snapshot: &StatusSnapshot,
) -> MetricsAggregator {
    let mut aggregator = MetricsAggregator::new();
    let topic_pending_total = snapshot.topics.iter().map(|item| item.pending as f64).sum();
    let topic_depth_total = snapshot
        .topics
        .iter()
        .map(|item| item.max_depth as f64)
        .sum();
    let service_pending_requests_total = snapshot
        .services
        .iter()
        .map(|item| item.pending_requests as f64)
        .sum();
    let service_pending_responses_total = snapshot
        .services
        .iter()
        .map(|item| item.pending_responses as f64)
        .sum();
    let loaded_plugins = snapshot
        .plugins
        .iter()
        .filter(|plugin| plugin.loaded)
        .count() as f64;

    let runtime_status = snapshot
        .health
        .iter()
        .find(|item| item.component == "runtime")
        .map(ops_health_status_from_item)
        .unwrap_or(HealthStatus::Healthy);

    let runtime_metrics = vec![
        MetricsSnapshot::gauge("nodes.count", snapshot.nodes.len() as f64, ""),
        MetricsSnapshot::gauge("topics.count", snapshot.topics.len() as f64, ""),
        MetricsSnapshot::gauge("topics.pending_total", topic_pending_total, ""),
        MetricsSnapshot::gauge("topics.max_depth_total", topic_depth_total, ""),
        MetricsSnapshot::gauge("services.count", snapshot.services.len() as f64, ""),
        MetricsSnapshot::gauge(
            "services.pending_requests_total",
            service_pending_requests_total,
            "",
        ),
        MetricsSnapshot::gauge(
            "services.pending_responses_total",
            service_pending_responses_total,
            "",
        ),
        MetricsSnapshot::gauge("actions.count", snapshot.actions.len() as f64, ""),
        MetricsSnapshot::gauge("missions.count", snapshot.missions.len() as f64, ""),
        MetricsSnapshot::gauge("plugins.loaded", loaded_plugins, ""),
        MetricsSnapshot::gauge("graph.edges", snapshot.edges.len() as f64, ""),
    ];
    aggregator.register(
        "runtime",
        Box::new(SnapshotProvider {
            status: runtime_status,
            metrics: runtime_metrics,
        }),
    );

    for item in &snapshot.health {
        if item.component == "runtime" {
            continue;
        }
        aggregator.register(
            item.component.clone(),
            Box::new(SnapshotProvider {
                status: ops_health_status_from_item(item),
                metrics: Vec::new(),
            }),
        );
    }

    aggregator
}

pub(in crate::commands::ops) fn ops_health_status_from_item(
    item: &introspection_core::HealthStatusItem,
) -> HealthStatus {
    match item.status.to_ascii_lowercase().as_str() {
        "healthy" => HealthStatus::Healthy,
        "degraded" => HealthStatus::Degraded {
            reason: item
                .reason
                .clone()
                .unwrap_or_else(|| "reported degraded status".to_string()),
        },
        _ => HealthStatus::Unhealthy {
            reason: item
                .reason
                .clone()
                .unwrap_or_else(|| format!("reported status={}", item.status)),
        },
    }
}