use super::*;
pub(in crate::commands::ops) fn build_replay_profile(
input_path: PathBuf,
) -> Result<serde_json::Value, String> {
let replay = FileReplaySession::open(&input_path)
.map_err(|err| format!("open bag file {} failed: {err}", input_path.display()))?;
let entries = replay.entries();
let total_entries = entries.len();
let topic_count = entries
.iter()
.map(|entry| entry.topic.as_str())
.collect::<HashSet<_>>()
.len();
let capture_span_ns = match (entries.first(), entries.last()) {
(Some(first), Some(last)) => {
(last.captured_at.0.saturating_sub(first.captured_at.0)) as u64
}
_ => 0,
};
Ok(serde_json::json!({
"api_version": OPS_REPLAY_API_VERSION,
"input": input_path,
"total_entries": total_entries,
"topic_count": topic_count,
"capture_span_ns": capture_span_ns,
}))
}
pub(in crate::commands::ops) fn build_obs_profile(
snapshot: &StatusSnapshot,
format: &str,
) -> Result<serde_json::Value, String> {
let aggregator = build_obs_aggregator(snapshot);
match format {
"prometheus" => Ok(serde_json::json!({
"format": "prometheus",
"metrics": export_prometheus_text(&aggregator),
})),
"otel" => Ok(serde_json::json!({
"format": "otel",
"metrics": export_otel_json_value(&aggregator),
})),
"both" => Ok(serde_json::json!({
"format": "both",
"metrics": {
"prometheus": export_prometheus_text(&aggregator),
"otel": export_otel_json_value(&aggregator),
}
})),
other => Err(format!(
"unsupported --obs-format value: {other} (expected prometheus|otel|both)"
)),
}
}
pub(in crate::commands::ops) fn build_obs_aggregator(
snapshot: &StatusSnapshot,
) -> MetricsAggregator {
let mut aggregator = MetricsAggregator::new();
let topic_pending_total = snapshot.topics.iter().map(|item| item.pending as f64).sum();
let topic_depth_total = snapshot
.topics
.iter()
.map(|item| item.max_depth as f64)
.sum();
let service_pending_requests_total = snapshot
.services
.iter()
.map(|item| item.pending_requests as f64)
.sum();
let service_pending_responses_total = snapshot
.services
.iter()
.map(|item| item.pending_responses as f64)
.sum();
let loaded_plugins = snapshot
.plugins
.iter()
.filter(|plugin| plugin.loaded)
.count() as f64;
let runtime_status = snapshot
.health
.iter()
.find(|item| item.component == "runtime")
.map(ops_health_status_from_item)
.unwrap_or(HealthStatus::Healthy);
let runtime_metrics = vec![
MetricsSnapshot::gauge("nodes.count", snapshot.nodes.len() as f64, ""),
MetricsSnapshot::gauge("topics.count", snapshot.topics.len() as f64, ""),
MetricsSnapshot::gauge("topics.pending_total", topic_pending_total, ""),
MetricsSnapshot::gauge("topics.max_depth_total", topic_depth_total, ""),
MetricsSnapshot::gauge("services.count", snapshot.services.len() as f64, ""),
MetricsSnapshot::gauge(
"services.pending_requests_total",
service_pending_requests_total,
"",
),
MetricsSnapshot::gauge(
"services.pending_responses_total",
service_pending_responses_total,
"",
),
MetricsSnapshot::gauge("actions.count", snapshot.actions.len() as f64, ""),
MetricsSnapshot::gauge("missions.count", snapshot.missions.len() as f64, ""),
MetricsSnapshot::gauge("plugins.loaded", loaded_plugins, ""),
MetricsSnapshot::gauge("graph.edges", snapshot.edges.len() as f64, ""),
];
aggregator.register(
"runtime",
Box::new(SnapshotProvider {
status: runtime_status,
metrics: runtime_metrics,
}),
);
for item in &snapshot.health {
if item.component == "runtime" {
continue;
}
aggregator.register(
item.component.clone(),
Box::new(SnapshotProvider {
status: ops_health_status_from_item(item),
metrics: Vec::new(),
}),
);
}
aggregator
}
pub(in crate::commands::ops) fn ops_health_status_from_item(
item: &introspection_core::HealthStatusItem,
) -> HealthStatus {
match item.status.to_ascii_lowercase().as_str() {
"healthy" => HealthStatus::Healthy,
"degraded" => HealthStatus::Degraded {
reason: item
.reason
.clone()
.unwrap_or_else(|| "reported degraded status".to_string()),
},
_ => HealthStatus::Unhealthy {
reason: item
.reason
.clone()
.unwrap_or_else(|| format!("reported status={}", item.status)),
},
}
}