use crate::v2::control::{ConfigUpdateRequest, ConfigUpdateResponse, ConfigUpdateType};
use crate::v2::metrics::{HistogramBucket, MetricsReport};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::time::{Duration, Instant};
#[derive(Debug)]
pub struct MetricsCollector {
counters: RwLock<HashMap<MetricKey, AggregatedCounter>>,
gauges: RwLock<HashMap<MetricKey, AggregatedGauge>>,
histograms: RwLock<HashMap<MetricKey, AggregatedHistogram>>,
last_report: RwLock<HashMap<String, Instant>>,
config: MetricsCollectorConfig,
}
#[derive(Debug, Clone)]
pub struct MetricsCollectorConfig {
pub max_age: Duration,
pub max_series: usize,
pub include_agent_id_label: bool,
}
impl Default for MetricsCollectorConfig {
fn default() -> Self {
Self {
max_age: Duration::from_secs(300), max_series: 10_000,
include_agent_id_label: true,
}
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
struct MetricKey {
agent_id: String,
name: String,
labels_key: String,
}
impl MetricKey {
fn new(agent_id: &str, name: &str, labels: &HashMap<String, String>) -> Self {
let labels_key = Self::labels_to_key(labels);
Self {
agent_id: agent_id.to_string(),
name: name.to_string(),
labels_key,
}
}
fn labels_to_key(labels: &HashMap<String, String>) -> String {
let mut pairs: Vec<_> = labels.iter().collect();
pairs.sort_by_key(|(k, _)| *k);
pairs
.into_iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",")
}
}
#[derive(Debug, Clone)]
struct AggregatedCounter {
name: String,
help: Option<String>,
labels: HashMap<String, String>,
value: u64,
last_updated: Instant,
}
#[derive(Debug, Clone)]
struct AggregatedGauge {
name: String,
help: Option<String>,
labels: HashMap<String, String>,
value: f64,
last_updated: Instant,
}
#[derive(Debug, Clone)]
struct AggregatedHistogram {
name: String,
help: Option<String>,
labels: HashMap<String, String>,
sum: f64,
count: u64,
buckets: Vec<HistogramBucket>,
last_updated: Instant,
}
impl MetricsCollector {
pub fn new() -> Self {
Self::with_config(MetricsCollectorConfig::default())
}
pub fn with_config(config: MetricsCollectorConfig) -> Self {
Self {
counters: RwLock::new(HashMap::new()),
gauges: RwLock::new(HashMap::new()),
histograms: RwLock::new(HashMap::new()),
last_report: RwLock::new(HashMap::new()),
config,
}
}
pub fn record(&self, report: &MetricsReport) {
let now = Instant::now();
self.last_report
.write()
.insert(report.agent_id.clone(), now);
for counter in &report.counters {
let mut labels = counter.labels.clone();
if self.config.include_agent_id_label {
labels.insert("agent_id".to_string(), report.agent_id.clone());
}
let key = MetricKey::new(&report.agent_id, &counter.name, &labels);
let mut counters = self.counters.write();
counters.insert(
key,
AggregatedCounter {
name: counter.name.clone(),
help: counter.help.clone(),
labels,
value: counter.value,
last_updated: now,
},
);
}
for gauge in &report.gauges {
let mut labels = gauge.labels.clone();
if self.config.include_agent_id_label {
labels.insert("agent_id".to_string(), report.agent_id.clone());
}
let key = MetricKey::new(&report.agent_id, &gauge.name, &labels);
let mut gauges = self.gauges.write();
gauges.insert(
key,
AggregatedGauge {
name: gauge.name.clone(),
help: gauge.help.clone(),
labels,
value: gauge.value,
last_updated: now,
},
);
}
for histogram in &report.histograms {
let mut labels = histogram.labels.clone();
if self.config.include_agent_id_label {
labels.insert("agent_id".to_string(), report.agent_id.clone());
}
let key = MetricKey::new(&report.agent_id, &histogram.name, &labels);
let mut histograms = self.histograms.write();
histograms.insert(
key,
AggregatedHistogram {
name: histogram.name.clone(),
help: histogram.help.clone(),
labels,
sum: histogram.sum,
count: histogram.count,
buckets: histogram.buckets.clone(),
last_updated: now,
},
);
}
}
pub fn expire_old_metrics(&self) {
let now = Instant::now();
let max_age = self.config.max_age;
self.counters
.write()
.retain(|_, v| now.duration_since(v.last_updated) < max_age);
self.gauges
.write()
.retain(|_, v| now.duration_since(v.last_updated) < max_age);
self.histograms
.write()
.retain(|_, v| now.duration_since(v.last_updated) < max_age);
}
pub fn series_count(&self) -> usize {
self.counters.read().len() + self.gauges.read().len() + self.histograms.read().len()
}
pub fn active_agents(&self) -> Vec<String> {
self.last_report.read().keys().cloned().collect()
}
pub fn export_prometheus(&self) -> String {
let mut output = String::new();
let counters = self.counters.read();
let mut counter_names: Vec<_> = counters.values().map(|c| &c.name).collect();
counter_names.sort();
counter_names.dedup();
for name in counter_names {
let metrics: Vec<_> = counters.values().filter(|c| &c.name == name).collect();
if let Some(first) = metrics.first() {
if let Some(help) = &first.help {
output.push_str(&format!("# HELP {} {}\n", name, help));
}
output.push_str(&format!("# TYPE {} counter\n", name));
}
for metric in metrics {
output.push_str(&format_metric_line(
name,
&metric.labels,
metric.value as f64,
));
}
}
let gauges = self.gauges.read();
let mut gauge_names: Vec<_> = gauges.values().map(|g| &g.name).collect();
gauge_names.sort();
gauge_names.dedup();
for name in gauge_names {
let metrics: Vec<_> = gauges.values().filter(|g| &g.name == name).collect();
if let Some(first) = metrics.first() {
if let Some(help) = &first.help {
output.push_str(&format!("# HELP {} {}\n", name, help));
}
output.push_str(&format!("# TYPE {} gauge\n", name));
}
for metric in metrics {
output.push_str(&format_metric_line(name, &metric.labels, metric.value));
}
}
let histograms = self.histograms.read();
let mut histogram_names: Vec<_> = histograms.values().map(|h| &h.name).collect();
histogram_names.sort();
histogram_names.dedup();
for name in histogram_names {
let metrics: Vec<_> = histograms.values().filter(|h| &h.name == name).collect();
if let Some(first) = metrics.first() {
if let Some(help) = &first.help {
output.push_str(&format!("# HELP {} {}\n", name, help));
}
output.push_str(&format!("# TYPE {} histogram\n", name));
}
for metric in metrics {
for bucket in &metric.buckets {
let mut labels = metric.labels.clone();
labels.insert(
"le".to_string(),
if bucket.le.is_infinite() {
"+Inf".to_string()
} else {
bucket.le.to_string()
},
);
output.push_str(&format_metric_line(
&format!("{}_bucket", name),
&labels,
bucket.count as f64,
));
}
output.push_str(&format_metric_line(
&format!("{}_sum", name),
&metric.labels,
metric.sum,
));
output.push_str(&format_metric_line(
&format!("{}_count", name),
&metric.labels,
metric.count as f64,
));
}
}
output
}
pub fn snapshot(&self) -> MetricsSnapshot {
MetricsSnapshot {
counters: self.counters.read().values().cloned().collect(),
gauges: self.gauges.read().values().cloned().collect(),
histograms: self.histograms.read().values().cloned().collect(),
timestamp: Instant::now(),
}
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct MetricsSnapshot {
counters: Vec<AggregatedCounter>,
gauges: Vec<AggregatedGauge>,
histograms: Vec<AggregatedHistogram>,
timestamp: Instant,
}
impl MetricsSnapshot {
pub fn counter_count(&self) -> usize {
self.counters.len()
}
pub fn gauge_count(&self) -> usize {
self.gauges.len()
}
pub fn histogram_count(&self) -> usize {
self.histograms.len()
}
}
#[derive(Debug)]
pub struct UnifiedMetricsAggregator {
proxy_counters: RwLock<HashMap<String, ProxyCounter>>,
proxy_gauges: RwLock<HashMap<String, ProxyGauge>>,
proxy_histograms: RwLock<HashMap<String, ProxyHistogram>>,
agent_collector: MetricsCollector,
service_name: String,
instance_id: String,
}
#[derive(Debug, Clone)]
struct ProxyCounter {
name: String,
help: String,
labels: HashMap<String, String>,
value: u64,
}
#[derive(Debug, Clone)]
struct ProxyGauge {
name: String,
help: String,
labels: HashMap<String, String>,
value: f64,
}
#[derive(Debug, Clone)]
struct ProxyHistogram {
name: String,
help: String,
labels: HashMap<String, String>,
sum: f64,
count: u64,
buckets: Vec<(f64, u64)>,
}
impl UnifiedMetricsAggregator {
pub fn new(service_name: impl Into<String>, instance_id: impl Into<String>) -> Self {
Self {
proxy_counters: RwLock::new(HashMap::new()),
proxy_gauges: RwLock::new(HashMap::new()),
proxy_histograms: RwLock::new(HashMap::new()),
agent_collector: MetricsCollector::new(),
service_name: service_name.into(),
instance_id: instance_id.into(),
}
}
pub fn with_agent_config(
service_name: impl Into<String>,
instance_id: impl Into<String>,
agent_config: MetricsCollectorConfig,
) -> Self {
Self {
proxy_counters: RwLock::new(HashMap::new()),
proxy_gauges: RwLock::new(HashMap::new()),
proxy_histograms: RwLock::new(HashMap::new()),
agent_collector: MetricsCollector::with_config(agent_config),
service_name: service_name.into(),
instance_id: instance_id.into(),
}
}
pub fn agent_collector(&self) -> &MetricsCollector {
&self.agent_collector
}
pub fn increment_counter(
&self,
name: &str,
help: &str,
labels: HashMap<String, String>,
delta: u64,
) {
let key = Self::metric_key(name, &labels);
let mut counters = self.proxy_counters.write();
if let Some(counter) = counters.get_mut(&key) {
counter.value += delta;
} else {
counters.insert(
key,
ProxyCounter {
name: name.to_string(),
help: help.to_string(),
labels,
value: delta,
},
);
}
}
pub fn set_gauge(&self, name: &str, help: &str, labels: HashMap<String, String>, value: f64) {
let key = Self::metric_key(name, &labels);
self.proxy_gauges.write().insert(
key,
ProxyGauge {
name: name.to_string(),
help: help.to_string(),
labels,
value,
},
);
}
pub fn observe_histogram(
&self,
name: &str,
help: &str,
labels: HashMap<String, String>,
bucket_boundaries: &[f64],
value: f64,
) {
let key = Self::metric_key(name, &labels);
let mut histograms = self.proxy_histograms.write();
if let Some(histogram) = histograms.get_mut(&key) {
histogram.sum += value;
histogram.count += 1;
for (boundary, count) in histogram.buckets.iter_mut() {
if value <= *boundary {
*count += 1;
}
}
} else {
let mut buckets: Vec<(f64, u64)> = bucket_boundaries
.iter()
.map(|&b| (b, if value <= b { 1 } else { 0 }))
.collect();
buckets.push((f64::INFINITY, 1));
histograms.insert(
key,
ProxyHistogram {
name: name.to_string(),
help: help.to_string(),
labels,
sum: value,
count: 1,
buckets,
},
);
}
}
pub fn record_agent_metrics(&self, report: &MetricsReport) {
self.agent_collector.record(report);
}
pub fn export_prometheus(&self) -> String {
let mut output = String::new();
output.push_str(
"# HELP zentinel_info Zentinel proxy information\n# TYPE zentinel_info gauge\n",
);
output.push_str(&format!(
"zentinel_info{{service=\"{}\",instance=\"{}\"}} 1\n",
escape_label_value(&self.service_name),
escape_label_value(&self.instance_id)
));
let counters = self.proxy_counters.read();
let mut counter_names: Vec<_> = counters.values().map(|c| &c.name).collect();
counter_names.sort();
counter_names.dedup();
for name in counter_names {
let metrics: Vec<_> = counters.values().filter(|c| &c.name == name).collect();
if let Some(first) = metrics.first() {
output.push_str(&format!("# HELP {} {}\n", name, first.help));
output.push_str(&format!("# TYPE {} counter\n", name));
}
for metric in metrics {
output.push_str(&format_metric_line(
name,
&metric.labels,
metric.value as f64,
));
}
}
let gauges = self.proxy_gauges.read();
let mut gauge_names: Vec<_> = gauges.values().map(|g| &g.name).collect();
gauge_names.sort();
gauge_names.dedup();
for name in gauge_names {
let metrics: Vec<_> = gauges.values().filter(|g| &g.name == name).collect();
if let Some(first) = metrics.first() {
output.push_str(&format!("# HELP {} {}\n", name, first.help));
output.push_str(&format!("# TYPE {} gauge\n", name));
}
for metric in metrics {
output.push_str(&format_metric_line(name, &metric.labels, metric.value));
}
}
let histograms = self.proxy_histograms.read();
let mut histogram_names: Vec<_> = histograms.values().map(|h| &h.name).collect();
histogram_names.sort();
histogram_names.dedup();
for name in histogram_names {
let metrics: Vec<_> = histograms.values().filter(|h| &h.name == name).collect();
if let Some(first) = metrics.first() {
output.push_str(&format!("# HELP {} {}\n", name, first.help));
output.push_str(&format!("# TYPE {} histogram\n", name));
}
for metric in metrics {
for (le, count) in &metric.buckets {
let mut labels = metric.labels.clone();
labels.insert(
"le".to_string(),
if le.is_infinite() {
"+Inf".to_string()
} else {
le.to_string()
},
);
output.push_str(&format_metric_line(
&format!("{}_bucket", name),
&labels,
*count as f64,
));
}
output.push_str(&format_metric_line(
&format!("{}_sum", name),
&metric.labels,
metric.sum,
));
output.push_str(&format_metric_line(
&format!("{}_count", name),
&metric.labels,
metric.count as f64,
));
}
}
output.push_str("\n# Agent metrics\n");
output.push_str(&self.agent_collector.export_prometheus());
output
}
pub fn series_count(&self) -> usize {
self.proxy_counters.read().len()
+ self.proxy_gauges.read().len()
+ self.proxy_histograms.read().len()
+ self.agent_collector.series_count()
}
fn metric_key(name: &str, labels: &HashMap<String, String>) -> String {
let mut pairs: Vec<_> = labels.iter().collect();
pairs.sort_by_key(|(k, _)| *k);
let labels_str = pairs
.into_iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",");
format!("{}|{}", name, labels_str)
}
}
impl Default for UnifiedMetricsAggregator {
fn default() -> Self {
Self::new("zentinel", "default")
}
}
fn format_metric_line(name: &str, labels: &HashMap<String, String>, value: f64) -> String {
if labels.is_empty() {
format!("{} {}\n", name, format_value(value))
} else {
let mut pairs: Vec<_> = labels.iter().collect();
pairs.sort_by_key(|(k, _)| *k);
let labels_str = pairs
.into_iter()
.map(|(k, v)| format!("{}=\"{}\"", k, escape_label_value(v)))
.collect::<Vec<_>>()
.join(",");
format!("{}{{{}}} {}\n", name, labels_str, format_value(value))
}
}
fn format_value(v: f64) -> String {
if v.is_infinite() {
if v.is_sign_positive() {
"+Inf".to_string()
} else {
"-Inf".to_string()
}
} else if v.is_nan() {
"NaN".to_string()
} else if v.fract() == 0.0 {
format!("{}", v as i64)
} else {
format!("{}", v)
}
}
fn escape_label_value(s: &str) -> String {
s.replace('\\', "\\\\")
.replace('"', "\\\"")
.replace('\n', "\\n")
}
pub struct ConfigUpdateHandler {
pending: RwLock<HashMap<String, PendingUpdate>>,
#[allow(clippy::type_complexity)]
on_rule_update: Option<
Box<dyn Fn(&str, &[crate::v2::control::RuleDefinition], &[String]) -> bool + Send + Sync>,
>,
#[allow(clippy::type_complexity)]
on_list_update: Option<Box<dyn Fn(&str, &[String], &[String]) -> bool + Send + Sync>>,
}
struct PendingUpdate {
request: ConfigUpdateRequest,
received_at: Instant,
}
impl ConfigUpdateHandler {
pub fn new() -> Self {
Self {
pending: RwLock::new(HashMap::new()),
on_rule_update: None,
on_list_update: None,
}
}
pub fn on_rule_update<F>(mut self, f: F) -> Self
where
F: Fn(&str, &[crate::v2::control::RuleDefinition], &[String]) -> bool
+ Send
+ Sync
+ 'static,
{
self.on_rule_update = Some(Box::new(f));
self
}
pub fn on_list_update<F>(mut self, f: F) -> Self
where
F: Fn(&str, &[String], &[String]) -> bool + Send + Sync + 'static,
{
self.on_list_update = Some(Box::new(f));
self
}
pub fn handle(&self, request: ConfigUpdateRequest) -> ConfigUpdateResponse {
let request_id = request.request_id.clone();
match &request.update_type {
ConfigUpdateType::RequestReload => {
self.pending.write().insert(
request_id.clone(),
PendingUpdate {
request,
received_at: Instant::now(),
},
);
ConfigUpdateResponse::success(request_id)
}
ConfigUpdateType::RuleUpdate {
rule_set,
rules,
remove_rules,
} => {
if let Some(ref callback) = self.on_rule_update {
if callback(rule_set, rules, remove_rules) {
ConfigUpdateResponse::success(request_id)
} else {
ConfigUpdateResponse::failure(request_id, "Rule update rejected")
}
} else {
ConfigUpdateResponse::failure(request_id, "Rule updates not supported")
}
}
ConfigUpdateType::ListUpdate {
list_id,
add,
remove,
} => {
if let Some(ref callback) = self.on_list_update {
if callback(list_id, add, remove) {
ConfigUpdateResponse::success(request_id)
} else {
ConfigUpdateResponse::failure(request_id, "List update rejected")
}
} else {
ConfigUpdateResponse::failure(request_id, "List updates not supported")
}
}
ConfigUpdateType::RestartRequired {
reason,
grace_period_ms,
} => {
tracing::warn!(
reason = reason,
grace_period_ms = grace_period_ms,
"Agent requested restart"
);
ConfigUpdateResponse::success(request_id)
}
ConfigUpdateType::ConfigError { error, field } => {
tracing::error!(
error = error,
field = ?field,
"Agent reported configuration error"
);
ConfigUpdateResponse::success(request_id)
}
}
}
pub fn pending_count(&self) -> usize {
self.pending.read().len()
}
pub fn clear_old_pending(&self, max_age: Duration) {
let now = Instant::now();
self.pending
.write()
.retain(|_, v| now.duration_since(v.received_at) < max_age);
}
}
impl Default for ConfigUpdateHandler {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for ConfigUpdateHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConfigUpdateHandler")
.field("pending_count", &self.pending.read().len())
.field("has_rule_callback", &self.on_rule_update.is_some())
.field("has_list_callback", &self.on_list_update.is_some())
.finish()
}
}
#[derive(Debug)]
pub struct ConfigPusher {
agents: RwLock<HashMap<String, AgentConnection>>,
pending_pushes: RwLock<HashMap<String, PendingPush>>,
config: ConfigPusherConfig,
sequence: std::sync::atomic::AtomicU64,
}
#[derive(Debug, Clone)]
pub struct ConfigPusherConfig {
pub ack_timeout: Duration,
pub max_retries: usize,
pub retry_interval: Duration,
pub max_pending_per_agent: usize,
}
impl Default for ConfigPusherConfig {
fn default() -> Self {
Self {
ack_timeout: Duration::from_secs(10),
max_retries: 3,
retry_interval: Duration::from_secs(2),
max_pending_per_agent: 100,
}
}
}
#[derive(Debug, Clone)]
pub struct AgentConnection {
pub agent_id: String,
pub name: String,
pub connected_at: Instant,
pub last_seen: Instant,
pub successful_pushes: u64,
pub failed_pushes: u64,
pub supports_push: bool,
}
#[derive(Debug)]
struct PendingPush {
push_id: String,
agent_id: String,
update: ConfigUpdateRequest,
created_at: Instant,
last_attempt: Instant,
attempts: usize,
status: PushStatus,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PushStatus {
Pending,
Sent,
Acknowledged,
Failed { reason: String },
Expired,
}
#[derive(Debug)]
pub struct PushResult {
pub push_id: String,
pub agent_id: String,
pub status: PushStatus,
pub attempts: usize,
}
impl ConfigPusher {
pub fn new() -> Self {
Self::with_config(ConfigPusherConfig::default())
}
pub fn with_config(config: ConfigPusherConfig) -> Self {
Self {
agents: RwLock::new(HashMap::new()),
pending_pushes: RwLock::new(HashMap::new()),
config,
sequence: std::sync::atomic::AtomicU64::new(1),
}
}
pub fn register_agent(
&self,
agent_id: impl Into<String>,
name: impl Into<String>,
supports_push: bool,
) {
let agent_id = agent_id.into();
let now = Instant::now();
self.agents.write().insert(
agent_id.clone(),
AgentConnection {
agent_id,
name: name.into(),
connected_at: now,
last_seen: now,
successful_pushes: 0,
failed_pushes: 0,
supports_push,
},
);
}
pub fn unregister_agent(&self, agent_id: &str) {
self.agents.write().remove(agent_id);
self.pending_pushes
.write()
.retain(|_, p| p.agent_id != agent_id);
}
pub fn touch_agent(&self, agent_id: &str) {
if let Some(agent) = self.agents.write().get_mut(agent_id) {
agent.last_seen = Instant::now();
}
}
pub fn connected_agents(&self) -> Vec<AgentConnection> {
self.agents.read().values().cloned().collect()
}
pub fn pushable_agents(&self) -> Vec<AgentConnection> {
self.agents
.read()
.values()
.filter(|a| a.supports_push)
.cloned()
.collect()
}
pub fn push_to_agent(&self, agent_id: &str, update_type: ConfigUpdateType) -> Option<String> {
let agents = self.agents.read();
let agent = agents.get(agent_id)?;
if !agent.supports_push {
return None;
}
let push_id = self.next_push_id();
let now = Instant::now();
let update = ConfigUpdateRequest {
update_type,
request_id: push_id.clone(),
timestamp_ms: now_ms(),
};
self.pending_pushes.write().insert(
push_id.clone(),
PendingPush {
push_id: push_id.clone(),
agent_id: agent_id.to_string(),
update,
created_at: now,
last_attempt: now,
attempts: 1,
status: PushStatus::Sent,
},
);
Some(push_id)
}
pub fn push_to_all(&self, update_type: ConfigUpdateType) -> Vec<String> {
let pushable = self.pushable_agents();
let mut push_ids = Vec::with_capacity(pushable.len());
for agent in pushable {
if let Some(push_id) = self.push_to_agent(&agent.agent_id, update_type.clone()) {
push_ids.push(push_id);
}
}
push_ids
}
pub fn acknowledge(&self, push_id: &str, accepted: bool, error: Option<String>) {
let mut pending = self.pending_pushes.write();
if let Some(push) = pending.get_mut(push_id) {
if accepted {
push.status = PushStatus::Acknowledged;
if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
agent.successful_pushes += 1;
}
} else {
push.status = PushStatus::Failed {
reason: error.unwrap_or_else(|| "Unknown error".to_string()),
};
if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
agent.failed_pushes += 1;
}
}
}
}
pub fn get_retryable(&self) -> Vec<(String, ConfigUpdateRequest)> {
let now = Instant::now();
let mut retryable = Vec::new();
let mut pending = self.pending_pushes.write();
for push in pending.values_mut() {
if push.status == PushStatus::Sent
&& now.duration_since(push.last_attempt) >= self.config.retry_interval
&& push.attempts < self.config.max_retries
{
push.attempts += 1;
push.last_attempt = now;
retryable.push((push.agent_id.clone(), push.update.clone()));
}
}
retryable
}
pub fn expire_old(&self) {
let now = Instant::now();
let mut pending = self.pending_pushes.write();
for push in pending.values_mut() {
if push.status == PushStatus::Sent
&& (now.duration_since(push.created_at) >= self.config.ack_timeout
|| push.attempts >= self.config.max_retries)
{
push.status = PushStatus::Expired;
if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
agent.failed_pushes += 1;
}
}
}
let cleanup_age = Duration::from_secs(60);
pending.retain(|_, p| {
now.duration_since(p.created_at) < cleanup_age
|| matches!(p.status, PushStatus::Pending | PushStatus::Sent)
});
}
pub fn get_results(&self) -> Vec<PushResult> {
self.pending_pushes
.read()
.values()
.map(|p| PushResult {
push_id: p.push_id.clone(),
agent_id: p.agent_id.clone(),
status: p.status.clone(),
attempts: p.attempts,
})
.collect()
}
pub fn pending_count(&self) -> usize {
self.pending_pushes
.read()
.values()
.filter(|p| matches!(p.status, PushStatus::Pending | PushStatus::Sent))
.count()
}
fn next_push_id(&self) -> String {
let seq = self
.sequence
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
format!("push-{}", seq)
}
}
impl Default for ConfigPusher {
fn default() -> Self {
Self::new()
}
}
fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::v2::metrics::{standard, CounterMetric, GaugeMetric, HistogramMetric};
#[test]
fn test_metrics_collector_basic() {
let collector = MetricsCollector::new();
let mut report = MetricsReport::new("test-agent", 10_000);
report
.counters
.push(CounterMetric::new(standard::REQUESTS_TOTAL, 100));
report
.gauges
.push(GaugeMetric::new(standard::IN_FLIGHT_REQUESTS, 5.0));
collector.record(&report);
assert_eq!(collector.series_count(), 2);
assert_eq!(collector.active_agents(), vec!["test-agent"]);
}
#[test]
fn test_metrics_collector_with_labels() {
let collector = MetricsCollector::new();
let mut report = MetricsReport::new("agent-1", 10_000);
let mut counter = CounterMetric::new(standard::REQUESTS_TOTAL, 50);
counter
.labels
.insert("route".to_string(), "/api".to_string());
report.counters.push(counter);
collector.record(&report);
let prometheus = collector.export_prometheus();
assert!(prometheus.contains("agent_requests_total"));
assert!(prometheus.contains("route=\"/api\""));
assert!(prometheus.contains("agent_id=\"agent-1\""));
}
#[test]
fn test_prometheus_export() {
let collector = MetricsCollector::new();
let mut report = MetricsReport::new("test", 10_000);
let mut counter = CounterMetric::new("http_requests_total", 123);
counter.help = Some("Total HTTP requests".to_string());
report.counters.push(counter);
collector.record(&report);
let output = collector.export_prometheus();
assert!(output.contains("# HELP http_requests_total Total HTTP requests"));
assert!(output.contains("# TYPE http_requests_total counter"));
assert!(output.contains("123"));
}
#[test]
fn test_histogram_export() {
let config = MetricsCollectorConfig {
include_agent_id_label: false,
..MetricsCollectorConfig::default()
};
let collector = MetricsCollector::with_config(config);
let mut report = MetricsReport::new("test", 10_000);
report.histograms.push(HistogramMetric {
name: "request_duration_seconds".to_string(),
help: Some("Request duration".to_string()),
labels: HashMap::new(),
sum: 10.5,
count: 100,
buckets: vec![
HistogramBucket { le: 0.1, count: 50 },
HistogramBucket { le: 0.5, count: 80 },
HistogramBucket { le: 1.0, count: 95 },
HistogramBucket::infinity(),
],
});
collector.record(&report);
let output = collector.export_prometheus();
assert!(output.contains("request_duration_seconds_bucket"));
assert!(output.contains("le=\"0.1\""));
assert!(output.contains("le=\"+Inf\""));
assert!(output.contains("request_duration_seconds_sum 10.5"));
assert!(output.contains("request_duration_seconds_count 100"));
}
#[test]
fn test_config_update_handler() {
let handler = ConfigUpdateHandler::new();
let request = ConfigUpdateRequest {
update_type: ConfigUpdateType::RequestReload,
request_id: "req-1".to_string(),
timestamp_ms: 0,
};
let response = handler.handle(request);
assert!(response.accepted);
assert_eq!(handler.pending_count(), 1);
}
#[test]
fn test_escape_label_value() {
assert_eq!(escape_label_value("simple"), "simple");
assert_eq!(escape_label_value("with\"quotes"), "with\\\"quotes");
assert_eq!(escape_label_value("with\\backslash"), "with\\\\backslash");
assert_eq!(escape_label_value("with\nnewline"), "with\\nnewline");
}
#[test]
fn test_config_pusher_basic() {
let pusher = ConfigPusher::new();
pusher.register_agent("agent-1", "Test Agent", true);
let agents = pusher.connected_agents();
assert_eq!(agents.len(), 1);
assert_eq!(agents[0].agent_id, "agent-1");
assert!(agents[0].supports_push);
}
#[test]
fn test_config_pusher_push_to_agent() {
let pusher = ConfigPusher::new();
pusher.register_agent("agent-1", "Test Agent", true);
let update_type = ConfigUpdateType::RuleUpdate {
rule_set: "default".to_string(),
rules: vec![],
remove_rules: vec![],
};
let push_id = pusher.push_to_agent("agent-1", update_type);
assert!(push_id.is_some());
let push_id = push_id.unwrap();
assert!(push_id.starts_with("push-"));
assert_eq!(pusher.pending_count(), 1);
}
#[test]
fn test_config_pusher_acknowledge() {
let pusher = ConfigPusher::new();
pusher.register_agent("agent-1", "Test Agent", true);
let push_id = pusher
.push_to_agent("agent-1", ConfigUpdateType::RequestReload)
.unwrap();
pusher.acknowledge(&push_id, true, None);
let results = pusher.get_results();
assert_eq!(results.len(), 1);
assert_eq!(results[0].status, PushStatus::Acknowledged);
let agents = pusher.connected_agents();
assert_eq!(agents[0].successful_pushes, 1);
}
#[test]
fn test_config_pusher_push_to_non_pushable() {
let pusher = ConfigPusher::new();
pusher.register_agent("agent-1", "Test Agent", false);
let push_id = pusher.push_to_agent("agent-1", ConfigUpdateType::RequestReload);
assert!(push_id.is_none());
}
#[test]
fn test_config_pusher_push_to_all() {
let pusher = ConfigPusher::new();
pusher.register_agent("agent-1", "Agent 1", true);
pusher.register_agent("agent-2", "Agent 2", true);
pusher.register_agent("agent-3", "Agent 3", false);
let push_ids = pusher.push_to_all(ConfigUpdateType::RequestReload);
assert_eq!(push_ids.len(), 2);
assert_eq!(pusher.pending_count(), 2);
}
#[test]
fn test_config_pusher_unregister() {
let pusher = ConfigPusher::new();
pusher.register_agent("agent-1", "Test Agent", true);
let _push_id = pusher.push_to_agent("agent-1", ConfigUpdateType::RequestReload);
assert_eq!(pusher.pending_count(), 1);
pusher.unregister_agent("agent-1");
assert_eq!(pusher.connected_agents().len(), 0);
assert_eq!(pusher.pending_count(), 0); }
#[test]
fn test_metrics_snapshot() {
let collector = MetricsCollector::new();
let mut report = MetricsReport::new("test", 10_000);
report
.counters
.push(CounterMetric::new("requests_total", 100));
report.gauges.push(GaugeMetric::new("connections", 5.0));
collector.record(&report);
let snapshot = collector.snapshot();
assert_eq!(snapshot.counter_count(), 1);
assert_eq!(snapshot.gauge_count(), 1);
}
#[test]
fn test_unified_aggregator_basic() {
let aggregator = UnifiedMetricsAggregator::new("test-service", "instance-1");
aggregator.increment_counter(
"http_requests_total",
"Total HTTP requests",
HashMap::new(),
100,
);
aggregator.set_gauge(
"active_connections",
"Active connections",
HashMap::new(),
42.0,
);
assert_eq!(aggregator.series_count(), 2);
}
#[test]
fn test_unified_aggregator_counter_increment() {
let aggregator = UnifiedMetricsAggregator::new("test", "1");
aggregator.increment_counter("requests", "Total requests", HashMap::new(), 10);
aggregator.increment_counter("requests", "Total requests", HashMap::new(), 5);
let output = aggregator.export_prometheus();
assert!(output.contains("requests 15"));
}
#[test]
fn test_unified_aggregator_labeled_metrics() {
let aggregator = UnifiedMetricsAggregator::new("test", "1");
let mut labels = HashMap::new();
labels.insert("method".to_string(), "GET".to_string());
aggregator.increment_counter("requests", "Total requests", labels.clone(), 100);
let mut labels2 = HashMap::new();
labels2.insert("method".to_string(), "POST".to_string());
aggregator.increment_counter("requests", "Total requests", labels2, 50);
let output = aggregator.export_prometheus();
assert!(output.contains("method=\"GET\""));
assert!(output.contains("method=\"POST\""));
}
#[test]
fn test_unified_aggregator_histogram() {
let aggregator = UnifiedMetricsAggregator::new("test", "1");
let buckets = vec![
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
];
aggregator.observe_histogram(
"request_duration",
"Request duration",
HashMap::new(),
&buckets,
0.05,
);
aggregator.observe_histogram(
"request_duration",
"Request duration",
HashMap::new(),
&buckets,
0.2,
);
aggregator.observe_histogram(
"request_duration",
"Request duration",
HashMap::new(),
&buckets,
1.5,
);
let output = aggregator.export_prometheus();
assert!(output.contains("request_duration_bucket"));
assert!(output.contains("request_duration_sum"));
assert!(output.contains("request_duration_count 3"));
}
#[test]
fn test_unified_aggregator_with_agent_metrics() {
let aggregator = UnifiedMetricsAggregator::new("test", "1");
aggregator.increment_counter("proxy_requests", "Proxy requests", HashMap::new(), 1000);
let mut report = MetricsReport::new("waf-agent", 5_000);
report.counters.push(CounterMetric::new("waf_blocked", 50));
aggregator.record_agent_metrics(&report);
let output = aggregator.export_prometheus();
assert!(output.contains("proxy_requests 1000"));
assert!(output.contains("waf_blocked"));
assert!(output.contains("Agent metrics"));
}
#[test]
fn test_unified_aggregator_service_info() {
let aggregator = UnifiedMetricsAggregator::new("my-service", "node-42");
let output = aggregator.export_prometheus();
assert!(output.contains("zentinel_info"));
assert!(output.contains("service=\"my-service\""));
assert!(output.contains("instance=\"node-42\""));
}
}