buswatch-adapters

Pre-built collectors for popular message bus systems.
Instead of instrumenting your code, use adapters to pull metrics directly from your message bus infrastructure.
Supported Systems
| Adapter |
Feature |
Metrics Collected |
| RabbitMQ |
rabbitmq |
Queue depths, consumer counts, message rates |
| Kafka |
kafka |
Consumer group lag, partition offsets |
| NATS |
nats |
JetStream consumer and stream metrics |
Quick Start
RabbitMQ
[dependencies]
buswatch-adapters = { version = "0.1", features = ["rabbitmq"] }
use buswatch_adapters::rabbitmq::RabbitMqAdapter;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let adapter = RabbitMqAdapter::builder()
.endpoint("http://localhost:15672")
.credentials("guest", "guest")
.build();
let snapshot = adapter.collect().await?;
for (queue, metrics) in &snapshot.modules {
println!("Queue: {} - {} messages", queue,
metrics.reads.get("messages")
.map(|r| r.backlog.unwrap_or(0))
.unwrap_or(0));
}
Ok(())
}
Kafka
[dependencies]
buswatch-adapters = { version = "0.1", features = ["kafka"] }
use buswatch_adapters::kafka::KafkaAdapter;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let adapter = KafkaAdapter::builder()
.brokers("localhost:9092")
.consumer_group("my-consumer-group")
.build();
let snapshot = adapter.collect().await?;
for (topic, metrics) in &snapshot.modules {
println!("Topic: {} - lag: {:?}", topic,
metrics.reads.get("partition-0")
.and_then(|r| r.backlog));
}
Ok(())
}
NATS JetStream
[dependencies]
buswatch-adapters = { version = "0.1", features = ["nats"] }
use buswatch_adapters::nats::NatsAdapter;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let adapter = NatsAdapter::builder()
.url("nats://localhost:4222")
.build()
.await?;
let snapshot = adapter.collect().await?;
for (consumer, metrics) in &snapshot.modules {
println!("Consumer: {}", consumer);
}
Ok(())
}
Continuous Collection
Adapters can be run in a loop to continuously collect metrics:
use buswatch_adapters::rabbitmq::RabbitMqAdapter;
use std::time::Duration;
use tokio::time;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let adapter = RabbitMqAdapter::builder()
.endpoint("http://localhost:15672")
.credentials("guest", "guest")
.build();
let mut interval = time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
match adapter.collect().await {
Ok(snapshot) => {
let json = serde_json::to_string_pretty(&snapshot)?;
tokio::fs::write("metrics.json", json).await?;
}
Err(e) => eprintln!("Collection failed: {}", e),
}
}
}
Feeding the TUI
Adapters produce Snapshot objects that can be:
- Written to a file for
buswatch -f metrics.json
- Streamed over TCP for
buswatch --connect host:port
- Sent to a channel for in-process TUI embedding
Example: TCP Server
use buswatch_adapters::rabbitmq::RabbitMqAdapter;
use tokio::net::TcpListener;
use tokio::io::AsyncWriteExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let adapter = RabbitMqAdapter::builder()
.endpoint("http://localhost:15672")
.credentials("guest", "guest")
.build();
let listener = TcpListener::bind("0.0.0.0:9090").await?;
println!("Listening on :9090");
loop {
let (mut socket, _) = listener.accept().await?;
let adapter = adapter.clone();
tokio::spawn(async move {
loop {
match adapter.collect().await {
Ok(snapshot) => {
let json = serde_json::to_string(&snapshot).unwrap();
if socket.write_all(json.as_bytes()).await.is_err() {
break;
}
if socket.write_all(b"\n").await.is_err() {
break;
}
}
Err(_) => break,
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
}
}
Then connect with: buswatch --connect localhost:9090
Features
| Feature |
Dependencies |
Description |
rabbitmq |
reqwest |
RabbitMQ Management API collector |
kafka |
rdkafka |
Kafka consumer lag collector |
nats |
async-nats |
NATS JetStream collector |
Enable multiple adapters:
[dependencies]
buswatch-adapters = { version = "0.1", features = ["rabbitmq", "kafka"] }
Error Handling
All adapters return Result<Snapshot, AdapterError>:
use buswatch_adapters::{rabbitmq::RabbitMqAdapter, AdapterError};
match adapter.collect().await {
Ok(snapshot) => { }
Err(AdapterError::Connection(msg)) => {
eprintln!("Connection failed: {}", msg);
}
Err(AdapterError::Parse(msg)) => {
eprintln!("Failed to parse response: {}", msg);
}
Err(e) => {
eprintln!("Error: {}", e);
}
}