alpaca-data 0.10.2

High-performance Rust client for Alpaca Market Data API
Documentation
use std::sync::Arc;

use crate::{
    Error,
    client::Inner,
    common::response::ResponseStream,
    transport::endpoint::Endpoint,
    transport::pagination::{collect_all, stream_pages},
};

use super::{
    BarsRequest, BarsResponse, ChainRequest, ChainResponse, ConditionCodesRequest,
    ConditionCodesResponse, ExchangeCodesResponse, LatestQuotesRequest, LatestQuotesResponse,
    LatestTradesRequest, LatestTradesResponse, SnapshotsRequest, SnapshotsResponse, TradesRequest,
    TradesResponse,
};

#[derive(Clone, Debug)]
pub struct OptionsClient {
    inner: Arc<Inner>,
}

impl OptionsClient {
    pub(crate) fn new(inner: Arc<Inner>) -> Self {
        Self { inner }
    }

    pub async fn bars(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
        self.ensure_credentials()?;
        request.validate()?;
        self.inner
            .http
            .get_json(
                &self.inner.base_url,
                Endpoint::OptionsBars,
                &self.inner.auth,
                request.to_query(),
            )
            .await
    }

    pub async fn bars_all(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
        self.ensure_credentials()?;
        let client = self.clone();

        collect_all(request, move |request| {
            let client = client.clone();
            async move { client.bars(request).await }
        })
        .await
    }

    pub fn bars_stream(&self, request: BarsRequest) -> ResponseStream<Result<BarsResponse, Error>> {
        if let Err(error) = self.ensure_credentials() {
            return Self::error_stream(error);
        }

        let client = self.clone();
        stream_pages(request, move |request| {
            let client = client.clone();
            async move { client.bars(request).await }
        })
    }

    pub async fn trades(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
        self.ensure_credentials()?;
        request.validate()?;
        self.inner
            .http
            .get_json(
                &self.inner.base_url,
                Endpoint::OptionsTrades,
                &self.inner.auth,
                request.to_query(),
            )
            .await
    }

    pub async fn trades_all(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
        self.ensure_credentials()?;
        let client = self.clone();

        collect_all(request, move |request| {
            let client = client.clone();
            async move { client.trades(request).await }
        })
        .await
    }

    pub fn trades_stream(
        &self,
        request: TradesRequest,
    ) -> ResponseStream<Result<TradesResponse, Error>> {
        if let Err(error) = self.ensure_credentials() {
            return Self::error_stream(error);
        }

        let client = self.clone();
        stream_pages(request, move |request| {
            let client = client.clone();
            async move { client.trades(request).await }
        })
    }

    pub async fn latest_quotes(
        &self,
        request: LatestQuotesRequest,
    ) -> Result<LatestQuotesResponse, Error> {
        self.ensure_credentials()?;
        request.validate()?;
        self.inner
            .http
            .get_json(
                &self.inner.base_url,
                Endpoint::OptionsLatestQuotes,
                &self.inner.auth,
                request.to_query(),
            )
            .await
    }

    pub async fn latest_trades(
        &self,
        request: LatestTradesRequest,
    ) -> Result<LatestTradesResponse, Error> {
        self.ensure_credentials()?;
        request.validate()?;
        self.inner
            .http
            .get_json(
                &self.inner.base_url,
                Endpoint::OptionsLatestTrades,
                &self.inner.auth,
                request.to_query(),
            )
            .await
    }

    pub async fn snapshots(&self, request: SnapshotsRequest) -> Result<SnapshotsResponse, Error> {
        self.ensure_credentials()?;
        request.validate()?;
        self.inner
            .http
            .get_json(
                &self.inner.base_url,
                Endpoint::OptionsSnapshots,
                &self.inner.auth,
                request.to_query(),
            )
            .await
    }

    pub async fn snapshots_all(
        &self,
        request: SnapshotsRequest,
    ) -> Result<SnapshotsResponse, Error> {
        self.ensure_credentials()?;
        let client = self.clone();

        collect_all(request, move |request| {
            let client = client.clone();
            async move { client.snapshots(request).await }
        })
        .await
    }

    pub fn snapshots_stream(
        &self,
        request: SnapshotsRequest,
    ) -> ResponseStream<Result<SnapshotsResponse, Error>> {
        if let Err(error) = self.ensure_credentials() {
            return Self::error_stream(error);
        }

        let client = self.clone();
        stream_pages(request, move |request| {
            let client = client.clone();
            async move { client.snapshots(request).await }
        })
    }

    pub async fn chain(&self, request: ChainRequest) -> Result<ChainResponse, Error> {
        self.ensure_credentials()?;
        request.validate()?;
        let endpoint = Endpoint::OptionsChain {
            underlying_symbol: request.underlying_symbol.clone(),
        };
        self.inner
            .http
            .get_json(
                &self.inner.base_url,
                endpoint,
                &self.inner.auth,
                request.to_query(),
            )
            .await
    }

    pub async fn chain_all(&self, request: ChainRequest) -> Result<ChainResponse, Error> {
        self.ensure_credentials()?;
        let client = self.clone();

        collect_all(request, move |request| {
            let client = client.clone();
            async move { client.chain(request).await }
        })
        .await
    }

    pub fn chain_stream(
        &self,
        request: ChainRequest,
    ) -> ResponseStream<Result<ChainResponse, Error>> {
        if let Err(error) = self.ensure_credentials() {
            return Self::error_stream(error);
        }

        let client = self.clone();
        stream_pages(request, move |request| {
            let client = client.clone();
            async move { client.chain(request).await }
        })
    }

    pub async fn exchange_codes(&self) -> Result<ExchangeCodesResponse, Error> {
        self.ensure_credentials()?;
        self.inner
            .http
            .get_json(
                &self.inner.base_url,
                Endpoint::OptionsExchangeCodes,
                &self.inner.auth,
                Vec::new(),
            )
            .await
    }

    pub async fn condition_codes(
        &self,
        request: ConditionCodesRequest,
    ) -> Result<ConditionCodesResponse, Error> {
        self.ensure_credentials()?;
        let endpoint = Endpoint::OptionsConditionCodes {
            ticktype: request.ticktype(),
        };
        self.inner
            .http
            .get_json(&self.inner.base_url, endpoint, &self.inner.auth, Vec::new())
            .await
    }

    fn ensure_credentials(&self) -> Result<(), Error> {
        if self.inner.auth.has_credentials() {
            Ok(())
        } else {
            Err(Error::MissingCredentials)
        }
    }

    fn error_stream<Response>(error: Error) -> ResponseStream<Result<Response, Error>>
    where
        Response: Send + 'static,
    {
        Box::pin(futures_util::stream::once(async move { Err(error) }))
    }
}