#![allow(clippy::cast_precision_loss)]
#![allow(clippy::cast_possible_truncation)]
#[cfg(test)]
mod tests;
use std::fmt::Write;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
const BUCKET_BOUNDS_MS: [u64; 9] = [1, 5, 10, 50, 100, 500, 1000, 5000, 10000];
#[derive(Debug, Default)]
pub struct LatencyHistogram {
buckets: [AtomicU64; 10],
sum_ns: AtomicU64,
count: AtomicU64,
}
impl LatencyHistogram {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn observe(&self, duration: Duration) {
let ns_u128 = duration.as_nanos();
let ns = if ns_u128 > u128::from(u64::MAX) {
u64::MAX
} else {
ns_u128 as u64
};
self.sum_ns.fetch_add(ns, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
let ms_u128 = duration.as_millis();
let ms = if ms_u128 > u128::from(u64::MAX) {
u64::MAX
} else {
ms_u128 as u64
};
let bucket_idx = BUCKET_BOUNDS_MS
.iter()
.position(|&bound| ms < bound)
.unwrap_or(9);
self.buckets[bucket_idx].fetch_add(1, Ordering::Relaxed);
}
#[must_use]
pub fn count(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
#[must_use]
pub fn sum_ns(&self) -> u64 {
self.sum_ns.load(Ordering::Relaxed)
}
#[must_use]
pub fn avg_ns(&self) -> f64 {
let count = self.count();
if count == 0 {
0.0
} else {
self.sum_ns() as f64 / count as f64
}
}
#[must_use]
pub fn bucket_counts(&self) -> [u64; 10] {
let mut counts = [0u64; 10];
for (i, bucket) in self.buckets.iter().enumerate() {
counts[i] = bucket.load(Ordering::Relaxed);
}
counts
}
pub fn reset(&self) {
self.sum_ns.store(0, Ordering::Relaxed);
self.count.store(0, Ordering::Relaxed);
for bucket in &self.buckets {
bucket.store(0, Ordering::Relaxed);
}
}
}
#[derive(Debug, Default)]
pub struct GraphMetrics {
nodes_total: AtomicU64,
node_inserts_total: AtomicU64,
node_deletes_total: AtomicU64,
edges_total: AtomicU64,
edge_inserts_total: AtomicU64,
edge_deletes_total: AtomicU64,
traversals_total: AtomicU64,
traversal_nodes_visited: AtomicU64,
pub edge_insert_latency: LatencyHistogram,
pub edge_delete_latency: LatencyHistogram,
pub traversal_latency: LatencyHistogram,
pub query_latency: LatencyHistogram,
}
impl GraphMetrics {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn record_node_insert(&self) {
self.node_inserts_total.fetch_add(1, Ordering::Relaxed);
self.nodes_total.fetch_add(1, Ordering::Relaxed);
}
pub fn record_node_delete(&self) {
self.node_deletes_total.fetch_add(1, Ordering::Relaxed);
loop {
let current = self.nodes_total.load(Ordering::Relaxed);
let new_val = current.saturating_sub(1);
if self
.nodes_total
.compare_exchange_weak(current, new_val, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
}
#[must_use]
pub fn nodes_total(&self) -> u64 {
self.nodes_total.load(Ordering::Relaxed)
}
#[must_use]
pub fn node_inserts_total(&self) -> u64 {
self.node_inserts_total.load(Ordering::Relaxed)
}
pub fn record_edge_insert(&self, latency: Duration) {
self.edge_inserts_total.fetch_add(1, Ordering::Relaxed);
self.edges_total.fetch_add(1, Ordering::Relaxed);
self.edge_insert_latency.observe(latency);
}
pub fn record_edge_delete(&self, latency: Duration) {
self.edge_deletes_total.fetch_add(1, Ordering::Relaxed);
loop {
let current = self.edges_total.load(Ordering::Relaxed);
let new_val = current.saturating_sub(1);
if self
.edges_total
.compare_exchange_weak(current, new_val, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
self.edge_delete_latency.observe(latency);
}
#[must_use]
pub fn edges_total(&self) -> u64 {
self.edges_total.load(Ordering::Relaxed)
}
#[must_use]
pub fn edge_inserts_total(&self) -> u64 {
self.edge_inserts_total.load(Ordering::Relaxed)
}
#[must_use]
pub fn edge_deletes_total(&self) -> u64 {
self.edge_deletes_total.load(Ordering::Relaxed)
}
pub fn record_traversal(&self, latency: Duration, nodes_visited: u64) {
self.traversals_total.fetch_add(1, Ordering::Relaxed);
self.traversal_nodes_visited
.fetch_add(nodes_visited, Ordering::Relaxed);
self.traversal_latency.observe(latency);
}
#[must_use]
pub fn traversals_total(&self) -> u64 {
self.traversals_total.load(Ordering::Relaxed)
}
#[must_use]
pub fn traversal_nodes_visited(&self) -> u64 {
self.traversal_nodes_visited.load(Ordering::Relaxed)
}
pub fn record_query(&self, latency: Duration) {
self.query_latency.observe(latency);
}
#[must_use]
pub fn to_prometheus(&self) -> String {
let mut output = String::with_capacity(2048);
output.push_str("# HELP velesdb_graph_nodes_total Current number of nodes\n");
output.push_str("# TYPE velesdb_graph_nodes_total gauge\n");
let _ = writeln!(output, "velesdb_graph_nodes_total {}\n", self.nodes_total());
output.push_str("# HELP velesdb_graph_node_inserts_total Total node insertions\n");
output.push_str("# TYPE velesdb_graph_node_inserts_total counter\n");
let _ = writeln!(
output,
"velesdb_graph_node_inserts_total {}\n",
self.node_inserts_total()
);
output.push_str("# HELP velesdb_graph_edges_total Current number of edges\n");
output.push_str("# TYPE velesdb_graph_edges_total gauge\n");
let _ = writeln!(output, "velesdb_graph_edges_total {}\n", self.edges_total());
output.push_str("# HELP velesdb_graph_edge_inserts_total Total edge insertions\n");
output.push_str("# TYPE velesdb_graph_edge_inserts_total counter\n");
let _ = writeln!(
output,
"velesdb_graph_edge_inserts_total {}\n",
self.edge_inserts_total()
);
Self::append_histogram_prometheus(&mut output, "edge_insert", &self.edge_insert_latency);
Self::append_histogram_prometheus(&mut output, "traversal", &self.traversal_latency);
output.push_str("# HELP velesdb_graph_traversals_total Total traversals executed\n");
output.push_str("# TYPE velesdb_graph_traversals_total counter\n");
let _ = writeln!(
output,
"velesdb_graph_traversals_total {}\n",
self.traversals_total()
);
output
}
fn append_histogram_prometheus(output: &mut String, name: &str, histogram: &LatencyHistogram) {
let bucket_bounds = [
"0.001", "0.005", "0.01", "0.05", "0.1", "0.5", "1", "5", "10", "+Inf",
];
let counts = histogram.bucket_counts();
let mut cumulative = 0u64;
let _ = writeln!(
output,
"# HELP velesdb_graph_{}_duration_seconds {} latency histogram",
name,
name.replace('_', " ")
);
let _ = writeln!(
output,
"# TYPE velesdb_graph_{name}_duration_seconds histogram"
);
for (i, &bound) in bucket_bounds.iter().enumerate() {
cumulative += counts[i];
let _ = writeln!(
output,
"velesdb_graph_{name}_duration_seconds_bucket{{le=\"{bound}\"}} {cumulative}",
);
}
let _ = writeln!(
output,
"velesdb_graph_{}_duration_seconds_sum {}",
name,
histogram.sum_ns() as f64 / 1_000_000_000.0
);
let _ = writeln!(
output,
"velesdb_graph_{}_duration_seconds_count {}\n",
name,
histogram.count()
);
}
pub fn reset(&self) {
self.nodes_total.store(0, Ordering::Relaxed);
self.node_inserts_total.store(0, Ordering::Relaxed);
self.node_deletes_total.store(0, Ordering::Relaxed);
self.edges_total.store(0, Ordering::Relaxed);
self.edge_inserts_total.store(0, Ordering::Relaxed);
self.edge_deletes_total.store(0, Ordering::Relaxed);
self.traversals_total.store(0, Ordering::Relaxed);
self.traversal_nodes_visited.store(0, Ordering::Relaxed);
self.edge_insert_latency.reset();
self.edge_delete_latency.reset();
self.traversal_latency.reset();
self.query_latency.reset();
}
}