#![allow(dead_code)] #![allow(clippy::format_push_string)]
#![allow(clippy::cast_precision_loss)]
#![allow(clippy::cast_possible_truncation)]
#![allow(clippy::cast_sign_loss)]
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
pub const LATENCY_BUCKETS_MS: [u64; 10] = [1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000];
#[derive(Debug, Default)]
pub struct MatchMetrics {
pub total_queries: AtomicU64,
pub successful_queries: AtomicU64,
pub failed_queries: AtomicU64,
pub total_results: AtomicU64,
pub latency_sum_ns: AtomicU64,
pub latency_buckets: [AtomicU64; 11],
pub max_depth_reached: AtomicU64,
pub depth_sum: AtomicU64,
pub similarity_queries: AtomicU64,
pub guard_rail_hits: AtomicU64,
}
impl MatchMetrics {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn record_success(&self, latency: Duration, result_count: usize, max_depth: u32) {
self.total_queries.fetch_add(1, Ordering::Relaxed);
self.successful_queries.fetch_add(1, Ordering::Relaxed);
self.total_results
.fetch_add(result_count as u64, Ordering::Relaxed);
self.record_latency(latency);
self.record_depth(max_depth);
}
pub fn record_failure(&self, latency: Duration) {
self.total_queries.fetch_add(1, Ordering::Relaxed);
self.failed_queries.fetch_add(1, Ordering::Relaxed);
self.record_latency(latency);
}
pub fn record_guard_rail_hit(&self) {
self.guard_rail_hits.fetch_add(1, Ordering::Relaxed);
}
pub fn record_similarity_query(&self) {
self.similarity_queries.fetch_add(1, Ordering::Relaxed);
}
fn record_latency(&self, latency: Duration) {
let ms = u64::try_from(latency.as_millis()).unwrap_or(u64::MAX);
self.latency_sum_ns.fetch_add(
u64::try_from(latency.as_nanos()).unwrap_or(u64::MAX),
Ordering::Relaxed,
);
let bucket_idx = LATENCY_BUCKETS_MS
.iter()
.position(|&bound| ms < bound)
.unwrap_or(LATENCY_BUCKETS_MS.len());
self.latency_buckets[bucket_idx].fetch_add(1, Ordering::Relaxed);
}
fn record_depth(&self, depth: u32) {
self.depth_sum
.fetch_add(u64::from(depth), Ordering::Relaxed);
let mut current_max = self.max_depth_reached.load(Ordering::Relaxed);
while u64::from(depth) > current_max {
match self.max_depth_reached.compare_exchange_weak(
current_max,
u64::from(depth),
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current_max = actual,
}
}
}
#[must_use]
pub fn avg_latency_ms(&self) -> f64 {
let total = self.total_queries.load(Ordering::Relaxed);
if total == 0 {
return 0.0;
}
let sum_ns = self.latency_sum_ns.load(Ordering::Relaxed);
(sum_ns as f64) / (total as f64) / 1_000_000.0
}
#[must_use]
pub fn success_rate(&self) -> f64 {
let total = self.total_queries.load(Ordering::Relaxed);
if total == 0 {
return 1.0;
}
let success = self.successful_queries.load(Ordering::Relaxed);
(success as f64) / (total as f64)
}
#[must_use]
pub fn avg_results_per_query(&self) -> f64 {
let success = self.successful_queries.load(Ordering::Relaxed);
if success == 0 {
return 0.0;
}
let total_results = self.total_results.load(Ordering::Relaxed);
(total_results as f64) / (success as f64)
}
#[must_use]
pub fn avg_depth(&self) -> f64 {
let success = self.successful_queries.load(Ordering::Relaxed);
if success == 0 {
return 0.0;
}
let depth_sum = self.depth_sum.load(Ordering::Relaxed);
(depth_sum as f64) / (success as f64)
}
#[must_use]
pub fn to_prometheus(&self) -> String {
let mut output = String::new();
Self::write_counter(
&mut output,
"velesdb_match_queries_total",
"Total MATCH queries executed",
self.total_queries.load(Ordering::Relaxed),
);
Self::write_counter(
&mut output,
"velesdb_match_queries_success_total",
"Successful MATCH queries",
self.successful_queries.load(Ordering::Relaxed),
);
Self::write_counter(
&mut output,
"velesdb_match_queries_failed_total",
"Failed MATCH queries",
self.failed_queries.load(Ordering::Relaxed),
);
self.write_latency_histogram(&mut output);
Self::write_counter(
&mut output,
"velesdb_match_results_total",
"Total results returned",
self.total_results.load(Ordering::Relaxed),
);
Self::write_counter(
&mut output,
"velesdb_match_guardrail_hits_total",
"Guard-rail violations",
self.guard_rail_hits.load(Ordering::Relaxed),
);
Self::write_counter(
&mut output,
"velesdb_match_similarity_queries_total",
"Queries with similarity",
self.similarity_queries.load(Ordering::Relaxed),
);
output
}
fn write_counter(output: &mut String, name: &str, help: &str, value: u64) {
output.push_str(&format!(
"# HELP {name} {help}\n# TYPE {name} counter\n{name} {value}\n"
));
}
fn write_latency_histogram(&self, output: &mut String) {
output.push_str("# HELP velesdb_match_latency_seconds MATCH query latency histogram\n");
output.push_str("# TYPE velesdb_match_latency_seconds histogram\n");
let mut cumulative = 0u64;
for (i, &bound) in LATENCY_BUCKETS_MS.iter().enumerate() {
cumulative += self.latency_buckets[i].load(Ordering::Relaxed);
output.push_str(&format!(
"velesdb_match_latency_seconds_bucket{{le=\"{}\"}} {}\n",
bound as f64 / 1000.0,
cumulative
));
}
cumulative += self.latency_buckets[LATENCY_BUCKETS_MS.len()].load(Ordering::Relaxed);
output.push_str(&format!(
"velesdb_match_latency_seconds_bucket{{le=\"+Inf\"}} {cumulative}\n",
));
}
pub fn reset(&self) {
self.total_queries.store(0, Ordering::Relaxed);
self.successful_queries.store(0, Ordering::Relaxed);
self.failed_queries.store(0, Ordering::Relaxed);
self.total_results.store(0, Ordering::Relaxed);
self.latency_sum_ns.store(0, Ordering::Relaxed);
for bucket in &self.latency_buckets {
bucket.store(0, Ordering::Relaxed);
}
self.max_depth_reached.store(0, Ordering::Relaxed);
self.depth_sum.store(0, Ordering::Relaxed);
self.similarity_queries.store(0, Ordering::Relaxed);
self.guard_rail_hits.store(0, Ordering::Relaxed);
}
}
pub struct QueryTimer<'a> {
metrics: &'a MatchMetrics,
start: Instant,
recorded: bool,
}
impl<'a> QueryTimer<'a> {
#[must_use]
pub fn new(metrics: &'a MatchMetrics) -> Self {
Self {
metrics,
start: Instant::now(),
recorded: false,
}
}
pub fn success(mut self, result_count: usize, max_depth: u32) {
self.metrics
.record_success(self.start.elapsed(), result_count, max_depth);
self.recorded = true;
}
pub fn failure(mut self) {
self.metrics.record_failure(self.start.elapsed());
self.recorded = true;
}
}
impl Drop for QueryTimer<'_> {
fn drop(&mut self) {
if !self.recorded {
self.metrics.record_failure(self.start.elapsed());
}
}
}