vibesql_server/subscription/manager/
metrics.rs1use std::sync::atomic::Ordering;
4
5use super::SubscriptionManager;
6use crate::subscription::{SubscriptionConfig, SubscriptionId, SubscriptionMetrics};
7
8impl SubscriptionManager {
9 pub fn subscription_count(&self) -> usize {
11 self.subscriptions.len()
12 }
13
14 pub fn watched_tables(&self) -> Vec<(String, usize)> {
16 self.table_index.iter().map(|entry| (entry.key().clone(), entry.value().len())).collect()
17 }
18
19 pub fn config(&self) -> &SubscriptionConfig {
21 &self.config
22 }
23
24 pub fn limit_exceeded_count(&self) -> usize {
26 self.limit_exceeded_count.load(Ordering::Relaxed)
27 }
28
29 pub fn result_set_exceeded_count(&self) -> usize {
31 self.result_set_exceeded_count.load(Ordering::Relaxed)
32 }
33
34 pub fn get_subscription_metrics(&self, id: SubscriptionId) -> Option<SubscriptionMetrics> {
39 self.subscriptions.get(&id).map(|sub| SubscriptionMetrics {
40 subscription_id: Some(sub.id),
41 updates_sent: sub.updates_sent,
42 updates_dropped: sub.updates_dropped,
43 channel_buffer_size: sub.channel_buffer_size,
44 channel_capacity: sub.notify_tx.capacity(),
45 slow_consumer_threshold_percent: sub.slow_consumer_threshold_percent,
46 })
47 }
48
49 pub fn get_all_metrics(&self) -> Vec<SubscriptionMetrics> {
54 self.subscriptions
55 .iter()
56 .map(|entry| {
57 let sub = entry.value();
58 SubscriptionMetrics {
59 subscription_id: Some(sub.id),
60 updates_sent: sub.updates_sent,
61 updates_dropped: sub.updates_dropped,
62 channel_buffer_size: sub.channel_buffer_size,
63 channel_capacity: sub.notify_tx.capacity(),
64 slow_consumer_threshold_percent: sub.slow_consumer_threshold_percent,
65 }
66 })
67 .collect()
68 }
69}