mod error;
mod request;
use error::*;
pub use request::*;
use reqwest::{
header::{self, HeaderMap},
Method, Url,
};
use serde::{Deserialize, Serialize};
pub enum Version {
V1,
V2,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct QstashResponse {
pub message_id: Option<String>,
pub url: Option<String>,
pub error: Option<String>,
pub deduplicated: Option<bool>,
}
pub struct Client {
pub http: reqwest::Client,
base_url: Url,
version: String,
}
impl Client {
pub fn new(
token: &str,
base_url: Option<&str>,
version: Option<Version>,
) -> Result<Client, QStashError> {
let mut value = match header::HeaderValue::from_str(&format!("Bearer {token}")) {
Ok(v) => v,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::TokenError);
}
};
value.set_sensitive(true);
let mut headers = header::HeaderMap::new();
headers.insert(header::AUTHORIZATION, value);
let http = match reqwest::Client::builder().default_headers(headers).build() {
Ok(c) => c,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::ReqwestError);
}
};
let version = match version.unwrap_or(Version::V2) {
Version::V1 => String::from("v1"),
Version::V2 => String::from("v2"),
};
let url = match Url::parse(base_url.unwrap_or("https://qstash.upstash.io")) {
Ok(u) => u,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::InvalidUrl);
}
};
Ok(Self {
http,
base_url: url,
version,
})
}
fn generate_headers(request: PublishOptions) -> Result<HeaderMap, QStashError> {
let mut headers = request.headers.unwrap_or(header::HeaderMap::new());
let method = match header::HeaderValue::from_str(
request.method.unwrap_or(reqwest::Method::POST).as_str(),
) {
Ok(v) => v,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
};
headers.insert("Upstash-Method", method);
if let Some(delay) = request.delay {
let delay = match header::HeaderValue::from_str(&format!("{}s", delay)) {
Ok(v) => v,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
};
headers.insert("Upstash-Delay", delay);
}
if let Some(not_before) = request.not_before {
let not_before = match header::HeaderValue::from_str(&format!("{}", not_before)) {
Ok(v) => v,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
};
headers.insert("Upstash-Not-Before", not_before);
}
if let Some(deduplication_id) = request.deduplication_id {
let deduplication_id = match header::HeaderValue::from_str(&deduplication_id) {
Ok(v) => v,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
};
headers.insert("Upstash-Deduplication-Id", deduplication_id);
}
if let Some(content_based_deduplication) = request.content_based_deduplication {
let content_based_deduplication =
match header::HeaderValue::from_str(match content_based_deduplication {
true => "true",
false => "false",
}) {
Ok(v) => v,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
};
headers.insert(
"Upstash-Content-Based-Deduplication",
content_based_deduplication,
);
}
if let Some(retries) = request.retries {
let retries = match header::HeaderValue::from_str(&format!("{}", retries)) {
Ok(v) => v,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
};
headers.insert("Upstash-Retries", retries);
}
if let Some(callback) = request.callback {
let callback = match header::HeaderValue::from_str(&callback) {
Ok(v) => v,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
};
headers.insert("Upstash-Callback", callback);
}
Ok(headers)
}
pub async fn publish<T: Into<reqwest::Body>>(
&self,
request: PublishRequest<T>,
) -> Result<Vec<QstashResponse>, QStashError> {
let request_url = match &request.url {
PublishRequestUrl::Url(v) => v.to_string(),
PublishRequestUrl::Topic(v) => v.clone(),
};
let path = match self
.base_url
.join(&format!("/{}/publish/{}", self.version, request_url))
{
Ok(p) => p,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
};
let headers = match Client::generate_headers(PublishOptions {
headers: request.headers,
delay: request.delay,
not_before: request.not_before,
deduplication_id: request.deduplication_id,
content_based_deduplication: request.content_based_deduplication,
retries: request.retries,
callback: request.callback,
method: request.method,
}) {
Ok(h) => h,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
};
let request_builder = self.http.request(Method::POST, path).headers(headers);
let response = match request.body {
Some(b) => match request_builder.body(b).send().await {
Ok(r) => {
tracing::debug!("{:?}", r);
r
}
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
},
None => match request_builder.send().await {
Ok(r) => {
tracing::debug!("{:?}", r);
r
}
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
},
};
let response: Vec<QstashResponse> = match request.url {
PublishRequestUrl::Url(_) => match response.json().await {
Ok(r) => vec![r],
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
},
PublishRequestUrl::Topic(_) => match response.json().await {
Ok(r) => r,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
},
};
Ok(response)
}
pub async fn publish_json<T: Serialize>(
&self,
url: PublishRequestUrl,
body: T,
options: Option<PublishOptions>,
) -> Result<Vec<QstashResponse>, QStashError> {
let request_url = match &url {
PublishRequestUrl::Url(v) => v.to_string(),
PublishRequestUrl::Topic(v) => v.clone(),
};
let path = match self
.base_url
.join(&format!("/{}/publish/{}", self.version, request_url))
{
Ok(p) => p,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
};
let headers = match options {
Some(options) => match Client::generate_headers(options) {
Ok(h) => h,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
},
None => header::HeaderMap::new(),
};
let response = match self
.http
.request(Method::POST, path)
.headers(headers)
.json(&body)
.send()
.await
{
Ok(r) => {
tracing::debug!("{:?}", r);
r
}
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
};
let response: Vec<QstashResponse> = match url {
PublishRequestUrl::Url(_) => match response.json().await {
Ok(r) => vec![r],
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
},
PublishRequestUrl::Topic(_) => match response.json().await {
Ok(r) => r,
Err(e) => {
let formated_string = e.to_string();
tracing::error!(formated_string);
return Err(QStashError::PublishError);
}
},
};
Ok(response)
}
}