1use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant, SystemTime};
10
11use dashmap::DashMap;
12
13use super::config::TenantId;
14
15pub struct TenantMetrics {
17 tenants: DashMap<TenantId, Arc<TenantStats>>,
19
20 start_time: Instant,
22
23 total_queries: AtomicU64,
25
26 total_errors: AtomicU64,
28}
29
30impl Default for TenantMetrics {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl TenantMetrics {
37 pub fn new() -> Self {
39 Self {
40 tenants: DashMap::new(),
41 start_time: Instant::now(),
42 total_queries: AtomicU64::new(0),
43 total_errors: AtomicU64::new(0),
44 }
45 }
46
47 pub fn get_or_create(&self, tenant: &TenantId) -> Arc<TenantStats> {
49 self.tenants
50 .entry(tenant.clone())
51 .or_insert_with(|| Arc::new(TenantStats::new(tenant.clone())))
52 .clone()
53 }
54
55 pub fn get(&self, tenant: &TenantId) -> Option<Arc<TenantStats>> {
57 self.tenants.get(tenant).map(|s| s.clone())
58 }
59
60 pub fn record_query(
62 &self,
63 tenant: &TenantId,
64 duration: Duration,
65 rows: u64,
66 success: bool,
67 ) {
68 self.total_queries.fetch_add(1, Ordering::Relaxed);
69 if !success {
70 self.total_errors.fetch_add(1, Ordering::Relaxed);
71 }
72
73 let stats = self.get_or_create(tenant);
74 stats.record_query(duration, rows, success);
75 }
76
77 pub fn record_bytes(&self, tenant: &TenantId, bytes_read: u64, bytes_written: u64) {
79 let stats = self.get_or_create(tenant);
80 stats.record_bytes(bytes_read, bytes_written);
81 }
82
83 pub fn record_connection(&self, tenant: &TenantId, connected: bool) {
85 let stats = self.get_or_create(tenant);
86 if connected {
87 stats.record_connect();
88 } else {
89 stats.record_disconnect();
90 }
91 }
92
93 pub fn tenant_ids(&self) -> Vec<TenantId> {
95 self.tenants.iter().map(|e| e.key().clone()).collect()
96 }
97
98 pub fn snapshot_all(&self) -> Vec<TenantMetricsSnapshot> {
100 self.tenants
101 .iter()
102 .map(|entry| entry.value().snapshot())
103 .collect()
104 }
105
106 pub fn snapshot(&self, tenant: &TenantId) -> Option<TenantMetricsSnapshot> {
108 self.tenants.get(tenant).map(|s| s.snapshot())
109 }
110
111 pub fn aggregate_snapshot(&self) -> AggregateMetricsSnapshot {
113 let mut total_queries = 0u64;
114 let mut total_errors = 0u64;
115 let mut total_time_us = 0u64;
116 let mut total_rows = 0u64;
117 let mut total_bytes_read = 0u64;
118 let mut total_bytes_written = 0u64;
119 let mut active_connections = 0u32;
120
121 for entry in self.tenants.iter() {
122 let stats = entry.value();
123 total_queries += stats.queries.load(Ordering::Relaxed);
124 total_errors += stats.errors.load(Ordering::Relaxed);
125 total_time_us += stats.total_time_us.load(Ordering::Relaxed);
126 total_rows += stats.rows_processed.load(Ordering::Relaxed);
127 total_bytes_read += stats.bytes_read.load(Ordering::Relaxed);
128 total_bytes_written += stats.bytes_written.load(Ordering::Relaxed);
129 active_connections += stats.active_connections.load(Ordering::Relaxed) as u32;
130 }
131
132 let elapsed = self.start_time.elapsed();
133 let qps = if elapsed.as_secs() > 0 {
134 total_queries as f64 / elapsed.as_secs_f64()
135 } else {
136 0.0
137 };
138
139 AggregateMetricsSnapshot {
140 tenant_count: self.tenants.len(),
141 total_queries,
142 total_errors,
143 error_rate: if total_queries > 0 {
144 total_errors as f64 / total_queries as f64
145 } else {
146 0.0
147 },
148 total_time: Duration::from_micros(total_time_us),
149 total_rows,
150 total_bytes_read,
151 total_bytes_written,
152 active_connections,
153 qps,
154 uptime: elapsed,
155 }
156 }
157
158 pub fn top_by_queries(&self, limit: usize) -> Vec<TenantMetricsSnapshot> {
160 let mut snapshots: Vec<_> = self.snapshot_all();
161 snapshots.sort_by(|a, b| b.queries.cmp(&a.queries));
162 snapshots.truncate(limit);
163 snapshots
164 }
165
166 pub fn top_by_time(&self, limit: usize) -> Vec<TenantMetricsSnapshot> {
168 let mut snapshots: Vec<_> = self.snapshot_all();
169 snapshots.sort_by(|a, b| b.total_time.cmp(&a.total_time));
170 snapshots.truncate(limit);
171 snapshots
172 }
173
174 pub fn top_by_errors(&self, limit: usize) -> Vec<TenantMetricsSnapshot> {
176 let mut snapshots: Vec<_> = self.snapshot_all();
177 snapshots.sort_by(|a, b| b.errors.cmp(&a.errors));
178 snapshots.truncate(limit);
179 snapshots
180 }
181
182 pub fn reset_tenant(&self, tenant: &TenantId) {
184 if let Some(stats) = self.tenants.get(tenant) {
185 stats.reset();
186 }
187 }
188
189 pub fn reset_all(&self) {
191 for entry in self.tenants.iter() {
192 entry.value().reset();
193 }
194 self.total_queries.store(0, Ordering::Relaxed);
195 self.total_errors.store(0, Ordering::Relaxed);
196 }
197}
198
199pub struct TenantStats {
201 tenant_id: TenantId,
203
204 queries: AtomicU64,
206
207 errors: AtomicU64,
209
210 total_time_us: AtomicU64,
212
213 min_time_us: AtomicU64,
215
216 max_time_us: AtomicU64,
218
219 rows_processed: AtomicU64,
221
222 bytes_read: AtomicU64,
224
225 bytes_written: AtomicU64,
227
228 active_connections: AtomicU64,
230
231 total_connections: AtomicU64,
233
234 created_at: Instant,
236
237 last_activity_us: AtomicU64,
239}
240
241impl TenantStats {
242 pub fn new(tenant_id: TenantId) -> Self {
244 Self {
245 tenant_id,
246 queries: AtomicU64::new(0),
247 errors: AtomicU64::new(0),
248 total_time_us: AtomicU64::new(0),
249 min_time_us: AtomicU64::new(u64::MAX),
250 max_time_us: AtomicU64::new(0),
251 rows_processed: AtomicU64::new(0),
252 bytes_read: AtomicU64::new(0),
253 bytes_written: AtomicU64::new(0),
254 active_connections: AtomicU64::new(0),
255 total_connections: AtomicU64::new(0),
256 created_at: Instant::now(),
257 last_activity_us: AtomicU64::new(0),
258 }
259 }
260
261 pub fn record_query(&self, duration: Duration, rows: u64, success: bool) {
263 self.queries.fetch_add(1, Ordering::Relaxed);
264
265 if !success {
266 self.errors.fetch_add(1, Ordering::Relaxed);
267 }
268
269 let duration_us = duration.as_micros() as u64;
270 self.total_time_us.fetch_add(duration_us, Ordering::Relaxed);
271 self.rows_processed.fetch_add(rows, Ordering::Relaxed);
272
273 self.update_min(&self.min_time_us, duration_us);
275 self.update_max(&self.max_time_us, duration_us);
276
277 let now = self.created_at.elapsed().as_micros() as u64;
279 self.last_activity_us.store(now, Ordering::Relaxed);
280 }
281
282 pub fn record_bytes(&self, read: u64, written: u64) {
284 self.bytes_read.fetch_add(read, Ordering::Relaxed);
285 self.bytes_written.fetch_add(written, Ordering::Relaxed);
286 }
287
288 pub fn record_connect(&self) {
290 self.active_connections.fetch_add(1, Ordering::Relaxed);
291 self.total_connections.fetch_add(1, Ordering::Relaxed);
292 }
293
294 pub fn record_disconnect(&self) {
296 self.active_connections.fetch_sub(1, Ordering::Relaxed);
297 }
298
299 pub fn snapshot(&self) -> TenantMetricsSnapshot {
301 let queries = self.queries.load(Ordering::Relaxed);
302 let total_time_us = self.total_time_us.load(Ordering::Relaxed);
303
304 let min_time = {
305 let min = self.min_time_us.load(Ordering::Relaxed);
306 if min == u64::MAX {
307 Duration::ZERO
308 } else {
309 Duration::from_micros(min)
310 }
311 };
312
313 TenantMetricsSnapshot {
314 tenant_id: self.tenant_id.clone(),
315 queries,
316 errors: self.errors.load(Ordering::Relaxed),
317 total_time: Duration::from_micros(total_time_us),
318 avg_time: if queries > 0 {
319 Duration::from_micros(total_time_us / queries)
320 } else {
321 Duration::ZERO
322 },
323 min_time,
324 max_time: Duration::from_micros(self.max_time_us.load(Ordering::Relaxed)),
325 rows_processed: self.rows_processed.load(Ordering::Relaxed),
326 bytes_read: self.bytes_read.load(Ordering::Relaxed),
327 bytes_written: self.bytes_written.load(Ordering::Relaxed),
328 active_connections: self.active_connections.load(Ordering::Relaxed) as u32,
329 total_connections: self.total_connections.load(Ordering::Relaxed),
330 uptime: self.created_at.elapsed(),
331 last_activity: Duration::from_micros(self.last_activity_us.load(Ordering::Relaxed)),
332 }
333 }
334
335 pub fn reset(&self) {
337 self.queries.store(0, Ordering::Relaxed);
338 self.errors.store(0, Ordering::Relaxed);
339 self.total_time_us.store(0, Ordering::Relaxed);
340 self.min_time_us.store(u64::MAX, Ordering::Relaxed);
341 self.max_time_us.store(0, Ordering::Relaxed);
342 self.rows_processed.store(0, Ordering::Relaxed);
343 self.bytes_read.store(0, Ordering::Relaxed);
344 self.bytes_written.store(0, Ordering::Relaxed);
345 }
346
347 fn update_min(&self, atomic: &AtomicU64, value: u64) {
349 let mut current = atomic.load(Ordering::Relaxed);
350 while value < current {
351 match atomic.compare_exchange_weak(
352 current,
353 value,
354 Ordering::Relaxed,
355 Ordering::Relaxed,
356 ) {
357 Ok(_) => break,
358 Err(c) => current = c,
359 }
360 }
361 }
362
363 fn update_max(&self, atomic: &AtomicU64, value: u64) {
365 let mut current = atomic.load(Ordering::Relaxed);
366 while value > current {
367 match atomic.compare_exchange_weak(
368 current,
369 value,
370 Ordering::Relaxed,
371 Ordering::Relaxed,
372 ) {
373 Ok(_) => break,
374 Err(c) => current = c,
375 }
376 }
377 }
378}
379
380#[derive(Debug, Clone)]
382pub struct TenantMetricsSnapshot {
383 pub tenant_id: TenantId,
385
386 pub queries: u64,
388
389 pub errors: u64,
391
392 pub total_time: Duration,
394
395 pub avg_time: Duration,
397
398 pub min_time: Duration,
400
401 pub max_time: Duration,
403
404 pub rows_processed: u64,
406
407 pub bytes_read: u64,
409
410 pub bytes_written: u64,
412
413 pub active_connections: u32,
415
416 pub total_connections: u64,
418
419 pub uptime: Duration,
421
422 pub last_activity: Duration,
424}
425
426impl TenantMetricsSnapshot {
427 pub fn qps(&self) -> f64 {
429 if self.uptime.as_secs() > 0 {
430 self.queries as f64 / self.uptime.as_secs_f64()
431 } else {
432 0.0
433 }
434 }
435
436 pub fn error_rate(&self) -> f64 {
438 if self.queries > 0 {
439 self.errors as f64 / self.queries as f64
440 } else {
441 0.0
442 }
443 }
444
445 pub fn avg_rows(&self) -> f64 {
447 if self.queries > 0 {
448 self.rows_processed as f64 / self.queries as f64
449 } else {
450 0.0
451 }
452 }
453
454 pub fn to_json(&self) -> String {
456 format!(
457 r#"{{"tenant_id":"{}","queries":{},"errors":{},"error_rate":{:.4},"avg_time_ms":{:.2},"qps":{:.2},"active_connections":{}}}"#,
458 self.tenant_id.0,
459 self.queries,
460 self.errors,
461 self.error_rate(),
462 self.avg_time.as_secs_f64() * 1000.0,
463 self.qps(),
464 self.active_connections
465 )
466 }
467}
468
469#[derive(Debug, Clone)]
471pub struct AggregateMetricsSnapshot {
472 pub tenant_count: usize,
474
475 pub total_queries: u64,
477
478 pub total_errors: u64,
480
481 pub error_rate: f64,
483
484 pub total_time: Duration,
486
487 pub total_rows: u64,
489
490 pub total_bytes_read: u64,
492
493 pub total_bytes_written: u64,
495
496 pub active_connections: u32,
498
499 pub qps: f64,
501
502 pub uptime: Duration,
504}
505
506impl AggregateMetricsSnapshot {
507 pub fn to_json(&self) -> String {
509 format!(
510 r#"{{"tenant_count":{},"total_queries":{},"total_errors":{},"error_rate":{:.4},"qps":{:.2},"active_connections":{},"uptime_secs":{}}}"#,
511 self.tenant_count,
512 self.total_queries,
513 self.total_errors,
514 self.error_rate,
515 self.qps,
516 self.active_connections,
517 self.uptime.as_secs()
518 )
519 }
520}
521
522pub struct TenantCostTracker {
524 cost_per_query: f64,
526
527 cost_per_1000_rows: f64,
529
530 cost_per_mb_read: f64,
532
533 cost_per_mb_written: f64,
535
536 cost_per_conn_second: f64,
538
539 costs: DashMap<TenantId, TenantCost>,
541}
542
543impl TenantCostTracker {
544 pub fn new() -> Self {
546 Self {
547 cost_per_query: 0.000001, cost_per_1000_rows: 0.00001, cost_per_mb_read: 0.00001, cost_per_mb_written: 0.0001, cost_per_conn_second: 0.0, costs: DashMap::new(),
553 }
554 }
555
556 pub fn with_pricing(
558 mut self,
559 per_query: f64,
560 per_1000_rows: f64,
561 per_mb_read: f64,
562 per_mb_written: f64,
563 ) -> Self {
564 self.cost_per_query = per_query;
565 self.cost_per_1000_rows = per_1000_rows;
566 self.cost_per_mb_read = per_mb_read;
567 self.cost_per_mb_written = per_mb_written;
568 self
569 }
570
571 pub fn record_query_cost(
573 &self,
574 tenant: &TenantId,
575 rows: u64,
576 bytes_read: u64,
577 bytes_written: u64,
578 ) {
579 let cost = self.cost_per_query
580 + (rows as f64 / 1000.0) * self.cost_per_1000_rows
581 + (bytes_read as f64 / 1_048_576.0) * self.cost_per_mb_read
582 + (bytes_written as f64 / 1_048_576.0) * self.cost_per_mb_written;
583
584 self.costs
585 .entry(tenant.clone())
586 .or_insert_with(TenantCost::new)
587 .add_cost(cost);
588 }
589
590 pub fn get_cost(&self, tenant: &TenantId) -> Option<f64> {
592 self.costs.get(tenant).map(|c| c.total_cost())
593 }
594
595 pub fn all_costs(&self) -> HashMap<TenantId, f64> {
597 self.costs
598 .iter()
599 .map(|e| (e.key().clone(), e.value().total_cost()))
600 .collect()
601 }
602
603 pub fn reset_tenant(&self, tenant: &TenantId) {
605 if let Some(mut cost) = self.costs.get_mut(tenant) {
606 cost.reset();
607 }
608 }
609
610 pub fn cost_report(&self) -> TenantCostReport {
612 let mut entries: Vec<_> = self
613 .costs
614 .iter()
615 .map(|e| TenantCostEntry {
616 tenant_id: e.key().clone(),
617 total_cost: e.value().total_cost(),
618 query_count: e.value().query_count(),
619 })
620 .collect();
621
622 entries.sort_by(|a, b| b.total_cost.partial_cmp(&a.total_cost).unwrap());
623
624 let total = entries.iter().map(|e| e.total_cost).sum();
625
626 TenantCostReport {
627 entries,
628 total_cost: total,
629 generated_at: SystemTime::now(),
630 }
631 }
632}
633
634impl Default for TenantCostTracker {
635 fn default() -> Self {
636 Self::new()
637 }
638}
639
640struct TenantCost {
642 total: std::sync::atomic::AtomicU64, queries: AtomicU64,
644}
645
646impl TenantCost {
647 fn new() -> Self {
648 Self {
649 total: AtomicU64::new(0),
650 queries: AtomicU64::new(0),
651 }
652 }
653
654 fn add_cost(&self, cost: f64) {
655 let scaled = (cost * 1_000_000.0) as u64;
656 self.total.fetch_add(scaled, Ordering::Relaxed);
657 self.queries.fetch_add(1, Ordering::Relaxed);
658 }
659
660 fn total_cost(&self) -> f64 {
661 self.total.load(Ordering::Relaxed) as f64 / 1_000_000.0
662 }
663
664 fn query_count(&self) -> u64 {
665 self.queries.load(Ordering::Relaxed)
666 }
667
668 fn reset(&mut self) {
669 self.total.store(0, Ordering::Relaxed);
670 self.queries.store(0, Ordering::Relaxed);
671 }
672}
673
674#[derive(Debug, Clone)]
676pub struct TenantCostEntry {
677 pub tenant_id: TenantId,
679
680 pub total_cost: f64,
682
683 pub query_count: u64,
685}
686
687#[derive(Debug, Clone)]
689pub struct TenantCostReport {
690 pub entries: Vec<TenantCostEntry>,
692
693 pub total_cost: f64,
695
696 pub generated_at: SystemTime,
698}
699
700#[cfg(test)]
701mod tests {
702 use super::*;
703
704 #[test]
705 fn test_tenant_stats() {
706 let tenant = TenantId::new("test");
707 let stats = TenantStats::new(tenant.clone());
708
709 stats.record_query(Duration::from_millis(10), 100, true);
710 stats.record_query(Duration::from_millis(20), 200, true);
711 stats.record_query(Duration::from_millis(5), 50, false);
712
713 let snapshot = stats.snapshot();
714
715 assert_eq!(snapshot.queries, 3);
716 assert_eq!(snapshot.errors, 1);
717 assert_eq!(snapshot.rows_processed, 350);
718 assert_eq!(snapshot.min_time, Duration::from_millis(5));
719 assert_eq!(snapshot.max_time, Duration::from_millis(20));
720 }
721
722 #[test]
723 fn test_tenant_metrics() {
724 let metrics = TenantMetrics::new();
725
726 let tenant_a = TenantId::new("tenant_a");
727 let tenant_b = TenantId::new("tenant_b");
728
729 metrics.record_query(&tenant_a, Duration::from_millis(10), 100, true);
730 metrics.record_query(&tenant_a, Duration::from_millis(15), 150, true);
731 metrics.record_query(&tenant_b, Duration::from_millis(20), 200, false);
732
733 let snapshot_a = metrics.snapshot(&tenant_a).unwrap();
734 assert_eq!(snapshot_a.queries, 2);
735 assert_eq!(snapshot_a.errors, 0);
736
737 let snapshot_b = metrics.snapshot(&tenant_b).unwrap();
738 assert_eq!(snapshot_b.queries, 1);
739 assert_eq!(snapshot_b.errors, 1);
740
741 let aggregate = metrics.aggregate_snapshot();
742 assert_eq!(aggregate.tenant_count, 2);
743 assert_eq!(aggregate.total_queries, 3);
744 assert_eq!(aggregate.total_errors, 1);
745 }
746
747 #[test]
748 fn test_top_tenants() {
749 let metrics = TenantMetrics::new();
750
751 for i in 0..5 {
752 let tenant = TenantId::new(format!("tenant_{}", i));
753 for _ in 0..(i + 1) {
754 metrics.record_query(&tenant, Duration::from_millis(10), 10, true);
755 }
756 }
757
758 let top = metrics.top_by_queries(3);
759 assert_eq!(top.len(), 3);
760 assert_eq!(top[0].queries, 5);
761 assert_eq!(top[1].queries, 4);
762 assert_eq!(top[2].queries, 3);
763 }
764
765 #[test]
766 fn test_connection_tracking() {
767 let metrics = TenantMetrics::new();
768 let tenant = TenantId::new("test");
769
770 metrics.record_connection(&tenant, true);
771 metrics.record_connection(&tenant, true);
772
773 let snapshot = metrics.snapshot(&tenant).unwrap();
774 assert_eq!(snapshot.active_connections, 2);
775 assert_eq!(snapshot.total_connections, 2);
776
777 metrics.record_connection(&tenant, false);
778 let snapshot = metrics.snapshot(&tenant).unwrap();
779 assert_eq!(snapshot.active_connections, 1);
780 assert_eq!(snapshot.total_connections, 2);
781 }
782
783 #[test]
784 fn test_bytes_tracking() {
785 let metrics = TenantMetrics::new();
786 let tenant = TenantId::new("test");
787
788 metrics.record_bytes(&tenant, 1024, 512);
789 metrics.record_bytes(&tenant, 2048, 1024);
790
791 let snapshot = metrics.snapshot(&tenant).unwrap();
792 assert_eq!(snapshot.bytes_read, 3072);
793 assert_eq!(snapshot.bytes_written, 1536);
794 }
795
796 #[test]
797 fn test_cost_tracker() {
798 let tracker = TenantCostTracker::new();
799 let tenant = TenantId::new("test");
800
801 tracker.record_query_cost(&tenant, 1000, 1_048_576, 524_288);
802 tracker.record_query_cost(&tenant, 500, 0, 0);
803
804 let cost = tracker.get_cost(&tenant).unwrap();
805 assert!(cost > 0.0);
806
807 let report = tracker.cost_report();
808 assert_eq!(report.entries.len(), 1);
809 assert_eq!(report.entries[0].query_count, 2);
810 }
811
812 #[test]
813 fn test_metrics_reset() {
814 let metrics = TenantMetrics::new();
815 let tenant = TenantId::new("test");
816
817 metrics.record_query(&tenant, Duration::from_millis(10), 100, true);
818
819 let snapshot = metrics.snapshot(&tenant).unwrap();
820 assert_eq!(snapshot.queries, 1);
821
822 metrics.reset_tenant(&tenant);
823
824 let snapshot = metrics.snapshot(&tenant).unwrap();
825 assert_eq!(snapshot.queries, 0);
826 }
827
828 #[test]
829 fn test_snapshot_json() {
830 let tenant = TenantId::new("test");
831 let stats = TenantStats::new(tenant);
832 stats.record_query(Duration::from_millis(10), 100, true);
833
834 let snapshot = stats.snapshot();
835 let json = snapshot.to_json();
836
837 assert!(json.contains("\"tenant_id\":\"test\""));
838 assert!(json.contains("\"queries\":1"));
839 }
840}