use std::fs;
use std::path::{Path, PathBuf};
use crate::helpers::option_value;
const OPS_ALERT_POLICY_API_VERSION: &str = "robotrt.ops.alert-policy.v1";
const DEFAULT_TOPIC_WARN_UTILIZATION: f64 = 80.0;
const DEFAULT_TOPIC_CRITICAL_UTILIZATION: f64 = 95.0;
const DEFAULT_ACK_WARN_COVERAGE: f64 = 80.0;
const DEFAULT_ACK_CRITICAL_COVERAGE: f64 = 50.0;
#[derive(Clone)]
pub(super) struct AlertPolicy {
pub topic_warn_utilization: f64,
pub topic_critical_utilization: f64,
pub ack_warn_coverage: f64,
pub ack_critical_coverage: f64,
}
#[derive(Clone, Copy)]
pub(super) struct OpsProfileTemplate {
pub name: &'static str,
pub default_obs_format: Option<&'static str>,
pub auto_replay_from_default_bag: bool,
pub default_warn_utilization: Option<f64>,
pub default_critical_utilization: Option<f64>,
pub default_warn_ack_coverage: Option<f64>,
pub default_critical_ack_coverage: Option<f64>,
pub focus: &'static [&'static str],
}
impl OpsProfileTemplate {
fn parse(raw: &str) -> Option<Self> {
match raw {
"perf-regression" => Some(Self {
name: "perf-regression",
default_obs_format: Some("both"),
auto_replay_from_default_bag: true,
default_warn_utilization: None,
default_critical_utilization: None,
default_warn_ack_coverage: None,
default_critical_ack_coverage: None,
focus: &["topics_by_utilization", "runtime_delta", "replay_span"],
}),
"task-stall" => Some(Self {
name: "task-stall",
default_obs_format: Some("otel"),
auto_replay_from_default_bag: false,
default_warn_utilization: None,
default_critical_utilization: None,
default_warn_ack_coverage: None,
default_critical_ack_coverage: None,
focus: &[
"missions_non_steady",
"actions_non_steady",
"health_non_healthy",
],
}),
"plugin-anomaly" => Some(Self {
name: "plugin-anomaly",
default_obs_format: Some("prometheus"),
auto_replay_from_default_bag: false,
default_warn_utilization: None,
default_critical_utilization: None,
default_warn_ack_coverage: None,
default_critical_ack_coverage: None,
focus: &["plugins", "health_non_healthy", "alerts"],
}),
"queue-backlog" => Some(Self {
name: "queue-backlog",
default_obs_format: Some("prometheus"),
auto_replay_from_default_bag: false,
default_warn_utilization: Some(60.0),
default_critical_utilization: Some(80.0),
default_warn_ack_coverage: Some(95.0),
default_critical_ack_coverage: Some(85.0),
focus: &["topics_by_utilization", "alerts", "backpressure"],
}),
_ => None,
}
}
}
#[derive(serde::Deserialize)]
struct AlertPolicyFile {
api_version: Option<String>,
topic_warn_utilization: Option<f64>,
topic_critical_utilization: Option<f64>,
ack_warn_coverage: Option<f64>,
ack_critical_coverage: Option<f64>,
}
pub(super) fn parse_alert_policy(args: &[String]) -> Result<AlertPolicy, String> {
let mut topic_warn_utilization = DEFAULT_TOPIC_WARN_UTILIZATION;
let mut topic_critical_utilization = DEFAULT_TOPIC_CRITICAL_UTILIZATION;
let mut ack_warn_coverage = DEFAULT_ACK_WARN_COVERAGE;
let mut ack_critical_coverage = DEFAULT_ACK_CRITICAL_COVERAGE;
if let Some(raw_policy_path) = option_value(args, "--policy") {
let policy_path = PathBuf::from(raw_policy_path);
let policy = parse_alert_policy_file(&policy_path)?;
if let Some(value) = policy.topic_warn_utilization {
topic_warn_utilization = value;
}
if let Some(value) = policy.topic_critical_utilization {
topic_critical_utilization = value;
}
if let Some(value) = policy.ack_warn_coverage {
ack_warn_coverage = value;
}
if let Some(value) = policy.ack_critical_coverage {
ack_critical_coverage = value;
}
}
if let Some(value) = parse_optional_f64_option(args, "--topic-warn-utilization")? {
topic_warn_utilization = value;
}
if let Some(value) = parse_optional_f64_option(args, "--topic-critical-utilization")? {
topic_critical_utilization = value;
}
if let Some(value) = parse_optional_f64_option(args, "--ack-warn-coverage")? {
ack_warn_coverage = value;
}
if let Some(value) = parse_optional_f64_option(args, "--ack-critical-coverage")? {
ack_critical_coverage = value;
}
validate_percentage("--topic-warn-utilization", topic_warn_utilization)?;
validate_percentage("--topic-critical-utilization", topic_critical_utilization)?;
validate_percentage("--ack-warn-coverage", ack_warn_coverage)?;
validate_percentage("--ack-critical-coverage", ack_critical_coverage)?;
if topic_warn_utilization > topic_critical_utilization {
return Err(String::from(
"--topic-warn-utilization must be <= --topic-critical-utilization",
));
}
if ack_warn_coverage < ack_critical_coverage {
return Err(String::from(
"--ack-warn-coverage must be >= --ack-critical-coverage",
));
}
Ok(AlertPolicy {
topic_warn_utilization,
topic_critical_utilization,
ack_warn_coverage,
ack_critical_coverage,
})
}
pub(super) fn parse_profile_template(
args: &[String],
) -> Result<Option<OpsProfileTemplate>, String> {
let Some(raw) = option_value(args, "--profile-template") else {
return Ok(None);
};
OpsProfileTemplate::parse(&raw).map(Some).ok_or_else(|| {
format!(
"unsupported --profile-template value: {raw} (expected perf-regression|task-stall|plugin-anomaly|queue-backlog)"
)
})
}
fn parse_optional_f64_option(args: &[String], option: &str) -> Result<Option<f64>, String> {
let Some(raw) = option_value(args, option) else {
return Ok(None);
};
raw.parse::<f64>()
.map(Some)
.map_err(|err| format!("invalid value for {option}: {raw} ({err})"))
}
fn parse_alert_policy_file(path: &Path) -> Result<AlertPolicyFile, String> {
let content = fs::read_to_string(path)
.map_err(|err| format!("read policy file {} failed: {err}", path.display()))?;
let policy: AlertPolicyFile = serde_json::from_str(&content)
.map_err(|err| format!("parse policy file {} failed: {err}", path.display()))?;
if let Some(version) = policy.api_version.as_deref()
&& version != OPS_ALERT_POLICY_API_VERSION
{
return Err(format!(
"unsupported policy api_version in {}: {version} (expected {OPS_ALERT_POLICY_API_VERSION})",
path.display()
));
}
Ok(policy)
}
fn validate_percentage(option: &str, value: f64) -> Result<(), String> {
if !(0.0..=100.0).contains(&value) {
return Err(format!(
"invalid value for {option}: {value} (must be in [0, 100])"
));
}
Ok(())
}