buswatch_sdk/
instrumentor.rs1use std::sync::Arc;
4use std::time::Duration;
5
6use crate::handle::ModuleHandle;
7use crate::output::Output;
8use crate::state::GlobalState;
9
10#[derive(Debug)]
42pub struct Instrumentor {
43 state: Arc<GlobalState>,
44 outputs: Arc<Vec<Output>>,
45 interval: Duration,
46}
47
48impl Instrumentor {
49 pub fn new() -> Self {
53 Self {
54 state: Arc::new(GlobalState::default()),
55 outputs: Arc::new(Vec::new()),
56 interval: Duration::from_secs(1),
57 }
58 }
59
60 pub fn builder() -> InstrumentorBuilder {
62 InstrumentorBuilder::new()
63 }
64
65 pub fn register(&self, name: &str) -> ModuleHandle {
74 let module_state = self.state.register_module(name);
75 ModuleHandle {
76 state: module_state,
77 global: self.state.clone(),
78 name: name.to_string(),
79 }
80 }
81
82 pub fn collect(&self) -> buswatch_types::Snapshot {
87 self.state.collect()
88 }
89
90 #[cfg(feature = "tokio")]
97 pub fn start(&self) -> EmissionHandle {
98 use tokio::sync::watch;
99
100 let (stop_tx, stop_rx) = watch::channel(false);
101 let state = self.state.clone();
102 let outputs = self.outputs.clone();
103 let interval = self.interval;
104
105 tokio::spawn(async move {
106 let mut interval_timer = tokio::time::interval(interval);
107 let mut stop_rx = stop_rx;
108
109 loop {
110 tokio::select! {
111 _ = interval_timer.tick() => {
112 let snapshot = state.collect();
113 for output in outputs.iter() {
114 let _ = output.emit(&snapshot).await;
115 }
116 }
117 _ = stop_rx.changed() => {
118 if *stop_rx.borrow() {
119 break;
120 }
121 }
122 }
123 }
124 });
125
126 EmissionHandle { stop_tx }
127 }
128
129 #[cfg(feature = "tokio")]
131 pub async fn emit_now(&self) {
132 let snapshot = self.state.collect();
133 for output in self.outputs.iter() {
134 let _ = output.emit(&snapshot).await;
135 }
136 }
137}
138
139impl Default for Instrumentor {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145#[derive(Debug, Default)]
147pub struct InstrumentorBuilder {
148 outputs: Vec<Output>,
149 interval: Option<Duration>,
150}
151
152impl InstrumentorBuilder {
153 pub fn new() -> Self {
155 Self::default()
156 }
157
158 pub fn output(mut self, output: Output) -> Self {
162 self.outputs.push(output);
163 self
164 }
165
166 pub fn interval(mut self, interval: Duration) -> Self {
170 self.interval = Some(interval);
171 self
172 }
173
174 pub fn build(self) -> Instrumentor {
176 Instrumentor {
177 state: Arc::new(GlobalState::default()),
178 outputs: Arc::new(self.outputs),
179 interval: self.interval.unwrap_or(Duration::from_secs(1)),
180 }
181 }
182}
183
184#[cfg(feature = "tokio")]
188pub struct EmissionHandle {
189 stop_tx: tokio::sync::watch::Sender<bool>,
190}
191
192#[cfg(feature = "tokio")]
193impl EmissionHandle {
194 pub fn stop(self) {
196 let _ = self.stop_tx.send(true);
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203
204 #[test]
205 fn test_instrumentor_new() {
206 let instrumentor = Instrumentor::new();
207 let handle = instrumentor.register("test-module");
208 assert_eq!(handle.name(), "test-module");
209 }
210
211 #[test]
212 fn test_instrumentor_collect() {
213 let instrumentor = Instrumentor::new();
214 let handle = instrumentor.register("producer");
215
216 handle.record_write("events", 100);
217 handle.record_read("commands", 50);
218
219 let snapshot = instrumentor.collect();
220 assert_eq!(snapshot.modules.len(), 1);
221
222 let metrics = snapshot.modules.get("producer").unwrap();
223 assert_eq!(metrics.writes.get("events").unwrap().count, 100);
224 assert_eq!(metrics.reads.get("commands").unwrap().count, 50);
225 }
226
227 #[test]
228 fn test_multiple_modules() {
229 let instrumentor = Instrumentor::new();
230
231 let producer = instrumentor.register("producer");
232 let consumer = instrumentor.register("consumer");
233
234 producer.record_write("events", 100);
235 consumer.record_read("events", 95);
236
237 let snapshot = instrumentor.collect();
238 assert_eq!(snapshot.modules.len(), 2);
239
240 let consumer_metrics = snapshot.modules.get("consumer").unwrap();
242 let events_read = consumer_metrics.reads.get("events").unwrap();
243 assert_eq!(events_read.count, 95);
244 assert_eq!(events_read.backlog, Some(5)); }
246
247 #[test]
248 fn test_builder() {
249 let instrumentor = Instrumentor::builder()
250 .output(Output::file("test.json"))
251 .interval(Duration::from_millis(500))
252 .build();
253
254 assert_eq!(instrumentor.interval, Duration::from_millis(500));
255 assert_eq!(instrumentor.outputs.len(), 1);
256 }
257}