Skip to main content

alpaca_data/stocks/
client.rs

1use std::sync::Arc;
2
3use crate::{
4    Error,
5    client::Inner,
6    common::response::ResponseStream,
7    transport::endpoint::Endpoint,
8    transport::pagination::{collect_all, stream_pages},
9};
10
11use super::{
12    AuctionsRequest, AuctionsResponse, AuctionsSingleRequest, AuctionsSingleResponse, BarsRequest,
13    BarsResponse, BarsSingleRequest, BarsSingleResponse, ConditionCodesRequest,
14    ConditionCodesResponse, ExchangeCodesResponse, LatestBarRequest, LatestBarResponse,
15    LatestBarsRequest, LatestBarsResponse, LatestQuoteRequest, LatestQuoteResponse,
16    LatestQuotesRequest, LatestQuotesResponse, LatestTradeRequest, LatestTradeResponse,
17    LatestTradesRequest, LatestTradesResponse, QuotesRequest, QuotesResponse, QuotesSingleRequest,
18    QuotesSingleResponse, SnapshotRequest, SnapshotResponse, SnapshotsRequest, SnapshotsResponse,
19    TradesRequest, TradesResponse, TradesSingleRequest, TradesSingleResponse,
20};
21
22#[derive(Clone, Debug)]
23pub struct StocksClient {
24    inner: Arc<Inner>,
25}
26
27impl StocksClient {
28    pub(crate) fn new(inner: Arc<Inner>) -> Self {
29        Self { inner }
30    }
31
32    pub async fn bars(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
33        self.ensure_credentials()?;
34        self.inner
35            .http
36            .get_json(
37                &self.inner.base_url,
38                Endpoint::StocksBars,
39                &self.inner.auth,
40                request.to_query(),
41            )
42            .await
43    }
44
45    pub async fn auctions(&self, request: AuctionsRequest) -> Result<AuctionsResponse, Error> {
46        self.ensure_credentials()?;
47        self.inner
48            .http
49            .get_json(
50                &self.inner.base_url,
51                Endpoint::StocksAuctions,
52                &self.inner.auth,
53                request.to_query(),
54            )
55            .await
56    }
57
58    pub async fn auctions_all(&self, request: AuctionsRequest) -> Result<AuctionsResponse, Error> {
59        self.ensure_credentials()?;
60        let client = self.clone();
61
62        collect_all(request, move |request| {
63            let client = client.clone();
64            async move { client.auctions(request).await }
65        })
66        .await
67    }
68
69    pub async fn auctions_single(
70        &self,
71        request: AuctionsSingleRequest,
72    ) -> Result<AuctionsSingleResponse, Error> {
73        self.ensure_credentials()?;
74        let endpoint = Endpoint::StocksAuctionsSingle {
75            symbol: request.symbol.clone(),
76        };
77        self.inner
78            .http
79            .get_json(
80                &self.inner.base_url,
81                endpoint,
82                &self.inner.auth,
83                request.to_query(),
84            )
85            .await
86    }
87
88    pub async fn auctions_single_all(
89        &self,
90        request: AuctionsSingleRequest,
91    ) -> Result<AuctionsSingleResponse, Error> {
92        self.ensure_credentials()?;
93        let client = self.clone();
94
95        collect_all(request, move |request| {
96            let client = client.clone();
97            async move { client.auctions_single(request).await }
98        })
99        .await
100    }
101
102    pub fn auctions_stream(
103        &self,
104        request: AuctionsRequest,
105    ) -> ResponseStream<Result<AuctionsResponse, Error>> {
106        if let Err(error) = self.ensure_credentials() {
107            return Self::error_stream(error);
108        }
109
110        let client = self.clone();
111        stream_pages(request, move |request| {
112            let client = client.clone();
113            async move { client.auctions(request).await }
114        })
115    }
116
117    pub fn auctions_single_stream(
118        &self,
119        request: AuctionsSingleRequest,
120    ) -> ResponseStream<Result<AuctionsSingleResponse, Error>> {
121        if let Err(error) = self.ensure_credentials() {
122            return Self::error_stream(error);
123        }
124
125        let client = self.clone();
126        stream_pages(request, move |request| {
127            let client = client.clone();
128            async move { client.auctions_single(request).await }
129        })
130    }
131
132    pub async fn bars_all(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
133        self.ensure_credentials()?;
134        let client = self.clone();
135
136        collect_all(request, move |request| {
137            let client = client.clone();
138            async move { client.bars(request).await }
139        })
140        .await
141    }
142
143    pub async fn bars_single(
144        &self,
145        request: BarsSingleRequest,
146    ) -> Result<BarsSingleResponse, Error> {
147        self.ensure_credentials()?;
148        let endpoint = Endpoint::StocksBarsSingle {
149            symbol: request.symbol.clone(),
150        };
151        self.inner
152            .http
153            .get_json(
154                &self.inner.base_url,
155                endpoint,
156                &self.inner.auth,
157                request.to_query(),
158            )
159            .await
160    }
161
162    pub async fn bars_single_all(
163        &self,
164        request: BarsSingleRequest,
165    ) -> Result<BarsSingleResponse, Error> {
166        self.ensure_credentials()?;
167        let client = self.clone();
168
169        collect_all(request, move |request| {
170            let client = client.clone();
171            async move { client.bars_single(request).await }
172        })
173        .await
174    }
175
176    pub fn bars_stream(&self, request: BarsRequest) -> ResponseStream<Result<BarsResponse, Error>> {
177        if let Err(error) = self.ensure_credentials() {
178            return Self::error_stream(error);
179        }
180
181        let client = self.clone();
182        stream_pages(request, move |request| {
183            let client = client.clone();
184            async move { client.bars(request).await }
185        })
186    }
187
188    pub fn bars_single_stream(
189        &self,
190        request: BarsSingleRequest,
191    ) -> ResponseStream<Result<BarsSingleResponse, Error>> {
192        if let Err(error) = self.ensure_credentials() {
193            return Self::error_stream(error);
194        }
195
196        let client = self.clone();
197        stream_pages(request, move |request| {
198            let client = client.clone();
199            async move { client.bars_single(request).await }
200        })
201    }
202
203    pub async fn quotes(&self, request: QuotesRequest) -> Result<QuotesResponse, Error> {
204        self.ensure_credentials()?;
205        self.inner
206            .http
207            .get_json(
208                &self.inner.base_url,
209                Endpoint::StocksQuotes,
210                &self.inner.auth,
211                request.to_query(),
212            )
213            .await
214    }
215
216    pub async fn quotes_all(&self, request: QuotesRequest) -> Result<QuotesResponse, Error> {
217        self.ensure_credentials()?;
218        let client = self.clone();
219
220        collect_all(request, move |request| {
221            let client = client.clone();
222            async move { client.quotes(request).await }
223        })
224        .await
225    }
226
227    pub async fn quotes_single(
228        &self,
229        request: QuotesSingleRequest,
230    ) -> Result<QuotesSingleResponse, Error> {
231        self.ensure_credentials()?;
232        let endpoint = Endpoint::StocksQuotesSingle {
233            symbol: request.symbol.clone(),
234        };
235        self.inner
236            .http
237            .get_json(
238                &self.inner.base_url,
239                endpoint,
240                &self.inner.auth,
241                request.to_query(),
242            )
243            .await
244    }
245
246    pub async fn quotes_single_all(
247        &self,
248        request: QuotesSingleRequest,
249    ) -> Result<QuotesSingleResponse, Error> {
250        self.ensure_credentials()?;
251        let client = self.clone();
252
253        collect_all(request, move |request| {
254            let client = client.clone();
255            async move { client.quotes_single(request).await }
256        })
257        .await
258    }
259
260    pub fn quotes_stream(
261        &self,
262        request: QuotesRequest,
263    ) -> ResponseStream<Result<QuotesResponse, Error>> {
264        if let Err(error) = self.ensure_credentials() {
265            return Self::error_stream(error);
266        }
267
268        let client = self.clone();
269        stream_pages(request, move |request| {
270            let client = client.clone();
271            async move { client.quotes(request).await }
272        })
273    }
274
275    pub fn quotes_single_stream(
276        &self,
277        request: QuotesSingleRequest,
278    ) -> ResponseStream<Result<QuotesSingleResponse, Error>> {
279        if let Err(error) = self.ensure_credentials() {
280            return Self::error_stream(error);
281        }
282
283        let client = self.clone();
284        stream_pages(request, move |request| {
285            let client = client.clone();
286            async move { client.quotes_single(request).await }
287        })
288    }
289
290    pub async fn trades(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
291        self.ensure_credentials()?;
292        self.inner
293            .http
294            .get_json(
295                &self.inner.base_url,
296                Endpoint::StocksTrades,
297                &self.inner.auth,
298                request.to_query(),
299            )
300            .await
301    }
302
303    pub async fn trades_all(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
304        self.ensure_credentials()?;
305        let client = self.clone();
306
307        collect_all(request, move |request| {
308            let client = client.clone();
309            async move { client.trades(request).await }
310        })
311        .await
312    }
313
314    pub async fn trades_single(
315        &self,
316        request: TradesSingleRequest,
317    ) -> Result<TradesSingleResponse, Error> {
318        self.ensure_credentials()?;
319        let endpoint = Endpoint::StocksTradesSingle {
320            symbol: request.symbol.clone(),
321        };
322        self.inner
323            .http
324            .get_json(
325                &self.inner.base_url,
326                endpoint,
327                &self.inner.auth,
328                request.to_query(),
329            )
330            .await
331    }
332
333    pub async fn trades_single_all(
334        &self,
335        request: TradesSingleRequest,
336    ) -> Result<TradesSingleResponse, Error> {
337        self.ensure_credentials()?;
338        let client = self.clone();
339
340        collect_all(request, move |request| {
341            let client = client.clone();
342            async move { client.trades_single(request).await }
343        })
344        .await
345    }
346
347    pub fn trades_stream(
348        &self,
349        request: TradesRequest,
350    ) -> ResponseStream<Result<TradesResponse, Error>> {
351        if let Err(error) = self.ensure_credentials() {
352            return Self::error_stream(error);
353        }
354
355        let client = self.clone();
356        stream_pages(request, move |request| {
357            let client = client.clone();
358            async move { client.trades(request).await }
359        })
360    }
361
362    pub fn trades_single_stream(
363        &self,
364        request: TradesSingleRequest,
365    ) -> ResponseStream<Result<TradesSingleResponse, Error>> {
366        if let Err(error) = self.ensure_credentials() {
367            return Self::error_stream(error);
368        }
369
370        let client = self.clone();
371        stream_pages(request, move |request| {
372            let client = client.clone();
373            async move { client.trades_single(request).await }
374        })
375    }
376
377    pub async fn latest_bars(
378        &self,
379        request: LatestBarsRequest,
380    ) -> Result<LatestBarsResponse, Error> {
381        self.ensure_credentials()?;
382        self.inner
383            .http
384            .get_json(
385                &self.inner.base_url,
386                Endpoint::StocksLatestBars,
387                &self.inner.auth,
388                request.to_query(),
389            )
390            .await
391    }
392
393    pub async fn latest_bar(&self, request: LatestBarRequest) -> Result<LatestBarResponse, Error> {
394        self.ensure_credentials()?;
395        let endpoint = Endpoint::StocksLatestBar {
396            symbol: request.symbol.clone(),
397        };
398        self.inner
399            .http
400            .get_json(
401                &self.inner.base_url,
402                endpoint,
403                &self.inner.auth,
404                request.to_query(),
405            )
406            .await
407    }
408
409    pub async fn latest_quotes(
410        &self,
411        request: LatestQuotesRequest,
412    ) -> Result<LatestQuotesResponse, Error> {
413        self.ensure_credentials()?;
414        self.inner
415            .http
416            .get_json(
417                &self.inner.base_url,
418                Endpoint::StocksLatestQuotes,
419                &self.inner.auth,
420                request.to_query(),
421            )
422            .await
423    }
424
425    pub async fn latest_quote(
426        &self,
427        request: LatestQuoteRequest,
428    ) -> Result<LatestQuoteResponse, Error> {
429        self.ensure_credentials()?;
430        let endpoint = Endpoint::StocksLatestQuote {
431            symbol: request.symbol.clone(),
432        };
433        self.inner
434            .http
435            .get_json(
436                &self.inner.base_url,
437                endpoint,
438                &self.inner.auth,
439                request.to_query(),
440            )
441            .await
442    }
443
444    pub async fn latest_trades(
445        &self,
446        request: LatestTradesRequest,
447    ) -> Result<LatestTradesResponse, Error> {
448        self.ensure_credentials()?;
449        self.inner
450            .http
451            .get_json(
452                &self.inner.base_url,
453                Endpoint::StocksLatestTrades,
454                &self.inner.auth,
455                request.to_query(),
456            )
457            .await
458    }
459
460    pub async fn latest_trade(
461        &self,
462        request: LatestTradeRequest,
463    ) -> Result<LatestTradeResponse, Error> {
464        self.ensure_credentials()?;
465        let endpoint = Endpoint::StocksLatestTrade {
466            symbol: request.symbol.clone(),
467        };
468        self.inner
469            .http
470            .get_json(
471                &self.inner.base_url,
472                endpoint,
473                &self.inner.auth,
474                request.to_query(),
475            )
476            .await
477    }
478
479    pub async fn snapshots(&self, request: SnapshotsRequest) -> Result<SnapshotsResponse, Error> {
480        self.ensure_credentials()?;
481        self.inner
482            .http
483            .get_json(
484                &self.inner.base_url,
485                Endpoint::StocksSnapshots,
486                &self.inner.auth,
487                request.to_query(),
488            )
489            .await
490    }
491
492    pub async fn snapshot(&self, request: SnapshotRequest) -> Result<SnapshotResponse, Error> {
493        self.ensure_credentials()?;
494        let endpoint = Endpoint::StocksSnapshot {
495            symbol: request.symbol.clone(),
496        };
497        self.inner
498            .http
499            .get_json(
500                &self.inner.base_url,
501                endpoint,
502                &self.inner.auth,
503                request.to_query(),
504            )
505            .await
506    }
507
508    pub async fn condition_codes(
509        &self,
510        request: ConditionCodesRequest,
511    ) -> Result<ConditionCodesResponse, Error> {
512        self.ensure_credentials()?;
513        let ticktype = request.ticktype.as_str();
514        let query = request.to_query();
515        let endpoint = Endpoint::StocksConditionCodes { ticktype };
516        self.inner
517            .http
518            .get_json(&self.inner.base_url, endpoint, &self.inner.auth, query)
519            .await
520    }
521
522    pub async fn exchange_codes(&self) -> Result<ExchangeCodesResponse, Error> {
523        self.ensure_credentials()?;
524        self.inner
525            .http
526            .get_json(
527                &self.inner.base_url,
528                Endpoint::StocksExchangeCodes,
529                &self.inner.auth,
530                Vec::new(),
531            )
532            .await
533    }
534
535    fn ensure_credentials(&self) -> Result<(), Error> {
536        if self.inner.auth.has_credentials() {
537            Ok(())
538        } else {
539            Err(Error::MissingCredentials)
540        }
541    }
542
543    fn error_stream<Response>(error: Error) -> ResponseStream<Result<Response, Error>>
544    where
545        Response: Send + 'static,
546    {
547        Box::pin(futures_util::stream::once(async move { Err(error) }))
548    }
549}