use std::fmt;
use bytes::Bytes;
use reqwest::{
header::{HeaderMap, HeaderName, HeaderValue, CONTENT_TYPE},
Method, Url,
};
use serde::Serialize;
use crate::error::{Error, Result};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Destination {
Url(Url),
UrlGroup(String),
}
impl Destination {
pub fn url(url: impl AsRef<str>) -> Result<Self> {
Url::parse(url.as_ref())
.map(Self::Url)
.map_err(|error| Error::Config {
message: format!("invalid destination url: {error}"),
})
}
pub fn url_group(name: impl Into<String>) -> Self {
Self::UrlGroup(name.into())
}
pub(crate) fn path_value(&self) -> String {
match self {
Self::Url(url) => url.as_str().to_owned(),
Self::UrlGroup(name) => name.clone(),
}
}
}
impl fmt::Display for Destination {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Url(url) => formatter.write_str(url.as_str()),
Self::UrlGroup(name) => formatter.write_str(name),
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Redaction {
body: bool,
all_headers: bool,
header_names: Vec<String>,
}
impl Redaction {
pub fn new() -> Self {
Self::default()
}
pub fn body(mut self) -> Self {
self.body = true;
self
}
pub fn all_headers(mut self) -> Self {
self.all_headers = true;
self
}
pub fn header(mut self, name: impl Into<String>) -> Self {
self.header_names.push(name.into());
self
}
pub(crate) fn header_value(&self) -> Option<String> {
let mut parts = Vec::new();
if self.body {
parts.push(String::from("body"));
}
if self.all_headers {
parts.push(String::from("header"));
}
parts.extend(
self.header_names
.iter()
.map(|name| format!("header[{name}]")),
);
if parts.is_empty() {
None
} else {
Some(parts.join(","))
}
}
}
#[derive(Debug, Clone, serde::Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct PublishedMessage {
pub message_id: Option<String>,
pub url: Option<String>,
pub error: Option<String>,
pub deduplicated: Option<bool>,
}
pub type PublishResponse = Vec<PublishedMessage>;
#[derive(Debug, Clone)]
pub struct PublishRequest {
pub(crate) destination: Destination,
pub(crate) body: Option<Bytes>,
pub(crate) headers: HeaderMap,
pub(crate) method: Method,
pub(crate) delay: Option<u32>,
pub(crate) not_before: Option<u64>,
pub(crate) deduplication_id: Option<String>,
pub(crate) content_based_deduplication: bool,
pub(crate) retries: Option<u32>,
pub(crate) retry_delay: Option<String>,
pub(crate) callback: Option<String>,
pub(crate) failure_callback: Option<String>,
pub(crate) timeout: Option<u32>,
pub(crate) label: Option<String>,
pub(crate) redaction: Option<Redaction>,
}
impl PublishRequest {
pub fn builder(destination: Destination) -> PublishRequestBuilder {
PublishRequestBuilder::new(destination)
}
pub fn destination(&self) -> &Destination {
&self.destination
}
}
#[derive(Debug, Clone)]
pub struct PublishRequestBuilder {
request: PublishRequest,
}
impl PublishRequestBuilder {
fn new(destination: Destination) -> Self {
Self {
request: PublishRequest {
destination,
body: None,
headers: HeaderMap::new(),
method: Method::POST,
delay: None,
not_before: None,
deduplication_id: None,
content_based_deduplication: false,
retries: None,
retry_delay: None,
callback: None,
failure_callback: None,
timeout: None,
label: None,
redaction: None,
},
}
}
pub fn body(mut self, body: impl Into<Bytes>) -> Self {
self.request.body = Some(body.into());
self
}
pub fn json_body<T>(mut self, body: &T) -> Result<Self>
where
T: Serialize,
{
self.request.body = Some(Bytes::from(
serde_json::to_vec(body).map_err(Error::Serialize)?,
));
self.request
.headers
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
Ok(self)
}
pub fn header(mut self, name: &str, value: &str) -> Result<Self> {
let header_name =
HeaderName::from_bytes(name.as_bytes()).map_err(|error| Error::InvalidRequest {
message: format!("invalid header name `{name}`: {error}"),
})?;
let header_value = HeaderValue::from_str(value).map_err(|error| Error::InvalidRequest {
message: format!("invalid header value for `{name}`: {error}"),
})?;
self.request.headers.insert(header_name, header_value);
Ok(self)
}
pub fn headers(mut self, headers: HeaderMap) -> Self {
self.request.headers = headers;
self
}
pub fn method(mut self, method: Method) -> Self {
self.request.method = method;
self
}
pub fn delay_seconds(mut self, seconds: u32) -> Self {
self.request.delay = Some(seconds);
self
}
pub fn not_before(mut self, timestamp: u64) -> Self {
self.request.not_before = Some(timestamp);
self
}
pub fn deduplication_id(mut self, id: impl Into<String>) -> Self {
self.request.deduplication_id = Some(id.into());
self
}
pub fn content_based_deduplication(mut self, enabled: bool) -> Self {
self.request.content_based_deduplication = enabled;
self
}
pub fn retries(mut self, retries: u32) -> Self {
self.request.retries = Some(retries);
self
}
pub fn retry_delay(mut self, expression: impl Into<String>) -> Self {
self.request.retry_delay = Some(expression.into());
self
}
pub fn callback(mut self, url: impl Into<String>) -> Self {
self.request.callback = Some(url.into());
self
}
pub fn failure_callback(mut self, url: impl Into<String>) -> Self {
self.request.failure_callback = Some(url.into());
self
}
pub fn timeout_seconds(mut self, seconds: u32) -> Self {
self.request.timeout = Some(seconds);
self
}
pub fn label(mut self, label: impl Into<String>) -> Self {
self.request.label = Some(label.into());
self
}
pub fn redaction(mut self, redaction: Redaction) -> Self {
self.request.redaction = Some(redaction);
self
}
pub fn build(self) -> PublishRequest {
self.request
}
}
#[derive(Debug, Clone)]
pub struct BatchRequest {
pub(crate) publish: PublishRequest,
pub(crate) queue_name: Option<String>,
}
impl BatchRequest {
pub fn new(publish: PublishRequest) -> Self {
Self {
publish,
queue_name: None,
}
}
pub fn queue_name(mut self, queue_name: impl Into<String>) -> Self {
self.queue_name = Some(queue_name.into());
self
}
}
#[derive(Debug, Clone)]
pub struct ScheduleRequest {
pub(crate) destination: Destination,
pub(crate) cron: String,
pub(crate) schedule_id: Option<String>,
pub(crate) body: Option<Bytes>,
pub(crate) headers: HeaderMap,
pub(crate) method: Method,
pub(crate) delay: Option<u32>,
pub(crate) retries: Option<u32>,
pub(crate) retry_delay: Option<String>,
pub(crate) callback: Option<String>,
pub(crate) failure_callback: Option<String>,
pub(crate) timeout: Option<u32>,
pub(crate) queue_name: Option<String>,
pub(crate) label: Option<String>,
pub(crate) redaction: Option<Redaction>,
}
impl ScheduleRequest {
pub fn builder(destination: Destination, cron: impl Into<String>) -> ScheduleRequestBuilder {
ScheduleRequestBuilder::new(destination, cron.into())
}
}
#[derive(Debug, Clone)]
pub struct ScheduleRequestBuilder {
request: ScheduleRequest,
}
impl ScheduleRequestBuilder {
fn new(destination: Destination, cron: String) -> Self {
Self {
request: ScheduleRequest {
destination,
cron,
schedule_id: None,
body: None,
headers: HeaderMap::new(),
method: Method::POST,
delay: None,
retries: None,
retry_delay: None,
callback: None,
failure_callback: None,
timeout: None,
queue_name: None,
label: None,
redaction: None,
},
}
}
pub fn schedule_id(mut self, schedule_id: impl Into<String>) -> Self {
self.request.schedule_id = Some(schedule_id.into());
self
}
pub fn body(mut self, body: impl Into<Bytes>) -> Self {
self.request.body = Some(body.into());
self
}
pub fn json_body<T>(mut self, body: &T) -> Result<Self>
where
T: Serialize,
{
self.request.body = Some(Bytes::from(
serde_json::to_vec(body).map_err(Error::Serialize)?,
));
self.request
.headers
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
Ok(self)
}
pub fn header(mut self, name: &str, value: &str) -> Result<Self> {
let header_name =
HeaderName::from_bytes(name.as_bytes()).map_err(|error| Error::InvalidRequest {
message: format!("invalid header name `{name}`: {error}"),
})?;
let header_value = HeaderValue::from_str(value).map_err(|error| Error::InvalidRequest {
message: format!("invalid header value for `{name}`: {error}"),
})?;
self.request.headers.insert(header_name, header_value);
Ok(self)
}
pub fn headers(mut self, headers: HeaderMap) -> Self {
self.request.headers = headers;
self
}
pub fn method(mut self, method: Method) -> Self {
self.request.method = method;
self
}
pub fn delay_seconds(mut self, seconds: u32) -> Self {
self.request.delay = Some(seconds);
self
}
pub fn retries(mut self, retries: u32) -> Self {
self.request.retries = Some(retries);
self
}
pub fn retry_delay(mut self, expression: impl Into<String>) -> Self {
self.request.retry_delay = Some(expression.into());
self
}
pub fn callback(mut self, url: impl Into<String>) -> Self {
self.request.callback = Some(url.into());
self
}
pub fn failure_callback(mut self, url: impl Into<String>) -> Self {
self.request.failure_callback = Some(url.into());
self
}
pub fn timeout_seconds(mut self, seconds: u32) -> Self {
self.request.timeout = Some(seconds);
self
}
pub fn queue_name(mut self, queue_name: impl Into<String>) -> Self {
self.request.queue_name = Some(queue_name.into());
self
}
pub fn label(mut self, label: impl Into<String>) -> Self {
self.request.label = Some(label.into());
self
}
pub fn redaction(mut self, redaction: Redaction) -> Self {
self.request.redaction = Some(redaction);
self
}
pub fn build(self) -> ScheduleRequest {
self.request
}
}
pub(crate) fn build_publish_headers(request: &PublishRequest) -> Result<HeaderMap> {
let mut headers = prefix_forward_headers(&request.headers)?;
headers.insert("Upstash-Method", header_value(request.method.as_str())?);
if let Some(delay) = request.delay {
headers.insert("Upstash-Delay", header_value(&format!("{delay}s"))?);
}
if let Some(not_before) = request.not_before {
headers.insert("Upstash-Not-Before", header_value(¬_before.to_string())?);
}
if let Some(deduplication_id) = &request.deduplication_id {
headers.insert("Upstash-Deduplication-Id", header_value(deduplication_id)?);
}
if request.content_based_deduplication {
headers.insert(
"Upstash-Content-Based-Deduplication",
HeaderValue::from_static("true"),
);
}
if let Some(retries) = request.retries {
headers.insert("Upstash-Retries", header_value(&retries.to_string())?);
}
if let Some(retry_delay) = &request.retry_delay {
headers.insert("Upstash-Retry-Delay", header_value(retry_delay)?);
}
if let Some(callback) = &request.callback {
headers.insert("Upstash-Callback", header_value(callback)?);
}
if let Some(failure_callback) = &request.failure_callback {
headers.insert("Upstash-Failure-Callback", header_value(failure_callback)?);
}
if let Some(timeout) = request.timeout {
headers.insert("Upstash-Timeout", header_value(&format!("{timeout}s"))?);
}
if let Some(label) = &request.label {
headers.insert("Upstash-Label", header_value(label)?);
}
if let Some(redaction) = &request.redaction {
if let Some(value) = redaction.header_value() {
headers.insert("Upstash-Redact-Fields", header_value(&value)?);
}
}
Ok(headers)
}
pub(crate) fn build_schedule_headers(request: &ScheduleRequest) -> Result<HeaderMap> {
let mut headers = prefix_forward_headers(&request.headers)?;
headers.insert("Upstash-Cron", header_value(&request.cron)?);
headers.insert("Upstash-Method", header_value(request.method.as_str())?);
if let Some(schedule_id) = &request.schedule_id {
headers.insert("Upstash-Schedule-Id", header_value(schedule_id)?);
}
if let Some(delay) = request.delay {
headers.insert("Upstash-Delay", header_value(&format!("{delay}s"))?);
}
if let Some(retries) = request.retries {
headers.insert("Upstash-Retries", header_value(&retries.to_string())?);
}
if let Some(retry_delay) = &request.retry_delay {
headers.insert("Upstash-Retry-Delay", header_value(retry_delay)?);
}
if let Some(callback) = &request.callback {
headers.insert("Upstash-Callback", header_value(callback)?);
}
if let Some(failure_callback) = &request.failure_callback {
headers.insert("Upstash-Failure-Callback", header_value(failure_callback)?);
}
if let Some(timeout) = request.timeout {
headers.insert("Upstash-Timeout", header_value(&format!("{timeout}s"))?);
}
if let Some(queue_name) = &request.queue_name {
headers.insert("Upstash-Queue-Name", header_value(queue_name)?);
}
if let Some(label) = &request.label {
headers.insert("Upstash-Label", header_value(label)?);
}
if let Some(redaction) = &request.redaction {
if let Some(value) = redaction.header_value() {
headers.insert("Upstash-Redact-Fields", header_value(&value)?);
}
}
Ok(headers)
}
fn prefix_forward_headers(headers: &HeaderMap) -> Result<HeaderMap> {
let mut prefixed = HeaderMap::new();
for (name, value) in headers {
if should_forward_header(name.as_str()) {
let forwarded_name =
HeaderName::from_bytes(format!("Upstash-Forward-{}", name.as_str()).as_bytes())
.map_err(|error| Error::InvalidRequest {
message: format!("invalid forwarded header name `{name}`: {error}"),
})?;
prefixed.insert(forwarded_name, value.clone());
} else {
prefixed.insert(name.clone(), value.clone());
}
}
Ok(prefixed)
}
fn should_forward_header(name: &str) -> bool {
let lower = name.to_ascii_lowercase();
!lower.starts_with("content-type") && !lower.starts_with("upstash-")
}
fn header_value(value: &str) -> Result<HeaderValue> {
HeaderValue::from_str(value).map_err(|error| Error::InvalidRequest {
message: format!("invalid header value `{value}`: {error}"),
})
}