buswatch_sdk/
instrumentor.rs

1//! The main Instrumentor type for collecting and emitting metrics.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use crate::handle::ModuleHandle;
7use crate::output::Output;
8use crate::state::GlobalState;
9
10/// The main entry point for instrumenting a message bus.
11///
12/// An Instrumentor collects metrics from registered modules and periodically
13/// emits snapshots to configured outputs.
14///
15/// # Example
16///
17/// ```rust,no_run
18/// use buswatch_sdk::{Instrumentor, Output};
19/// use std::time::Duration;
20///
21/// #[tokio::main]
22/// async fn main() {
23///     let instrumentor = Instrumentor::builder()
24///         .output(Output::file("metrics.json"))
25///         .interval(Duration::from_secs(1))
26///         .build();
27///
28///     let handle = instrumentor.register("my-service");
29///
30///     // Start background emission
31///     instrumentor.start();
32///
33///     // Record some metrics
34///     handle.record_read("events", 10);
35///     handle.record_write("results", 10);
36///
37///     // Keep the application running
38///     tokio::time::sleep(Duration::from_secs(5)).await;
39/// }
40/// ```
41#[derive(Debug)]
42pub struct Instrumentor {
43    state: Arc<GlobalState>,
44    outputs: Arc<Vec<Output>>,
45    interval: Duration,
46}
47
48impl Instrumentor {
49    /// Create a new instrumentor with default settings.
50    ///
51    /// By default, no outputs are configured and the interval is 1 second.
52    pub fn new() -> Self {
53        Self {
54            state: Arc::new(GlobalState::default()),
55            outputs: Arc::new(Vec::new()),
56            interval: Duration::from_secs(1),
57        }
58    }
59
60    /// Create a builder for configuring the instrumentor.
61    pub fn builder() -> InstrumentorBuilder {
62        InstrumentorBuilder::new()
63    }
64
65    /// Register a module and get a handle for recording metrics.
66    ///
67    /// If a module with this name already exists, returns a handle to
68    /// the existing module.
69    ///
70    /// # Arguments
71    ///
72    /// * `name` - The module name (e.g., "order-processor", "notification-sender")
73    pub fn register(&self, name: &str) -> ModuleHandle {
74        let module_state = self.state.register_module(name);
75        ModuleHandle {
76            state: module_state,
77            global: self.state.clone(),
78            name: name.to_string(),
79        }
80    }
81
82    /// Collect a snapshot of all current metrics.
83    ///
84    /// This is useful if you want to emit snapshots manually rather than
85    /// using the background emission.
86    pub fn collect(&self) -> buswatch_types::Snapshot {
87        self.state.collect()
88    }
89
90    /// Start background emission of snapshots.
91    ///
92    /// This spawns a tokio task that periodically collects and emits
93    /// snapshots to all configured outputs.
94    ///
95    /// Returns a handle that can be used to stop the emission.
96    #[cfg(feature = "tokio")]
97    pub fn start(&self) -> EmissionHandle {
98        use tokio::sync::watch;
99
100        let (stop_tx, stop_rx) = watch::channel(false);
101        let state = self.state.clone();
102        let outputs = self.outputs.clone();
103        let interval = self.interval;
104
105        tokio::spawn(async move {
106            let mut interval_timer = tokio::time::interval(interval);
107            let mut stop_rx = stop_rx;
108
109            loop {
110                tokio::select! {
111                    _ = interval_timer.tick() => {
112                        let snapshot = state.collect();
113                        for output in outputs.iter() {
114                            let _ = output.emit(&snapshot).await;
115                        }
116                    }
117                    _ = stop_rx.changed() => {
118                        if *stop_rx.borrow() {
119                            break;
120                        }
121                    }
122                }
123            }
124        });
125
126        EmissionHandle { stop_tx }
127    }
128
129    /// Emit a snapshot to all outputs immediately.
130    #[cfg(feature = "tokio")]
131    pub async fn emit_now(&self) {
132        let snapshot = self.state.collect();
133        for output in self.outputs.iter() {
134            let _ = output.emit(&snapshot).await;
135        }
136    }
137}
138
139impl Default for Instrumentor {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145/// Builder for configuring an Instrumentor.
146#[derive(Debug, Default)]
147pub struct InstrumentorBuilder {
148    outputs: Vec<Output>,
149    interval: Option<Duration>,
150}
151
152impl InstrumentorBuilder {
153    /// Create a new builder.
154    pub fn new() -> Self {
155        Self::default()
156    }
157
158    /// Add an output destination.
159    ///
160    /// Multiple outputs can be added; snapshots will be emitted to all of them.
161    pub fn output(mut self, output: Output) -> Self {
162        self.outputs.push(output);
163        self
164    }
165
166    /// Set the emission interval.
167    ///
168    /// Defaults to 1 second if not specified.
169    pub fn interval(mut self, interval: Duration) -> Self {
170        self.interval = Some(interval);
171        self
172    }
173
174    /// Build the instrumentor.
175    pub fn build(self) -> Instrumentor {
176        Instrumentor {
177            state: Arc::new(GlobalState::default()),
178            outputs: Arc::new(self.outputs),
179            interval: self.interval.unwrap_or(Duration::from_secs(1)),
180        }
181    }
182}
183
184/// Handle for controlling background emission.
185///
186/// Drop this handle to stop emission, or call `stop()` explicitly.
187#[cfg(feature = "tokio")]
188pub struct EmissionHandle {
189    stop_tx: tokio::sync::watch::Sender<bool>,
190}
191
192#[cfg(feature = "tokio")]
193impl EmissionHandle {
194    /// Stop background emission.
195    pub fn stop(self) {
196        let _ = self.stop_tx.send(true);
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203
204    #[test]
205    fn test_instrumentor_new() {
206        let instrumentor = Instrumentor::new();
207        let handle = instrumentor.register("test-module");
208        assert_eq!(handle.name(), "test-module");
209    }
210
211    #[test]
212    fn test_instrumentor_collect() {
213        let instrumentor = Instrumentor::new();
214        let handle = instrumentor.register("producer");
215
216        handle.record_write("events", 100);
217        handle.record_read("commands", 50);
218
219        let snapshot = instrumentor.collect();
220        assert_eq!(snapshot.modules.len(), 1);
221
222        let metrics = snapshot.modules.get("producer").unwrap();
223        assert_eq!(metrics.writes.get("events").unwrap().count, 100);
224        assert_eq!(metrics.reads.get("commands").unwrap().count, 50);
225    }
226
227    #[test]
228    fn test_multiple_modules() {
229        let instrumentor = Instrumentor::new();
230
231        let producer = instrumentor.register("producer");
232        let consumer = instrumentor.register("consumer");
233
234        producer.record_write("events", 100);
235        consumer.record_read("events", 95);
236
237        let snapshot = instrumentor.collect();
238        assert_eq!(snapshot.modules.len(), 2);
239
240        // Check backlog computation
241        let consumer_metrics = snapshot.modules.get("consumer").unwrap();
242        let events_read = consumer_metrics.reads.get("events").unwrap();
243        assert_eq!(events_read.count, 95);
244        assert_eq!(events_read.backlog, Some(5)); // 100 written - 95 read = 5 backlog
245    }
246
247    #[test]
248    fn test_builder() {
249        let instrumentor = Instrumentor::builder()
250            .output(Output::file("test.json"))
251            .interval(Duration::from_millis(500))
252            .build();
253
254        assert_eq!(instrumentor.interval, Duration::from_millis(500));
255        assert_eq!(instrumentor.outputs.len(), 1);
256    }
257}