qstash-rs 0.6.0

A Rust SDK for Upstash QStash
Documentation
use bytes::Bytes;
use reqwest::{header::HeaderMap, Method, StatusCode};
use serde::de::DeserializeOwned;

use crate::error::{Error, Result};

#[derive(Debug, serde::Deserialize)]
struct ApiErrorPayload {
    error: Option<String>,
}

#[derive(Debug, Clone)]
pub(crate) struct HttpClient {
    inner: reqwest::Client,
    base_url: String,
}

impl HttpClient {
    pub(crate) fn new(inner: reqwest::Client, base_url: String) -> Self {
        Self { inner, base_url }
    }

    pub(crate) async fn send_json<T>(
        &self,
        method: Method,
        path: &str,
        query: &[(String, String)],
        headers: Option<HeaderMap>,
        body: Option<Bytes>,
    ) -> Result<T>
    where
        T: DeserializeOwned,
    {
        let response = self.send(method, path, query, headers, body).await?;
        decode_json(response.status, response.body)
    }

    pub(crate) async fn send_empty(
        &self,
        method: Method,
        path: &str,
        query: &[(String, String)],
        headers: Option<HeaderMap>,
        body: Option<Bytes>,
    ) -> Result<()> {
        self.send(method, path, query, headers, body).await?;
        Ok(())
    }

    pub(crate) async fn send_bytes(
        &self,
        method: Method,
        path: &str,
        query: &[(String, String)],
        headers: Option<HeaderMap>,
        body: Option<Bytes>,
    ) -> Result<ResponseBody> {
        self.send(method, path, query, headers, body).await
    }

    async fn send(
        &self,
        method: Method,
        path: &str,
        query: &[(String, String)],
        headers: Option<HeaderMap>,
        body: Option<Bytes>,
    ) -> Result<ResponseBody> {
        let url = format!(
            "{}/{}",
            self.base_url.trim_end_matches('/'),
            path.trim_start_matches('/')
        );

        let mut request = self.inner.request(method, &url);

        if !query.is_empty() {
            request = request.query(query);
        }

        if let Some(headers) = headers {
            request = request.headers(headers);
        }

        if let Some(body) = body {
            request = request.body(body);
        }

        let response = request.send().await.map_err(Error::Transport)?;
        let status = response.status();
        let body = response.bytes().await.map_err(Error::Transport)?;

        if status.is_success() {
            return Ok(ResponseBody { status, body });
        }

        Err(api_error(status, body))
    }
}

#[derive(Debug, Clone)]
pub(crate) struct ResponseBody {
    pub(crate) status: StatusCode,
    pub(crate) body: Bytes,
}

pub(crate) fn decode_json<T>(status: StatusCode, body: Bytes) -> Result<T>
where
    T: DeserializeOwned,
{
    serde_json::from_slice(&body).map_err(|source| Error::Decode {
        status,
        body: String::from_utf8_lossy(&body).into_owned(),
        source,
    })
}

fn api_error(status: StatusCode, body: Bytes) -> Error {
    let raw_body = if body.is_empty() {
        None
    } else {
        Some(String::from_utf8_lossy(&body).into_owned())
    };

    let message = serde_json::from_slice::<ApiErrorPayload>(&body)
        .ok()
        .and_then(|payload| payload.error)
        .or_else(|| raw_body.clone())
        .unwrap_or_else(|| status.to_string());

    Error::Api {
        status,
        message,
        body: raw_body,
    }
}