buswatch_sdk/
handle.rs

1//! Module handle for recording metrics.
2
3use std::sync::atomic::Ordering;
4use std::sync::Arc;
5use std::time::Instant;
6
7use crate::state::{GlobalState, ModuleState};
8
9/// A handle for recording metrics for a specific module.
10///
11/// This is the primary interface for instrumenting your message bus.
12/// Obtain a handle by calling `Instrumentor::register()`.
13///
14/// # Example
15///
16/// ```rust
17/// use buswatch_sdk::Instrumentor;
18///
19/// let instrumentor = Instrumentor::new();
20/// let handle = instrumentor.register("my-service");
21///
22/// // Record a read from a topic
23/// handle.record_read("orders.new", 1);
24///
25/// // Record a write to a topic
26/// handle.record_write("orders.processed", 1);
27///
28/// // Track pending operations
29/// let guard = handle.start_read("orders.new");
30/// // ... do the read ...
31/// drop(guard); // Clears pending state
32/// ```
33#[derive(Clone)]
34pub struct ModuleHandle {
35    pub(crate) state: Arc<ModuleState>,
36    pub(crate) global: Arc<GlobalState>,
37    pub(crate) name: String,
38}
39
40impl ModuleHandle {
41    /// Record that messages were read from a topic.
42    ///
43    /// # Arguments
44    ///
45    /// * `topic` - The topic name
46    /// * `count` - Number of messages read
47    pub fn record_read(&self, topic: &str, count: u64) {
48        let read_state = self.state.get_or_create_read(topic);
49        read_state.count.fetch_add(count, Ordering::Relaxed);
50    }
51
52    /// Record that messages were written to a topic.
53    ///
54    /// # Arguments
55    ///
56    /// * `topic` - The topic name
57    /// * `count` - Number of messages written
58    pub fn record_write(&self, topic: &str, count: u64) {
59        let write_state = self.state.get_or_create_write(topic);
60        write_state.count.fetch_add(count, Ordering::Relaxed);
61
62        // Also update global write counter for backlog computation
63        let global_counter = self.global.get_topic_write_counter(topic);
64        global_counter.fetch_add(count, Ordering::Relaxed);
65    }
66
67    /// Start tracking a pending read operation.
68    ///
69    /// Returns a guard that clears the pending state when dropped.
70    /// This is useful for tracking how long reads are blocked.
71    ///
72    /// # Example
73    ///
74    /// ```rust
75    /// # use buswatch_sdk::Instrumentor;
76    /// # let instrumentor = Instrumentor::new();
77    /// # let handle = instrumentor.register("test");
78    /// let guard = handle.start_read("events");
79    /// // ... blocking read operation ...
80    /// drop(guard); // Clears pending state
81    /// handle.record_read("events", 1);
82    /// ```
83    pub fn start_read(&self, topic: &str) -> PendingGuard {
84        let read_state = self.state.get_or_create_read(topic);
85        *read_state.pending_since.write() = Some(Instant::now());
86
87        PendingGuard {
88            state: PendingState::Read(read_state),
89        }
90    }
91
92    /// Start tracking a pending write operation.
93    ///
94    /// Returns a guard that clears the pending state when dropped.
95    /// This is useful for tracking backpressure (slow consumers).
96    pub fn start_write(&self, topic: &str) -> PendingGuard {
97        let write_state = self.state.get_or_create_write(topic);
98        *write_state.pending_since.write() = Some(Instant::now());
99
100        PendingGuard {
101            state: PendingState::Write(write_state),
102        }
103    }
104
105    /// Set the pending duration for a read directly.
106    ///
107    /// Use this if you're computing pending time yourself rather than
108    /// using the guard-based API.
109    pub fn set_read_pending(&self, topic: &str, since: Option<Instant>) {
110        let read_state = self.state.get_or_create_read(topic);
111        *read_state.pending_since.write() = since;
112    }
113
114    /// Set the pending duration for a write directly.
115    pub fn set_write_pending(&self, topic: &str, since: Option<Instant>) {
116        let write_state = self.state.get_or_create_write(topic);
117        *write_state.pending_since.write() = since;
118    }
119
120    /// Get the module name.
121    pub fn name(&self) -> &str {
122        &self.name
123    }
124}
125
126impl std::fmt::Debug for ModuleHandle {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        f.debug_struct("ModuleHandle")
129            .field("name", &self.name)
130            .finish()
131    }
132}
133
134/// Internal state for pending guard.
135enum PendingState {
136    Read(Arc<crate::state::ReadState>),
137    Write(Arc<crate::state::WriteState>),
138}
139
140/// Guard that clears pending state when dropped.
141///
142/// This implements RAII-style tracking of pending operations.
143pub struct PendingGuard {
144    state: PendingState,
145}
146
147impl Drop for PendingGuard {
148    fn drop(&mut self) {
149        match &self.state {
150            PendingState::Read(s) => *s.pending_since.write() = None,
151            PendingState::Write(s) => *s.pending_since.write() = None,
152        }
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::state::GlobalState;
160
161    fn create_handle() -> ModuleHandle {
162        let global = Arc::new(GlobalState::default());
163        let state = global.register_module("test");
164        ModuleHandle {
165            state,
166            global,
167            name: "test".to_string(),
168        }
169    }
170
171    #[test]
172    fn test_record_read() {
173        let handle = create_handle();
174        handle.record_read("topic", 5);
175        handle.record_read("topic", 3);
176
177        let metrics = handle.state.collect();
178        assert_eq!(metrics.reads.get("topic").unwrap().count, 8);
179    }
180
181    #[test]
182    fn test_record_write() {
183        let handle = create_handle();
184        handle.record_write("topic", 10);
185
186        let metrics = handle.state.collect();
187        assert_eq!(metrics.writes.get("topic").unwrap().count, 10);
188    }
189
190    #[test]
191    fn test_pending_guard() {
192        let handle = create_handle();
193
194        {
195            let _guard = handle.start_read("topic");
196            // While guard is held, pending should be set
197            let state = handle.state.get_or_create_read("topic");
198            assert!(state.pending_since.read().is_some());
199        }
200
201        // After guard is dropped, pending should be cleared
202        let state = handle.state.get_or_create_read("topic");
203        assert!(state.pending_since.read().is_none());
204    }
205}