qstash-rs 0.6.0

A Rust SDK for Upstash QStash
Documentation
use std::collections::HashMap;

use reqwest::Method;
use serde::{Deserialize, Serialize};

use super::Client;
use crate::error::Result;

/// Message state reported by QStash logs.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum LogState {
    /// Message accepted by QStash.
    Created,
    /// Message currently being processed.
    Active,
    /// Message will be retried.
    Retry,
    /// Delivery returned an error and is awaiting retry or final failure.
    Error,
    /// Message delivered successfully.
    Delivered,
    /// Message exhausted retries.
    Failed,
    /// Message was cancelled.
    Cancelled,
    /// Message cancellation has been requested.
    CancelRequested,
    /// Aggregate filter value used by the API.
    InProgress,
    /// Any state not known by this crate yet.
    #[serde(other)]
    Unknown,
}

/// A single QStash log entry.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LogEntry {
    /// Timestamp in milliseconds.
    pub time: u64,
    /// Message state.
    pub state: LogState,
    /// Message identifier.
    pub message_id: String,
    /// Next delivery timestamp in milliseconds when one exists.
    pub next_delivery_time: Option<u64>,
    /// Delivery error.
    pub error: Option<String>,
    /// Destination URL.
    pub url: Option<String>,
    /// URL Group name.
    pub topic_name: Option<String>,
    /// Endpoint label inside the URL Group.
    pub endpoint_name: Option<String>,
    /// Forwarded headers.
    pub header: Option<HashMap<String, Vec<String>>>,
    /// UTF-8 body when available.
    pub body: Option<String>,
    /// Base64-encoded body for non-UTF-8 payloads.
    pub body_base64: Option<String>,
    /// HTTP response status from the destination.
    pub response_status: Option<u16>,
    /// User-defined label.
    pub label: Option<String>,
    /// Queue name.
    pub queue_name: Option<String>,
    /// Schedule identifier.
    pub schedule_id: Option<String>,
    /// Caller IP.
    pub caller_ip: Option<String>,
    /// Flow control key.
    pub flow_control_key: Option<String>,
}

/// Paginated log response.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LogsPage {
    /// Cursor for the next page.
    pub cursor: Option<String>,
    /// Returned log entries.
    pub logs: Vec<LogEntry>,
}

/// Query options for listing logs.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct LogFilter {
    /// Cursor from a previous response.
    pub cursor: Option<String>,
    /// Filter by message identifier.
    pub message_id: Option<String>,
    /// Filter by message state.
    pub state: Option<LogState>,
    /// Filter by destination URL.
    pub url: Option<String>,
    /// Filter by URL Group name.
    pub url_group_name: Option<String>,
    /// Filter by endpoint label.
    pub endpoint_name: Option<String>,
    /// Filter by schedule identifier.
    pub schedule_id: Option<String>,
    /// Filter by queue name.
    pub queue_name: Option<String>,
    /// Filter by caller IP.
    pub caller_ip: Option<String>,
    /// Filter by flow control key.
    pub flow_control_key: Option<String>,
    /// Filter by response status.
    pub response_status: Option<u16>,
    /// Filter by label.
    pub label: Option<String>,
    /// Filter from this inclusive timestamp in milliseconds.
    pub from_date: Option<u64>,
    /// Filter to this inclusive timestamp in milliseconds.
    pub to_date: Option<u64>,
    /// Maximum number of log entries to return.
    pub count: Option<u32>,
}

/// Log operations.
pub struct LogsApi<'a> {
    pub(crate) client: &'a Client,
}

impl LogsApi<'_> {
    /// Lists published-message logs.
    pub async fn list(&self, filter: &LogFilter) -> Result<LogsPage> {
        self.client
            .http
            .send_json(
                Method::GET,
                "v2/logs",
                &log_filter_query(filter),
                None,
                None,
            )
            .await
    }
}

fn log_filter_query(filter: &LogFilter) -> Vec<(String, String)> {
    let mut query = Vec::new();

    if let Some(cursor) = &filter.cursor {
        query.push((String::from("cursor"), cursor.clone()));
    }
    if let Some(message_id) = &filter.message_id {
        query.push((String::from("messageId"), message_id.clone()));
    }
    if let Some(state) = &filter.state {
        query.push((String::from("state"), serde_state(state)));
    }
    if let Some(url) = &filter.url {
        query.push((String::from("url"), url.clone()));
    }
    if let Some(url_group_name) = &filter.url_group_name {
        query.push((String::from("topicName"), url_group_name.clone()));
    }
    if let Some(endpoint_name) = &filter.endpoint_name {
        query.push((String::from("endpointName"), endpoint_name.clone()));
    }
    if let Some(schedule_id) = &filter.schedule_id {
        query.push((String::from("scheduleId"), schedule_id.clone()));
    }
    if let Some(queue_name) = &filter.queue_name {
        query.push((String::from("queueName"), queue_name.clone()));
    }
    if let Some(caller_ip) = &filter.caller_ip {
        query.push((String::from("callerIp"), caller_ip.clone()));
    }
    if let Some(flow_control_key) = &filter.flow_control_key {
        query.push((String::from("flowControlKey"), flow_control_key.clone()));
    }
    if let Some(response_status) = filter.response_status {
        query.push((String::from("responseStatus"), response_status.to_string()));
    }
    if let Some(label) = &filter.label {
        query.push((String::from("label"), label.clone()));
    }
    if let Some(from_date) = filter.from_date {
        query.push((String::from("fromDate"), from_date.to_string()));
    }
    if let Some(to_date) = filter.to_date {
        query.push((String::from("toDate"), to_date.to_string()));
    }
    if let Some(count) = filter.count {
        query.push((String::from("count"), count.to_string()));
    }

    query
}

fn serde_state(state: &LogState) -> String {
    match state {
        LogState::Created => String::from("CREATED"),
        LogState::Active => String::from("ACTIVE"),
        LogState::Retry => String::from("RETRY"),
        LogState::Error => String::from("ERROR"),
        LogState::Delivered => String::from("DELIVERED"),
        LogState::Failed => String::from("FAILED"),
        LogState::Cancelled => String::from("CANCELLED"),
        LogState::CancelRequested => String::from("CANCEL_REQUESTED"),
        LogState::InProgress => String::from("IN_PROGRESS"),
        LogState::Unknown => String::from("UNKNOWN"),
    }
}