use parking_lot::RwLock; use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
pub struct AnalyticsConfig {
pub max_queries: usize,
pub max_fields: usize,
pub max_errors: usize,
pub track_query_text: bool,
pub slow_query_threshold_ms: u64,
pub enable_dashboard: bool,
pub retention_period: Duration,
}
impl Default for AnalyticsConfig {
fn default() -> Self {
Self {
max_queries: 1000,
max_fields: 500,
max_errors: 200,
track_query_text: true,
slow_query_threshold_ms: 500,
enable_dashboard: true,
retention_period: Duration::from_secs(86400), }
}
}
impl AnalyticsConfig {
pub fn new() -> Self {
Self::default()
}
pub fn max_queries(mut self, n: usize) -> Self {
self.max_queries = n;
self
}
pub fn slow_query_threshold(mut self, threshold: Duration) -> Self {
self.slow_query_threshold_ms = threshold.as_millis() as u64;
self
}
pub fn disable_query_text(mut self) -> Self {
self.track_query_text = false;
self
}
pub fn production() -> Self {
Self {
max_queries: 500,
max_fields: 200,
max_errors: 100,
track_query_text: false,
slow_query_threshold_ms: 1000,
enable_dashboard: true,
retention_period: Duration::from_secs(3600), }
}
pub fn development() -> Self {
Self {
max_queries: 2000,
max_fields: 1000,
max_errors: 500,
track_query_text: true,
slow_query_threshold_ms: 200,
enable_dashboard: true,
retention_period: Duration::from_secs(86400 * 7), }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryStats {
pub query_hash: String,
pub operation_name: Option<String>,
pub operation_type: String,
pub query_text: Option<String>,
pub execution_count: u64,
pub total_time_ms: u64,
pub avg_time_ms: f64,
pub min_time_ms: u64,
pub max_time_ms: u64,
pub error_count: u64,
pub last_execution: u64,
pub first_execution: u64,
pub p95_time_ms: f64,
}
impl QueryStats {
fn new(
query_hash: String,
operation_name: Option<String>,
operation_type: String,
query_text: Option<String>,
) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
Self {
query_hash,
operation_name,
operation_type,
query_text,
execution_count: 0,
total_time_ms: 0,
avg_time_ms: 0.0,
min_time_ms: u64::MAX,
max_time_ms: 0,
error_count: 0,
last_execution: now,
first_execution: now,
p95_time_ms: 0.0,
}
}
fn record_execution(&mut self, duration_ms: u64, had_error: bool) {
self.execution_count += 1;
self.total_time_ms += duration_ms;
self.avg_time_ms = self.total_time_ms as f64 / self.execution_count as f64;
self.min_time_ms = self.min_time_ms.min(duration_ms);
self.max_time_ms = self.max_time_ms.max(duration_ms);
if self.execution_count == 1 {
self.p95_time_ms = duration_ms as f64;
} else {
let alpha = 0.05;
if duration_ms as f64 > self.p95_time_ms {
self.p95_time_ms = self.p95_time_ms * (1.0 - alpha) + duration_ms as f64 * alpha;
}
}
if had_error {
self.error_count += 1;
}
self.last_execution = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FieldStats {
pub field_name: String,
pub parent_type: String,
pub request_count: u64,
pub avg_time_ms: f64,
pub last_requested: u64,
}
impl FieldStats {
fn new(field_name: String, parent_type: String) -> Self {
Self {
field_name,
parent_type,
request_count: 0,
avg_time_ms: 0.0,
last_requested: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorStats {
pub error_code: String,
pub error_message: String,
pub occurrence_count: u64,
pub affected_queries: Vec<String>,
pub first_occurrence: u64,
pub last_occurrence: u64,
}
impl ErrorStats {
fn new(error_code: String, error_message: String) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
Self {
error_code,
error_message,
occurrence_count: 1,
affected_queries: Vec::new(),
first_occurrence: now,
last_occurrence: now,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnalyticsSnapshot {
pub timestamp: u64,
pub total_requests: u64,
pub total_errors: u64,
pub avg_latency_ms: f64,
pub requests_per_minute: f64,
pub error_rate: f64,
pub top_queries: Vec<QueryStats>,
pub slowest_queries: Vec<QueryStats>,
pub top_fields: Vec<FieldStats>,
pub error_patterns: Vec<ErrorStats>,
pub requests_by_type: HashMap<String, u64>,
pub cache_hit_rate: Option<f64>,
pub uptime_seconds: u64,
}
struct AnalyticsState {
#[allow(dead_code)] config: AnalyticsConfig,
queries: HashMap<String, QueryStats>,
fields: HashMap<String, FieldStats>,
errors: HashMap<String, ErrorStats>,
total_requests: u64,
total_errors: u64,
total_latency_ms: u64,
start_time: Instant,
requests_in_window: Vec<u64>, cache_hits: u64,
cache_misses: u64,
}
impl AnalyticsState {
fn new(config: AnalyticsConfig) -> Self {
Self {
config,
queries: HashMap::new(),
fields: HashMap::new(),
errors: HashMap::new(),
total_requests: 0,
total_errors: 0,
total_latency_ms: 0,
start_time: Instant::now(),
requests_in_window: Vec::new(),
cache_hits: 0,
cache_misses: 0,
}
}
}
pub struct QueryAnalytics {
state: RwLock<AnalyticsState>,
config: AnalyticsConfig,
}
impl QueryAnalytics {
pub fn new(config: AnalyticsConfig) -> Self {
Self {
state: RwLock::new(AnalyticsState::new(config.clone())),
config,
}
}
pub fn record_query(
&self,
query: &str,
operation_name: Option<&str>,
operation_type: &str,
duration: Duration,
had_error: bool,
error_details: Option<(&str, &str)>, ) {
let duration_ms = duration.as_millis() as u64;
let query_hash = self.hash_query(query);
let mut state = self.state.write();
state.total_requests += 1;
state.total_latency_ms += duration_ms;
if had_error {
state.total_errors += 1;
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
state.requests_in_window.push(now);
let cutoff = now.saturating_sub(60);
state.requests_in_window.retain(|&t| t > cutoff);
let query_text = if self.config.track_query_text {
Some(Self::truncate_query(query, 500))
} else {
None
};
let query_stats = state.queries.entry(query_hash.clone()).or_insert_with(|| {
QueryStats::new(
query_hash.clone(),
operation_name.map(String::from),
operation_type.to_string(),
query_text,
)
});
query_stats.record_execution(duration_ms, had_error);
if let Some((code, message)) = error_details {
let error_key = code.to_string();
let error_stats = state
.errors
.entry(error_key.clone())
.or_insert_with(|| ErrorStats::new(code.to_string(), message.to_string()));
error_stats.occurrence_count += 1;
error_stats.last_occurrence = now;
if !error_stats.affected_queries.contains(&query_hash)
&& error_stats.affected_queries.len() < 10
{
error_stats.affected_queries.push(query_hash.clone());
}
}
if state.queries.len() > self.config.max_queries {
self.prune_queries(&mut state);
}
if state.errors.len() > self.config.max_errors {
self.prune_errors(&mut state);
}
}
pub fn record_field(&self, parent_type: &str, field_name: &str, duration: Option<Duration>) {
let key = format!("{}.{}", parent_type, field_name);
let duration_ms = duration.map(|d| d.as_millis() as f64).unwrap_or(0.0);
let mut state = self.state.write();
let field_stats = state
.fields
.entry(key.clone())
.or_insert_with(|| FieldStats::new(field_name.to_string(), parent_type.to_string()));
field_stats.request_count += 1;
if duration_ms > 0.0 {
field_stats.avg_time_ms =
(field_stats.avg_time_ms * (field_stats.request_count - 1) as f64 + duration_ms)
/ field_stats.request_count as f64;
}
field_stats.last_requested = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
if state.fields.len() > self.config.max_fields {
self.prune_fields(&mut state);
}
}
pub fn record_cache_access(&self, hit: bool) {
let mut state = self.state.write();
if hit {
state.cache_hits += 1;
} else {
state.cache_misses += 1;
}
}
pub fn get_snapshot(&self) -> AnalyticsSnapshot {
let state = self.state.read();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let requests_per_minute = state.requests_in_window.len() as f64;
let error_rate = if state.total_requests > 0 {
(state.total_errors as f64 / state.total_requests as f64) * 100.0
} else {
0.0
};
let avg_latency_ms = if state.total_requests > 0 {
state.total_latency_ms as f64 / state.total_requests as f64
} else {
0.0
};
let cache_hit_rate = {
let total_cache = state.cache_hits + state.cache_misses;
if total_cache > 0 {
Some((state.cache_hits as f64 / total_cache as f64) * 100.0)
} else {
None
}
};
let mut top_queries: Vec<_> = state.queries.values().cloned().collect();
top_queries.sort_by(|a, b| b.execution_count.cmp(&a.execution_count));
top_queries.truncate(20);
let mut slowest_queries: Vec<_> = state.queries.values().cloned().collect();
slowest_queries.sort_by(|a, b| {
b.avg_time_ms
.partial_cmp(&a.avg_time_ms)
.unwrap_or(std::cmp::Ordering::Equal)
});
slowest_queries.truncate(20);
let mut top_fields: Vec<_> = state.fields.values().cloned().collect();
top_fields.sort_by(|a, b| b.request_count.cmp(&a.request_count));
top_fields.truncate(50);
let mut error_patterns: Vec<_> = state.errors.values().cloned().collect();
error_patterns.sort_by(|a, b| b.occurrence_count.cmp(&a.occurrence_count));
let mut requests_by_type: HashMap<String, u64> = HashMap::new();
for query in state.queries.values() {
*requests_by_type
.entry(query.operation_type.clone())
.or_insert(0) += query.execution_count;
}
AnalyticsSnapshot {
timestamp: now,
total_requests: state.total_requests,
total_errors: state.total_errors,
avg_latency_ms,
requests_per_minute,
error_rate,
top_queries,
slowest_queries,
top_fields,
error_patterns,
requests_by_type,
cache_hit_rate,
uptime_seconds: state.start_time.elapsed().as_secs(),
}
}
pub fn reset(&self) {
let mut state = self.state.write();
*state = AnalyticsState::new(self.config.clone());
}
pub fn config(&self) -> &AnalyticsConfig {
&self.config
}
fn hash_query(&self, query: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(query.as_bytes());
hex::encode(hasher.finalize())[..16].to_string() }
fn truncate_query(query: &str, max_len: usize) -> String {
if query.len() <= max_len {
query.to_string()
} else {
format!("{}...", &query[..max_len.saturating_sub(3)])
}
}
fn prune_queries(&self, state: &mut AnalyticsState) {
let cutoff = state.queries.len() - self.config.max_queries + 100;
let mut entries: Vec<_> = state
.queries
.iter()
.map(|(k, v)| (k.clone(), v.last_execution))
.collect();
entries.sort_by_key(|(_, t)| *t);
for (key, _) in entries.into_iter().take(cutoff) {
state.queries.remove(&key);
}
}
fn prune_fields(&self, state: &mut AnalyticsState) {
let cutoff = state.fields.len() - self.config.max_fields + 50;
let mut entries: Vec<_> = state
.fields
.iter()
.map(|(k, v)| (k.clone(), v.last_requested))
.collect();
entries.sort_by_key(|(_, t)| *t);
for (key, _) in entries.into_iter().take(cutoff) {
state.fields.remove(&key);
}
}
fn prune_errors(&self, state: &mut AnalyticsState) {
let cutoff = state.errors.len() - self.config.max_errors + 20;
let mut entries: Vec<_> = state
.errors
.iter()
.map(|(k, v)| (k.clone(), v.last_occurrence))
.collect();
entries.sort_by_key(|(_, t)| *t);
for (key, _) in entries.into_iter().take(cutoff) {
state.errors.remove(&key);
}
}
}
pub type SharedQueryAnalytics = Arc<QueryAnalytics>;
pub fn create_analytics(config: AnalyticsConfig) -> SharedQueryAnalytics {
Arc::new(QueryAnalytics::new(config))
}
pub struct AnalyticsGuard {
analytics: SharedQueryAnalytics,
query: String,
operation_name: Option<String>,
operation_type: String,
start: Instant,
had_error: bool,
error_details: Option<(String, String)>,
}
impl AnalyticsGuard {
pub fn new(
analytics: SharedQueryAnalytics,
query: String,
operation_name: Option<String>,
operation_type: String,
) -> Self {
Self {
analytics,
query,
operation_name,
operation_type,
start: Instant::now(),
had_error: false,
error_details: None,
}
}
pub fn set_error(&mut self, code: String, message: String) {
self.had_error = true;
self.error_details = Some((code, message));
}
}
impl Drop for AnalyticsGuard {
fn drop(&mut self) {
let duration = self.start.elapsed();
let error_details = self
.error_details
.as_ref()
.map(|(c, m)| (c.as_str(), m.as_str()));
self.analytics.record_query(
&self.query,
self.operation_name.as_deref(),
&self.operation_type,
duration,
self.had_error,
error_details,
);
}
}
pub fn analytics_dashboard_html() -> &'static str {
include_str!("analytics_dashboard.html")
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_analytics_config_default() {
let config = AnalyticsConfig::default();
assert_eq!(config.max_queries, 1000);
assert!(config.track_query_text);
}
#[test]
fn test_record_query() {
let analytics = QueryAnalytics::new(AnalyticsConfig::default());
analytics.record_query(
"query { user { id } }",
Some("GetUser"),
"query",
Duration::from_millis(50),
false,
None,
);
let snapshot = analytics.get_snapshot();
assert_eq!(snapshot.total_requests, 1);
assert_eq!(snapshot.total_errors, 0);
assert!(!snapshot.top_queries.is_empty());
}
#[test]
fn test_record_error() {
let analytics = QueryAnalytics::new(AnalyticsConfig::default());
analytics.record_query(
"query { invalid }",
None,
"query",
Duration::from_millis(10),
true,
Some(("GRAPHQL_VALIDATION_ERROR", "Unknown field 'invalid'")),
);
let snapshot = analytics.get_snapshot();
assert_eq!(snapshot.total_errors, 1);
assert!(!snapshot.error_patterns.is_empty());
}
#[test]
fn test_record_field() {
let analytics = QueryAnalytics::new(AnalyticsConfig::default());
analytics.record_field("User", "email", Some(Duration::from_millis(5)));
analytics.record_field("User", "email", Some(Duration::from_millis(10)));
let snapshot = analytics.get_snapshot();
assert!(!snapshot.top_fields.is_empty());
assert_eq!(snapshot.top_fields[0].request_count, 2);
}
#[test]
fn test_cache_tracking() {
let analytics = QueryAnalytics::new(AnalyticsConfig::default());
analytics.record_cache_access(true);
analytics.record_cache_access(true);
analytics.record_cache_access(false);
let snapshot = analytics.get_snapshot();
let hit_rate = snapshot.cache_hit_rate.unwrap();
assert!((hit_rate - 66.67).abs() < 1.0);
}
#[test]
fn test_concurrent_access() {
let analytics = Arc::new(QueryAnalytics::new(AnalyticsConfig::default()));
let mut handles = vec![];
for i in 0..10 {
let analytics = Arc::clone(&analytics);
handles.push(thread::spawn(move || {
for j in 0..100 {
analytics.record_query(
&format!("query {{ field_{} }}", j),
None,
"query",
Duration::from_millis(i as u64 + j as u64),
false,
None,
);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
let snapshot = analytics.get_snapshot();
assert_eq!(snapshot.total_requests, 1000);
}
#[test]
fn test_analytics_config_builder() {
let config = AnalyticsConfig::new()
.max_queries(500)
.slow_query_threshold(Duration::from_micros(100))
.disable_query_text();
assert_eq!(config.max_queries, 500);
assert_eq!(config.slow_query_threshold_ms, 0); assert!(!config.track_query_text);
}
#[test]
fn test_prune_queries() {
let config = AnalyticsConfig::default().max_queries(10);
let analytics = QueryAnalytics::new(config);
for i in 0..20 {
analytics.record_query(
&format!("query Q{} {{ id }}", i),
None,
"query",
Duration::from_millis(10),
false,
None,
);
}
let snapshot = analytics.get_snapshot();
assert!(snapshot.top_queries.len() <= 10);
}
#[test]
fn test_prune_fields() {
let config = AnalyticsConfig {
max_fields: 5,
..Default::default()
};
let analytics = QueryAnalytics::new(config);
for i in 0..10 {
analytics.record_field("User", &format!("field_{}", i), None);
}
let snapshot = analytics.get_snapshot();
assert!(snapshot.top_fields.len() <= 5);
}
#[test]
fn test_prune_errors() {
let config = AnalyticsConfig {
max_errors: 5,
..Default::default()
};
let analytics = QueryAnalytics::new(config);
for i in 0..10 {
analytics.record_query(
"query",
None,
"query",
Duration::from_millis(1),
true,
Some((&format!("ERR_{}", i), "msg")),
);
}
let snapshot = analytics.get_snapshot();
assert!(snapshot.error_patterns.len() <= 5);
}
#[test]
fn test_reset() {
let analytics = QueryAnalytics::new(AnalyticsConfig::default());
analytics.record_query(
"query { id }",
None,
"query",
Duration::from_millis(10),
false,
None,
);
assert_eq!(analytics.get_snapshot().total_requests, 1);
analytics.reset();
assert_eq!(analytics.get_snapshot().total_requests, 0);
assert!(analytics.get_snapshot().top_queries.is_empty());
}
#[test]
fn test_analytics_guard() {
let analytics = create_analytics(AnalyticsConfig::default());
{
let mut guard = AnalyticsGuard::new(
analytics.clone(),
"query { guard }".to_string(),
None,
"query".to_string(),
);
std::thread::sleep(Duration::from_millis(1));
guard.set_error("GUARD_ERR".to_string(), "ErrorMessage".to_string());
}
let snapshot = analytics.get_snapshot();
assert_eq!(snapshot.total_requests, 1);
assert_eq!(snapshot.total_errors, 1);
assert_eq!(snapshot.error_patterns[0].error_code, "GUARD_ERR");
}
}