use crate::error::QrustyClientError;
use backoff::future::retry;
use backoff::ExponentialBackoff;
use log::info;
use reqwest::{Client, StatusCode};
use serde::Deserialize;
#[derive(Debug, Clone)]
pub struct QrustyClient {
base_url: String,
client: Client,
}
impl QrustyClient {
pub fn new(base_url: impl Into<String>) -> Self {
QrustyClient {
base_url: base_url.into(),
client: Client::new(),
}
}
pub async fn health(&self) -> Result<(), QrustyClientError> {
let url = format!("{}/health", self.base_url);
info!("Checking health at {}", url);
let op = || async {
let resp = self
.client
.get(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
Ok(())
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(1));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Health check failed: {}", e)))
}
pub async fn create_queue(&self, name: &str, ordering: &str) -> Result<(), QrustyClientError> {
let url = format!("{}/create-queue", self.base_url);
let body = serde_json::json!({
"name": name,
"config": { "ordering": ordering }
});
info!("Creating queue '{}' with ordering '{}'", name, ordering);
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
Ok(())
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Create queue failed: {}", e)))
}
pub async fn update_queue(
&self,
name: &str,
new_name: Option<&str>,
allow_duplicates: Option<bool>,
) -> Result<(), QrustyClientError> {
if new_name.is_none() && allow_duplicates.is_none() {
return Err(QrustyClientError::InvalidResponse(
"At least one of new_name or allow_duplicates must be specified".to_string(),
));
}
let url = format!("{}/update-queue", self.base_url);
let mut config = serde_json::Map::new();
if let Some(new_name) = new_name {
config.insert("name".to_string(), serde_json::json!(new_name));
}
if let Some(allow_duplicates) = allow_duplicates {
config.insert(
"allow_duplicates".to_string(),
serde_json::json!(allow_duplicates),
);
}
let body = serde_json::json!({
"name": name,
"config": config
});
info!(
"Updating queue '{}': new_name={:?}, allow_duplicates={:?}",
name, new_name, allow_duplicates
);
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
Ok(())
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Update queue failed: {}", e)))
}
pub async fn publish(
&self,
queue: &str,
priority: u64,
payload: &str,
max_retries: Option<u32>,
) -> Result<String, QrustyClientError> {
let url = format!("{}/publish", self.base_url);
let mut body = serde_json::json!({
"queue": queue,
"priority": priority,
"payload": payload
});
if let Some(retries) = max_retries {
body["max_retries"] = serde_json::json!(retries);
}
info!(
"Publishing message to queue '{}' with priority {}",
queue, priority
);
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v["id"].as_str().unwrap_or("").to_string())
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Publish failed: {}", e)))
}
pub async fn consume(
&self,
queue: &str,
consumer_id: &str,
timeout_seconds: Option<u64>,
) -> Result<Option<ConsumeResponse>, QrustyClientError> {
let url = format!("{}/consume/{}", self.base_url, queue);
let body = serde_json::json!({
"consumer_id": consumer_id,
"timeout_seconds": timeout_seconds.unwrap_or(30)
});
info!(
"Consuming message from queue '{}' as consumer '{}'",
queue, consumer_id
);
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: Option<ConsumeResponse> = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Consume failed: {}", e)))
}
pub async fn ack(
&self,
queue: &str,
id: &str,
consumer_id: &str,
) -> Result<(), QrustyClientError> {
let url = format!("{}/ack/{}/{}", self.base_url, queue, id);
let body = serde_json::json!({ "consumer_id": consumer_id });
info!(
"Acknowledging message '{}' in queue '{}' by consumer '{}'",
id, queue, consumer_id
);
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
match resp.status() {
StatusCode::OK => Ok(()),
StatusCode::NOT_FOUND => Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(
"Message not found or not locked by this consumer".to_string(),
),
)),
s if s.is_server_error() => Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(format!("Status: {}", s)),
)),
_ => Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
)),
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Ack failed: {}", e)))
}
pub async fn nack(
&self,
queue: &str,
id: &str,
consumer_id: &str,
) -> Result<(), QrustyClientError> {
let url = format!("{}/nack/{}/{}", self.base_url, queue, id);
let body = serde_json::json!({ "consumer_id": consumer_id });
info!(
"Nacking message '{}' in queue '{}' by consumer '{}'",
id, queue, consumer_id
);
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
match resp.status() {
StatusCode::OK => Ok(()),
StatusCode::NOT_FOUND => Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(
"Message not found or not locked by this consumer".to_string(),
),
)),
s if s.is_server_error() => Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(format!("Status: {}", s)),
)),
_ => Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
)),
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Nack failed: {}", e)))
}
pub async fn stats(&self) -> Result<serde_json::Value, QrustyClientError> {
let url = format!("{}/stats", self.base_url);
info!("Getting queue statistics from {}", url);
let op = || async {
let resp = self
.client
.get(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Stats failed: {}", e)))
}
pub async fn purge_queue(&self, queue: &str) -> Result<usize, QrustyClientError> {
let url = format!("{}/purge-queue/{}", self.base_url, queue);
info!("Purging queue '{}'", queue);
let op = || async {
let resp = self
.client
.post(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v["purged_messages"].as_u64().unwrap_or(0) as usize)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Purge queue failed: {}", e)))
}
pub async fn delete_queue(&self, queue: &str) -> Result<usize, QrustyClientError> {
let url = format!("{}/delete-queue/{}", self.base_url, queue);
info!("Deleting queue '{}'", queue);
let op = || async {
let resp = self
.client
.delete(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v["deleted_messages"].as_u64().unwrap_or(0) as usize)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(format!("Status: {}", resp.status())),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Delete queue failed: {}", e)))
}
}
#[derive(Debug, Deserialize)]
pub struct ConsumeResponse {
pub id: String,
pub payload: String,
pub retry_count: u32,
}
#[cfg(test)]
mod tests {
use super::*;
use httpmock::Method::GET;
use httpmock::MockServer;
use tokio;
#[tokio::test]
async fn test_health_ok() {
let server = MockServer::start();
let health_mock = server.mock(|when, then| {
when.method(GET).path("/health");
then.status(200);
});
let client = QrustyClient::new(server.url(""));
let result = client.health().await;
health_mock.assert();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_health_fail() {
let server = MockServer::start();
let health_mock = server.mock(|when, then| {
when.method(GET).path("/health");
then.status(500);
});
let client = QrustyClient::new(server.url(""));
let result = client.health().await;
health_mock.assert_hits(health_mock.hits()); assert!(result.is_err());
}
}