use std::collections::BTreeMap;
use std::time::Duration;
use reqwest::Client;
use serde::Deserialize;
use buswatch_types::{ModuleMetrics, ReadMetrics, SchemaVersion, Snapshot, WriteMetrics};
use crate::AdapterError;
#[derive(Debug, Clone)]
pub struct RabbitMqAdapter {
client: Client,
endpoint: String,
username: String,
password: String,
vhost: String,
}
impl RabbitMqAdapter {
pub fn builder() -> RabbitMqAdapterBuilder {
RabbitMqAdapterBuilder::default()
}
pub async fn collect(&self) -> Result<Snapshot, AdapterError> {
let queues = self.fetch_queues().await?;
let mut modules = BTreeMap::new();
for queue in queues {
let module_metrics = self.queue_to_metrics(&queue);
modules.insert(queue.name, module_metrics);
}
Ok(Snapshot {
version: SchemaVersion::current(),
timestamp_ms: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
modules,
})
}
pub async fn collect_queue(&self, queue_name: &str) -> Result<ModuleMetrics, AdapterError> {
let queue = self.fetch_queue(queue_name).await?;
Ok(self.queue_to_metrics(&queue))
}
async fn fetch_queues(&self) -> Result<Vec<QueueInfo>, AdapterError> {
let url = format!("{}/api/queues/{}", self.endpoint, urlencoded(&self.vhost));
let response = self
.client
.get(&url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(AdapterError::Auth("Invalid credentials".to_string()));
}
if !response.status().is_success() {
return Err(AdapterError::Http(format!(
"API returned status {}",
response.status()
)));
}
let queues: Vec<QueueInfo> = response
.json()
.await
.map_err(|e| AdapterError::Parse(e.to_string()))?;
Ok(queues)
}
async fn fetch_queue(&self, queue_name: &str) -> Result<QueueInfo, AdapterError> {
let url = format!(
"{}/api/queues/{}/{}",
self.endpoint,
urlencoded(&self.vhost),
urlencoded(queue_name)
);
let response = self
.client
.get(&url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
return Err(AdapterError::Auth("Invalid credentials".to_string()));
}
if response.status() == reqwest::StatusCode::NOT_FOUND {
return Err(AdapterError::Http(format!(
"Queue '{}' not found",
queue_name
)));
}
if !response.status().is_success() {
return Err(AdapterError::Http(format!(
"API returned status {}",
response.status()
)));
}
let queue: QueueInfo = response
.json()
.await
.map_err(|e| AdapterError::Parse(e.to_string()))?;
Ok(queue)
}
fn queue_to_metrics(&self, queue: &QueueInfo) -> ModuleMetrics {
let mut reads = BTreeMap::new();
let mut writes = BTreeMap::new();
if queue.consumers > 0 {
let mut read_metrics = ReadMetrics::new(queue.messages_delivered.unwrap_or(0));
read_metrics.backlog = Some(queue.messages_ready);
if let Some(rate) = queue
.message_stats
.as_ref()
.and_then(|s| s.deliver_get_rate())
{
read_metrics.rate = Some(rate);
}
reads.insert("messages".to_string(), read_metrics);
} else {
let mut read_metrics = ReadMetrics::new(0);
read_metrics.backlog = Some(queue.messages_ready);
reads.insert("messages".to_string(), read_metrics);
}
let mut write_metrics = WriteMetrics::new(queue.messages_published.unwrap_or(0));
if let Some(rate) = queue.message_stats.as_ref().and_then(|s| s.publish_rate()) {
write_metrics.rate = Some(rate);
}
writes.insert("messages".to_string(), write_metrics);
ModuleMetrics { reads, writes }
}
}
#[derive(Debug, Default)]
pub struct RabbitMqAdapterBuilder {
endpoint: Option<String>,
username: Option<String>,
password: Option<String>,
vhost: Option<String>,
timeout: Option<Duration>,
}
impl RabbitMqAdapterBuilder {
pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = Some(endpoint.into());
self
}
pub fn credentials(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
self.username = Some(username.into());
self.password = Some(password.into());
self
}
pub fn vhost(mut self, vhost: impl Into<String>) -> Self {
self.vhost = Some(vhost.into());
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn build(self) -> RabbitMqAdapter {
let timeout = self.timeout.unwrap_or(Duration::from_secs(10));
let client = Client::builder()
.timeout(timeout)
.build()
.expect("Failed to build HTTP client");
RabbitMqAdapter {
client,
endpoint: self
.endpoint
.unwrap_or_else(|| "http://localhost:15672".to_string()),
username: self.username.unwrap_or_else(|| "guest".to_string()),
password: self.password.unwrap_or_else(|| "guest".to_string()),
vhost: self.vhost.unwrap_or_else(|| "/".to_string()),
}
}
}
fn urlencoded(s: &str) -> String {
s.replace('/', "%2F")
}
#[derive(Debug, Deserialize)]
struct QueueInfo {
name: String,
#[serde(default)]
messages_ready: u64,
#[serde(default)]
#[allow(dead_code)]
messages_unacknowledged: u64,
#[serde(default)]
consumers: u32,
#[serde(default)]
messages_delivered: Option<u64>,
#[serde(default)]
messages_published: Option<u64>,
message_stats: Option<MessageStats>,
}
#[derive(Debug, Deserialize)]
struct MessageStats {
#[serde(default, rename = "publish_details")]
publish_details: Option<RateDetails>,
#[serde(default, rename = "deliver_get_details")]
deliver_get_details: Option<RateDetails>,
}
impl MessageStats {
fn publish_rate(&self) -> Option<f64> {
self.publish_details.as_ref().map(|d| d.rate)
}
fn deliver_get_rate(&self) -> Option<f64> {
self.deliver_get_details.as_ref().map(|d| d.rate)
}
}
#[derive(Debug, Deserialize)]
struct RateDetails {
rate: f64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_builder_defaults() {
let adapter = RabbitMqAdapter::builder().build();
assert_eq!(adapter.endpoint, "http://localhost:15672");
assert_eq!(adapter.username, "guest");
assert_eq!(adapter.password, "guest");
assert_eq!(adapter.vhost, "/");
}
#[test]
fn test_builder_custom() {
let adapter = RabbitMqAdapter::builder()
.endpoint("http://rabbit.local:15672")
.credentials("admin", "secret")
.vhost("myapp")
.build();
assert_eq!(adapter.endpoint, "http://rabbit.local:15672");
assert_eq!(adapter.username, "admin");
assert_eq!(adapter.password, "secret");
assert_eq!(adapter.vhost, "myapp");
}
#[test]
fn test_urlencoded() {
assert_eq!(urlencoded("/"), "%2F");
assert_eq!(urlencoded("my/vhost"), "my%2Fvhost");
assert_eq!(urlencoded("simple"), "simple");
}
#[test]
fn test_queue_to_metrics() {
let adapter = RabbitMqAdapter::builder().build();
let queue = QueueInfo {
name: "test-queue".to_string(),
messages_ready: 100,
messages_unacknowledged: 5,
consumers: 2,
messages_delivered: Some(500),
messages_published: Some(600),
message_stats: Some(MessageStats {
publish_details: Some(RateDetails { rate: 10.5 }),
deliver_get_details: Some(RateDetails { rate: 9.2 }),
}),
};
let metrics = adapter.queue_to_metrics(&queue);
let read = metrics.reads.get("messages").unwrap();
assert_eq!(read.count, 500);
assert_eq!(read.backlog, Some(100));
assert_eq!(read.rate, Some(9.2));
let write = metrics.writes.get("messages").unwrap();
assert_eq!(write.count, 600);
assert_eq!(write.rate, Some(10.5));
}
}