alpacars 0.1.2

Async Rust SDK for Alpaca Markets API
Documentation
use crate::error::AlpacaError;
use reqwest::{header, Client, Method, StatusCode};
use serde::{de::DeserializeOwned, Serialize};
use std::time::Duration;
use tokio::time::sleep;
use tracing::{debug, warn};

pub const DEFAULT_RETRY_ATTEMPTS: u32 = 3;
pub const DEFAULT_RETRY_WAIT_SECS: u64 = 3;
pub const DEFAULT_RETRY_STATUS_CODES: &[u16] = &[429, 504];

pub const DATA_V2_MAX_LIMIT: u32 = 10_000;
pub const ACCOUNT_ACTIVITIES_DEFAULT_PAGE_SIZE: u32 = 100;

pub mod base_url {
    pub const TRADING_PAPER: &str = "https://paper-api.alpaca.markets";
    pub const TRADING_LIVE: &str = "https://api.alpaca.markets";
    pub const BROKER_SANDBOX: &str = "https://broker-api.sandbox.alpaca.markets";
    pub const BROKER_PRODUCTION: &str = "https://broker-api.alpaca.markets";
    pub const DATA: &str = "https://data.alpaca.markets";
    pub const DATA_SANDBOX: &str = "https://data.sandbox.alpaca.markets";
    pub const TRADING_STREAM_PAPER: &str = "wss://paper-api.alpaca.markets/stream";
    pub const TRADING_STREAM_LIVE: &str = "wss://api.alpaca.markets/stream";
    pub const MARKET_DATA_STREAM: &str = "wss://stream.data.alpaca.markets";
}

#[derive(Clone)]
pub struct RestClient {
    http: Client,
    api_key: Option<String>,
    secret_key: Option<String>,
    oauth_token: Option<String>,
    pub(crate) base_url: String,
    pub(crate) api_version: String,
    use_basic_auth: bool,
    retry_attempts: u32,
    retry_wait_secs: u64,
    retry_status_codes: Vec<u16>,
}

impl RestClient {
    pub fn new(
        api_key: Option<String>,
        secret_key: Option<String>,
        oauth_token: Option<String>,
        base_url: String,
        api_version: String,
        use_basic_auth: bool,
    ) -> Result<Self, AlpacaError> {
        if oauth_token.is_none() {
            match (&api_key, &secret_key) {
                (None, _) => {
                    return Err(AlpacaError::InvalidCredentials(
                        "Either oauth_token or api_key+secret_key must be provided".into(),
                    ))
                }
                (Some(_), None) => {
                    return Err(AlpacaError::InvalidCredentials(
                        "secret_key is required when api_key is provided".into(),
                    ))
                }
                _ => {}
            }
        }
        Ok(Self {
            http: Client::new(),
            api_key,
            secret_key,
            oauth_token,
            base_url,
            api_version,
            use_basic_auth,
            retry_attempts: DEFAULT_RETRY_ATTEMPTS,
            retry_wait_secs: DEFAULT_RETRY_WAIT_SECS,
            retry_status_codes: DEFAULT_RETRY_STATUS_CODES.to_vec(),
        })
    }

    pub fn api_key(&self) -> Option<&str> {
        self.api_key.as_deref()
    }

    pub fn secret_key(&self) -> Option<&str> {
        self.secret_key.as_deref()
    }

    pub fn base_url(&self) -> &str {
        &self.base_url
    }

    pub fn build_url(&self, path: &str) -> String {
        format!("{}/{}{}", self.base_url, self.api_version, path)
    }

    fn auth_headers(&self) -> header::HeaderMap {
        let mut headers = header::HeaderMap::new();
        if let Some(token) = &self.oauth_token {
            if let Ok(v) = format!("Bearer {}", token).parse() {
                headers.insert(header::AUTHORIZATION, v);
            }
        } else if self.use_basic_auth {
            if let (Some(key), Some(secret)) = (&self.api_key, &self.secret_key) {
                use base64::Engine;
                let encoded = base64::engine::general_purpose::STANDARD
                    .encode(format!("{}:{}", key, secret));
                if let Ok(v) = format!("Basic {}", encoded).parse() {
                    headers.insert(header::AUTHORIZATION, v);
                }
            }
        } else {
            if let Some(key) = &self.api_key {
                if let Ok(v) = key.parse() {
                    headers.insert("APCA-API-KEY-ID", v);
                }
            }
            if let Some(secret) = &self.secret_key {
                if let Ok(v) = secret.parse() {
                    headers.insert("APCA-API-SECRET-KEY", v);
                }
            }
        }
        headers
    }

    fn query_pairs(val: &serde_json::Value) -> Vec<(String, String)> {
        let obj = match val.as_object() {
            Some(o) => o,
            None => return vec![],
        };
        obj.iter()
            .filter_map(|(k, v)| match v {
                serde_json::Value::Null => None,
                serde_json::Value::Bool(b) => Some((k.clone(), b.to_string())),
                serde_json::Value::Number(n) => Some((k.clone(), n.to_string())),
                serde_json::Value::String(s) if s.is_empty() => None,
                serde_json::Value::String(s) => Some((k.clone(), s.clone())),
                serde_json::Value::Array(arr) => {
                    let joined = arr
                        .iter()
                        .filter_map(|v| match v {
                            serde_json::Value::String(s) => Some(s.clone()),
                            other => Some(other.to_string()),
                        })
                        .collect::<Vec<_>>()
                        .join(",");
                    if joined.is_empty() {
                        None
                    } else {
                        Some((k.clone(), joined))
                    }
                }
                other => Some((k.clone(), other.to_string())),
            })
            .collect()
    }

