qstash-rs 0.6.0

A Rust SDK for Upstash QStash
Documentation
//! Async QStash client.

mod dlq;
mod http;
mod logs;
mod messages;
mod queues;
mod request;
mod schedules;
mod signing_keys;
mod url_groups;

use bytes::Bytes;
use reqwest::{
    header::{self, HeaderMap, HeaderValue},
    Method,
};
use serde::Serialize;

use crate::{error::Result, Error};

pub use dlq::{DlqApi, DlqDeleteResult, DlqFilter, DlqMessage, DlqPage};
pub use logs::{LogEntry, LogFilter, LogState, LogsApi, LogsPage};
pub use messages::{CancelResult, Message, MessageFilter, MessagesApi};
pub use queues::{Queue, QueueUpsertRequest, QueuesApi};
pub use request::{
    BatchRequest, Destination, PublishRequest, PublishRequestBuilder, PublishResponse,
    PublishedMessage, Redaction, ScheduleRequest, ScheduleRequestBuilder,
};
pub use schedules::{Schedule, ScheduleCreateResponse, SchedulesApi};
pub use signing_keys::{SigningKeys, SigningKeysApi};
pub use url_groups::{Endpoint, UrlGroup, UrlGroupsApi};

pub(crate) use self::http::HttpClient;
use self::{http::decode_json, request::build_publish_headers};

const DEFAULT_BASE_URL: &str = "https://qstash.upstash.io";

/// Builder for [`Client`].
#[derive(Debug, Clone)]
pub struct ClientBuilder {
    token: String,
    base_url: String,
    default_headers: HeaderMap,
}

impl ClientBuilder {
    /// Overrides the QStash base URL.
    pub fn base_url(mut self, base_url: impl Into<String>) -> Self {
        self.base_url = base_url.into();
        self
    }

    /// Replaces the default headers sent with every request.
    pub fn default_headers(mut self, headers: HeaderMap) -> Self {
        self.default_headers = headers;
        self
    }

    /// Builds the client.
    pub fn build(self) -> Result<Client> {
        if self.token.trim().is_empty() {
            return Err(Error::Config {
                message: String::from("qstash token cannot be empty"),
            });
        }

        reqwest::Url::parse(&self.base_url).map_err(|error| Error::Config {
            message: format!("invalid base url: {error}"),
        })?;

        let mut headers = self.default_headers;
        let mut authorization =
            HeaderValue::from_str(&format!("Bearer {}", self.token)).map_err(|error| {
                Error::Config {
                    message: format!("invalid authorization header: {error}"),
                }
            })?;
        authorization.set_sensitive(true);
        headers.insert(header::AUTHORIZATION, authorization);

        let http = reqwest::Client::builder()
            .default_headers(headers)
            .build()
            .map_err(Error::Transport)?;

        Ok(Client {
            http: HttpClient::new(http, self.base_url.trim_end_matches('/').to_owned()),
        })
    }
}

/// Async QStash client.
#[derive(Debug, Clone)]
pub struct Client {
    pub(crate) http: HttpClient,
}

impl Client {
    /// Creates a client with the default Upstash base URL.
    pub fn new(token: impl Into<String>) -> Result<Self> {
        Self::builder(token).build()
    }

    /// Starts a configurable client builder.
    pub fn builder(token: impl Into<String>) -> ClientBuilder {
        ClientBuilder {
            token: token.into(),
            base_url: String::from(DEFAULT_BASE_URL),
            default_headers: HeaderMap::new(),
        }
    }

    /// Accesses message operations.
    pub fn messages(&self) -> MessagesApi<'_> {
        MessagesApi { client: self }
    }

    /// Accesses log operations.
    pub fn logs(&self) -> LogsApi<'_> {
        LogsApi { client: self }
    }

    /// Accesses DLQ operations.
    pub fn dlq(&self) -> DlqApi<'_> {
        DlqApi { client: self }
    }

    /// Accesses queue operations.
    pub fn queues(&self) -> QueuesApi<'_> {
        QueuesApi { client: self }
    }

    /// Accesses schedule operations.
    pub fn schedules(&self) -> SchedulesApi<'_> {
        SchedulesApi { client: self }
    }

    /// Accesses URL Group operations.
    pub fn url_groups(&self) -> UrlGroupsApi<'_> {
        UrlGroupsApi { client: self }
    }

    /// Accesses signing-key operations.
    pub fn signing_keys(&self) -> SigningKeysApi<'_> {
        SigningKeysApi { client: self }
    }

    /// Publishes a raw request.
    pub async fn publish(&self, request: PublishRequest) -> Result<PublishResponse> {
        let headers = build_publish_headers(&request)?;
        let response = self
            .http
            .send_bytes(
                Method::POST,
                &format!("v2/publish/{}", request.destination.path_value()),
                &[],
                Some(headers),
                request.body,
            )
            .await?;

        normalize_publish_response(response.status, response.body)
    }

    /// Convenience wrapper for publishing a JSON request body.
    pub async fn publish_json<T>(
        &self,
        destination: Destination,
        body: &T,
    ) -> Result<PublishResponse>
    where
        T: Serialize,
    {
        let request = PublishRequest::builder(destination)
            .json_body(body)?
            .build();
        self.publish(request).await
    }

    /// Sends multiple publish requests in a single batch request.
    pub async fn batch<I>(&self, requests: I) -> Result<Vec<PublishedMessage>>
    where
        I: IntoIterator<Item = BatchRequest>,
    {
        let payload = requests
            .into_iter()
            .map(batch_item)
            .collect::<Result<Vec<_>>>()?;

        let mut headers = HeaderMap::new();
        headers.insert(
            header::CONTENT_TYPE,
            HeaderValue::from_static("application/json"),
        );
        let body = Bytes::from(serde_json::to_vec(&payload).map_err(Error::Serialize)?);

        self.http
            .send_json(Method::POST, "v2/batch", &[], Some(headers), Some(body))
            .await
    }
}

#[derive(Debug, Serialize)]
struct BatchPayloadItem {
    destination: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    body: Option<String>,
    headers: std::collections::HashMap<String, String>,
    #[serde(skip_serializing_if = "Option::is_none", rename = "queue")]
    queue_name: Option<String>,
}

fn batch_item(request: BatchRequest) -> Result<BatchPayloadItem> {
    let headers = build_publish_headers(&request.publish)?;
    let mut serialized_headers = std::collections::HashMap::new();
    for (name, value) in &headers {
        serialized_headers.insert(
            name.as_str().to_owned(),
            value
                .to_str()
                .map_err(|error| Error::InvalidRequest {
                    message: format!("non-utf8 header value for `{name}`: {error}"),
                })?
                .to_owned(),
        );
    }

    let body = request
        .publish
        .body
        .map(|body| {
            String::from_utf8(body.to_vec()).map_err(|error| Error::InvalidRequest {
                message: format!("batch bodies must be valid utf-8: {error}"),
            })
        })
        .transpose()?;

    Ok(BatchPayloadItem {
        destination: request.publish.destination.path_value(),
        body,
        headers: serialized_headers,
        queue_name: request.queue_name,
    })
}

fn normalize_publish_response(status: reqwest::StatusCode, body: Bytes) -> Result<PublishResponse> {
    if let Ok(single) = decode_json::<PublishedMessage>(status, body.clone()) {
        return Ok(vec![single]);
    }

    decode_json::<Vec<PublishedMessage>>(status, body)
}