ticksupply 0.2.1

Official Rust client for the Ticksupply market data API
Documentation
//! Internal HTTP plumbing — one `send` helper funnels every request, including retries.

use std::time::Duration;

use serde::de::DeserializeOwned;
use serde::Serialize;

use crate::client::Client;
use crate::error::{map_error, Error, ErrorEnvelope, Result};

#[derive(Default, Clone, Copy)]
pub(crate) struct RequestOpts<'a> {
    pub idempotency_key: Option<&'a str>,
}

pub(crate) async fn send<T, B>(
    client: &Client,
    method: reqwest::Method,
    path: &str,
    query: Option<&[(&str, String)]>,
    body: Option<&B>,
    opts: RequestOpts<'_>,
) -> Result<T>
where
    T: DeserializeOwned,
    B: Serialize + ?Sized,
{
    let retries = retry_budget(client.inner.max_retries, &method, opts.idempotency_key);
    let mut attempt: u32 = 0;
    loop {
        attempt += 1;
        let result = send_once::<T, B>(client, method.clone(), path, query, body, opts).await;
        match result {
            Ok(v) => return Ok(v),
            Err(err) if attempt <= retries && is_retriable(&err) => {
                sleep_with_backoff(attempt, retry_after_from(&err)).await;
                continue;
            }
            Err(err) => return Err(err),
        }
    }
}

pub(crate) async fn send_empty<B>(
    client: &Client,
    method: reqwest::Method,
    path: &str,
    query: Option<&[(&str, String)]>,
    body: Option<&B>,
    opts: RequestOpts<'_>,
) -> Result<()>
where
    B: Serialize + ?Sized,
{
    let retries = retry_budget(client.inner.max_retries, &method, opts.idempotency_key);
    let mut attempt: u32 = 0;
    loop {
        attempt += 1;
        let result = send_empty_once::<B>(client, method.clone(), path, query, body, opts).await;
        match result {
            Ok(()) => return Ok(()),
            Err(err) if attempt <= retries && is_retriable(&err) => {
                sleep_with_backoff(attempt, retry_after_from(&err)).await;
                continue;
            }
            Err(err) => return Err(err),
        }
    }
}

fn retry_budget(max: u32, method: &reqwest::Method, idem_key: Option<&str>) -> u32 {
    if max == 0 {
        return 0;
    }
    let is_safe = matches!(*method, reqwest::Method::GET | reqwest::Method::HEAD);
    if is_safe || idem_key.is_some() {
        max
    } else {
        0
    }
}

fn is_retriable(err: &Error) -> bool {
    matches!(err, Error::Network(_) | Error::RateLimited { .. })
        || matches!(err, Error::Api { status, .. } if *status >= 500)
}

fn retry_after_from(err: &Error) -> Option<u64> {
    match err {
        Error::RateLimited { retry_after, .. } => *retry_after,
        _ => None,
    }
}

async fn sleep_with_backoff(attempt: u32, retry_after: Option<u64>) {
    // Exponential: 1s, 2s, 4s, 8s, ... capped at 60s. Jitter ±25%.
    let base_secs = 2_u64.saturating_pow(attempt.saturating_sub(1)).min(60);
    let jittered = jitter(base_secs);
    let floor = Duration::from_secs(retry_after.unwrap_or(0));
    let delay = std::cmp::max(floor, jittered);
    tokio::time::sleep(delay).await;
}

fn jitter(base_secs: u64) -> Duration {
    // Deterministic-enough jitter without pulling a new dep: hash the current
    // nanosecond instant into a small positive fraction of base.
    use std::time::{SystemTime, UNIX_EPOCH};
    let nanos = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.subsec_nanos() as u64)
        .unwrap_or(0);
    let jitter_ms = (nanos % (base_secs * 500)).min(base_secs * 500);
    Duration::from_millis(base_secs * 1000 - (base_secs * 250) + jitter_ms)
        .min(Duration::from_secs(65))
}

async fn send_once<T, B>(
    client: &Client,
    method: reqwest::Method,
    path: &str,
    query: Option<&[(&str, String)]>,
    body: Option<&B>,
    opts: RequestOpts<'_>,
) -> Result<T>
where
    T: DeserializeOwned,
    B: Serialize + ?Sized,
{
    let (status, request_id, retry_after, bytes, ok) =
        do_request(client, method, path, query, body, opts).await?;
    if ok {
        return serde_json::from_slice(&bytes).map_err(Error::Decode);
    }
    Err(parse_error(status, request_id, retry_after, &bytes))
}

async fn send_empty_once<B>(
    client: &Client,
    method: reqwest::Method,
    path: &str,
    query: Option<&[(&str, String)]>,
    body: Option<&B>,
    opts: RequestOpts<'_>,
) -> Result<()>
where
    B: Serialize + ?Sized,
{
    let (status, request_id, retry_after, bytes, ok) =
        do_request(client, method, path, query, body, opts).await?;
    if ok {
        return Ok(());
    }
    Err(parse_error(status, request_id, retry_after, &bytes))
}

async fn do_request<B>(
    client: &Client,
    method: reqwest::Method,
    path: &str,
    query: Option<&[(&str, String)]>,
    body: Option<&B>,
    opts: RequestOpts<'_>,
) -> Result<(u16, Option<String>, Option<u64>, bytes::Bytes, bool)>
where
    B: Serialize + ?Sized,
{
    let url = format!("{}{}", client.inner.base_url, path);
    let mut req = client
        .inner
        .http
        .request(method, &url)
        .header("X-Api-Key", client.inner.api_key_header.clone());
    if let Some(q) = query {
        req = req.query(q);
    }
    if let Some(b) = body {
        req = req.json(b);
    }
    if let Some(k) = opts.idempotency_key {
        req = req.header("Idempotency-Key", k);
    }

    let response = req.send().await.map_err(Error::Network)?;
    let status = response.status().as_u16();
    let ok = response.status().is_success();
    let request_id = response
        .headers()
        .get("X-Request-Id")
        .and_then(|v| v.to_str().ok())
        .map(String::from);
    let retry_after = response
        .headers()
        .get(reqwest::header::RETRY_AFTER)
        .and_then(|v| v.to_str().ok())
        .and_then(|s| s.parse::<u64>().ok());
    let bytes = response.bytes().await.map_err(Error::Network)?;
    Ok((status, request_id, retry_after, bytes, ok))
}

fn parse_error(
    status: u16,
    request_id: Option<String>,
    retry_after: Option<u64>,
    bytes: &[u8],
) -> Error {
    match serde_json::from_slice::<ErrorEnvelope>(bytes) {
        Ok(env) => map_error(status, env.error, request_id, retry_after),
        Err(_) => Error::Api {
            code: "unknown".into(),
            message: String::from_utf8_lossy(bytes).to_string(),
            status,
            request_id,
        },
    }
}