use std::sync::atomic::{AtomicU64, Ordering};
use parking_lot::Mutex;
use web_time::Duration;
use crate::analytics::{DirectFetchFailureReason, QueryInfo};
#[derive(Debug)]
pub(crate) struct QueryMetrics {
pub query_info: QueryInfo,
pub fetch_grpc_requests: AtomicU64,
pub fetch_grpc_bytes: AtomicU64,
pub fetch_direct_requests: AtomicU64,
pub fetch_direct_bytes: AtomicU64,
pub fetch_direct_retries: AtomicU64,
pub fetch_direct_requests_retried: AtomicU64,
pub fetch_direct_retry_sleep_us: AtomicU64,
pub fetch_direct_max_attempt: AtomicU64,
pub fetch_direct_original_ranges: AtomicU64,
pub fetch_direct_merged_ranges: AtomicU64,
}
impl QueryMetrics {
pub(crate) fn new(query_info: QueryInfo) -> Self {
Self {
query_info,
fetch_grpc_requests: AtomicU64::new(0),
fetch_grpc_bytes: AtomicU64::new(0),
fetch_direct_requests: AtomicU64::new(0),
fetch_direct_bytes: AtomicU64::new(0),
fetch_direct_retries: AtomicU64::new(0),
fetch_direct_requests_retried: AtomicU64::new(0),
fetch_direct_retry_sleep_us: AtomicU64::new(0),
fetch_direct_max_attempt: AtomicU64::new(0),
fetch_direct_original_ranges: AtomicU64::new(0),
fetch_direct_merged_ranges: AtomicU64::new(0),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct QuerySnapshot {
pub query_info: QueryInfo,
pub total_duration: Duration,
pub time_to_first_chunk: Option<Duration>,
pub error_kind: Option<&'static str>,
pub direct_terminal_reason: Option<DirectFetchFailureReason>,
pub fetch_grpc_requests: u64,
pub fetch_grpc_bytes: u64,
pub fetch_direct_requests: u64,
pub fetch_direct_bytes: u64,
pub fetch_direct_retries: u64,
pub fetch_direct_requests_retried: u64,
pub fetch_direct_retry_sleep: Duration,
pub fetch_direct_max_attempt: u64,
pub fetch_direct_original_ranges: u64,
pub fetch_direct_merged_ranges: u64,
}
pub(crate) fn build_query_snapshot(
metrics: &QueryMetrics,
total_duration: Duration,
time_to_first_chunk: Option<Duration>,
error_kind: Option<&'static str>,
direct_terminal_reason: Option<DirectFetchFailureReason>,
) -> QuerySnapshot {
let load = |a: &AtomicU64| a.load(Ordering::Relaxed);
QuerySnapshot {
query_info: metrics.query_info.clone(),
total_duration,
time_to_first_chunk,
error_kind,
direct_terminal_reason,
fetch_grpc_requests: load(&metrics.fetch_grpc_requests),
fetch_grpc_bytes: load(&metrics.fetch_grpc_bytes),
fetch_direct_requests: load(&metrics.fetch_direct_requests),
fetch_direct_bytes: load(&metrics.fetch_direct_bytes),
fetch_direct_retries: load(&metrics.fetch_direct_retries),
fetch_direct_requests_retried: load(&metrics.fetch_direct_requests_retried),
fetch_direct_retry_sleep: Duration::from_micros(load(&metrics.fetch_direct_retry_sleep_us)),
fetch_direct_max_attempt: load(&metrics.fetch_direct_max_attempt),
fetch_direct_original_ranges: load(&metrics.fetch_direct_original_ranges),
fetch_direct_merged_ranges: load(&metrics.fetch_direct_merged_ranges),
}
}
#[derive(Clone, Debug)]
pub struct MetricsCollector {
inner: std::sync::Arc<Mutex<Vec<QuerySnapshot>>>,
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
impl MetricsCollector {
pub fn new() -> Self {
Self {
inner: std::sync::Arc::new(Mutex::new(Vec::new())),
}
}
pub fn drain(&self) -> Vec<QuerySnapshot> {
let mut guard = self.inner.lock();
std::mem::take(&mut *guard)
}
pub fn snapshot(&self) -> Vec<QuerySnapshot> {
let guard = self.inner.lock();
guard.clone()
}
fn push(&self, snapshot: QuerySnapshot) {
let mut guard = self.inner.lock();
guard.push(snapshot);
}
}
pub fn push_snapshot(collectors: &[MetricsCollector], snapshot: &QuerySnapshot) {
for c in collectors {
c.push(snapshot.clone());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::analytics::QueryType;
fn dummy_query_info() -> QueryInfo {
QueryInfo {
dataset_id: "ds-test".to_owned(),
query_chunks: 10,
query_segments: 2,
query_layers: 1,
query_columns: 4,
query_entities: 1,
query_bytes: 1024,
query_chunks_per_segment_min: 5,
query_chunks_per_segment_max: 5,
query_chunks_per_segment_mean: 5.0,
query_type: QueryType::LatestAt,
primary_index_name: Some("time".to_owned()),
time_to_first_chunk_info: None,
trace_id: None,
filters_pushed_down: 1,
filters_applied_client_side: 0,
entity_path_narrowing_applied: true,
}
}
#[test]
fn build_query_snapshot_reads_atomic_counters() {
let metrics = QueryMetrics::new(dummy_query_info());
metrics.fetch_grpc_bytes.fetch_add(1_000, Ordering::Relaxed);
metrics.fetch_grpc_bytes.fetch_add(2_500, Ordering::Relaxed);
metrics
.fetch_direct_requests
.fetch_add(3, Ordering::Relaxed);
let snap = build_query_snapshot(&metrics, Duration::from_millis(42), None, None, None);
assert_eq!(snap.fetch_grpc_bytes, 3_500);
assert_eq!(snap.fetch_direct_requests, 3);
assert_eq!(snap.fetch_grpc_requests, 0);
assert_eq!(snap.query_info.dataset_id, "ds-test");
assert_eq!(snap.total_duration, Duration::from_millis(42));
assert!(snap.query_info.entity_path_narrowing_applied);
}
#[test]
fn build_query_snapshot_forwards_optional_exec_fields() {
let metrics = QueryMetrics::new(dummy_query_info());
let snap = build_query_snapshot(
&metrics,
Duration::from_micros(999),
Some(Duration::from_micros(123)),
Some("direct_fetch"),
Some(DirectFetchFailureReason::Http5xx),
);
assert_eq!(snap.time_to_first_chunk, Some(Duration::from_micros(123)));
assert_eq!(snap.error_kind, Some("direct_fetch"));
assert_eq!(
snap.direct_terminal_reason,
Some(DirectFetchFailureReason::Http5xx)
);
}
#[test]
fn push_snapshot_fans_out_to_each_collector() {
let a = MetricsCollector::new();
let b = MetricsCollector::new();
let metrics = QueryMetrics::new(dummy_query_info());
let snap = build_query_snapshot(&metrics, Duration::from_micros(100), None, None, None);
push_snapshot(&[a.clone(), b.clone()], &snap);
assert_eq!(a.snapshot().len(), 1);
assert_eq!(b.snapshot().len(), 1);
assert_eq!(a.drain().len(), 1);
assert_eq!(a.snapshot().len(), 0);
assert_eq!(b.snapshot().len(), 1);
}
}