qstash-rs 0.6.0

A Rust SDK for Upstash QStash
Documentation
use std::collections::HashMap;

use reqwest::Method;
use serde::{Deserialize, Serialize};

use super::{request::PublishedMessage, Client};
use crate::error::Result;

/// A message currently stored in the Dead Letter Queue.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DlqMessage {
    /// Original message identifier.
    pub message_id: String,
    /// Destination URL.
    pub url: String,
    /// URL Group name when the message targets one.
    pub topic_name: Option<String>,
    /// Endpoint label inside the URL Group.
    pub endpoint_name: Option<String>,
    /// Delivery method.
    pub method: Option<String>,
    /// Forwarded headers.
    pub header: Option<HashMap<String, Vec<String>>>,
    /// UTF-8 body when available.
    pub body: Option<String>,
    /// Base64-encoded body for non-UTF-8 payloads.
    pub body_base64: Option<String>,
    /// Maximum retry count.
    pub max_retries: Option<u32>,
    /// Absolute not-before timestamp in milliseconds.
    pub not_before: Option<u64>,
    /// Creation timestamp in milliseconds.
    pub created_at: u64,
    /// Callback URL.
    pub callback: Option<String>,
    /// Failure callback URL.
    pub failure_callback: Option<String>,
    /// Queue name when the message was enqueued.
    pub queue_name: Option<String>,
    /// Schedule identifier when created by a schedule.
    pub schedule_id: Option<String>,
    /// Caller IP recorded by QStash.
    pub caller_ip: Option<String>,
    /// User-defined label.
    pub label: Option<String>,
    /// Flow control key used for rate limiting.
    pub flow_control_key: Option<String>,
    /// Flow control rate.
    pub rate: Option<u32>,
    /// Flow control period in seconds.
    pub period: Option<u32>,
    /// Flow control parallelism.
    pub parallelism: Option<u32>,
    /// Destination response status.
    pub response_status: Option<u16>,
    /// DLQ identifier used for DLQ operations.
    pub dlq_id: String,
}

/// Paginated DLQ response.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DlqPage {
    /// Cursor for the next page.
    pub cursor: Option<String>,
    /// Returned DLQ messages.
    pub messages: Vec<DlqMessage>,
}

/// Result returned by bulk DLQ deletion.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DlqDeleteResult {
    /// Cursor for the next deletion page when batching.
    pub cursor: Option<String>,
    /// Number of deleted items.
    pub deleted: u64,
}

/// Query options for listing or bulk-deleting DLQ messages.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct DlqFilter {
    /// Cursor from a previous response.
    pub cursor: Option<String>,
    /// Filter by original message identifier.
    pub message_id: Option<String>,
    /// Filter by destination URL.
    pub url: Option<String>,
    /// Filter by URL Group name.
    pub url_group_name: Option<String>,
    /// Filter by schedule identifier.
    pub schedule_id: Option<String>,
    /// Filter by queue name.
    pub queue_name: Option<String>,
    /// Filter by destination response status.
    pub response_status: Option<u16>,
    /// Filter by user-defined label.
    pub label: Option<String>,
    /// Filter by flow control key.
    pub flow_control_key: Option<String>,
    /// Filter from this inclusive timestamp in milliseconds.
    pub from_date: Option<u64>,
    /// Filter to this inclusive timestamp in milliseconds.
    pub to_date: Option<u64>,
    /// Maximum number of messages to return or delete.
    pub count: Option<u32>,
}

/// DLQ operations.
pub struct DlqApi<'a> {
    pub(crate) client: &'a Client,
}

impl DlqApi<'_> {
    /// Lists DLQ messages.
    pub async fn list(&self, filter: &DlqFilter) -> Result<DlqPage> {
        self.client
            .http
            .send_json(Method::GET, "v2/dlq", &dlq_filter_query(filter), None, None)
            .await
    }

    /// Retrieves a single DLQ message.
    pub async fn get(&self, dlq_id: &str) -> Result<DlqMessage> {
        self.client
            .http
            .send_json(Method::GET, &format!("v2/dlq/{dlq_id}"), &[], None, None)
            .await
    }

    /// Deletes a single DLQ message.
    pub async fn delete(&self, dlq_id: &str) -> Result<()> {
        self.client
            .http
            .send_empty(Method::DELETE, &format!("v2/dlq/{dlq_id}"), &[], None, None)
            .await
    }

    /// Deletes a set of DLQ identifiers.
    pub async fn delete_many<I, S>(&self, dlq_ids: I) -> Result<DlqDeleteResult>
    where
        I: IntoIterator<Item = S>,
        S: AsRef<str>,
    {
        let mut query = Vec::new();
        for dlq_id in dlq_ids {
            query.push((String::from("dlqIds"), dlq_id.as_ref().to_owned()));
        }

        self.client
            .http
            .send_json(Method::DELETE, "v2/dlq", &query, None, None)
            .await
    }

    /// Deletes DLQ messages matching the provided filter.
    pub async fn delete_matching(&self, filter: &DlqFilter) -> Result<DlqDeleteResult> {
        self.client
            .http
            .send_json(
                Method::DELETE,
                "v2/dlq",
                &dlq_filter_query(filter),
                None,
                None,
            )
            .await
    }

    /// Retries a single DLQ message.
    pub async fn retry(&self, dlq_id: &str) -> Result<PublishedMessage> {
        self.client
            .http
            .send_json(
                Method::POST,
                &format!("v2/dlq/retry/{dlq_id}"),
                &[],
                None,
                None,
            )
            .await
    }
}

fn dlq_filter_query(filter: &DlqFilter) -> Vec<(String, String)> {
    let mut query = Vec::new();

    if let Some(cursor) = &filter.cursor {
        query.push((String::from("cursor"), cursor.clone()));
    }
    if let Some(message_id) = &filter.message_id {
        query.push((String::from("messageId"), message_id.clone()));
    }
    if let Some(url) = &filter.url {
        query.push((String::from("url"), url.clone()));
    }
    if let Some(url_group_name) = &filter.url_group_name {
        query.push((String::from("topicName"), url_group_name.clone()));
    }
    if let Some(schedule_id) = &filter.schedule_id {
        query.push((String::from("scheduleId"), schedule_id.clone()));
    }
    if let Some(queue_name) = &filter.queue_name {
        query.push((String::from("queueName"), queue_name.clone()));
    }
    if let Some(response_status) = filter.response_status {
        query.push((String::from("responseStatus"), response_status.to_string()));
    }
    if let Some(label) = &filter.label {
        query.push((String::from("label"), label.clone()));
    }
    if let Some(flow_control_key) = &filter.flow_control_key {
        query.push((String::from("flowControlKey"), flow_control_key.clone()));
    }
    if let Some(from_date) = filter.from_date {
        query.push((String::from("fromDate"), from_date.to_string()));
    }
    if let Some(to_date) = filter.to_date {
        query.push((String::from("toDate"), to_date.to_string()));
    }
    if let Some(count) = filter.count {
        query.push((String::from("count"), count.to_string()));
    }

    query
}