use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use dashmap::DashMap;
use super::config::TenantId;
pub struct TenantMetrics {
tenants: DashMap<TenantId, Arc<TenantStats>>,
start_time: Instant,
total_queries: AtomicU64,
total_errors: AtomicU64,
}
impl Default for TenantMetrics {
fn default() -> Self {
Self::new()
}
}
impl TenantMetrics {
pub fn new() -> Self {
Self {
tenants: DashMap::new(),
start_time: Instant::now(),
total_queries: AtomicU64::new(0),
total_errors: AtomicU64::new(0),
}
}
pub fn get_or_create(&self, tenant: &TenantId) -> Arc<TenantStats> {
self.tenants
.entry(tenant.clone())
.or_insert_with(|| Arc::new(TenantStats::new(tenant.clone())))
.clone()
}
pub fn get(&self, tenant: &TenantId) -> Option<Arc<TenantStats>> {
self.tenants.get(tenant).map(|s| s.clone())
}
pub fn record_query(
&self,
tenant: &TenantId,
duration: Duration,
rows: u64,
success: bool,
) {
self.total_queries.fetch_add(1, Ordering::Relaxed);
if !success {
self.total_errors.fetch_add(1, Ordering::Relaxed);
}
let stats = self.get_or_create(tenant);
stats.record_query(duration, rows, success);
}
pub fn record_bytes(&self, tenant: &TenantId, bytes_read: u64, bytes_written: u64) {
let stats = self.get_or_create(tenant);
stats.record_bytes(bytes_read, bytes_written);
}
pub fn record_connection(&self, tenant: &TenantId, connected: bool) {
let stats = self.get_or_create(tenant);
if connected {
stats.record_connect();
} else {
stats.record_disconnect();
}
}
pub fn tenant_ids(&self) -> Vec<TenantId> {
self.tenants.iter().map(|e| e.key().clone()).collect()
}
pub fn snapshot_all(&self) -> Vec<TenantMetricsSnapshot> {
self.tenants
.iter()
.map(|entry| entry.value().snapshot())
.collect()
}
pub fn snapshot(&self, tenant: &TenantId) -> Option<TenantMetricsSnapshot> {
self.tenants.get(tenant).map(|s| s.snapshot())
}
pub fn aggregate_snapshot(&self) -> AggregateMetricsSnapshot {
let mut total_queries = 0u64;
let mut total_errors = 0u64;
let mut total_time_us = 0u64;
let mut total_rows = 0u64;
let mut total_bytes_read = 0u64;
let mut total_bytes_written = 0u64;
let mut active_connections = 0u32;
for entry in self.tenants.iter() {
let stats = entry.value();
total_queries += stats.queries.load(Ordering::Relaxed);
total_errors += stats.errors.load(Ordering::Relaxed);
total_time_us += stats.total_time_us.load(Ordering::Relaxed);
total_rows += stats.rows_processed.load(Ordering::Relaxed);
total_bytes_read += stats.bytes_read.load(Ordering::Relaxed);
total_bytes_written += stats.bytes_written.load(Ordering::Relaxed);
active_connections += stats.active_connections.load(Ordering::Relaxed) as u32;
}
let elapsed = self.start_time.elapsed();
let qps = if elapsed.as_secs() > 0 {
total_queries as f64 / elapsed.as_secs_f64()
} else {
0.0
};
AggregateMetricsSnapshot {
tenant_count: self.tenants.len(),
total_queries,
total_errors,
error_rate: if total_queries > 0 {
total_errors as f64 / total_queries as f64
} else {
0.0
},
total_time: Duration::from_micros(total_time_us),
total_rows,
total_bytes_read,
total_bytes_written,
active_connections,
qps,
uptime: elapsed,
}
}
pub fn top_by_queries(&self, limit: usize) -> Vec<TenantMetricsSnapshot> {
let mut snapshots: Vec<_> = self.snapshot_all();
snapshots.sort_by(|a, b| b.queries.cmp(&a.queries));
snapshots.truncate(limit);
snapshots
}
pub fn top_by_time(&self, limit: usize) -> Vec<TenantMetricsSnapshot> {
let mut snapshots: Vec<_> = self.snapshot_all();
snapshots.sort_by(|a, b| b.total_time.cmp(&a.total_time));
snapshots.truncate(limit);
snapshots
}
pub fn top_by_errors(&self, limit: usize) -> Vec<TenantMetricsSnapshot> {
let mut snapshots: Vec<_> = self.snapshot_all();
snapshots.sort_by(|a, b| b.errors.cmp(&a.errors));
snapshots.truncate(limit);
snapshots
}
pub fn reset_tenant(&self, tenant: &TenantId) {
if let Some(stats) = self.tenants.get(tenant) {
stats.reset();
}
}
pub fn reset_all(&self) {
for entry in self.tenants.iter() {
entry.value().reset();
}
self.total_queries.store(0, Ordering::Relaxed);
self.total_errors.store(0, Ordering::Relaxed);
}
}
pub struct TenantStats {
tenant_id: TenantId,
queries: AtomicU64,
errors: AtomicU64,
total_time_us: AtomicU64,
min_time_us: AtomicU64,
max_time_us: AtomicU64,
rows_processed: AtomicU64,
bytes_read: AtomicU64,
bytes_written: AtomicU64,
active_connections: AtomicU64,
total_connections: AtomicU64,
created_at: Instant,
last_activity_us: AtomicU64,
}
impl TenantStats {
pub fn new(tenant_id: TenantId) -> Self {
Self {
tenant_id,
queries: AtomicU64::new(0),
errors: AtomicU64::new(0),
total_time_us: AtomicU64::new(0),
min_time_us: AtomicU64::new(u64::MAX),
max_time_us: AtomicU64::new(0),
rows_processed: AtomicU64::new(0),
bytes_read: AtomicU64::new(0),
bytes_written: AtomicU64::new(0),
active_connections: AtomicU64::new(0),
total_connections: AtomicU64::new(0),
created_at: Instant::now(),
last_activity_us: AtomicU64::new(0),
}
}
pub fn record_query(&self, duration: Duration, rows: u64, success: bool) {
self.queries.fetch_add(1, Ordering::Relaxed);
if !success {
self.errors.fetch_add(1, Ordering::Relaxed);
}
let duration_us = duration.as_micros() as u64;
self.total_time_us.fetch_add(duration_us, Ordering::Relaxed);
self.rows_processed.fetch_add(rows, Ordering::Relaxed);
self.update_min(&self.min_time_us, duration_us);
self.update_max(&self.max_time_us, duration_us);
let now = self.created_at.elapsed().as_micros() as u64;
self.last_activity_us.store(now, Ordering::Relaxed);
}
pub fn record_bytes(&self, read: u64, written: u64) {
self.bytes_read.fetch_add(read, Ordering::Relaxed);
self.bytes_written.fetch_add(written, Ordering::Relaxed);
}
pub fn record_connect(&self) {
self.active_connections.fetch_add(1, Ordering::Relaxed);
self.total_connections.fetch_add(1, Ordering::Relaxed);
}
pub fn record_disconnect(&self) {
self.active_connections.fetch_sub(1, Ordering::Relaxed);
}
pub fn snapshot(&self) -> TenantMetricsSnapshot {
let queries = self.queries.load(Ordering::Relaxed);
let total_time_us = self.total_time_us.load(Ordering::Relaxed);
let min_time = {
let min = self.min_time_us.load(Ordering::Relaxed);
if min == u64::MAX {
Duration::ZERO
} else {
Duration::from_micros(min)
}
};
TenantMetricsSnapshot {
tenant_id: self.tenant_id.clone(),
queries,
errors: self.errors.load(Ordering::Relaxed),
total_time: Duration::from_micros(total_time_us),
avg_time: if queries > 0 {
Duration::from_micros(total_time_us / queries)
} else {
Duration::ZERO
},
min_time,
max_time: Duration::from_micros(self.max_time_us.load(Ordering::Relaxed)),
rows_processed: self.rows_processed.load(Ordering::Relaxed),
bytes_read: self.bytes_read.load(Ordering::Relaxed),
bytes_written: self.bytes_written.load(Ordering::Relaxed),
active_connections: self.active_connections.load(Ordering::Relaxed) as u32,
total_connections: self.total_connections.load(Ordering::Relaxed),
uptime: self.created_at.elapsed(),
last_activity: Duration::from_micros(self.last_activity_us.load(Ordering::Relaxed)),
}
}
pub fn reset(&self) {
self.queries.store(0, Ordering::Relaxed);
self.errors.store(0, Ordering::Relaxed);
self.total_time_us.store(0, Ordering::Relaxed);
self.min_time_us.store(u64::MAX, Ordering::Relaxed);
self.max_time_us.store(0, Ordering::Relaxed);
self.rows_processed.store(0, Ordering::Relaxed);
self.bytes_read.store(0, Ordering::Relaxed);
self.bytes_written.store(0, Ordering::Relaxed);
}
fn update_min(&self, atomic: &AtomicU64, value: u64) {
let mut current = atomic.load(Ordering::Relaxed);
while value < current {
match atomic.compare_exchange_weak(
current,
value,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(c) => current = c,
}
}
}
fn update_max(&self, atomic: &AtomicU64, value: u64) {
let mut current = atomic.load(Ordering::Relaxed);
while value > current {
match atomic.compare_exchange_weak(
current,
value,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(c) => current = c,
}
}
}
}
#[derive(Debug, Clone)]
pub struct TenantMetricsSnapshot {
pub tenant_id: TenantId,
pub queries: u64,
pub errors: u64,
pub total_time: Duration,
pub avg_time: Duration,
pub min_time: Duration,
pub max_time: Duration,
pub rows_processed: u64,
pub bytes_read: u64,
pub bytes_written: u64,
pub active_connections: u32,
pub total_connections: u64,
pub uptime: Duration,
pub last_activity: Duration,
}
impl TenantMetricsSnapshot {
pub fn qps(&self) -> f64 {
if self.uptime.as_secs() > 0 {
self.queries as f64 / self.uptime.as_secs_f64()
} else {
0.0
}
}
pub fn error_rate(&self) -> f64 {
if self.queries > 0 {
self.errors as f64 / self.queries as f64
} else {
0.0
}
}
pub fn avg_rows(&self) -> f64 {
if self.queries > 0 {
self.rows_processed as f64 / self.queries as f64
} else {
0.0
}
}
pub fn to_json(&self) -> String {
format!(
r#"{{"tenant_id":"{}","queries":{},"errors":{},"error_rate":{:.4},"avg_time_ms":{:.2},"qps":{:.2},"active_connections":{}}}"#,
self.tenant_id.0,
self.queries,
self.errors,
self.error_rate(),
self.avg_time.as_secs_f64() * 1000.0,
self.qps(),
self.active_connections
)
}
}
#[derive(Debug, Clone)]
pub struct AggregateMetricsSnapshot {
pub tenant_count: usize,
pub total_queries: u64,
pub total_errors: u64,
pub error_rate: f64,
pub total_time: Duration,
pub total_rows: u64,
pub total_bytes_read: u64,
pub total_bytes_written: u64,
pub active_connections: u32,
pub qps: f64,
pub uptime: Duration,
}
impl AggregateMetricsSnapshot {
pub fn to_json(&self) -> String {
format!(
r#"{{"tenant_count":{},"total_queries":{},"total_errors":{},"error_rate":{:.4},"qps":{:.2},"active_connections":{},"uptime_secs":{}}}"#,
self.tenant_count,
self.total_queries,
self.total_errors,
self.error_rate,
self.qps,
self.active_connections,
self.uptime.as_secs()
)
}
}
pub struct TenantCostTracker {
cost_per_query: f64,
cost_per_1000_rows: f64,
cost_per_mb_read: f64,
cost_per_mb_written: f64,
cost_per_conn_second: f64,
costs: DashMap<TenantId, TenantCost>,
}
impl TenantCostTracker {
pub fn new() -> Self {
Self {
cost_per_query: 0.000001, cost_per_1000_rows: 0.00001, cost_per_mb_read: 0.00001, cost_per_mb_written: 0.0001, cost_per_conn_second: 0.0, costs: DashMap::new(),
}
}
pub fn with_pricing(
mut self,
per_query: f64,
per_1000_rows: f64,
per_mb_read: f64,
per_mb_written: f64,
) -> Self {
self.cost_per_query = per_query;
self.cost_per_1000_rows = per_1000_rows;
self.cost_per_mb_read = per_mb_read;
self.cost_per_mb_written = per_mb_written;
self
}
pub fn record_query_cost(
&self,
tenant: &TenantId,
rows: u64,
bytes_read: u64,
bytes_written: u64,
) {
let cost = self.cost_per_query
+ (rows as f64 / 1000.0) * self.cost_per_1000_rows
+ (bytes_read as f64 / 1_048_576.0) * self.cost_per_mb_read
+ (bytes_written as f64 / 1_048_576.0) * self.cost_per_mb_written;
self.costs
.entry(tenant.clone())
.or_insert_with(TenantCost::new)
.add_cost(cost);
}
pub fn get_cost(&self, tenant: &TenantId) -> Option<f64> {
self.costs.get(tenant).map(|c| c.total_cost())
}
pub fn all_costs(&self) -> HashMap<TenantId, f64> {
self.costs
.iter()
.map(|e| (e.key().clone(), e.value().total_cost()))
.collect()
}
pub fn reset_tenant(&self, tenant: &TenantId) {
if let Some(mut cost) = self.costs.get_mut(tenant) {
cost.reset();
}
}
pub fn cost_report(&self) -> TenantCostReport {
let mut entries: Vec<_> = self
.costs
.iter()
.map(|e| TenantCostEntry {
tenant_id: e.key().clone(),
total_cost: e.value().total_cost(),
query_count: e.value().query_count(),
})
.collect();
entries.sort_by(|a, b| b.total_cost.partial_cmp(&a.total_cost).unwrap());
let total = entries.iter().map(|e| e.total_cost).sum();
TenantCostReport {
entries,
total_cost: total,
generated_at: SystemTime::now(),
}
}
}
impl Default for TenantCostTracker {
fn default() -> Self {
Self::new()
}
}
struct TenantCost {
total: std::sync::atomic::AtomicU64, queries: AtomicU64,
}
impl TenantCost {
fn new() -> Self {
Self {
total: AtomicU64::new(0),
queries: AtomicU64::new(0),
}
}
fn add_cost(&self, cost: f64) {
let scaled = (cost * 1_000_000.0) as u64;
self.total.fetch_add(scaled, Ordering::Relaxed);
self.queries.fetch_add(1, Ordering::Relaxed);
}
fn total_cost(&self) -> f64 {
self.total.load(Ordering::Relaxed) as f64 / 1_000_000.0
}
fn query_count(&self) -> u64 {
self.queries.load(Ordering::Relaxed)
}
fn reset(&mut self) {
self.total.store(0, Ordering::Relaxed);
self.queries.store(0, Ordering::Relaxed);
}
}
#[derive(Debug, Clone)]
pub struct TenantCostEntry {
pub tenant_id: TenantId,
pub total_cost: f64,
pub query_count: u64,
}
#[derive(Debug, Clone)]
pub struct TenantCostReport {
pub entries: Vec<TenantCostEntry>,
pub total_cost: f64,
pub generated_at: SystemTime,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tenant_stats() {
let tenant = TenantId::new("test");
let stats = TenantStats::new(tenant.clone());
stats.record_query(Duration::from_millis(10), 100, true);
stats.record_query(Duration::from_millis(20), 200, true);
stats.record_query(Duration::from_millis(5), 50, false);
let snapshot = stats.snapshot();
assert_eq!(snapshot.queries, 3);
assert_eq!(snapshot.errors, 1);
assert_eq!(snapshot.rows_processed, 350);
assert_eq!(snapshot.min_time, Duration::from_millis(5));
assert_eq!(snapshot.max_time, Duration::from_millis(20));
}
#[test]
fn test_tenant_metrics() {
let metrics = TenantMetrics::new();
let tenant_a = TenantId::new("tenant_a");
let tenant_b = TenantId::new("tenant_b");
metrics.record_query(&tenant_a, Duration::from_millis(10), 100, true);
metrics.record_query(&tenant_a, Duration::from_millis(15), 150, true);
metrics.record_query(&tenant_b, Duration::from_millis(20), 200, false);
let snapshot_a = metrics.snapshot(&tenant_a).unwrap();
assert_eq!(snapshot_a.queries, 2);
assert_eq!(snapshot_a.errors, 0);
let snapshot_b = metrics.snapshot(&tenant_b).unwrap();
assert_eq!(snapshot_b.queries, 1);
assert_eq!(snapshot_b.errors, 1);
let aggregate = metrics.aggregate_snapshot();
assert_eq!(aggregate.tenant_count, 2);
assert_eq!(aggregate.total_queries, 3);
assert_eq!(aggregate.total_errors, 1);
}
#[test]
fn test_top_tenants() {
let metrics = TenantMetrics::new();
for i in 0..5 {
let tenant = TenantId::new(format!("tenant_{}", i));
for _ in 0..(i + 1) {
metrics.record_query(&tenant, Duration::from_millis(10), 10, true);
}
}
let top = metrics.top_by_queries(3);
assert_eq!(top.len(), 3);
assert_eq!(top[0].queries, 5);
assert_eq!(top[1].queries, 4);
assert_eq!(top[2].queries, 3);
}
#[test]
fn test_connection_tracking() {
let metrics = TenantMetrics::new();
let tenant = TenantId::new("test");
metrics.record_connection(&tenant, true);
metrics.record_connection(&tenant, true);
let snapshot = metrics.snapshot(&tenant).unwrap();
assert_eq!(snapshot.active_connections, 2);
assert_eq!(snapshot.total_connections, 2);
metrics.record_connection(&tenant, false);
let snapshot = metrics.snapshot(&tenant).unwrap();
assert_eq!(snapshot.active_connections, 1);
assert_eq!(snapshot.total_connections, 2);
}
#[test]
fn test_bytes_tracking() {
let metrics = TenantMetrics::new();
let tenant = TenantId::new("test");
metrics.record_bytes(&tenant, 1024, 512);
metrics.record_bytes(&tenant, 2048, 1024);
let snapshot = metrics.snapshot(&tenant).unwrap();
assert_eq!(snapshot.bytes_read, 3072);
assert_eq!(snapshot.bytes_written, 1536);
}
#[test]
fn test_cost_tracker() {
let tracker = TenantCostTracker::new();
let tenant = TenantId::new("test");
tracker.record_query_cost(&tenant, 1000, 1_048_576, 524_288);
tracker.record_query_cost(&tenant, 500, 0, 0);
let cost = tracker.get_cost(&tenant).unwrap();
assert!(cost > 0.0);
let report = tracker.cost_report();
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].query_count, 2);
}
#[test]
fn test_metrics_reset() {
let metrics = TenantMetrics::new();
let tenant = TenantId::new("test");
metrics.record_query(&tenant, Duration::from_millis(10), 100, true);
let snapshot = metrics.snapshot(&tenant).unwrap();
assert_eq!(snapshot.queries, 1);
metrics.reset_tenant(&tenant);
let snapshot = metrics.snapshot(&tenant).unwrap();
assert_eq!(snapshot.queries, 0);
}
#[test]
fn test_snapshot_json() {
let tenant = TenantId::new("test");
let stats = TenantStats::new(tenant);
stats.record_query(Duration::from_millis(10), 100, true);
let snapshot = stats.snapshot();
let json = snapshot.to_json();
assert!(json.contains("\"tenant_id\":\"test\""));
assert!(json.contains("\"queries\":1"));
}
}