Skip to main content

alpaca_data/stocks/
client.rs

1use std::sync::Arc;
2
3use crate::{
4    Error,
5    client::Inner,
6    common::response::ResponseStream,
7    transport::endpoint::Endpoint,
8    transport::pagination::{collect_all, stream_pages},
9};
10
11use super::{
12    AuctionsRequest, AuctionsResponse, AuctionsSingleRequest, AuctionsSingleResponse, BarsRequest,
13    BarsResponse, BarsSingleRequest, BarsSingleResponse, ConditionCodesRequest,
14    ConditionCodesResponse, ExchangeCodesResponse, LatestBarRequest, LatestBarResponse,
15    LatestBarsRequest, LatestBarsResponse, LatestQuoteRequest, LatestQuoteResponse,
16    LatestQuotesRequest, LatestQuotesResponse, LatestTradeRequest, LatestTradeResponse,
17    LatestTradesRequest, LatestTradesResponse, QuotesRequest, QuotesResponse, QuotesSingleRequest,
18    QuotesSingleResponse, SnapshotRequest, SnapshotResponse, SnapshotsRequest, SnapshotsResponse,
19    TradesRequest, TradesResponse, TradesSingleRequest, TradesSingleResponse,
20};
21
22#[derive(Clone, Debug)]
23pub struct StocksClient {
24    inner: Arc<Inner>,
25}
26
27impl StocksClient {
28    pub(crate) fn new(inner: Arc<Inner>) -> Self {
29        Self { inner }
30    }
31
32    pub async fn bars(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
33        self.ensure_credentials()?;
34        request.validate()?;
35        self.inner
36            .http
37            .get_json(
38                &self.inner.base_url,
39                Endpoint::StocksBars,
40                &self.inner.auth,
41                request.to_query(),
42            )
43            .await
44    }
45
46    pub async fn auctions(&self, request: AuctionsRequest) -> Result<AuctionsResponse, Error> {
47        self.ensure_credentials()?;
48        request.validate()?;
49        self.inner
50            .http
51            .get_json(
52                &self.inner.base_url,
53                Endpoint::StocksAuctions,
54                &self.inner.auth,
55                request.to_query(),
56            )
57            .await
58    }
59
60    pub async fn auctions_all(&self, request: AuctionsRequest) -> Result<AuctionsResponse, Error> {
61        self.ensure_credentials()?;
62        let client = self.clone();
63
64        collect_all(request, move |request| {
65            let client = client.clone();
66            async move { client.auctions(request).await }
67        })
68        .await
69    }
70
71    pub async fn auctions_single(
72        &self,
73        request: AuctionsSingleRequest,
74    ) -> Result<AuctionsSingleResponse, Error> {
75        self.ensure_credentials()?;
76        request.validate()?;
77        let endpoint = Endpoint::StocksAuctionsSingle {
78            symbol: request.symbol.clone(),
79        };
80        self.inner
81            .http
82            .get_json(
83                &self.inner.base_url,
84                endpoint,
85                &self.inner.auth,
86                request.to_query(),
87            )
88            .await
89    }
90
91    pub async fn auctions_single_all(
92        &self,
93        request: AuctionsSingleRequest,
94    ) -> Result<AuctionsSingleResponse, Error> {
95        self.ensure_credentials()?;
96        let client = self.clone();
97
98        collect_all(request, move |request| {
99            let client = client.clone();
100            async move { client.auctions_single(request).await }
101        })
102        .await
103    }
104
105    pub fn auctions_stream(
106        &self,
107        request: AuctionsRequest,
108    ) -> ResponseStream<Result<AuctionsResponse, Error>> {
109        if let Err(error) = self.ensure_credentials() {
110            return Self::error_stream(error);
111        }
112
113        let client = self.clone();
114        stream_pages(request, move |request| {
115            let client = client.clone();
116            async move { client.auctions(request).await }
117        })
118    }
119
120    pub fn auctions_single_stream(
121        &self,
122        request: AuctionsSingleRequest,
123    ) -> ResponseStream<Result<AuctionsSingleResponse, Error>> {
124        if let Err(error) = self.ensure_credentials() {
125            return Self::error_stream(error);
126        }
127
128        let client = self.clone();
129        stream_pages(request, move |request| {
130            let client = client.clone();
131            async move { client.auctions_single(request).await }
132        })
133    }
134
135    pub async fn bars_all(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
136        self.ensure_credentials()?;
137        let client = self.clone();
138
139        collect_all(request, move |request| {
140            let client = client.clone();
141            async move { client.bars(request).await }
142        })
143        .await
144    }
145
146    pub async fn bars_single(
147        &self,
148        request: BarsSingleRequest,
149    ) -> Result<BarsSingleResponse, Error> {
150        self.ensure_credentials()?;
151        request.validate()?;
152        let endpoint = Endpoint::StocksBarsSingle {
153            symbol: request.symbol.clone(),
154        };
155        self.inner
156            .http
157            .get_json(
158                &self.inner.base_url,
159                endpoint,
160                &self.inner.auth,
161                request.to_query(),
162            )
163            .await
164    }
165
166    pub async fn bars_single_all(
167        &self,
168        request: BarsSingleRequest,
169    ) -> Result<BarsSingleResponse, Error> {
170        self.ensure_credentials()?;
171        let client = self.clone();
172
173        collect_all(request, move |request| {
174            let client = client.clone();
175            async move { client.bars_single(request).await }
176        })
177        .await
178    }
179
180    pub fn bars_stream(&self, request: BarsRequest) -> ResponseStream<Result<BarsResponse, Error>> {
181        if let Err(error) = self.ensure_credentials() {
182            return Self::error_stream(error);
183        }
184
185        let client = self.clone();
186        stream_pages(request, move |request| {
187            let client = client.clone();
188            async move { client.bars(request).await }
189        })
190    }
191
192    pub fn bars_single_stream(
193        &self,
194        request: BarsSingleRequest,
195    ) -> ResponseStream<Result<BarsSingleResponse, Error>> {
196        if let Err(error) = self.ensure_credentials() {
197            return Self::error_stream(error);
198        }
199
200        let client = self.clone();
201        stream_pages(request, move |request| {
202            let client = client.clone();
203            async move { client.bars_single(request).await }
204        })
205    }
206
207    pub async fn quotes(&self, request: QuotesRequest) -> Result<QuotesResponse, Error> {
208        self.ensure_credentials()?;
209        request.validate()?;
210        self.inner
211            .http
212            .get_json(
213                &self.inner.base_url,
214                Endpoint::StocksQuotes,
215                &self.inner.auth,
216                request.to_query(),
217            )
218            .await
219    }
220
221    pub async fn quotes_all(&self, request: QuotesRequest) -> Result<QuotesResponse, Error> {
222        self.ensure_credentials()?;
223        let client = self.clone();
224
225        collect_all(request, move |request| {
226            let client = client.clone();
227            async move { client.quotes(request).await }
228        })
229        .await
230    }
231
232    pub async fn quotes_single(
233        &self,
234        request: QuotesSingleRequest,
235    ) -> Result<QuotesSingleResponse, Error> {
236        self.ensure_credentials()?;
237        request.validate()?;
238        let endpoint = Endpoint::StocksQuotesSingle {
239            symbol: request.symbol.clone(),
240        };
241        self.inner
242            .http
243            .get_json(
244                &self.inner.base_url,
245                endpoint,
246                &self.inner.auth,
247                request.to_query(),
248            )
249            .await
250    }
251
252    pub async fn quotes_single_all(
253        &self,
254        request: QuotesSingleRequest,
255    ) -> Result<QuotesSingleResponse, Error> {
256        self.ensure_credentials()?;
257        let client = self.clone();
258
259        collect_all(request, move |request| {
260            let client = client.clone();
261            async move { client.quotes_single(request).await }
262        })
263        .await
264    }
265
266    pub fn quotes_stream(
267        &self,
268        request: QuotesRequest,
269    ) -> ResponseStream<Result<QuotesResponse, Error>> {
270        if let Err(error) = self.ensure_credentials() {
271            return Self::error_stream(error);
272        }
273
274        let client = self.clone();
275        stream_pages(request, move |request| {
276            let client = client.clone();
277            async move { client.quotes(request).await }
278        })
279    }
280
281    pub fn quotes_single_stream(
282        &self,
283        request: QuotesSingleRequest,
284    ) -> ResponseStream<Result<QuotesSingleResponse, Error>> {
285        if let Err(error) = self.ensure_credentials() {
286            return Self::error_stream(error);
287        }
288
289        let client = self.clone();
290        stream_pages(request, move |request| {
291            let client = client.clone();
292            async move { client.quotes_single(request).await }
293        })
294    }
295
296    pub async fn trades(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
297        self.ensure_credentials()?;
298        request.validate()?;
299        self.inner
300            .http
301            .get_json(
302                &self.inner.base_url,
303                Endpoint::StocksTrades,
304                &self.inner.auth,
305                request.to_query(),
306            )
307            .await
308    }
309
310    pub async fn trades_all(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
311        self.ensure_credentials()?;
312        let client = self.clone();
313
314        collect_all(request, move |request| {
315            let client = client.clone();
316            async move { client.trades(request).await }
317        })
318        .await
319    }
320
321    pub async fn trades_single(
322        &self,
323        request: TradesSingleRequest,
324    ) -> Result<TradesSingleResponse, Error> {
325        self.ensure_credentials()?;
326        request.validate()?;
327        let endpoint = Endpoint::StocksTradesSingle {
328            symbol: request.symbol.clone(),
329        };
330        self.inner
331            .http
332            .get_json(
333                &self.inner.base_url,
334                endpoint,
335                &self.inner.auth,
336                request.to_query(),
337            )
338            .await
339    }
340
341    pub async fn trades_single_all(
342        &self,
343        request: TradesSingleRequest,
344    ) -> Result<TradesSingleResponse, Error> {
345        self.ensure_credentials()?;
346        let client = self.clone();
347
348        collect_all(request, move |request| {
349            let client = client.clone();
350            async move { client.trades_single(request).await }
351        })
352        .await
353    }
354
355    pub fn trades_stream(
356        &self,
357        request: TradesRequest,
358    ) -> ResponseStream<Result<TradesResponse, Error>> {
359        if let Err(error) = self.ensure_credentials() {
360            return Self::error_stream(error);
361        }
362
363        let client = self.clone();
364        stream_pages(request, move |request| {
365            let client = client.clone();
366            async move { client.trades(request).await }
367        })
368    }
369
370    pub fn trades_single_stream(
371        &self,
372        request: TradesSingleRequest,
373    ) -> ResponseStream<Result<TradesSingleResponse, Error>> {
374        if let Err(error) = self.ensure_credentials() {
375            return Self::error_stream(error);
376        }
377
378        let client = self.clone();
379        stream_pages(request, move |request| {
380            let client = client.clone();
381            async move { client.trades_single(request).await }
382        })
383    }
384
385    pub async fn latest_bars(
386        &self,
387        request: LatestBarsRequest,
388    ) -> Result<LatestBarsResponse, Error> {
389        self.ensure_credentials()?;
390        request.validate()?;
391        self.inner
392            .http
393            .get_json(
394                &self.inner.base_url,
395                Endpoint::StocksLatestBars,
396                &self.inner.auth,
397                request.to_query(),
398            )
399            .await
400    }
401
402    pub async fn latest_bar(&self, request: LatestBarRequest) -> Result<LatestBarResponse, Error> {
403        self.ensure_credentials()?;
404        let endpoint = Endpoint::StocksLatestBar {
405            symbol: request.symbol.clone(),
406        };
407        self.inner
408            .http
409            .get_json(
410                &self.inner.base_url,
411                endpoint,
412                &self.inner.auth,
413                request.to_query(),
414            )
415            .await
416    }
417
418    pub async fn latest_quotes(
419        &self,
420        request: LatestQuotesRequest,
421    ) -> Result<LatestQuotesResponse, Error> {
422        self.ensure_credentials()?;
423        request.validate()?;
424        self.inner
425            .http
426            .get_json(
427                &self.inner.base_url,
428                Endpoint::StocksLatestQuotes,
429                &self.inner.auth,
430                request.to_query(),
431            )
432            .await
433    }
434
435    pub async fn latest_quote(
436        &self,
437        request: LatestQuoteRequest,
438    ) -> Result<LatestQuoteResponse, Error> {
439        self.ensure_credentials()?;
440        let endpoint = Endpoint::StocksLatestQuote {
441            symbol: request.symbol.clone(),
442        };
443        self.inner
444            .http
445            .get_json(
446                &self.inner.base_url,
447                endpoint,
448                &self.inner.auth,
449                request.to_query(),
450            )
451            .await
452    }
453
454    pub async fn latest_trades(
455        &self,
456        request: LatestTradesRequest,
457    ) -> Result<LatestTradesResponse, Error> {
458        self.ensure_credentials()?;
459        request.validate()?;
460        self.inner
461            .http
462            .get_json(
463                &self.inner.base_url,
464                Endpoint::StocksLatestTrades,
465                &self.inner.auth,
466                request.to_query(),
467            )
468            .await
469    }
470
471    pub async fn latest_trade(
472        &self,
473        request: LatestTradeRequest,
474    ) -> Result<LatestTradeResponse, Error> {
475        self.ensure_credentials()?;
476        let endpoint = Endpoint::StocksLatestTrade {
477            symbol: request.symbol.clone(),
478        };
479        self.inner
480            .http
481            .get_json(
482                &self.inner.base_url,
483                endpoint,
484                &self.inner.auth,
485                request.to_query(),
486            )
487            .await
488    }
489
490    pub async fn snapshots(&self, request: SnapshotsRequest) -> Result<SnapshotsResponse, Error> {
491        self.ensure_credentials()?;
492        request.validate()?;
493        self.inner
494            .http
495            .get_json(
496                &self.inner.base_url,
497                Endpoint::StocksSnapshots,
498                &self.inner.auth,
499                request.to_query(),
500            )
501            .await
502    }
503
504    pub async fn snapshot(&self, request: SnapshotRequest) -> Result<SnapshotResponse, Error> {
505        self.ensure_credentials()?;
506        let endpoint = Endpoint::StocksSnapshot {
507            symbol: request.symbol.clone(),
508        };
509        self.inner
510            .http
511            .get_json(
512                &self.inner.base_url,
513                endpoint,
514                &self.inner.auth,
515                request.to_query(),
516            )
517            .await
518    }
519
520    pub async fn condition_codes(
521        &self,
522        request: ConditionCodesRequest,
523    ) -> Result<ConditionCodesResponse, Error> {
524        self.ensure_credentials()?;
525        let ticktype = request.ticktype.as_str();
526        let query = request.to_query();
527        let endpoint = Endpoint::StocksConditionCodes { ticktype };
528        self.inner
529            .http
530            .get_json(&self.inner.base_url, endpoint, &self.inner.auth, query)
531            .await
532    }
533
534    pub async fn exchange_codes(&self) -> Result<ExchangeCodesResponse, Error> {
535        self.ensure_credentials()?;
536        self.inner
537            .http
538            .get_json(
539                &self.inner.base_url,
540                Endpoint::StocksExchangeCodes,
541                &self.inner.auth,
542                Vec::new(),
543            )
544            .await
545    }
546
547    fn ensure_credentials(&self) -> Result<(), Error> {
548        if self.inner.auth.has_credentials() {
549            Ok(())
550        } else {
551            Err(Error::MissingCredentials)
552        }
553    }
554
555    fn error_stream<Response>(error: Error) -> ResponseStream<Result<Response, Error>>
556    where
557        Response: Send + 'static,
558    {
559        Box::pin(futures_util::stream::once(async move { Err(error) }))
560    }
561}