qstash-rs 0.6.0

A Rust SDK for Upstash QStash
Documentation
use super::Client;
use crate::error::Result;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// A queued or in-flight QStash message.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Message {
    /// Unique identifier of the message.
    pub message_id: String,
    /// Destination URL.
    pub url: String,
    /// URL Group name when the message targets one.
    pub topic_name: Option<String>,
    /// Endpoint label inside the URL Group.
    pub endpoint_name: Option<String>,
    /// API name when the message targets a provider integration.
    pub api: Option<String>,
    /// Custom message key if one was set by the service.
    pub key: Option<String>,
    /// Delivery method.
    pub method: Option<String>,
    /// Forwarded headers.
    pub header: Option<HashMap<String, Vec<String>>>,
    /// UTF-8 body when available.
    pub body: Option<String>,
    /// Base64-encoded body for non-UTF-8 payloads.
    pub body_base64: Option<String>,
    /// Maximum retry count.
    pub max_retries: Option<u32>,
    /// Retry delay expression when configured.
    pub retry_delay_expression: Option<String>,
    /// Absolute not-before timestamp in milliseconds.
    pub not_before: Option<u64>,
    /// Creation timestamp in milliseconds.
    pub created_at: u64,
    /// Callback URL.
    pub callback: Option<String>,
    /// Failure callback URL.
    pub failure_callback: Option<String>,
    /// Queue name when the message was enqueued.
    pub queue_name: Option<String>,
    /// Schedule identifier when created by a schedule.
    pub schedule_id: Option<String>,
    /// Caller IP recorded by QStash.
    pub caller_ip: Option<String>,
    /// User-defined label.
    pub label: Option<String>,
    /// Flow control key used for rate limiting.
    pub flow_control_key: Option<String>,
    /// Flow control rate.
    pub rate: Option<u32>,
    /// Flow control period in seconds.
    pub period: Option<u32>,
    /// Flow control parallelism.
    pub parallelism: Option<u32>,
}

/// Result returned by bulk cancellation endpoints.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CancelResult {
    /// Number of messages cancelled by the operation.
    pub cancelled: u64,
}

/// Filters supported by bulk cancellation.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct MessageFilter {
    /// Filter by URL Group name.
    pub url_group_name: Option<String>,
    /// Filter by queue name.
    pub queue_name: Option<String>,
    /// Filter by destination URL.
    pub url: Option<String>,
    /// Filter by schedule identifier.
    pub schedule_id: Option<String>,
    /// Filter by caller IP.
    pub caller_ip: Option<String>,
    /// Filter by label.
    pub label: Option<String>,
    /// Filter by flow control key.
    pub flow_control_key: Option<String>,
    /// Filter from this inclusive timestamp in milliseconds.
    pub from_date: Option<u64>,
    /// Filter to this inclusive timestamp in milliseconds.
    pub to_date: Option<u64>,
    /// Maximum number of messages to cancel.
    pub count: Option<u32>,
}

/// Message operations.
pub struct MessagesApi<'a> {
    pub(crate) client: &'a Client,
}

impl MessagesApi<'_> {
    /// Retrieves a message by identifier.
    pub async fn get(&self, message_id: &str) -> Result<Message> {
        self.client
            .http
            .send_json(
                Method::GET,
                &format!("v2/messages/{message_id}"),
                &[],
                None,
                None,
            )
            .await
    }

    /// Cancels a single pending message.
    pub async fn cancel(&self, message_id: &str) -> Result<()> {
        self.client
            .http
            .send_empty(
                Method::DELETE,
                &format!("v2/messages/{message_id}"),
                &[],
                None,
                None,
            )
            .await
    }

    /// Cancels a set of message identifiers.
    pub async fn cancel_many<I, S>(&self, message_ids: I) -> Result<CancelResult>
    where
        I: IntoIterator<Item = S>,
        S: AsRef<str>,
    {
        let mut query = Vec::new();
        for message_id in message_ids {
            query.push((String::from("messageIds"), message_id.as_ref().to_owned()));
        }

        self.client
            .http
            .send_json(Method::DELETE, "v2/messages", &query, None, None)
            .await
    }

    /// Cancels messages matching the provided filter.
    pub async fn cancel_matching(&self, filter: &MessageFilter) -> Result<CancelResult> {
        self.client
            .http
            .send_json(
                Method::DELETE,
                "v2/messages",
                &message_filter_query(filter),
                None,
                None,
            )
            .await
    }

    /// Cancels all pending messages, optionally in bounded batches.
    pub async fn cancel_all(&self, count: Option<u32>) -> Result<CancelResult> {
        let query = count
            .map(|count| vec![(String::from("count"), count.to_string())])
            .unwrap_or_default();

        self.client
            .http
            .send_json(Method::DELETE, "v2/messages", &query, None, None)
            .await
    }
}

fn message_filter_query(filter: &MessageFilter) -> Vec<(String, String)> {
    let mut query = Vec::new();

    if let Some(url_group_name) = &filter.url_group_name {
        query.push((String::from("topicName"), url_group_name.clone()));
    }
    if let Some(queue_name) = &filter.queue_name {
        query.push((String::from("queueName"), queue_name.clone()));
    }
    if let Some(url) = &filter.url {
        query.push((String::from("url"), url.clone()));
    }
    if let Some(schedule_id) = &filter.schedule_id {
        query.push((String::from("scheduleId"), schedule_id.clone()));
    }
    if let Some(caller_ip) = &filter.caller_ip {
        query.push((String::from("callerIp"), caller_ip.clone()));
    }
    if let Some(label) = &filter.label {
        query.push((String::from("label"), label.clone()));
    }
    if let Some(flow_control_key) = &filter.flow_control_key {
        query.push((String::from("flowControlKey"), flow_control_key.clone()));
    }
    if let Some(from_date) = filter.from_date {
        query.push((String::from("fromDate"), from_date.to_string()));
    }
    if let Some(to_date) = filter.to_date {
        query.push((String::from("toDate"), to_date.to_string()));
    }
    if let Some(count) = filter.count {
        query.push((String::from("count"), count.to_string()));
    }

    query
}