qstash-rs 0.6.0

A Rust SDK for Upstash QStash
Documentation
use bytes::Bytes;
use reqwest::{header::CONTENT_TYPE, Method};
use serde::{Deserialize, Serialize};

use super::{
    http::decode_json,
    request::{build_publish_headers, PublishRequest, PublishResponse, PublishedMessage},
    Client,
};
use crate::{error::Result, Error};

/// Queue metadata returned by QStash.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Queue {
    /// Queue name.
    pub name: String,
    /// Creation timestamp in milliseconds.
    pub created_at: u64,
    /// Last update timestamp in milliseconds.
    pub updated_at: u64,
    /// Configured parallel consumers.
    pub parallelism: u32,
    /// Number of queued messages waiting to be delivered.
    pub lag: u64,
    /// Whether the queue is paused.
    pub paused: bool,
}

/// Queue upsert payload.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueueUpsertRequest {
    /// Queue name.
    pub queue_name: String,
    /// Queue parallelism.
    pub parallelism: u32,
}

/// Queue operations.
pub struct QueuesApi<'a> {
    pub(crate) client: &'a Client,
}

impl QueuesApi<'_> {
    /// Lists all queues.
    pub async fn list(&self) -> Result<Vec<Queue>> {
        self.client
            .http
            .send_json(Method::GET, "v2/queues", &[], None, None)
            .await
    }

    /// Retrieves a single queue.
    pub async fn get(&self, queue_name: &str) -> Result<Queue> {
        self.client
            .http
            .send_json(
                Method::GET,
                &format!("v2/queues/{queue_name}"),
                &[],
                None,
                None,
            )
            .await
    }

    /// Creates or updates a queue.
    pub async fn upsert(&self, request: QueueUpsertRequest) -> Result<()> {
        let body = Bytes::from(serde_json::to_vec(&request).map_err(Error::Serialize)?);
        let mut headers = reqwest::header::HeaderMap::new();
        headers.insert(
            CONTENT_TYPE,
            reqwest::header::HeaderValue::from_static("application/json"),
        );

        self.client
            .http
            .send_empty(Method::POST, "v2/queues", &[], Some(headers), Some(body))
            .await
    }

    /// Deletes a queue.
    pub async fn delete(&self, queue_name: &str) -> Result<()> {
        self.client
            .http
            .send_empty(
                Method::DELETE,
                &format!("v2/queues/{queue_name}"),
                &[],
                None,
                None,
            )
            .await
    }

    /// Pauses a queue.
    pub async fn pause(&self, queue_name: &str) -> Result<()> {
        self.client
            .http
            .send_empty(
                Method::POST,
                &format!("v2/queues/{queue_name}/pause"),
                &[],
                None,
                None,
            )
            .await
    }

    /// Resumes a queue.
    pub async fn resume(&self, queue_name: &str) -> Result<()> {
        self.client
            .http
            .send_empty(
                Method::POST,
                &format!("v2/queues/{queue_name}/resume"),
                &[],
                None,
                None,
            )
            .await
    }

    /// Enqueues a message into a named queue.
    pub async fn enqueue(
        &self,
        queue_name: &str,
        request: PublishRequest,
    ) -> Result<PublishResponse> {
        let headers = build_publish_headers(&request)?;
        let response = self
            .client
            .http
            .send_bytes(
                Method::POST,
                &format!(
                    "v2/enqueue/{queue_name}/{}",
                    request.destination.path_value()
                ),
                &[],
                Some(headers),
                request.body,
            )
            .await?;

        normalize_publish_response(response.status, response.body)
    }

    /// Convenience wrapper for enqueueing JSON.
    pub async fn enqueue_json<T>(
        &self,
        queue_name: &str,
        destination: super::request::Destination,
        body: &T,
    ) -> Result<PublishResponse>
    where
        T: Serialize,
    {
        let request = PublishRequest::builder(destination)
            .json_body(body)?
            .build();
        self.enqueue(queue_name, request).await
    }
}

fn normalize_publish_response(status: reqwest::StatusCode, body: Bytes) -> Result<PublishResponse> {
    if let Ok(single) = decode_json::<PublishedMessage>(status, body.clone()) {
        return Ok(vec![single]);
    }

    decode_json::<Vec<PublishedMessage>>(status, body)
}