Skip to main content

vibesql_server/subscription/manager/
metrics.rs

1//! Metrics and configuration accessors for subscription management.
2
3use std::sync::atomic::Ordering;
4
5use super::SubscriptionManager;
6use crate::subscription::{SubscriptionConfig, SubscriptionId, SubscriptionMetrics};
7
8impl SubscriptionManager {
9    /// Get the number of active subscriptions
10    pub fn subscription_count(&self) -> usize {
11        self.subscriptions.len()
12    }
13
14    /// Get the tables being watched and their subscription counts
15    pub fn watched_tables(&self) -> Vec<(String, usize)> {
16        self.table_index.iter().map(|entry| (entry.key().clone(), entry.value().len())).collect()
17    }
18
19    /// Get the current configuration
20    pub fn config(&self) -> &SubscriptionConfig {
21        &self.config
22    }
23
24    /// Get the number of times the global limit was exceeded (for metrics)
25    pub fn limit_exceeded_count(&self) -> usize {
26        self.limit_exceeded_count.load(Ordering::Relaxed)
27    }
28
29    /// Get the number of times a result set was too large (for metrics)
30    pub fn result_set_exceeded_count(&self) -> usize {
31        self.result_set_exceeded_count.load(Ordering::Relaxed)
32    }
33
34    /// Get metrics for a specific subscription
35    ///
36    /// Returns metrics including updates sent, dropped, and channel health.
37    /// Returns None if the subscription doesn't exist.
38    pub fn get_subscription_metrics(&self, id: SubscriptionId) -> Option<SubscriptionMetrics> {
39        self.subscriptions.get(&id).map(|sub| SubscriptionMetrics {
40            subscription_id: Some(sub.id),
41            updates_sent: sub.updates_sent,
42            updates_dropped: sub.updates_dropped,
43            channel_buffer_size: sub.channel_buffer_size,
44            channel_capacity: sub.notify_tx.capacity(),
45            slow_consumer_threshold_percent: sub.slow_consumer_threshold_percent,
46        })
47    }
48
49    /// Get metrics for all active subscriptions
50    ///
51    /// Returns a vector of metrics for all subscriptions, useful for
52    /// monitoring and alerting on subscription health.
53    pub fn get_all_metrics(&self) -> Vec<SubscriptionMetrics> {
54        self.subscriptions
55            .iter()
56            .map(|entry| {
57                let sub = entry.value();
58                SubscriptionMetrics {
59                    subscription_id: Some(sub.id),
60                    updates_sent: sub.updates_sent,
61                    updates_dropped: sub.updates_dropped,
62                    channel_buffer_size: sub.channel_buffer_size,
63                    channel_capacity: sub.notify_tx.capacity(),
64                    slow_consumer_threshold_percent: sub.slow_consumer_threshold_percent,
65                }
66            })
67            .collect()
68    }
69}