use core_types::{HealthStatus, MetricsProvider, MetricsSnapshot};
use introspection_core::{HealthStatusItem, StatusSnapshot};
use obs_core::MetricsAggregator;
use crate::helpers::{parse_u64_option, resolve_runtime_endpoint_from_hint};
use crate::gateway::{STATUS_SERVICE_NAME, fetch_snapshot_from_endpoint};
const DEFAULT_DAEMON_ENDPOINT: &str = "127.0.0.1:7588";
#[derive(Clone)]
pub(crate) struct SnapshotSource {
pub(crate) label: String,
pub(crate) json: serde_json::Value,
}
struct SnapshotProvider {
status: HealthStatus,
metrics: Vec<MetricsSnapshot>,
}
impl SnapshotProvider {
fn new(status: HealthStatus, metrics: Vec<MetricsSnapshot>) -> Self {
Self { status, metrics }
}
}
impl MetricsProvider for SnapshotProvider {
fn collect(&self) -> Vec<MetricsSnapshot> {
self.metrics.clone()
}
fn health(&self) -> HealthStatus {
self.status.clone()
}
}
pub(crate) fn load_status_snapshot(
args: &[String],
endpoint: Option<String>,
timeout_option: &str,
_default_report_path: &str,
) -> Result<(StatusSnapshot, SnapshotSource), String> {
let raw_endpoint = resolve_runtime_endpoint_from_hint(args, endpoint, DEFAULT_DAEMON_ENDPOINT)?;
let timeout_ms = parse_u64_option(args, timeout_option, 1000)?;
let (snapshot, endpoint) = fetch_remote_status_snapshot(&raw_endpoint, timeout_ms)?;
let source = SnapshotSource {
label: format!("remote:{endpoint}"),
json: serde_json::json!({
"mode": "remote_service",
"service": STATUS_SERVICE_NAME,
"endpoint": endpoint,
"timeout_ms": timeout_ms,
}),
};
Ok((snapshot, source))
}
pub(crate) fn fetch_remote_status_snapshot(
endpoint: &str,
timeout_ms: u64,
) -> Result<(StatusSnapshot, String), String> {
fetch_snapshot_from_endpoint(endpoint, timeout_ms)
}
pub(crate) fn build_snapshot_aggregator(snapshot: &StatusSnapshot) -> MetricsAggregator {
let mut aggregator = MetricsAggregator::new();
let topic_pending_total = snapshot.topics.iter().map(|item| item.pending as f64).sum();
let topic_depth_total = snapshot
.topics
.iter()
.map(|item| item.max_depth as f64)
.sum();
let service_pending_requests_total = snapshot
.services
.iter()
.map(|item| item.pending_requests as f64)
.sum();
let service_pending_responses_total = snapshot
.services
.iter()
.map(|item| item.pending_responses as f64)
.sum();
let loaded_plugins = snapshot
.plugins
.iter()
.filter(|plugin| plugin.loaded)
.count() as f64;
let runtime_status = snapshot
.health
.iter()
.find(|item| item.component == "runtime")
.map(health_status_from_item)
.unwrap_or(HealthStatus::Healthy);
let runtime_metrics = vec![
MetricsSnapshot::gauge("nodes.count", snapshot.nodes.len() as f64, ""),
MetricsSnapshot::gauge("topics.count", snapshot.topics.len() as f64, ""),
MetricsSnapshot::gauge("topics.pending_total", topic_pending_total, ""),
MetricsSnapshot::gauge("topics.max_depth_total", topic_depth_total, ""),
MetricsSnapshot::gauge("services.count", snapshot.services.len() as f64, ""),
MetricsSnapshot::gauge(
"services.pending_requests_total",
service_pending_requests_total,
"",
),
MetricsSnapshot::gauge(
"services.pending_responses_total",
service_pending_responses_total,
"",
),
MetricsSnapshot::gauge("actions.count", snapshot.actions.len() as f64, ""),
MetricsSnapshot::gauge("missions.count", snapshot.missions.len() as f64, ""),
MetricsSnapshot::gauge("plugins.loaded", loaded_plugins, ""),
MetricsSnapshot::gauge("graph.edges", snapshot.edges.len() as f64, ""),
];
aggregator.register(
"runtime",
Box::new(SnapshotProvider::new(runtime_status, runtime_metrics)),
);
for item in &snapshot.health {
if item.component == "runtime" {
continue;
}
aggregator.register(
item.component.clone(),
Box::new(SnapshotProvider::new(health_status_from_item(item), Vec::new())),
);
}
aggregator
}
fn health_status_from_item(item: &HealthStatusItem) -> HealthStatus {
match item.status.to_ascii_lowercase().as_str() {
"healthy" => HealthStatus::Healthy,
"degraded" => HealthStatus::Degraded {
reason: item
.reason
.clone()
.unwrap_or_else(|| "reported degraded status".to_string()),
},
_ => HealthStatus::Unhealthy {
reason: item
.reason
.clone()
.unwrap_or_else(|| format!("reported status={}", item.status)),
},
}
}