mod dlq;
mod http;
mod logs;
mod messages;
mod queues;
mod request;
mod schedules;
mod signing_keys;
mod url_groups;
use bytes::Bytes;
use reqwest::{
header::{self, HeaderMap, HeaderValue},
Method,
};
use serde::Serialize;
use crate::{error::Result, Error};
pub use dlq::{DlqApi, DlqDeleteResult, DlqFilter, DlqMessage, DlqPage};
pub use logs::{LogEntry, LogFilter, LogState, LogsApi, LogsPage};
pub use messages::{CancelResult, Message, MessageFilter, MessagesApi};
pub use queues::{Queue, QueueUpsertRequest, QueuesApi};
pub use request::{
BatchRequest, Destination, PublishRequest, PublishRequestBuilder, PublishResponse,
PublishedMessage, Redaction, ScheduleRequest, ScheduleRequestBuilder,
};
pub use schedules::{Schedule, ScheduleCreateResponse, SchedulesApi};
pub use signing_keys::{SigningKeys, SigningKeysApi};
pub use url_groups::{Endpoint, UrlGroup, UrlGroupsApi};
pub(crate) use self::http::HttpClient;
use self::{http::decode_json, request::build_publish_headers};
const DEFAULT_BASE_URL: &str = "https://qstash.upstash.io";
#[derive(Debug, Clone)]
pub struct ClientBuilder {
token: String,
base_url: String,
default_headers: HeaderMap,
}
impl ClientBuilder {
pub fn base_url(mut self, base_url: impl Into<String>) -> Self {
self.base_url = base_url.into();
self
}
pub fn default_headers(mut self, headers: HeaderMap) -> Self {
self.default_headers = headers;
self
}
pub fn build(self) -> Result<Client> {
if self.token.trim().is_empty() {
return Err(Error::Config {
message: String::from("qstash token cannot be empty"),
});
}
reqwest::Url::parse(&self.base_url).map_err(|error| Error::Config {
message: format!("invalid base url: {error}"),
})?;
let mut headers = self.default_headers;
let mut authorization =
HeaderValue::from_str(&format!("Bearer {}", self.token)).map_err(|error| {
Error::Config {
message: format!("invalid authorization header: {error}"),
}
})?;
authorization.set_sensitive(true);
headers.insert(header::AUTHORIZATION, authorization);
let http = reqwest::Client::builder()
.default_headers(headers)
.build()
.map_err(Error::Transport)?;
Ok(Client {
http: HttpClient::new(http, self.base_url.trim_end_matches('/').to_owned()),
})
}
}
#[derive(Debug, Clone)]
pub struct Client {
pub(crate) http: HttpClient,
}
impl Client {
pub fn new(token: impl Into<String>) -> Result<Self> {
Self::builder(token).build()
}
pub fn builder(token: impl Into<String>) -> ClientBuilder {
ClientBuilder {
token: token.into(),
base_url: String::from(DEFAULT_BASE_URL),
default_headers: HeaderMap::new(),
}
}
pub fn messages(&self) -> MessagesApi<'_> {
MessagesApi { client: self }
}
pub fn logs(&self) -> LogsApi<'_> {
LogsApi { client: self }
}
pub fn dlq(&self) -> DlqApi<'_> {
DlqApi { client: self }
}
pub fn queues(&self) -> QueuesApi<'_> {
QueuesApi { client: self }
}
pub fn schedules(&self) -> SchedulesApi<'_> {
SchedulesApi { client: self }
}
pub fn url_groups(&self) -> UrlGroupsApi<'_> {
UrlGroupsApi { client: self }
}
pub fn signing_keys(&self) -> SigningKeysApi<'_> {
SigningKeysApi { client: self }
}
pub async fn publish(&self, request: PublishRequest) -> Result<PublishResponse> {
let headers = build_publish_headers(&request)?;
let response = self
.http
.send_bytes(
Method::POST,
&format!("v2/publish/{}", request.destination.path_value()),
&[],
Some(headers),
request.body,
)
.await?;
normalize_publish_response(response.status, response.body)
}
pub async fn publish_json<T>(
&self,
destination: Destination,
body: &T,
) -> Result<PublishResponse>
where
T: Serialize,
{
let request = PublishRequest::builder(destination)
.json_body(body)?
.build();
self.publish(request).await
}
pub async fn batch<I>(&self, requests: I) -> Result<Vec<PublishedMessage>>
where
I: IntoIterator<Item = BatchRequest>,
{
let payload = requests
.into_iter()
.map(batch_item)
.collect::<Result<Vec<_>>>()?;
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
let body = Bytes::from(serde_json::to_vec(&payload).map_err(Error::Serialize)?);
self.http
.send_json(Method::POST, "v2/batch", &[], Some(headers), Some(body))
.await
}
}
#[derive(Debug, Serialize)]
struct BatchPayloadItem {
destination: String,
#[serde(skip_serializing_if = "Option::is_none")]
body: Option<String>,
headers: std::collections::HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none", rename = "queue")]
queue_name: Option<String>,
}
fn batch_item(request: BatchRequest) -> Result<BatchPayloadItem> {
let headers = build_publish_headers(&request.publish)?;
let mut serialized_headers = std::collections::HashMap::new();
for (name, value) in &headers {
serialized_headers.insert(
name.as_str().to_owned(),
value
.to_str()
.map_err(|error| Error::InvalidRequest {
message: format!("non-utf8 header value for `{name}`: {error}"),
})?
.to_owned(),
);
}
let body = request
.publish
.body
.map(|body| {
String::from_utf8(body.to_vec()).map_err(|error| Error::InvalidRequest {
message: format!("batch bodies must be valid utf-8: {error}"),
})
})
.transpose()?;
Ok(BatchPayloadItem {
destination: request.publish.destination.path_value(),
body,
headers: serialized_headers,
queue_name: request.queue_name,
})
}
fn normalize_publish_response(status: reqwest::StatusCode, body: Bytes) -> Result<PublishResponse> {
if let Ok(single) = decode_json::<PublishedMessage>(status, body.clone()) {
return Ok(vec![single]);
}
decode_json::<Vec<PublishedMessage>>(status, body)
}