use std::collections::HashMap;
use std::time::Instant;
use parking_lot::Mutex;
struct HistogramData {
sum: f64,
count: u64,
buckets: Vec<(f64, u64)>,
}
impl HistogramData {
fn new() -> Self {
let boundaries = vec![
0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
30.0, 60.0,
];
let buckets = boundaries.into_iter().map(|b| (b, 0u64)).collect();
Self {
sum: 0.0,
count: 0,
buckets,
}
}
fn observe(&mut self, value: f64) {
self.sum += value;
self.count += 1;
for (boundary, count) in &mut self.buckets {
if value <= *boundary {
*count += 1;
}
}
}
fn render(&self, name: &str, labels: &str, help: &str) -> String {
let mut out = String::new();
out.push_str(&format!("# HELP {} {}\n", name, help));
out.push_str(&format!("# TYPE {} histogram\n", name));
for (boundary, count) in &self.buckets {
out.push_str(&format!(
"{}_bucket{{{},le=\"{}\"}} {}\n",
name, labels, boundary, count
));
}
out.push_str(&format!(
"{}_bucket{{{},le=\"+Inf\"}} {}\n",
name, labels, self.count
));
out.push_str(&format!("{}_sum{{{}}} {}\n", name, labels, self.sum));
out.push_str(&format!("{}_count{{{}}} {}\n", name, labels, self.count));
out
}
}
pub struct MetricsStore {
handler_durations: Mutex<HashMap<String, HistogramData>>,
lock_waits: Mutex<HashMap<String, HistogramData>>,
request_counts: Mutex<HashMap<String, u64>>,
recall_rejected_counts: Mutex<HashMap<&'static str, u64>>,
recall_in_flight_gauge: std::sync::atomic::AtomicI64,
expansion_concurrent_gauge: std::sync::atomic::AtomicI64,
raft_term_changes: std::sync::atomic::AtomicU64,
raft_elections: Mutex<HashMap<&'static str, u64>>,
raft_heartbeat_lag: Mutex<HistogramData>,
raft_task_poll_latency: Mutex<HistogramData>,
openraft_current_term: std::sync::atomic::AtomicU64,
openraft_is_leader: std::sync::atomic::AtomicI64,
openraft_last_log_index: std::sync::atomic::AtomicI64,
openraft_last_applied_index: std::sync::atomic::AtomicI64,
openraft_snapshot_index: std::sync::atomic::AtomicI64,
openraft_purged_index: std::sync::atomic::AtomicI64,
openraft_quorum_ack_lag_ms: std::sync::atomic::AtomicI64,
openraft_running_state_healthy: std::sync::atomic::AtomicI64,
openraft_voters: std::sync::atomic::AtomicU64,
openraft_learners: std::sync::atomic::AtomicU64,
recall_request_counts: Mutex<HashMap<(&'static str, bool), u64>>,
recall_request_top_k: Mutex<HashMap<&'static str, HistogramData>>,
}
impl MetricsStore {
pub fn new() -> Self {
Self {
handler_durations: Mutex::new(HashMap::new()),
lock_waits: Mutex::new(HashMap::new()),
request_counts: Mutex::new(HashMap::new()),
recall_rejected_counts: Mutex::new(HashMap::new()),
recall_in_flight_gauge: std::sync::atomic::AtomicI64::new(0),
expansion_concurrent_gauge: std::sync::atomic::AtomicI64::new(0),
raft_term_changes: std::sync::atomic::AtomicU64::new(0),
raft_elections: Mutex::new(HashMap::new()),
raft_heartbeat_lag: Mutex::new(HistogramData::new()),
raft_task_poll_latency: Mutex::new(HistogramData::new()),
openraft_current_term: std::sync::atomic::AtomicU64::new(0),
openraft_is_leader: std::sync::atomic::AtomicI64::new(0),
openraft_last_log_index: std::sync::atomic::AtomicI64::new(-1),
openraft_last_applied_index: std::sync::atomic::AtomicI64::new(-1),
openraft_snapshot_index: std::sync::atomic::AtomicI64::new(-1),
openraft_purged_index: std::sync::atomic::AtomicI64::new(-1),
openraft_quorum_ack_lag_ms: std::sync::atomic::AtomicI64::new(-1),
openraft_running_state_healthy: std::sync::atomic::AtomicI64::new(0),
openraft_voters: std::sync::atomic::AtomicU64::new(0),
openraft_learners: std::sync::atomic::AtomicU64::new(0),
recall_request_counts: Mutex::new(HashMap::new()),
recall_request_top_k: Mutex::new(HashMap::new()),
}
}
pub fn record_handler_duration(&self, handler: &str, duration_secs: f64) {
let mut map = self.handler_durations.lock();
map.entry(handler.to_string())
.or_insert_with(HistogramData::new)
.observe(duration_secs);
}
pub fn record_lock_wait(&self, lock_name: &str, duration_secs: f64) {
let mut map = self.lock_waits.lock();
map.entry(lock_name.to_string())
.or_insert_with(HistogramData::new)
.observe(duration_secs);
}
pub fn increment_request(&self, handler: &str) {
let mut map = self.request_counts.lock();
*map.entry(handler.to_string()).or_insert(0) += 1;
}
pub fn render_prometheus(&self) -> String {
let mut out = String::with_capacity(4096);
{
let map = self.handler_durations.lock();
for (handler, hist) in map.iter() {
out.push_str(&hist.render(
"yantrikdb_handler_duration_seconds",
&format!("handler=\"{}\"", handler),
"Duration of HTTP handler execution in seconds",
));
}
}
{
let map = self.lock_waits.lock();
for (lock_name, hist) in map.iter() {
out.push_str(&hist.render(
"yantrikdb_lock_wait_seconds",
&format!("lock=\"{}\"", lock_name),
"Time spent waiting to acquire a lock in seconds",
));
}
}
{
let map = self.request_counts.lock();
if !map.is_empty() {
out.push_str("# HELP yantrikdb_requests_total Total HTTP requests per handler\n");
out.push_str("# TYPE yantrikdb_requests_total counter\n");
for (handler, count) in map.iter() {
out.push_str(&format!(
"yantrikdb_requests_total{{handler=\"{}\"}} {}\n",
handler, count,
));
}
}
}
{
let map = self.recall_rejected_counts.lock();
if !map.is_empty() {
out.push_str(
"# HELP yantrikdb_recall_rejected_total Recall requests rejected by admission control, by reason\n",
);
out.push_str("# TYPE yantrikdb_recall_rejected_total counter\n");
for (reason, count) in map.iter() {
out.push_str(&format!(
"yantrikdb_recall_rejected_total{{reason=\"{}\"}} {}\n",
reason, count
));
}
}
}
out.push_str("# HELP yantrikdb_recall_in_flight Current in-flight recalls (any kind)\n");
out.push_str("# TYPE yantrikdb_recall_in_flight gauge\n");
out.push_str(&format!(
"yantrikdb_recall_in_flight {}\n",
self.recall_in_flight_gauge
.load(std::sync::atomic::Ordering::Relaxed)
));
out.push_str("# HELP yantrikdb_expansion_concurrent Current concurrent expanded recalls\n");
out.push_str("# TYPE yantrikdb_expansion_concurrent gauge\n");
out.push_str(&format!(
"yantrikdb_expansion_concurrent {}\n",
self.expansion_concurrent_gauge
.load(std::sync::atomic::Ordering::Relaxed)
));
out.push_str(
"# HELP yantrikdb_raft_term_changes_total Raft term increments (new election or stepdown)\n",
);
out.push_str("# TYPE yantrikdb_raft_term_changes_total counter\n");
out.push_str(&format!(
"yantrikdb_raft_term_changes_total {}\n",
self.raft_term_changes
.load(std::sync::atomic::Ordering::Relaxed)
));
{
let map = self.raft_elections.lock();
if !map.is_empty() {
out.push_str("# HELP yantrikdb_raft_elections_total Raft elections by outcome\n");
out.push_str("# TYPE yantrikdb_raft_elections_total counter\n");
for (result, count) in map.iter() {
out.push_str(&format!(
"yantrikdb_raft_elections_total{{result=\"{}\"}} {}\n",
result, count
));
}
}
}
{
let hist = self.raft_heartbeat_lag.lock();
if hist.count > 0 {
out.push_str(&hist.render(
"yantrikdb_raft_heartbeat_lag_seconds",
"",
"Heartbeat round-trip lag in seconds",
));
}
}
{
let hist = self.raft_task_poll_latency.lock();
if hist.count > 0 {
out.push_str(&hist.render(
"yantrikdb_raft_task_poll_latency_seconds",
"",
"Control-runtime task scheduling latency in seconds (acceptance gate signal)",
));
}
}
let load_u64 =
|a: &std::sync::atomic::AtomicU64| a.load(std::sync::atomic::Ordering::Relaxed);
let load_i64 =
|a: &std::sync::atomic::AtomicI64| a.load(std::sync::atomic::Ordering::Relaxed);
out.push_str(
"# HELP yantrikdb_openraft_current_term Current Raft term observed by this node\n",
);
out.push_str("# TYPE yantrikdb_openraft_current_term gauge\n");
out.push_str(&format!(
"yantrikdb_openraft_current_term {}\n",
load_u64(&self.openraft_current_term)
));
out.push_str("# HELP yantrikdb_openraft_is_leader 1 if this node is the cluster leader, 0 otherwise\n");
out.push_str("# TYPE yantrikdb_openraft_is_leader gauge\n");
out.push_str(&format!(
"yantrikdb_openraft_is_leader {}\n",
load_i64(&self.openraft_is_leader)
));
out.push_str("# HELP yantrikdb_openraft_last_log_index Last log index appended on this node (-1 = none)\n");
out.push_str("# TYPE yantrikdb_openraft_last_log_index gauge\n");
out.push_str(&format!(
"yantrikdb_openraft_last_log_index {}\n",
load_i64(&self.openraft_last_log_index)
));
out.push_str("# HELP yantrikdb_openraft_last_applied_index Last log index applied to state machine (-1 = none)\n");
out.push_str("# TYPE yantrikdb_openraft_last_applied_index gauge\n");
out.push_str(&format!(
"yantrikdb_openraft_last_applied_index {}\n",
load_i64(&self.openraft_last_applied_index)
));
out.push_str("# HELP yantrikdb_openraft_snapshot_index Last log index included in the most recent snapshot (-1 = none)\n");
out.push_str("# TYPE yantrikdb_openraft_snapshot_index gauge\n");
out.push_str(&format!(
"yantrikdb_openraft_snapshot_index {}\n",
load_i64(&self.openraft_snapshot_index)
));
out.push_str("# HELP yantrikdb_openraft_purged_index Largest log index purged from storage (-1 = none)\n");
out.push_str("# TYPE yantrikdb_openraft_purged_index gauge\n");
out.push_str(&format!(
"yantrikdb_openraft_purged_index {}\n",
load_i64(&self.openraft_purged_index)
));
out.push_str("# HELP yantrikdb_openraft_quorum_ack_lag_ms Ms since quorum last acknowledged the leader (-1 = not leader)\n");
out.push_str("# TYPE yantrikdb_openraft_quorum_ack_lag_ms gauge\n");
out.push_str(&format!(
"yantrikdb_openraft_quorum_ack_lag_ms {}\n",
load_i64(&self.openraft_quorum_ack_lag_ms)
));
out.push_str("# HELP yantrikdb_openraft_running_state_healthy 1 if openraft running_state is Ok, 0 after fatal\n");
out.push_str("# TYPE yantrikdb_openraft_running_state_healthy gauge\n");
out.push_str(&format!(
"yantrikdb_openraft_running_state_healthy {}\n",
load_i64(&self.openraft_running_state_healthy)
));
out.push_str("# HELP yantrikdb_openraft_voters Number of voter members in cluster\n");
out.push_str("# TYPE yantrikdb_openraft_voters gauge\n");
out.push_str(&format!(
"yantrikdb_openraft_voters {}\n",
load_u64(&self.openraft_voters)
));
out.push_str("# HELP yantrikdb_openraft_learners Number of learner members in cluster\n");
out.push_str("# TYPE yantrikdb_openraft_learners gauge\n");
out.push_str(&format!(
"yantrikdb_openraft_learners {}\n",
load_u64(&self.openraft_learners)
));
{
let map = self.recall_request_counts.lock();
if !map.is_empty() {
out.push_str("# HELP yantrikdb_recall_requests_total Recall requests received\n");
out.push_str("# TYPE yantrikdb_recall_requests_total counter\n");
for ((version, expand), count) in map.iter() {
out.push_str(&format!(
"yantrikdb_recall_requests_total{{api_version=\"{}\",expand=\"{}\"}} {}\n",
version, expand, count
));
}
}
}
{
let map = self.recall_request_top_k.lock();
for (version, hist) in map.iter() {
if hist.count > 0 {
out.push_str(&hist.render(
"yantrikdb_recall_request_top_k",
&format!("api_version=\"{}\"", version),
"Distribution of requested top_k values",
));
}
}
}
render_version_gauges_if_set(&mut out);
out
}
}
pub fn increment_recall_rejected(reason: &'static str) {
let mut map = global().recall_rejected_counts.lock();
*map.entry(reason).or_insert(0) += 1;
}
pub fn set_recall_in_flight_gauge(value: i64) {
global()
.recall_in_flight_gauge
.store(value, std::sync::atomic::Ordering::Relaxed);
}
pub fn set_expansion_concurrent_gauge(value: i64) {
global()
.expansion_concurrent_gauge
.store(value, std::sync::atomic::Ordering::Relaxed);
}
pub fn increment_raft_term_changes() {
global()
.raft_term_changes
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn record_raft_election(result: &'static str) {
debug_assert!(
matches!(result, "won" | "lost" | "stepped_down"),
"raft election result must be one of won|lost|stepped_down"
);
let mut map = global().raft_elections.lock();
*map.entry(result).or_insert(0) += 1;
}
pub fn record_raft_heartbeat_lag(duration: std::time::Duration) {
global()
.raft_heartbeat_lag
.lock()
.observe(duration.as_secs_f64());
}
pub fn record_raft_task_poll_latency(duration: std::time::Duration) {
global()
.raft_task_poll_latency
.lock()
.observe(duration.as_secs_f64());
}
#[allow(clippy::too_many_arguments)]
pub fn record_openraft_gauges(
current_term: u64,
is_leader: bool,
last_log_index: Option<u64>,
last_applied_index: Option<u64>,
snapshot_index: Option<u64>,
purged_index: Option<u64>,
millis_since_quorum_ack: Option<u64>,
healthy: bool,
voters: u64,
learners: u64,
) {
use std::sync::atomic::Ordering::Relaxed;
let g = global();
g.openraft_current_term.store(current_term, Relaxed);
g.openraft_is_leader
.store(if is_leader { 1 } else { 0 }, Relaxed);
g.openraft_last_log_index
.store(last_log_index.map(|n| n as i64).unwrap_or(-1), Relaxed);
g.openraft_last_applied_index
.store(last_applied_index.map(|n| n as i64).unwrap_or(-1), Relaxed);
g.openraft_snapshot_index
.store(snapshot_index.map(|n| n as i64).unwrap_or(-1), Relaxed);
g.openraft_purged_index
.store(purged_index.map(|n| n as i64).unwrap_or(-1), Relaxed);
g.openraft_quorum_ack_lag_ms.store(
millis_since_quorum_ack.map(|n| n as i64).unwrap_or(-1),
Relaxed,
);
g.openraft_running_state_healthy
.store(if healthy { 1 } else { 0 }, Relaxed);
g.openraft_voters.store(voters, Relaxed);
g.openraft_learners.store(learners, Relaxed);
}
pub fn record_recall_request(api_version: &'static str, expand: bool) {
let mut map = global().recall_request_counts.lock();
*map.entry((api_version, expand)).or_insert(0) += 1;
}
pub fn record_recall_top_k(api_version: &'static str, top_k: usize) {
let mut map = global().recall_request_top_k.lock();
map.entry(api_version)
.or_insert_with(HistogramData::new)
.observe(top_k as f64);
}
pub fn increment_version_rejection(reason: &'static str) {
let mut map = global().recall_rejected_counts.lock();
*map.entry(reason).or_insert(0) += 1;
}
pub type VersionGaugeRenderer = fn(&mut String);
static VERSION_GAUGE_RENDERER: std::sync::OnceLock<VersionGaugeRenderer> =
std::sync::OnceLock::new();
pub fn set_version_gauge_renderer(f: VersionGaugeRenderer) {
let _ = VERSION_GAUGE_RENDERER.set(f);
}
fn render_version_gauges_if_set(out: &mut String) {
if let Some(renderer) = VERSION_GAUGE_RENDERER.get() {
renderer(out);
}
}
#[cfg(test)]
pub fn raft_task_poll_latency_p99() -> f64 {
let hist = global().raft_task_poll_latency.lock();
if hist.count == 0 {
return 0.0;
}
let target = (hist.count as f64 * 0.99) as u64;
let mut acc = 0u64;
let mut last_boundary = 0.0;
for (boundary, bucket_count) in &hist.buckets {
acc = *bucket_count;
if acc >= target {
return *boundary;
}
last_boundary = *boundary;
}
last_boundary.max(hist.sum / hist.count.max(1) as f64)
}
static METRICS: std::sync::OnceLock<MetricsStore> = std::sync::OnceLock::new();
pub fn global() -> &'static MetricsStore {
METRICS.get_or_init(MetricsStore::new)
}
pub struct HandlerTimer {
handler: &'static str,
start: Instant,
}
impl HandlerTimer {
pub fn new(handler: &'static str) -> Self {
global().increment_request(handler);
Self {
handler,
start: Instant::now(),
}
}
}
impl Drop for HandlerTimer {
fn drop(&mut self) {
global().record_handler_duration(self.handler, self.start.elapsed().as_secs_f64());
}
}
pub fn record_engine_lock_wait(duration: std::time::Duration) {
global().record_lock_wait("engine", duration.as_secs_f64());
}
#[allow(dead_code)]
pub fn record_control_lock_wait(duration: std::time::Duration) {
global().record_lock_wait("control", duration.as_secs_f64());
}
#[allow(dead_code)]
#[cfg(debug_assertions)]
pub mod lock_rank {
pub const CONTROL: u8 = 0;
pub const TENANT_POOL: u8 = 1;
pub const ENGINE: u8 = 2;
pub const CONN: u8 = 3;
pub const VEC_INDEX: u8 = 4;
pub const GRAPH_INDEX: u8 = 5;
pub const SCORING_CACHE: u8 = 6;
pub const ACTIVE_SESSIONS: u8 = 7;
pub const HLC: u8 = 8;
}
#[allow(dead_code)]
#[cfg(debug_assertions)]
pub fn check_lock_order(rank: u8, lock_name: &str) {
thread_local! {
static HELD_RANKS: std::cell::RefCell<Vec<(u8, &'static str)>> = const { std::cell::RefCell::new(Vec::new()) };
}
HELD_RANKS.with(|held| {
let held = held.borrow();
for &(held_rank, held_name) in held.iter() {
if held_rank > rank {
panic!(
"LOCK ORDER VIOLATION: trying to acquire '{}' (rank {}) \
while holding '{}' (rank {}). See CONCURRENCY.md Rule 3.",
lock_name, rank, held_name, held_rank,
);
}
}
});
}
#[allow(dead_code)]
#[cfg(debug_assertions)]
pub fn push_lock(rank: u8, lock_name: &'static str) {
thread_local! {
static HELD_RANKS: std::cell::RefCell<Vec<(u8, &'static str)>> = const { std::cell::RefCell::new(Vec::new()) };
}
HELD_RANKS.with(|held| {
held.borrow_mut().push((rank, lock_name));
});
}
#[allow(dead_code)]
#[cfg(debug_assertions)]
pub fn pop_lock(rank: u8) {
thread_local! {
static HELD_RANKS: std::cell::RefCell<Vec<(u8, &'static str)>> = const { std::cell::RefCell::new(Vec::new()) };
}
HELD_RANKS.with(|held| {
let mut held = held.borrow_mut();
if let Some(pos) = held.iter().rposition(|(r, _)| *r == rank) {
held.remove(pos);
}
});
}
#[cfg(not(debug_assertions))]
pub fn check_lock_order(_rank: u8, _lock_name: &str) {}
#[cfg(not(debug_assertions))]
pub fn push_lock(_rank: u8, _lock_name: &'static str) {}
#[cfg(not(debug_assertions))]
pub fn pop_lock(_rank: u8) {}