use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
#[derive(Clone, Copy)]
pub enum ResultSizeBucket {
Empty,
Small,
Medium,
Large,
}
impl ResultSizeBucket {
pub fn from_count(n: usize) -> Self {
match n {
0 => Self::Empty,
1..=99 => Self::Small,
100..=9_999 => Self::Medium,
_ => Self::Large,
}
}
pub fn label(&self) -> &'static str {
match self {
Self::Empty => "empty",
Self::Small => "small",
Self::Medium => "medium",
Self::Large => "large",
}
}
}
pub struct Metrics {
sparql_queries: AtomicU64,
datalog_queries: AtomicU64,
errors: AtomicU64,
total_duration_us: AtomicU64,
last_query_ts: AtomicU64,
arrow_batches_sent: AtomicU64,
arrow_ticket_rejections: AtomicU64,
select_count: AtomicU64,
ask_count: AtomicU64,
construct_count: AtomicU64,
describe_count: AtomicU64,
update_count: AtomicU64,
select_duration_us: AtomicU64,
ask_duration_us: AtomicU64,
construct_duration_us: AtomicU64,
describe_duration_us: AtomicU64,
update_duration_us: AtomicU64,
result_empty: AtomicU64,
result_small: AtomicU64,
result_medium: AtomicU64,
result_large: AtomicU64,
dictionary_hot_cache_hits: AtomicU64,
dictionary_hot_cache_misses: AtomicU64,
federation_endpoint_requests: AtomicU64,
federation_endpoint_duration_us: AtomicU64,
dictionary_cache_hit_ratio_ppm: AtomicU64,
merge_worker_delta_rows_pending: AtomicU64,
cors_permissive_requests_total: AtomicU64,
pagerank_queue_depth: AtomicU64,
pagerank_queue_max_delta_bits: AtomicU64,
pagerank_queue_oldest_enqueue_seconds: AtomicU64,
bidi_relay_dropped_total: AtomicU64,
merge_cycle_duration_us: AtomicU64,
datalog_stratum_duration_us: AtomicU64,
shacl_validation_queue_depth: AtomicU64,
cdc_replication_slot_lag_bytes: AtomicU64,
er_stage_duration_us: [AtomicU64; 5],
sameas_assertions_total: AtomicU64,
bayesian_propagation_duration_us: AtomicU64,
temporal_facts_total: AtomicU64,
temporal_queries_total: AtomicU64,
pprl_bloom_encodes_total: AtomicU64,
llm_cache_hits_total: AtomicU64,
llm_cache_misses_total: AtomicU64,
proof_tree_duration_us: AtomicU64,
conflict_detections_total: AtomicU64,
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}
impl Metrics {
pub fn new() -> Self {
Self {
sparql_queries: AtomicU64::new(0),
datalog_queries: AtomicU64::new(0),
errors: AtomicU64::new(0),
total_duration_us: AtomicU64::new(0),
last_query_ts: AtomicU64::new(0),
arrow_batches_sent: AtomicU64::new(0),
arrow_ticket_rejections: AtomicU64::new(0),
select_count: AtomicU64::new(0),
ask_count: AtomicU64::new(0),
construct_count: AtomicU64::new(0),
describe_count: AtomicU64::new(0),
update_count: AtomicU64::new(0),
select_duration_us: AtomicU64::new(0),
ask_duration_us: AtomicU64::new(0),
construct_duration_us: AtomicU64::new(0),
describe_duration_us: AtomicU64::new(0),
update_duration_us: AtomicU64::new(0),
result_empty: AtomicU64::new(0),
result_small: AtomicU64::new(0),
result_medium: AtomicU64::new(0),
result_large: AtomicU64::new(0),
dictionary_hot_cache_hits: AtomicU64::new(0),
dictionary_hot_cache_misses: AtomicU64::new(0),
federation_endpoint_requests: AtomicU64::new(0),
federation_endpoint_duration_us: AtomicU64::new(0),
dictionary_cache_hit_ratio_ppm: AtomicU64::new(0),
merge_worker_delta_rows_pending: AtomicU64::new(0),
cors_permissive_requests_total: AtomicU64::new(0),
pagerank_queue_depth: AtomicU64::new(0),
pagerank_queue_max_delta_bits: AtomicU64::new(0),
pagerank_queue_oldest_enqueue_seconds: AtomicU64::new(0),
bidi_relay_dropped_total: AtomicU64::new(0),
merge_cycle_duration_us: AtomicU64::new(0),
datalog_stratum_duration_us: AtomicU64::new(0),
shacl_validation_queue_depth: AtomicU64::new(0),
cdc_replication_slot_lag_bytes: AtomicU64::new(0),
er_stage_duration_us: [
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
],
sameas_assertions_total: AtomicU64::new(0),
bayesian_propagation_duration_us: AtomicU64::new(0),
temporal_facts_total: AtomicU64::new(0),
temporal_queries_total: AtomicU64::new(0),
pprl_bloom_encodes_total: AtomicU64::new(0),
llm_cache_hits_total: AtomicU64::new(0),
llm_cache_misses_total: AtomicU64::new(0),
proof_tree_duration_us: AtomicU64::new(0),
conflict_detections_total: AtomicU64::new(0),
}
}
pub fn record_query(&self, duration: Duration) {
self.sparql_queries.fetch_add(1, Ordering::Relaxed);
self.total_duration_us
.fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
self.last_query_ts.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
Ordering::Relaxed,
);
}
pub fn record_query_typed(&self, duration: Duration, query_type: &str, row_count: usize) {
self.record_query(duration);
let dur_us = duration.as_micros() as u64;
match query_type {
"SELECT" => {
self.select_count.fetch_add(1, Ordering::Relaxed);
self.select_duration_us.fetch_add(dur_us, Ordering::Relaxed);
}
"ASK" => {
self.ask_count.fetch_add(1, Ordering::Relaxed);
self.ask_duration_us.fetch_add(dur_us, Ordering::Relaxed);
}
"CONSTRUCT" => {
self.construct_count.fetch_add(1, Ordering::Relaxed);
self.construct_duration_us
.fetch_add(dur_us, Ordering::Relaxed);
}
"DESCRIBE" => {
self.describe_count.fetch_add(1, Ordering::Relaxed);
self.describe_duration_us
.fetch_add(dur_us, Ordering::Relaxed);
}
"UPDATE" => {
self.update_count.fetch_add(1, Ordering::Relaxed);
self.update_duration_us.fetch_add(dur_us, Ordering::Relaxed);
}
_ => {}
}
match ResultSizeBucket::from_count(row_count) {
ResultSizeBucket::Empty => self.result_empty.fetch_add(1, Ordering::Relaxed),
ResultSizeBucket::Small => self.result_small.fetch_add(1, Ordering::Relaxed),
ResultSizeBucket::Medium => self.result_medium.fetch_add(1, Ordering::Relaxed),
ResultSizeBucket::Large => self.result_large.fetch_add(1, Ordering::Relaxed),
};
}
pub fn record_datalog_query(&self, duration: Duration) {
self.datalog_queries.fetch_add(1, Ordering::Relaxed);
self.total_duration_us
.fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
}
pub fn record_error(&self) {
self.errors.fetch_add(1, Ordering::Relaxed);
}
pub fn record_arrow_batches_sent(&self, n: u64) {
self.arrow_batches_sent.fetch_add(n, Ordering::Relaxed);
}
pub fn record_arrow_ticket_rejection(&self) {
self.arrow_ticket_rejections.fetch_add(1, Ordering::Relaxed);
}
pub fn sparql_query_count(&self) -> u64 {
self.sparql_queries.load(Ordering::Relaxed)
}
pub fn datalog_query_count(&self) -> u64 {
self.datalog_queries.load(Ordering::Relaxed)
}
pub fn query_count(&self) -> u64 {
self.sparql_queries.load(Ordering::Relaxed)
}
pub fn error_count(&self) -> u64 {
self.errors.load(Ordering::Relaxed)
}
pub fn total_duration_secs(&self) -> f64 {
self.total_duration_us.load(Ordering::Relaxed) as f64 / 1_000_000.0
}
pub fn last_query_ts(&self) -> u64 {
self.last_query_ts.load(Ordering::Relaxed)
}
pub fn arrow_batches_sent(&self) -> u64 {
self.arrow_batches_sent.load(Ordering::Relaxed)
}
pub fn arrow_ticket_rejections(&self) -> u64 {
self.arrow_ticket_rejections.load(Ordering::Relaxed)
}
pub fn select_count(&self) -> u64 {
self.select_count.load(Ordering::Relaxed)
}
pub fn ask_count(&self) -> u64 {
self.ask_count.load(Ordering::Relaxed)
}
pub fn construct_count(&self) -> u64 {
self.construct_count.load(Ordering::Relaxed)
}
pub fn describe_count(&self) -> u64 {
self.describe_count.load(Ordering::Relaxed)
}
pub fn update_count(&self) -> u64 {
self.update_count.load(Ordering::Relaxed)
}
pub fn select_duration_secs(&self) -> f64 {
self.select_duration_us.load(Ordering::Relaxed) as f64 / 1_000_000.0
}
pub fn ask_duration_secs(&self) -> f64 {
self.ask_duration_us.load(Ordering::Relaxed) as f64 / 1_000_000.0
}
pub fn construct_duration_secs(&self) -> f64 {
self.construct_duration_us.load(Ordering::Relaxed) as f64 / 1_000_000.0
}
pub fn describe_duration_secs(&self) -> f64 {
self.describe_duration_us.load(Ordering::Relaxed) as f64 / 1_000_000.0
}
pub fn update_duration_secs(&self) -> f64 {
self.update_duration_us.load(Ordering::Relaxed) as f64 / 1_000_000.0
}
pub fn result_empty_count(&self) -> u64 {
self.result_empty.load(Ordering::Relaxed)
}
pub fn result_small_count(&self) -> u64 {
self.result_small.load(Ordering::Relaxed)
}
pub fn result_medium_count(&self) -> u64 {
self.result_medium.load(Ordering::Relaxed)
}
pub fn result_large_count(&self) -> u64 {
self.result_large.load(Ordering::Relaxed)
}
pub fn dictionary_hot_cache_hits(&self) -> u64 {
self.dictionary_hot_cache_hits.load(Ordering::Relaxed)
}
pub fn dictionary_hot_cache_misses(&self) -> u64 {
self.dictionary_hot_cache_misses.load(Ordering::Relaxed)
}
pub fn update_dictionary_cache_stats(&self, hits: u64, misses: u64) {
self.dictionary_hot_cache_hits
.store(hits, Ordering::Relaxed);
self.dictionary_hot_cache_misses
.store(misses, Ordering::Relaxed);
let total = hits + misses;
let ppm = total
.checked_div(1)
.map(|_| hits * 1_000_000 / total)
.unwrap_or(0);
self.dictionary_cache_hit_ratio_ppm
.store(ppm, Ordering::Relaxed);
}
pub fn record_federation_request(&self, duration: std::time::Duration) {
self.federation_endpoint_requests
.fetch_add(1, Ordering::Relaxed);
self.federation_endpoint_duration_us
.fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
}
pub fn federation_endpoint_requests(&self) -> u64 {
self.federation_endpoint_requests.load(Ordering::Relaxed)
}
pub fn federation_endpoint_duration_secs(&self) -> f64 {
self.federation_endpoint_duration_us.load(Ordering::Relaxed) as f64 / 1_000_000.0
}
pub fn dictionary_cache_hit_ratio(&self) -> f64 {
self.dictionary_cache_hit_ratio_ppm.load(Ordering::Relaxed) as f64 / 1_000_000.0
}
pub fn update_merge_worker_delta_rows_pending(&self, rows: u64) {
self.merge_worker_delta_rows_pending
.store(rows, Ordering::Relaxed);
}
pub fn merge_worker_delta_rows_pending(&self) -> u64 {
self.merge_worker_delta_rows_pending.load(Ordering::Relaxed)
}
pub fn record_cors_permissive_request(&self) {
self.cors_permissive_requests_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn cors_permissive_requests_total(&self) -> u64 {
self.cors_permissive_requests_total.load(Ordering::Relaxed)
}
pub fn update_pagerank_queue_stats(&self, depth: u64, max_delta: f64, oldest_seconds: f64) {
self.pagerank_queue_depth.store(depth, Ordering::Relaxed);
self.pagerank_queue_max_delta_bits
.store(max_delta.to_bits(), Ordering::Relaxed);
let oldest_secs = if oldest_seconds.is_finite() && oldest_seconds >= 0.0 {
oldest_seconds as u64
} else {
0
};
self.pagerank_queue_oldest_enqueue_seconds
.store(oldest_secs, Ordering::Relaxed);
}
pub fn pagerank_queue_depth(&self) -> u64 {
self.pagerank_queue_depth.load(Ordering::Relaxed)
}
pub fn pagerank_queue_max_delta(&self) -> f64 {
f64::from_bits(self.pagerank_queue_max_delta_bits.load(Ordering::Relaxed))
}
pub fn pagerank_queue_oldest_enqueue_seconds(&self) -> u64 {
self.pagerank_queue_oldest_enqueue_seconds
.load(Ordering::Relaxed)
}
pub fn update_bidi_relay_dropped_total(&self, dropped: u64) {
self.bidi_relay_dropped_total
.store(dropped, Ordering::Relaxed);
}
pub fn bidi_relay_dropped_total(&self) -> u64 {
self.bidi_relay_dropped_total.load(Ordering::Relaxed)
}
pub fn record_merge_cycle_duration(&self, duration: std::time::Duration) {
self.merge_cycle_duration_us
.fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
}
pub fn merge_cycle_duration_secs(&self) -> f64 {
self.merge_cycle_duration_us.load(Ordering::Relaxed) as f64 / 1_000_000.0
}
pub fn record_datalog_stratum_duration(&self, duration: std::time::Duration) {
self.datalog_stratum_duration_us
.fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
}
pub fn datalog_stratum_duration_secs(&self) -> f64 {
self.datalog_stratum_duration_us.load(Ordering::Relaxed) as f64 / 1_000_000.0
}
pub fn update_shacl_validation_queue_depth(&self, depth: u64) {
self.shacl_validation_queue_depth
.store(depth, Ordering::Relaxed);
}
pub fn shacl_validation_queue_depth(&self) -> u64 {
self.shacl_validation_queue_depth.load(Ordering::Relaxed)
}
pub fn update_cdc_replication_slot_lag_bytes(&self, lag_bytes: u64) {
self.cdc_replication_slot_lag_bytes
.store(lag_bytes, Ordering::Relaxed);
}
pub fn cdc_replication_slot_lag_bytes(&self) -> u64 {
self.cdc_replication_slot_lag_bytes.load(Ordering::Relaxed)
}
fn er_stage_index(stage: &str) -> usize {
match stage {
"blocking" => 0,
"embedding" => 1,
"shacl" => 2,
"canonicalization" => 3,
"provenance" => 4,
_ => 4,
}
}
pub fn record_er_stage_duration(&self, stage: &str, duration: std::time::Duration) {
let idx = Self::er_stage_index(stage);
self.er_stage_duration_us[idx].fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
}
pub fn er_stage_duration_secs(&self, stage: &str) -> f64 {
let idx = Self::er_stage_index(stage);
self.er_stage_duration_us[idx].load(Ordering::Relaxed) as f64 / 1_000_000.0
}
pub fn record_sameas_assertion(&self) {
self.sameas_assertions_total.fetch_add(1, Ordering::Relaxed);
}
pub fn sameas_assertions_total(&self) -> u64 {
self.sameas_assertions_total.load(Ordering::Relaxed)
}
pub fn record_bayesian_propagation_duration(&self, duration: std::time::Duration) {
self.bayesian_propagation_duration_us
.fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
}
pub fn bayesian_propagation_duration_secs(&self) -> f64 {
self.bayesian_propagation_duration_us
.load(Ordering::Relaxed) as f64
/ 1_000_000.0
}
pub fn update_temporal_facts_total(&self, count: u64) {
self.temporal_facts_total.store(count, Ordering::Relaxed);
}
pub fn temporal_facts_total(&self) -> u64 {
self.temporal_facts_total.load(Ordering::Relaxed)
}
pub fn record_temporal_query(&self) {
self.temporal_queries_total.fetch_add(1, Ordering::Relaxed);
}
pub fn temporal_queries_total(&self) -> u64 {
self.temporal_queries_total.load(Ordering::Relaxed)
}
pub fn record_pprl_bloom_encode(&self) {
self.pprl_bloom_encodes_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn pprl_bloom_encodes_total(&self) -> u64 {
self.pprl_bloom_encodes_total.load(Ordering::Relaxed)
}
pub fn record_llm_cache_hit(&self) {
self.llm_cache_hits_total.fetch_add(1, Ordering::Relaxed);
}
pub fn record_llm_cache_miss(&self) {
self.llm_cache_misses_total.fetch_add(1, Ordering::Relaxed);
}
pub fn llm_cache_hits_total(&self) -> u64 {
self.llm_cache_hits_total.load(Ordering::Relaxed)
}
pub fn llm_cache_misses_total(&self) -> u64 {
self.llm_cache_misses_total.load(Ordering::Relaxed)
}
pub fn record_proof_tree_duration(&self, duration: std::time::Duration) {
self.proof_tree_duration_us
.fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
}
pub fn proof_tree_duration_secs(&self) -> f64 {
self.proof_tree_duration_us.load(Ordering::Relaxed) as f64 / 1_000_000.0
}
pub fn record_conflict_detection(&self) {
self.conflict_detections_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn conflict_detections_total(&self) -> u64 {
self.conflict_detections_total.load(Ordering::Relaxed)
}
}