1use std::sync::Arc;
2
3use crate::{
4 Error,
5 client::Inner,
6 common::response::ResponseStream,
7 transport::endpoint::Endpoint,
8 transport::pagination::{collect_all, stream_pages},
9};
10
11use super::{
12 AuctionsRequest, AuctionsResponse, AuctionsSingleRequest, AuctionsSingleResponse, BarsRequest,
13 BarsResponse, BarsSingleRequest, BarsSingleResponse, ConditionCodesRequest,
14 ConditionCodesResponse, ExchangeCodesResponse, LatestBarRequest, LatestBarResponse,
15 LatestBarsRequest, LatestBarsResponse, LatestQuoteRequest, LatestQuoteResponse,
16 LatestQuotesRequest, LatestQuotesResponse, LatestTradeRequest, LatestTradeResponse,
17 LatestTradesRequest, LatestTradesResponse, QuotesRequest, QuotesResponse, QuotesSingleRequest,
18 QuotesSingleResponse, SnapshotRequest, SnapshotResponse, SnapshotsRequest, SnapshotsResponse,
19 TradesRequest, TradesResponse, TradesSingleRequest, TradesSingleResponse,
20};
21
22#[derive(Clone, Debug)]
23pub struct StocksClient {
24 inner: Arc<Inner>,
25}
26
27impl StocksClient {
28 pub(crate) fn new(inner: Arc<Inner>) -> Self {
29 Self { inner }
30 }
31
32 pub async fn bars(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
33 self.ensure_credentials()?;
34 self.inner
35 .http
36 .get_json(
37 &self.inner.base_url,
38 Endpoint::StocksBars,
39 &self.inner.auth,
40 request.to_query(),
41 )
42 .await
43 }
44
45 pub async fn auctions(&self, request: AuctionsRequest) -> Result<AuctionsResponse, Error> {
46 self.ensure_credentials()?;
47 self.inner
48 .http
49 .get_json(
50 &self.inner.base_url,
51 Endpoint::StocksAuctions,
52 &self.inner.auth,
53 request.to_query(),
54 )
55 .await
56 }
57
58 pub async fn auctions_all(&self, request: AuctionsRequest) -> Result<AuctionsResponse, Error> {
59 self.ensure_credentials()?;
60 let client = self.clone();
61
62 collect_all(request, move |request| {
63 let client = client.clone();
64 async move { client.auctions(request).await }
65 })
66 .await
67 }
68
69 pub async fn auctions_single(
70 &self,
71 request: AuctionsSingleRequest,
72 ) -> Result<AuctionsSingleResponse, Error> {
73 self.ensure_credentials()?;
74 let endpoint = Endpoint::StocksAuctionsSingle {
75 symbol: request.symbol.clone(),
76 };
77 self.inner
78 .http
79 .get_json(
80 &self.inner.base_url,
81 endpoint,
82 &self.inner.auth,
83 request.to_query(),
84 )
85 .await
86 }
87
88 pub async fn auctions_single_all(
89 &self,
90 request: AuctionsSingleRequest,
91 ) -> Result<AuctionsSingleResponse, Error> {
92 self.ensure_credentials()?;
93 let client = self.clone();
94
95 collect_all(request, move |request| {
96 let client = client.clone();
97 async move { client.auctions_single(request).await }
98 })
99 .await
100 }
101
102 pub fn auctions_stream(
103 &self,
104 request: AuctionsRequest,
105 ) -> ResponseStream<Result<AuctionsResponse, Error>> {
106 if let Err(error) = self.ensure_credentials() {
107 return Self::error_stream(error);
108 }
109
110 let client = self.clone();
111 stream_pages(request, move |request| {
112 let client = client.clone();
113 async move { client.auctions(request).await }
114 })
115 }
116
117 pub fn auctions_single_stream(
118 &self,
119 request: AuctionsSingleRequest,
120 ) -> ResponseStream<Result<AuctionsSingleResponse, Error>> {
121 if let Err(error) = self.ensure_credentials() {
122 return Self::error_stream(error);
123 }
124
125 let client = self.clone();
126 stream_pages(request, move |request| {
127 let client = client.clone();
128 async move { client.auctions_single(request).await }
129 })
130 }
131
132 pub async fn bars_all(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
133 self.ensure_credentials()?;
134 let client = self.clone();
135
136 collect_all(request, move |request| {
137 let client = client.clone();
138 async move { client.bars(request).await }
139 })
140 .await
141 }
142
143 pub async fn bars_single(
144 &self,
145 request: BarsSingleRequest,
146 ) -> Result<BarsSingleResponse, Error> {
147 self.ensure_credentials()?;
148 let endpoint = Endpoint::StocksBarsSingle {
149 symbol: request.symbol.clone(),
150 };
151 self.inner
152 .http
153 .get_json(
154 &self.inner.base_url,
155 endpoint,
156 &self.inner.auth,
157 request.to_query(),
158 )
159 .await
160 }
161
162 pub async fn bars_single_all(
163 &self,
164 request: BarsSingleRequest,
165 ) -> Result<BarsSingleResponse, Error> {
166 self.ensure_credentials()?;
167 let client = self.clone();
168
169 collect_all(request, move |request| {
170 let client = client.clone();
171 async move { client.bars_single(request).await }
172 })
173 .await
174 }
175
176 pub fn bars_stream(&self, request: BarsRequest) -> ResponseStream<Result<BarsResponse, Error>> {
177 if let Err(error) = self.ensure_credentials() {
178 return Self::error_stream(error);
179 }
180
181 let client = self.clone();
182 stream_pages(request, move |request| {
183 let client = client.clone();
184 async move { client.bars(request).await }
185 })
186 }
187
188 pub fn bars_single_stream(
189 &self,
190 request: BarsSingleRequest,
191 ) -> ResponseStream<Result<BarsSingleResponse, Error>> {
192 if let Err(error) = self.ensure_credentials() {
193 return Self::error_stream(error);
194 }
195
196 let client = self.clone();
197 stream_pages(request, move |request| {
198 let client = client.clone();
199 async move { client.bars_single(request).await }
200 })
201 }
202
203 pub async fn quotes(&self, request: QuotesRequest) -> Result<QuotesResponse, Error> {
204 self.ensure_credentials()?;
205 self.inner
206 .http
207 .get_json(
208 &self.inner.base_url,
209 Endpoint::StocksQuotes,
210 &self.inner.auth,
211 request.to_query(),
212 )
213 .await
214 }
215
216 pub async fn quotes_all(&self, request: QuotesRequest) -> Result<QuotesResponse, Error> {
217 self.ensure_credentials()?;
218 let client = self.clone();
219
220 collect_all(request, move |request| {
221 let client = client.clone();
222 async move { client.quotes(request).await }
223 })
224 .await
225 }
226
227 pub async fn quotes_single(
228 &self,
229 request: QuotesSingleRequest,
230 ) -> Result<QuotesSingleResponse, Error> {
231 self.ensure_credentials()?;
232 let endpoint = Endpoint::StocksQuotesSingle {
233 symbol: request.symbol.clone(),
234 };
235 self.inner
236 .http
237 .get_json(
238 &self.inner.base_url,
239 endpoint,
240 &self.inner.auth,
241 request.to_query(),
242 )
243 .await
244 }
245
246 pub async fn quotes_single_all(
247 &self,
248 request: QuotesSingleRequest,
249 ) -> Result<QuotesSingleResponse, Error> {
250 self.ensure_credentials()?;
251 let client = self.clone();
252
253 collect_all(request, move |request| {
254 let client = client.clone();
255 async move { client.quotes_single(request).await }
256 })
257 .await
258 }
259
260 pub fn quotes_stream(
261 &self,
262 request: QuotesRequest,
263 ) -> ResponseStream<Result<QuotesResponse, Error>> {
264 if let Err(error) = self.ensure_credentials() {
265 return Self::error_stream(error);
266 }
267
268 let client = self.clone();
269 stream_pages(request, move |request| {
270 let client = client.clone();
271 async move { client.quotes(request).await }
272 })
273 }
274
275 pub fn quotes_single_stream(
276 &self,
277 request: QuotesSingleRequest,
278 ) -> ResponseStream<Result<QuotesSingleResponse, Error>> {
279 if let Err(error) = self.ensure_credentials() {
280 return Self::error_stream(error);
281 }
282
283 let client = self.clone();
284 stream_pages(request, move |request| {
285 let client = client.clone();
286 async move { client.quotes_single(request).await }
287 })
288 }
289
290 pub async fn trades(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
291 self.ensure_credentials()?;
292 self.inner
293 .http
294 .get_json(
295 &self.inner.base_url,
296 Endpoint::StocksTrades,
297 &self.inner.auth,
298 request.to_query(),
299 )
300 .await
301 }
302
303 pub async fn trades_all(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
304 self.ensure_credentials()?;
305 let client = self.clone();
306
307 collect_all(request, move |request| {
308 let client = client.clone();
309 async move { client.trades(request).await }
310 })
311 .await
312 }
313
314 pub async fn trades_single(
315 &self,
316 request: TradesSingleRequest,
317 ) -> Result<TradesSingleResponse, Error> {
318 self.ensure_credentials()?;
319 let endpoint = Endpoint::StocksTradesSingle {
320 symbol: request.symbol.clone(),
321 };
322 self.inner
323 .http
324 .get_json(
325 &self.inner.base_url,
326 endpoint,
327 &self.inner.auth,
328 request.to_query(),
329 )
330 .await
331 }
332
333 pub async fn trades_single_all(
334 &self,
335 request: TradesSingleRequest,
336 ) -> Result<TradesSingleResponse, Error> {
337 self.ensure_credentials()?;
338 let client = self.clone();
339
340 collect_all(request, move |request| {
341 let client = client.clone();
342 async move { client.trades_single(request).await }
343 })
344 .await
345 }
346
347 pub fn trades_stream(
348 &self,
349 request: TradesRequest,
350 ) -> ResponseStream<Result<TradesResponse, Error>> {
351 if let Err(error) = self.ensure_credentials() {
352 return Self::error_stream(error);
353 }
354
355 let client = self.clone();
356 stream_pages(request, move |request| {
357 let client = client.clone();
358 async move { client.trades(request).await }
359 })
360 }
361
362 pub fn trades_single_stream(
363 &self,
364 request: TradesSingleRequest,
365 ) -> ResponseStream<Result<TradesSingleResponse, Error>> {
366 if let Err(error) = self.ensure_credentials() {
367 return Self::error_stream(error);
368 }
369
370 let client = self.clone();
371 stream_pages(request, move |request| {
372 let client = client.clone();
373 async move { client.trades_single(request).await }
374 })
375 }
376
377 pub async fn latest_bars(
378 &self,
379 request: LatestBarsRequest,
380 ) -> Result<LatestBarsResponse, Error> {
381 self.ensure_credentials()?;
382 self.inner
383 .http
384 .get_json(
385 &self.inner.base_url,
386 Endpoint::StocksLatestBars,
387 &self.inner.auth,
388 request.to_query(),
389 )
390 .await
391 }
392
393 pub async fn latest_bar(&self, request: LatestBarRequest) -> Result<LatestBarResponse, Error> {
394 self.ensure_credentials()?;
395 let endpoint = Endpoint::StocksLatestBar {
396 symbol: request.symbol.clone(),
397 };
398 self.inner
399 .http
400 .get_json(
401 &self.inner.base_url,
402 endpoint,
403 &self.inner.auth,
404 request.to_query(),
405 )
406 .await
407 }
408
409 pub async fn latest_quotes(
410 &self,
411 request: LatestQuotesRequest,
412 ) -> Result<LatestQuotesResponse, Error> {
413 self.ensure_credentials()?;
414 self.inner
415 .http
416 .get_json(
417 &self.inner.base_url,
418 Endpoint::StocksLatestQuotes,
419 &self.inner.auth,
420 request.to_query(),
421 )
422 .await
423 }
424
425 pub async fn latest_quote(
426 &self,
427 request: LatestQuoteRequest,
428 ) -> Result<LatestQuoteResponse, Error> {
429 self.ensure_credentials()?;
430 let endpoint = Endpoint::StocksLatestQuote {
431 symbol: request.symbol.clone(),
432 };
433 self.inner
434 .http
435 .get_json(
436 &self.inner.base_url,
437 endpoint,
438 &self.inner.auth,
439 request.to_query(),
440 )
441 .await
442 }
443
444 pub async fn latest_trades(
445 &self,
446 request: LatestTradesRequest,
447 ) -> Result<LatestTradesResponse, Error> {
448 self.ensure_credentials()?;
449 self.inner
450 .http
451 .get_json(
452 &self.inner.base_url,
453 Endpoint::StocksLatestTrades,
454 &self.inner.auth,
455 request.to_query(),
456 )
457 .await
458 }
459
460 pub async fn latest_trade(
461 &self,
462 request: LatestTradeRequest,
463 ) -> Result<LatestTradeResponse, Error> {
464 self.ensure_credentials()?;
465 let endpoint = Endpoint::StocksLatestTrade {
466 symbol: request.symbol.clone(),
467 };
468 self.inner
469 .http
470 .get_json(
471 &self.inner.base_url,
472 endpoint,
473 &self.inner.auth,
474 request.to_query(),
475 )
476 .await
477 }
478
479 pub async fn snapshots(&self, request: SnapshotsRequest) -> Result<SnapshotsResponse, Error> {
480 self.ensure_credentials()?;
481 self.inner
482 .http
483 .get_json(
484 &self.inner.base_url,
485 Endpoint::StocksSnapshots,
486 &self.inner.auth,
487 request.to_query(),
488 )
489 .await
490 }
491
492 pub async fn snapshot(&self, request: SnapshotRequest) -> Result<SnapshotResponse, Error> {
493 self.ensure_credentials()?;
494 let endpoint = Endpoint::StocksSnapshot {
495 symbol: request.symbol.clone(),
496 };
497 self.inner
498 .http
499 .get_json(
500 &self.inner.base_url,
501 endpoint,
502 &self.inner.auth,
503 request.to_query(),
504 )
505 .await
506 }
507
508 pub async fn condition_codes(
509 &self,
510 request: ConditionCodesRequest,
511 ) -> Result<ConditionCodesResponse, Error> {
512 self.ensure_credentials()?;
513 let ticktype = request.ticktype.as_str();
514 let query = request.to_query();
515 let endpoint = Endpoint::StocksConditionCodes { ticktype };
516 self.inner
517 .http
518 .get_json(&self.inner.base_url, endpoint, &self.inner.auth, query)
519 .await
520 }
521
522 pub async fn exchange_codes(&self) -> Result<ExchangeCodesResponse, Error> {
523 self.ensure_credentials()?;
524 self.inner
525 .http
526 .get_json(
527 &self.inner.base_url,
528 Endpoint::StocksExchangeCodes,
529 &self.inner.auth,
530 Vec::new(),
531 )
532 .await
533 }
534
535 fn ensure_credentials(&self) -> Result<(), Error> {
536 if self.inner.auth.has_credentials() {
537 Ok(())
538 } else {
539 Err(Error::MissingCredentials)
540 }
541 }
542
543 fn error_stream<Response>(error: Error) -> ResponseStream<Result<Response, Error>>
544 where
545 Response: Send + 'static,
546 {
547 Box::pin(futures_util::stream::once(async move { Err(error) }))
548 }
549}