use std::sync::Arc;
use std::time::Duration;
use crate::handle::ModuleHandle;
use crate::output::Output;
use crate::state::GlobalState;
#[derive(Debug)]
pub struct Instrumentor {
state: Arc<GlobalState>,
outputs: Arc<Vec<Output>>,
interval: Duration,
}
impl Instrumentor {
pub fn new() -> Self {
Self {
state: Arc::new(GlobalState::default()),
outputs: Arc::new(Vec::new()),
interval: Duration::from_secs(1),
}
}
pub fn builder() -> InstrumentorBuilder {
InstrumentorBuilder::new()
}
pub fn register(&self, name: &str) -> ModuleHandle {
let module_state = self.state.register_module(name);
ModuleHandle {
state: module_state,
global: self.state.clone(),
name: name.to_string(),
}
}
pub fn collect(&self) -> buswatch_types::Snapshot {
self.state.collect()
}
#[cfg(feature = "tokio")]
pub fn start(&self) -> EmissionHandle {
use tokio::sync::watch;
let (stop_tx, stop_rx) = watch::channel(false);
let state = self.state.clone();
let outputs = self.outputs.clone();
let interval = self.interval;
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
let mut stop_rx = stop_rx;
loop {
tokio::select! {
_ = interval_timer.tick() => {
let snapshot = state.collect();
for output in outputs.iter() {
let _ = output.emit(&snapshot).await;
}
}
_ = stop_rx.changed() => {
if *stop_rx.borrow() {
break;
}
}
}
}
});
EmissionHandle { stop_tx }
}
#[cfg(feature = "tokio")]
pub async fn emit_now(&self) {
let snapshot = self.state.collect();
for output in self.outputs.iter() {
let _ = output.emit(&snapshot).await;
}
}
}
impl Default for Instrumentor {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Default)]
pub struct InstrumentorBuilder {
outputs: Vec<Output>,
interval: Option<Duration>,
}
impl InstrumentorBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn output(mut self, output: Output) -> Self {
self.outputs.push(output);
self
}
pub fn interval(mut self, interval: Duration) -> Self {
self.interval = Some(interval);
self
}
pub fn build(self) -> Instrumentor {
Instrumentor {
state: Arc::new(GlobalState::default()),
outputs: Arc::new(self.outputs),
interval: self.interval.unwrap_or(Duration::from_secs(1)),
}
}
}
#[cfg(feature = "tokio")]
pub struct EmissionHandle {
stop_tx: tokio::sync::watch::Sender<bool>,
}
#[cfg(feature = "tokio")]
impl EmissionHandle {
pub fn stop(self) {
let _ = self.stop_tx.send(true);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_instrumentor_new() {
let instrumentor = Instrumentor::new();
let handle = instrumentor.register("test-module");
assert_eq!(handle.name(), "test-module");
}
#[test]
fn test_instrumentor_collect() {
let instrumentor = Instrumentor::new();
let handle = instrumentor.register("producer");
handle.record_write("events", 100);
handle.record_read("commands", 50);
let snapshot = instrumentor.collect();
assert_eq!(snapshot.modules.len(), 1);
let metrics = snapshot.modules.get("producer").unwrap();
assert_eq!(metrics.writes.get("events").unwrap().count, 100);
assert_eq!(metrics.reads.get("commands").unwrap().count, 50);
}
#[test]
fn test_multiple_modules() {
let instrumentor = Instrumentor::new();
let producer = instrumentor.register("producer");
let consumer = instrumentor.register("consumer");
producer.record_write("events", 100);
consumer.record_read("events", 95);
let snapshot = instrumentor.collect();
assert_eq!(snapshot.modules.len(), 2);
let consumer_metrics = snapshot.modules.get("consumer").unwrap();
let events_read = consumer_metrics.reads.get("events").unwrap();
assert_eq!(events_read.count, 95);
assert_eq!(events_read.backlog, Some(5)); }
#[test]
fn test_builder() {
let instrumentor = Instrumentor::builder()
.output(Output::file("test.json"))
.interval(Duration::from_millis(500))
.build();
assert_eq!(instrumentor.interval, Duration::from_millis(500));
assert_eq!(instrumentor.outputs.len(), 1);
}
}