use tracing::{debug, info};
use url::form_urlencoded;
use crate::config::{QueueSelection, QueueType, SourceConfig};
use crate::definitions::types::{ClusterOverview, QueueInfo, RabbitMqDefinitions};
use crate::error::{Error, Result};
pub struct ManagementClient {
client: reqwest::Client,
base_url: String,
username: String,
password: String,
}
impl ManagementClient {
pub fn from_config(source: &SourceConfig) -> Result<Self> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(|e| Error::ManagementApi(format!("Failed to create HTTP client: {}", e)))?;
Ok(Self {
client,
base_url: source.management_url.trim_end_matches('/').to_string(),
username: source.management_username.clone(),
password: source.management_password.clone(),
})
}
pub fn new(base_url: &str, username: &str, password: &str) -> Result<Self> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(|e| Error::ManagementApi(format!("Failed to create HTTP client: {}", e)))?;
Ok(Self {
client,
base_url: base_url.trim_end_matches('/').to_string(),
username: username.to_string(),
password: password.to_string(),
})
}
pub async fn get_overview(&self) -> Result<ClusterOverview> {
let url = format!("{}/api/overview", self.base_url);
debug!("Fetching cluster overview from {}", url);
let resp = self
.client
.get(&url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.map_err(|e| Error::ManagementApi(format!("Failed to fetch overview: {}", e)))?;
if !resp.status().is_success() {
return Err(Error::ManagementApi(format!(
"Overview request failed with status {}",
resp.status()
)));
}
resp.json()
.await
.map_err(|e| Error::ManagementApi(format!("Failed to parse overview: {}", e)))
}
pub async fn export_definitions(&self, vhost: Option<&str>) -> Result<RabbitMqDefinitions> {
let url = match vhost {
Some(vh) => format!("{}/api/definitions/{}", self.base_url, url_encode(vh)),
None => format!("{}/api/definitions", self.base_url),
};
info!("Exporting definitions from {}", url);
let resp = self
.client
.get(&url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.map_err(|e| Error::ManagementApi(format!("Failed to export definitions: {}", e)))?;
if !resp.status().is_success() {
return Err(Error::ManagementApi(format!(
"Definitions export failed with status {}",
resp.status()
)));
}
resp.json()
.await
.map_err(|e| Error::ManagementApi(format!("Failed to parse definitions: {}", e)))
}
pub async fn import_definitions(&self, definitions: &RabbitMqDefinitions) -> Result<()> {
let url = format!("{}/api/definitions", self.base_url);
info!("Importing definitions to {}", url);
let resp = self
.client
.post(&url)
.basic_auth(&self.username, Some(&self.password))
.json(definitions)
.send()
.await
.map_err(|e| Error::ManagementApi(format!("Failed to import definitions: {}", e)))?;
if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default();
return Err(Error::ManagementApi(format!(
"Definitions import failed: {}",
body
)));
}
info!("Definitions imported successfully");
Ok(())
}
pub async fn list_queues(&self, vhost: Option<&str>) -> Result<Vec<QueueInfo>> {
let url = match vhost {
Some(vh) => format!("{}/api/queues/{}", self.base_url, url_encode(vh)),
None => format!("{}/api/queues", self.base_url),
};
debug!("Listing queues from {}", url);
let resp = self
.client
.get(&url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.map_err(|e| Error::ManagementApi(format!("Failed to list queues: {}", e)))?;
if !resp.status().is_success() {
return Err(Error::ManagementApi(format!(
"Queue list failed with status {}",
resp.status()
)));
}
resp.json()
.await
.map_err(|e| Error::ManagementApi(format!("Failed to parse queue list: {}", e)))
}
pub async fn discover_queues(&self, selection: &QueueSelection) -> Result<Vec<QueueInfo>> {
let mut all_queues = if selection.vhosts.is_empty() {
self.list_queues(None).await?
} else {
let mut queues = Vec::new();
for vhost in &selection.vhosts {
queues.extend(self.list_queues(Some(vhost)).await?);
}
queues
};
let initial_count = all_queues.len();
if !selection.include.is_empty() {
all_queues.retain(|q| {
selection
.include
.iter()
.any(|pattern| glob_matches(pattern, &q.name))
});
}
if !selection.exclude.is_empty() {
all_queues.retain(|q| {
!selection
.exclude
.iter()
.any(|pattern| glob_matches(pattern, &q.name))
});
}
if !selection.types.is_empty() {
all_queues.retain(|q| {
selection.types.iter().any(|t| {
let type_str = match t {
QueueType::Classic => "classic",
QueueType::Quorum => "quorum",
QueueType::Stream => "stream",
};
q.queue_type == type_str
})
});
}
if selection.min_messages > 0 {
all_queues.retain(|q| q.messages >= selection.min_messages);
}
info!(
"Discovered {} queues (filtered from {} total)",
all_queues.len(),
initial_count
);
Ok(all_queues)
}
pub async fn get_queue(&self, vhost: &str, name: &str) -> Result<QueueInfo> {
let url = format!(
"{}/api/queues/{}/{}",
self.base_url,
url_encode(vhost),
url_encode(name)
);
let resp = self
.client
.get(&url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.map_err(|e| Error::ManagementApi(format!("Failed to get queue {}: {}", name, e)))?;
if !resp.status().is_success() {
return Err(Error::ManagementApi(format!(
"Queue {} not found (status {})",
name,
resp.status()
)));
}
resp.json()
.await
.map_err(|e| Error::ManagementApi(format!("Failed to parse queue info: {}", e)))
}
pub async fn queue_exists(&self, vhost: &str, name: &str) -> Result<bool> {
let url = format!(
"{}/api/queues/{}/{}",
self.base_url,
url_encode(vhost),
url_encode(name)
);
let resp = self
.client
.get(&url)
.basic_auth(&self.username, Some(&self.password))
.send()
.await
.map_err(|e| Error::ManagementApi(format!("Failed to get queue {}: {}", name, e)))?;
if resp.status().is_success() {
return Ok(true);
}
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(false);
}
Err(Error::ManagementApi(format!(
"Queue {} existence check failed with status {}",
name,
resp.status()
)))
}
pub async fn declare_queue(
&self,
vhost: &str,
name: &str,
queue_type: QueueType,
) -> Result<()> {
let url = format!(
"{}/api/queues/{}/{}",
self.base_url,
url_encode(vhost),
url_encode(name)
);
let arguments = match queue_type {
QueueType::Classic => serde_json::json!({}),
QueueType::Quorum => serde_json::json!({"x-queue-type": "quorum"}),
QueueType::Stream => serde_json::json!({"x-queue-type": "stream"}),
};
let resp = self
.client
.put(&url)
.basic_auth(&self.username, Some(&self.password))
.json(&serde_json::json!({
"durable": true,
"auto_delete": false,
"arguments": arguments,
}))
.send()
.await
.map_err(|e| {
Error::ManagementApi(format!("Failed to declare queue {}: {}", name, e))
})?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(Error::ManagementApi(format!(
"Queue {} declaration failed with status {}: {}",
name, status, body
)));
}
info!("Declared missing queue {}/{}", vhost, name);
Ok(())
}
}
fn url_encode(s: &str) -> String {
form_urlencoded::byte_serialize(s.as_bytes()).collect()
}
fn glob_matches(pattern: &str, text: &str) -> bool {
let pattern_chars: Vec<char> = pattern.chars().collect();
let text_chars: Vec<char> = text.chars().collect();
glob_match_recursive(&pattern_chars, &text_chars, 0, 0)
}
fn glob_match_recursive(pattern: &[char], text: &[char], pi: usize, ti: usize) -> bool {
if pi == pattern.len() && ti == text.len() {
return true;
}
if pi == pattern.len() {
return false;
}
match pattern[pi] {
'*' => {
for i in ti..=text.len() {
if glob_match_recursive(pattern, text, pi + 1, i) {
return true;
}
}
false
}
'?' => {
if ti < text.len() {
glob_match_recursive(pattern, text, pi + 1, ti + 1)
} else {
false
}
}
c => {
if ti < text.len() && text[ti] == c {
glob_match_recursive(pattern, text, pi + 1, ti + 1)
} else {
false
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_glob_matches_exact() {
assert!(glob_matches("hello", "hello"));
assert!(!glob_matches("hello", "world"));
}
#[test]
fn test_glob_matches_star() {
assert!(glob_matches("orders-*", "orders-queue"));
assert!(glob_matches("orders-*", "orders-"));
assert!(!glob_matches("orders-*", "payments-queue"));
assert!(glob_matches("*-dead-letter", "orders-dead-letter"));
assert!(glob_matches("*", "anything"));
}
#[test]
fn test_glob_matches_question() {
assert!(glob_matches("queue-?", "queue-1"));
assert!(!glob_matches("queue-?", "queue-12"));
}
#[test]
fn test_glob_matches_combined() {
assert!(glob_matches("*-retry-*", "orders-retry-3"));
assert!(glob_matches("*-retry-*", "payments-retry-queue"));
assert!(!glob_matches("*-retry-*", "orders-queue"));
}
}