    async fn execute_raw(
        &self,
        method: Method,
        url: &str,
        query: Option<serde_json::Value>,
        body: Option<serde_json::Value>,
    ) -> Result<serde_json::Value, AlpacaError> {
        let mut attempts = 0u32;
        loop {
            debug!(method = %method, url, attempt = attempts, "sending request");
            let mut req = self.http.request(method.clone(), url);
            req = req.headers(self.auth_headers());

            if let Some(ref q) = query {
                req = req.query(&Self::query_pairs(q));
            }
            if let Some(ref b) = body {
                req = req.json(b);
            }

            let resp = req.send().await?;
            let status = resp.status();
            let status_code = status.as_u16();

            if self.retry_status_codes.contains(&status_code)
                && attempts < self.retry_attempts
            {
                attempts += 1;
                let exp = attempts.saturating_sub(1).min(6) as u32;
                let backoff = self
                    .retry_wait_secs
                    .saturating_mul(1u64 << exp)
                    .min(60);
                warn!(status = status_code, attempt = attempts, wait_secs = backoff, "retryable error, backing off");
                sleep(Duration::from_secs(backoff)).await;
                continue;
            }

            if status == StatusCode::NO_CONTENT {
                return Ok(serde_json::Value::Null);
            }

            let text = resp.text().await?;

            if !status.is_success() {
                let parsed: serde_json::Value =
                    serde_json::from_str(&text).unwrap_or(serde_json::Value::Null);
                let message = parsed["message"]
                    .as_str()
                    .unwrap_or(&text)
                    .to_string();
                let code = parsed["code"].as_u64().unwrap_or(0) as u32;
                warn!(status = status_code, code, message, body = %text, "API error");
                return Err(AlpacaError::Api { status_code, code, message });
            }

            if text.is_empty() {
                return Ok(serde_json::Value::Null);
            }

            return Ok(serde_json::from_str(&text)?);
        }
    }

    pub async fn get<Q, R>(&self, path: &str, query: Option<&Q>) -> Result<R, AlpacaError>
    where
        Q: Serialize,
        R: DeserializeOwned,
    {
        let url = self.build_url(path);
        let q = query.map(serde_json::to_value).transpose()?;
        let val = self.execute_raw(Method::GET, &url, q, None).await?;
        Ok(serde_json::from_value(val)?)
    }

    pub async fn get_raw<Q>(&self, path: &str, query: Option<&Q>) -> Result<serde_json::Value, AlpacaError>
    where
        Q: Serialize,
    {
        let url = self.build_url(path);
        let q = query.map(serde_json::to_value).transpose()?;
        self.execute_raw(Method::GET, &url, q, None).await
    }

    pub async fn post<B, R>(&self, path: &str, body: Option<&B>) -> Result<R, AlpacaError>
    where
        B: Serialize,
        R: DeserializeOwned,
    {
        let url = self.build_url(path);
        let b = body.map(serde_json::to_value).transpose()?;
        let val = self.execute_raw(Method::POST, &url, None, b).await?;
        Ok(serde_json::from_value(val)?)
    }

    pub async fn post_void<B>(&self, path: &str, body: Option<&B>) -> Result<(), AlpacaError>
    where
        B: Serialize + ?Sized,
    {
        let url = self.build_url(path);
        let b = body.map(serde_json::to_value).transpose()?;
        self.execute_raw(Method::POST, &url, None, b).await?;
        Ok(())
    }

    pub async fn patch<B, R>(&self, path: &str, body: Option<&B>) -> Result<R, AlpacaError>
    where
        B: Serialize,
        R: DeserializeOwned,
    {
        let url = self.build_url(path);
        let b = body.map(serde_json::to_value).transpose()?;
        let val = self.execute_raw(Method::PATCH, &url, None, b).await?;
        Ok(serde_json::from_value(val)?)
    }

    pub async fn put<B, R>(&self, path: &str, body: Option<&B>) -> Result<R, AlpacaError>
    where
        B: Serialize,
        R: DeserializeOwned,
    {
        let url = self.build_url(path);
        let b = body.map(serde_json::to_value).transpose()?;
        let val = self.execute_raw(Method::PUT, &url, None, b).await?;
        Ok(serde_json::from_value(val)?)
    }

    pub async fn delete_void<Q>(&self, path: &str, query: Option<&Q>) -> Result<(), AlpacaError>
    where
        Q: Serialize,
    {
        let url = self.build_url(path);
        let q = query.map(serde_json::to_value).transpose()?;
        self.execute_raw(Method::DELETE, &url, q, None).await?;
        Ok(())
    }

    pub async fn delete<Q, R>(&self, path: &str, query: Option<&Q>) -> Result<R, AlpacaError>
    where
        Q: Serialize,
        R: DeserializeOwned,
    {
        let url = self.build_url(path);
        let q = query.map(serde_json::to_value).transpose()?;
        let val = self.execute_raw(Method::DELETE, &url, q, None).await?;
        Ok(serde_json::from_value(val)?)
    }

    pub async fn delete_with_body<B, R>(&self, path: &str, body: Option<&B>) -> Result<R, AlpacaError>
    where
        B: Serialize,
        R: DeserializeOwned,
    {
        let url = self.build_url(path);
        let b = body.map(serde_json::to_value).transpose()?;
        let val = self.execute_raw(Method::DELETE, &url, None, b).await?;
        Ok(serde_json::from_value(val)?)
    }
}