use bytes::Bytes;
use reqwest::{header::CONTENT_TYPE, Method};
use serde::{Deserialize, Serialize};
use super::{
http::decode_json,
request::{build_publish_headers, PublishRequest, PublishResponse, PublishedMessage},
Client,
};
use crate::{error::Result, Error};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Queue {
pub name: String,
pub created_at: u64,
pub updated_at: u64,
pub parallelism: u32,
pub lag: u64,
pub paused: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueueUpsertRequest {
pub queue_name: String,
pub parallelism: u32,
}
pub struct QueuesApi<'a> {
pub(crate) client: &'a Client,
}
impl QueuesApi<'_> {
pub async fn list(&self) -> Result<Vec<Queue>> {
self.client
.http
.send_json(Method::GET, "v2/queues", &[], None, None)
.await
}
pub async fn get(&self, queue_name: &str) -> Result<Queue> {
self.client
.http
.send_json(
Method::GET,
&format!("v2/queues/{queue_name}"),
&[],
None,
None,
)
.await
}
pub async fn upsert(&self, request: QueueUpsertRequest) -> Result<()> {
let body = Bytes::from(serde_json::to_vec(&request).map_err(Error::Serialize)?);
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
CONTENT_TYPE,
reqwest::header::HeaderValue::from_static("application/json"),
);
self.client
.http
.send_empty(Method::POST, "v2/queues", &[], Some(headers), Some(body))
.await
}
pub async fn delete(&self, queue_name: &str) -> Result<()> {
self.client
.http
.send_empty(
Method::DELETE,
&format!("v2/queues/{queue_name}"),
&[],
None,
None,
)
.await
}
pub async fn pause(&self, queue_name: &str) -> Result<()> {
self.client
.http
.send_empty(
Method::POST,
&format!("v2/queues/{queue_name}/pause"),
&[],
None,
None,
)
.await
}
pub async fn resume(&self, queue_name: &str) -> Result<()> {
self.client
.http
.send_empty(
Method::POST,
&format!("v2/queues/{queue_name}/resume"),
&[],
None,
None,
)
.await
}
pub async fn enqueue(
&self,
queue_name: &str,
request: PublishRequest,
) -> Result<PublishResponse> {
let headers = build_publish_headers(&request)?;
let response = self
.client
.http
.send_bytes(
Method::POST,
&format!(
"v2/enqueue/{queue_name}/{}",
request.destination.path_value()
),
&[],
Some(headers),
request.body,
)
.await?;
normalize_publish_response(response.status, response.body)
}
pub async fn enqueue_json<T>(
&self,
queue_name: &str,
destination: super::request::Destination,
body: &T,
) -> Result<PublishResponse>
where
T: Serialize,
{
let request = PublishRequest::builder(destination)
.json_body(body)?
.build();
self.enqueue(queue_name, request).await
}
}
fn normalize_publish_response(status: reqwest::StatusCode, body: Bytes) -> Result<PublishResponse> {
if let Ok(single) = decode_json::<PublishedMessage>(status, body.clone()) {
return Ok(vec![single]);
}
decode_json::<Vec<PublishedMessage>>(status, body)
}