use crate::error::QrustyClientError;
use crate::priority::Priority;
use backoff::future::retry;
use backoff::ExponentialBackoff;
use log::info;
use reqwest::{Client, Response, StatusCode};
use serde::Deserialize;
async fn error_with_body(resp: Response) -> String {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
format!("Status: {} Body: {}", status, body)
}
#[derive(Debug, Clone)]
pub struct QrustyClient {
base_url: String,
client: Client,
}
impl QrustyClient {
pub fn new(base_url: impl Into<String>) -> Self {
QrustyClient {
base_url: base_url.into(),
client: Client::new(),
}
}
pub async fn health(&self) -> Result<(), QrustyClientError> {
let url = format!("{}/health", self.base_url);
info!("Checking health at {}", url);
let op = || async {
let resp = self
.client
.get(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
Ok(())
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(1));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Health check failed: {}", e)))
}
pub async fn create_queue(
&self,
name: &str,
ordering: &str,
allow_duplicates: Option<bool>,
priority_kind: Option<&str>,
) -> Result<(), QrustyClientError> {
let url = format!("{}/create-queue", self.base_url);
let mut config = serde_json::json!({ "ordering": ordering });
if let Some(ad) = allow_duplicates {
config["allow_duplicates"] = serde_json::json!(ad);
}
if let Some(pk) = priority_kind {
config["priority_kind"] = serde_json::json!(pk);
}
let body = serde_json::json!({
"name": name,
"config": config
});
info!("Creating queue '{}' with ordering '{}'", name, ordering);
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
Ok(())
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Create queue failed: {}", e)))
}
pub async fn update_queue(
&self,
name: &str,
new_name: Option<&str>,
allow_duplicates: Option<bool>,
) -> Result<(), QrustyClientError> {
if new_name.is_none() && allow_duplicates.is_none() {
return Err(QrustyClientError::InvalidResponse(
"At least one of new_name or allow_duplicates must be specified".to_string(),
));
}
let url = format!("{}/update-queue", self.base_url);
let mut config = serde_json::Map::new();
if let Some(new_name) = new_name {
config.insert("name".to_string(), serde_json::json!(new_name));
}
if let Some(allow_duplicates) = allow_duplicates {
config.insert(
"allow_duplicates".to_string(),
serde_json::json!(allow_duplicates),
);
}
let body = serde_json::json!({
"name": name,
"config": config
});
info!(
"Updating queue '{}': new_name={:?}, allow_duplicates={:?}",
name, new_name, allow_duplicates
);
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
Ok(())
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Update queue failed: {}", e)))
}
pub async fn publish(
&self,
queue: &str,
priority: impl Into<Priority>,
payload: &str,
max_retries: Option<u32>,
) -> Result<String, QrustyClientError> {
let priority = priority.into();
let url = format!("{}/publish", self.base_url);
let mut body = serde_json::json!({
"queue": queue,
"priority": priority,
"payload": payload
});
if let Some(retries) = max_retries {
body["max_retries"] = serde_json::json!(retries);
}
info!(
"Publishing message to queue '{}' with priority {}",
queue, priority
);
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v["id"].as_str().unwrap_or("").to_string())
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Publish failed: {}", e)))
}
pub async fn consume(
&self,
queue: &str,
consumer_id: &str,
timeout_seconds: Option<u64>,
) -> Result<Option<ConsumeResponse>, QrustyClientError> {
let url = format!("{}/consume/{}", self.base_url, queue);
let body = serde_json::json!({
"consumer_id": consumer_id,
"timeout_seconds": timeout_seconds.unwrap_or(30)
});
info!(
"Consuming message from queue '{}' as consumer '{}'",
queue, consumer_id
);
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: Option<ConsumeResponse> = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Consume failed: {}", e)))
}
pub async fn ack(
&self,
queue: &str,
id: &str,
consumer_id: &str,
) -> Result<(), QrustyClientError> {
let url = format!("{}/ack/{}/{}", self.base_url, queue, id);
let body = serde_json::json!({ "consumer_id": consumer_id });
info!(
"Acknowledging message '{}' in queue '{}' by consumer '{}'",
id, queue, consumer_id
);
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
let status = resp.status();
match status {
StatusCode::OK => Ok(()),
StatusCode::NOT_FOUND => Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
)),
_ if status.is_server_error() => Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
)),
_ => Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
)),
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Ack failed: {}", e)))
}
pub async fn nack(
&self,
queue: &str,
id: &str,
consumer_id: &str,
) -> Result<(), QrustyClientError> {
let url = format!("{}/nack/{}/{}", self.base_url, queue, id);
let body = serde_json::json!({ "consumer_id": consumer_id });
info!(
"Nacking message '{}' in queue '{}' by consumer '{}'",
id, queue, consumer_id
);
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
let status = resp.status();
match status {
StatusCode::OK => Ok(()),
StatusCode::NOT_FOUND => Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
)),
_ if status.is_server_error() => Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
)),
_ => Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
)),
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Nack failed: {}", e)))
}
pub async fn stats(&self) -> Result<serde_json::Value, QrustyClientError> {
let url = format!("{}/stats", self.base_url);
info!("Getting queue statistics from {}", url);
let op = || async {
let resp = self
.client
.get(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Stats failed: {}", e)))
}
pub async fn purge_queue(&self, queue: &str) -> Result<usize, QrustyClientError> {
let url = format!("{}/purge-queue/{}", self.base_url, queue);
info!("Purging queue '{}'", queue);
let op = || async {
let resp = self
.client
.post(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v["purged_messages"].as_u64().unwrap_or(0) as usize)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Purge queue failed: {}", e)))
}
pub async fn delete_queue(&self, queue: &str) -> Result<usize, QrustyClientError> {
let url = format!("{}/delete-queue/{}", self.base_url, queue);
info!("Deleting queue '{}'", queue);
let op = || async {
let resp = self
.client
.delete(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v["deleted_messages"].as_u64().unwrap_or(0) as usize)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Delete queue failed: {}", e)))
}
pub async fn queue_stats(&self, queue: &str) -> Result<serde_json::Value, QrustyClientError> {
let url = format!("{}/queue-stats/{}", self.base_url, queue);
let op = || async {
let resp = self
.client
.get(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Queue stats failed: {}", e)))
}
pub async fn queue_metrics(&self, queue: &str) -> Result<serde_json::Value, QrustyClientError> {
let url = format!("{}/queues/{}/metrics", self.base_url, queue);
let op = || async {
let resp = self
.client
.get(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Queue metrics failed: {}", e)))
}
pub async fn list_queues(&self) -> Result<Vec<String>, QrustyClientError> {
let url = format!("{}/queues", self.base_url);
let op = || async {
let resp = self
.client
.get(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: Vec<String> = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("List queues failed: {}", e)))
}
pub async fn purge_all(&self) -> Result<serde_json::Value, QrustyClientError> {
let url = format!("{}/purge-all", self.base_url);
let op = || async {
let resp = self
.client
.post(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Purge all failed: {}", e)))
}
pub async fn delete_all(&self) -> Result<serde_json::Value, QrustyClientError> {
let url = format!("{}/delete-all", self.base_url);
let op = || async {
let resp = self
.client
.post(&url)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Delete all failed: {}", e)))
}
pub async fn ack_batch(
&self,
queue: &str,
consumer_id: &str,
message_ids: &[&str],
) -> Result<serde_json::Value, QrustyClientError> {
let url = format!("{}/ack-batch/{}", self.base_url, queue);
let body = serde_json::json!({
"consumer_id": consumer_id,
"message_ids": message_ids
});
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Ack batch failed: {}", e)))
}
pub async fn nack_batch(
&self,
queue: &str,
consumer_id: &str,
message_ids: &[&str],
) -> Result<serde_json::Value, QrustyClientError> {
let url = format!("{}/nack-batch/{}", self.base_url, queue);
let body = serde_json::json!({
"consumer_id": consumer_id,
"message_ids": message_ids
});
let op = || async {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
if resp.status() == StatusCode::OK {
let v: serde_json::Value = resp
.json()
.await
.map_err(|e| backoff::Error::transient(QrustyClientError::Http(e)))?;
Ok(v)
} else if resp.status().is_server_error() {
Err(backoff::Error::transient(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
} else {
Err(backoff::Error::permanent(
QrustyClientError::InvalidResponse(error_with_body(resp).await),
))
}
};
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = Some(std::time::Duration::from_secs(2));
retry(backoff, op)
.await
.map_err(|e| QrustyClientError::RetryFailed(format!("Nack batch failed: {}", e)))
}
}
#[derive(Debug, Deserialize)]
pub struct ConsumeResponse {
pub id: String,
pub payload: String,
pub retry_count: u32,
}
#[cfg(test)]
mod tests {
use super::*;
use httpmock::Method::GET;
use httpmock::MockServer;
use tokio;
#[tokio::test]
async fn test_health_ok() {
let server = MockServer::start();
let health_mock = server.mock(|when, then| {
when.method(GET).path("/health");
then.status(200);
});
let client = QrustyClient::new(server.url(""));
let result = client.health().await;
health_mock.assert();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_health_fail() {
let server = MockServer::start();
let health_mock = server.mock(|when, then| {
when.method(GET).path("/health");
then.status(500);
});
let client = QrustyClient::new(server.url(""));
let result = client.health().await;
health_mock.assert_hits(health_mock.hits()); assert!(result.is_err());
}
#[tokio::test]
async fn test_create_queue_with_all_config() {
use httpmock::Method::POST;
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(POST).path("/create-queue");
then.status(200);
});
let client = QrustyClient::new(server.url(""));
let result = client
.create_queue("orders", "MinFirst", Some(false), Some("Text"))
.await;
mock.assert();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_queue_stats() {
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(GET).path("/queue-stats/orders");
then.status(200)
.header("content-type", "application/json")
.body(r#"{"available":5,"locked":2,"total":7}"#);
});
let client = QrustyClient::new(server.url(""));
let result = client.queue_stats("orders").await;
mock.assert();
assert!(result.is_ok());
assert_eq!(result.unwrap()["total"], 7);
}
#[tokio::test]
async fn test_queue_metrics() {
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(GET).path("/queues/orders/metrics");
then.status(200)
.header("content-type", "application/json")
.body(r#"{"metrics":[]}"#);
});
let client = QrustyClient::new(server.url(""));
let result = client.queue_metrics("orders").await;
mock.assert();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_list_queues() {
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(GET).path("/queues");
then.status(200)
.header("content-type", "application/json")
.body(r#"["orders","events"]"#);
});
let client = QrustyClient::new(server.url(""));
let result = client.list_queues().await;
mock.assert();
assert!(result.is_ok());
assert_eq!(result.unwrap(), vec!["orders", "events"]);
}
#[tokio::test]
async fn test_purge_all() {
use httpmock::Method::POST;
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(POST).path("/purge-all");
then.status(200)
.header("content-type", "application/json")
.body(r#"{"purged":10}"#);
});
let client = QrustyClient::new(server.url(""));
let result = client.purge_all().await;
mock.assert();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_delete_all() {
use httpmock::Method::POST;
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(POST).path("/delete-all");
then.status(200)
.header("content-type", "application/json")
.body(r#"{"deleted":3}"#);
});
let client = QrustyClient::new(server.url(""));
let result = client.delete_all().await;
mock.assert();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_ack_batch() {
use httpmock::Method::POST;
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(POST).path("/ack-batch/orders");
then.status(200)
.header("content-type", "application/json")
.body(r#"{"acked":["id1"],"not_found":["id2"]}"#);
});
let client = QrustyClient::new(server.url(""));
let result = client
.ack_batch("orders", "worker-1", &["id1", "id2"])
.await;
mock.assert();
assert!(result.is_ok());
let v = result.unwrap();
assert_eq!(v["acked"][0], "id1");
}
#[tokio::test]
async fn test_nack_batch() {
use httpmock::Method::POST;
let server = MockServer::start();
let mock = server.mock(|when, then| {
when.method(POST).path("/nack-batch/orders");
then.status(200)
.header("content-type", "application/json")
.body(r#"{"unlocked":["id1"],"dead_lettered":[],"dropped":[],"not_found":[]}"#);
});
let client = QrustyClient::new(server.url(""));
let result = client.nack_batch("orders", "worker-1", &["id1"]).await;
mock.assert();
assert!(result.is_ok());
let v = result.unwrap();
assert_eq!(v["unlocked"][0], "id1");
}
}