Skip to main content

alpaca_data/options/
client.rs

1use std::sync::Arc;
2
3use crate::{
4    Error,
5    client::Inner,
6    common::response::ResponseStream,
7    transport::endpoint::Endpoint,
8    transport::pagination::{collect_all, stream_pages},
9};
10
11use super::{
12    BarsRequest, BarsResponse, ChainRequest, ChainResponse, ConditionCodesRequest,
13    ConditionCodesResponse, ExchangeCodesResponse, LatestQuotesRequest, LatestQuotesResponse,
14    LatestTradesRequest, LatestTradesResponse, SnapshotsRequest, SnapshotsResponse, TradesRequest,
15    TradesResponse,
16};
17
18#[derive(Clone, Debug)]
19pub struct OptionsClient {
20    inner: Arc<Inner>,
21}
22
23impl OptionsClient {
24    pub(crate) fn new(inner: Arc<Inner>) -> Self {
25        Self { inner }
26    }
27
28    pub async fn bars(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
29        self.ensure_credentials()?;
30        self.inner
31            .http
32            .get_json(
33                &self.inner.base_url,
34                Endpoint::OptionsBars,
35                &self.inner.auth,
36                request.to_query(),
37            )
38            .await
39    }
40
41    pub async fn bars_all(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
42        self.ensure_credentials()?;
43        let client = self.clone();
44
45        collect_all(request, move |request| {
46            let client = client.clone();
47            async move { client.bars(request).await }
48        })
49        .await
50    }
51
52    pub fn bars_stream(&self, request: BarsRequest) -> ResponseStream<Result<BarsResponse, Error>> {
53        if let Err(error) = self.ensure_credentials() {
54            return Self::error_stream(error);
55        }
56
57        let client = self.clone();
58        stream_pages(request, move |request| {
59            let client = client.clone();
60            async move { client.bars(request).await }
61        })
62    }
63
64    pub async fn trades(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
65        self.ensure_credentials()?;
66        self.inner
67            .http
68            .get_json(
69                &self.inner.base_url,
70                Endpoint::OptionsTrades,
71                &self.inner.auth,
72                request.to_query(),
73            )
74            .await
75    }
76
77    pub async fn trades_all(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
78        self.ensure_credentials()?;
79        let client = self.clone();
80
81        collect_all(request, move |request| {
82            let client = client.clone();
83            async move { client.trades(request).await }
84        })
85        .await
86    }
87
88    pub fn trades_stream(
89        &self,
90        request: TradesRequest,
91    ) -> ResponseStream<Result<TradesResponse, Error>> {
92        if let Err(error) = self.ensure_credentials() {
93            return Self::error_stream(error);
94        }
95
96        let client = self.clone();
97        stream_pages(request, move |request| {
98            let client = client.clone();
99            async move { client.trades(request).await }
100        })
101    }
102
103    pub async fn latest_quotes(
104        &self,
105        request: LatestQuotesRequest,
106    ) -> Result<LatestQuotesResponse, Error> {
107        self.ensure_credentials()?;
108        self.inner
109            .http
110            .get_json(
111                &self.inner.base_url,
112                Endpoint::OptionsLatestQuotes,
113                &self.inner.auth,
114                request.to_query(),
115            )
116            .await
117    }
118
119    pub async fn latest_trades(
120        &self,
121        request: LatestTradesRequest,
122    ) -> Result<LatestTradesResponse, Error> {
123        self.ensure_credentials()?;
124        self.inner
125            .http
126            .get_json(
127                &self.inner.base_url,
128                Endpoint::OptionsLatestTrades,
129                &self.inner.auth,
130                request.to_query(),
131            )
132            .await
133    }
134
135    pub async fn snapshots(&self, request: SnapshotsRequest) -> Result<SnapshotsResponse, Error> {
136        self.ensure_credentials()?;
137        self.inner
138            .http
139            .get_json(
140                &self.inner.base_url,
141                Endpoint::OptionsSnapshots,
142                &self.inner.auth,
143                request.to_query(),
144            )
145            .await
146    }
147
148    pub async fn snapshots_all(
149        &self,
150        request: SnapshotsRequest,
151    ) -> Result<SnapshotsResponse, Error> {
152        self.ensure_credentials()?;
153        let client = self.clone();
154
155        collect_all(request, move |request| {
156            let client = client.clone();
157            async move { client.snapshots(request).await }
158        })
159        .await
160    }
161
162    pub fn snapshots_stream(
163        &self,
164        request: SnapshotsRequest,
165    ) -> ResponseStream<Result<SnapshotsResponse, Error>> {
166        if let Err(error) = self.ensure_credentials() {
167            return Self::error_stream(error);
168        }
169
170        let client = self.clone();
171        stream_pages(request, move |request| {
172            let client = client.clone();
173            async move { client.snapshots(request).await }
174        })
175    }
176
177    pub async fn chain(&self, request: ChainRequest) -> Result<ChainResponse, Error> {
178        self.ensure_credentials()?;
179        let endpoint = Endpoint::OptionsChain {
180            underlying_symbol: request.underlying_symbol.clone(),
181        };
182        self.inner
183            .http
184            .get_json(
185                &self.inner.base_url,
186                endpoint,
187                &self.inner.auth,
188                request.to_query(),
189            )
190            .await
191    }
192
193    pub async fn chain_all(&self, request: ChainRequest) -> Result<ChainResponse, Error> {
194        self.ensure_credentials()?;
195        let client = self.clone();
196
197        collect_all(request, move |request| {
198            let client = client.clone();
199            async move { client.chain(request).await }
200        })
201        .await
202    }
203
204    pub fn chain_stream(
205        &self,
206        request: ChainRequest,
207    ) -> ResponseStream<Result<ChainResponse, Error>> {
208        if let Err(error) = self.ensure_credentials() {
209            return Self::error_stream(error);
210        }
211
212        let client = self.clone();
213        stream_pages(request, move |request| {
214            let client = client.clone();
215            async move { client.chain(request).await }
216        })
217    }
218
219    pub async fn exchange_codes(&self) -> Result<ExchangeCodesResponse, Error> {
220        self.ensure_credentials()?;
221        self.inner
222            .http
223            .get_json(
224                &self.inner.base_url,
225                Endpoint::OptionsExchangeCodes,
226                &self.inner.auth,
227                Vec::new(),
228            )
229            .await
230    }
231
232    pub async fn condition_codes(
233        &self,
234        request: ConditionCodesRequest,
235    ) -> Result<ConditionCodesResponse, Error> {
236        self.ensure_credentials()?;
237        let endpoint = Endpoint::OptionsConditionCodes {
238            ticktype: request.ticktype(),
239        };
240        self.inner
241            .http
242            .get_json(&self.inner.base_url, endpoint, &self.inner.auth, Vec::new())
243            .await
244    }
245
246    fn ensure_credentials(&self) -> Result<(), Error> {
247        if self.inner.auth.has_credentials() {
248            Ok(())
249        } else {
250            Err(Error::MissingCredentials)
251        }
252    }
253
254    fn error_stream<Response>(error: Error) -> ResponseStream<Result<Response, Error>>
255    where
256        Response: Send + 'static,
257    {
258        Box::pin(futures_util::stream::once(async move { Err(error) }))
259    }
260}