1use std::sync::Arc;
2
3use crate::{
4 Error,
5 client::Inner,
6 common::response::ResponseStream,
7 transport::endpoint::Endpoint,
8 transport::pagination::{collect_all, stream_pages},
9};
10
11use super::{
12 AuctionsRequest, AuctionsResponse, AuctionsSingleRequest, AuctionsSingleResponse, BarsRequest,
13 BarsResponse, BarsSingleRequest, BarsSingleResponse, ConditionCodesRequest,
14 ConditionCodesResponse, ExchangeCodesResponse, LatestBarRequest, LatestBarResponse,
15 LatestBarsRequest, LatestBarsResponse, LatestQuoteRequest, LatestQuoteResponse,
16 LatestQuotesRequest, LatestQuotesResponse, LatestTradeRequest, LatestTradeResponse,
17 LatestTradesRequest, LatestTradesResponse, QuotesRequest, QuotesResponse, QuotesSingleRequest,
18 QuotesSingleResponse, SnapshotRequest, SnapshotResponse, SnapshotsRequest, SnapshotsResponse,
19 TradesRequest, TradesResponse, TradesSingleRequest, TradesSingleResponse,
20};
21
22#[derive(Clone, Debug)]
23pub struct StocksClient {
24 inner: Arc<Inner>,
25}
26
27impl StocksClient {
28 pub(crate) fn new(inner: Arc<Inner>) -> Self {
29 Self { inner }
30 }
31
32 pub async fn bars(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
33 self.ensure_credentials()?;
34 request.validate()?;
35 self.inner
36 .http
37 .get_json(
38 &self.inner.base_url,
39 Endpoint::StocksBars,
40 &self.inner.auth,
41 request.to_query(),
42 )
43 .await
44 }
45
46 pub async fn auctions(&self, request: AuctionsRequest) -> Result<AuctionsResponse, Error> {
47 self.ensure_credentials()?;
48 request.validate()?;
49 self.inner
50 .http
51 .get_json(
52 &self.inner.base_url,
53 Endpoint::StocksAuctions,
54 &self.inner.auth,
55 request.to_query(),
56 )
57 .await
58 }
59
60 pub async fn auctions_all(&self, request: AuctionsRequest) -> Result<AuctionsResponse, Error> {
61 self.ensure_credentials()?;
62 let client = self.clone();
63
64 collect_all(request, move |request| {
65 let client = client.clone();
66 async move { client.auctions(request).await }
67 })
68 .await
69 }
70
71 pub async fn auctions_single(
72 &self,
73 request: AuctionsSingleRequest,
74 ) -> Result<AuctionsSingleResponse, Error> {
75 self.ensure_credentials()?;
76 request.validate()?;
77 let endpoint = Endpoint::StocksAuctionsSingle {
78 symbol: request.symbol.clone(),
79 };
80 self.inner
81 .http
82 .get_json(
83 &self.inner.base_url,
84 endpoint,
85 &self.inner.auth,
86 request.to_query(),
87 )
88 .await
89 }
90
91 pub async fn auctions_single_all(
92 &self,
93 request: AuctionsSingleRequest,
94 ) -> Result<AuctionsSingleResponse, Error> {
95 self.ensure_credentials()?;
96 let client = self.clone();
97
98 collect_all(request, move |request| {
99 let client = client.clone();
100 async move { client.auctions_single(request).await }
101 })
102 .await
103 }
104
105 pub fn auctions_stream(
106 &self,
107 request: AuctionsRequest,
108 ) -> ResponseStream<Result<AuctionsResponse, Error>> {
109 if let Err(error) = self.ensure_credentials() {
110 return Self::error_stream(error);
111 }
112
113 let client = self.clone();
114 stream_pages(request, move |request| {
115 let client = client.clone();
116 async move { client.auctions(request).await }
117 })
118 }
119
120 pub fn auctions_single_stream(
121 &self,
122 request: AuctionsSingleRequest,
123 ) -> ResponseStream<Result<AuctionsSingleResponse, Error>> {
124 if let Err(error) = self.ensure_credentials() {
125 return Self::error_stream(error);
126 }
127
128 let client = self.clone();
129 stream_pages(request, move |request| {
130 let client = client.clone();
131 async move { client.auctions_single(request).await }
132 })
133 }
134
135 pub async fn bars_all(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
136 self.ensure_credentials()?;
137 let client = self.clone();
138
139 collect_all(request, move |request| {
140 let client = client.clone();
141 async move { client.bars(request).await }
142 })
143 .await
144 }
145
146 pub async fn bars_single(
147 &self,
148 request: BarsSingleRequest,
149 ) -> Result<BarsSingleResponse, Error> {
150 self.ensure_credentials()?;
151 request.validate()?;
152 let endpoint = Endpoint::StocksBarsSingle {
153 symbol: request.symbol.clone(),
154 };
155 self.inner
156 .http
157 .get_json(
158 &self.inner.base_url,
159 endpoint,
160 &self.inner.auth,
161 request.to_query(),
162 )
163 .await
164 }
165
166 pub async fn bars_single_all(
167 &self,
168 request: BarsSingleRequest,
169 ) -> Result<BarsSingleResponse, Error> {
170 self.ensure_credentials()?;
171 let client = self.clone();
172
173 collect_all(request, move |request| {
174 let client = client.clone();
175 async move { client.bars_single(request).await }
176 })
177 .await
178 }
179
180 pub fn bars_stream(&self, request: BarsRequest) -> ResponseStream<Result<BarsResponse, Error>> {
181 if let Err(error) = self.ensure_credentials() {
182 return Self::error_stream(error);
183 }
184
185 let client = self.clone();
186 stream_pages(request, move |request| {
187 let client = client.clone();
188 async move { client.bars(request).await }
189 })
190 }
191
192 pub fn bars_single_stream(
193 &self,
194 request: BarsSingleRequest,
195 ) -> ResponseStream<Result<BarsSingleResponse, Error>> {
196 if let Err(error) = self.ensure_credentials() {
197 return Self::error_stream(error);
198 }
199
200 let client = self.clone();
201 stream_pages(request, move |request| {
202 let client = client.clone();
203 async move { client.bars_single(request).await }
204 })
205 }
206
207 pub async fn quotes(&self, request: QuotesRequest) -> Result<QuotesResponse, Error> {
208 self.ensure_credentials()?;
209 request.validate()?;
210 self.inner
211 .http
212 .get_json(
213 &self.inner.base_url,
214 Endpoint::StocksQuotes,
215 &self.inner.auth,
216 request.to_query(),
217 )
218 .await
219 }
220
221 pub async fn quotes_all(&self, request: QuotesRequest) -> Result<QuotesResponse, Error> {
222 self.ensure_credentials()?;
223 let client = self.clone();
224
225 collect_all(request, move |request| {
226 let client = client.clone();
227 async move { client.quotes(request).await }
228 })
229 .await
230 }
231
232 pub async fn quotes_single(
233 &self,
234 request: QuotesSingleRequest,
235 ) -> Result<QuotesSingleResponse, Error> {
236 self.ensure_credentials()?;
237 request.validate()?;
238 let endpoint = Endpoint::StocksQuotesSingle {
239 symbol: request.symbol.clone(),
240 };
241 self.inner
242 .http
243 .get_json(
244 &self.inner.base_url,
245 endpoint,
246 &self.inner.auth,
247 request.to_query(),
248 )
249 .await
250 }
251
252 pub async fn quotes_single_all(
253 &self,
254 request: QuotesSingleRequest,
255 ) -> Result<QuotesSingleResponse, Error> {
256 self.ensure_credentials()?;
257 let client = self.clone();
258
259 collect_all(request, move |request| {
260 let client = client.clone();
261 async move { client.quotes_single(request).await }
262 })
263 .await
264 }
265
266 pub fn quotes_stream(
267 &self,
268 request: QuotesRequest,
269 ) -> ResponseStream<Result<QuotesResponse, Error>> {
270 if let Err(error) = self.ensure_credentials() {
271 return Self::error_stream(error);
272 }
273
274 let client = self.clone();
275 stream_pages(request, move |request| {
276 let client = client.clone();
277 async move { client.quotes(request).await }
278 })
279 }
280
281 pub fn quotes_single_stream(
282 &self,
283 request: QuotesSingleRequest,
284 ) -> ResponseStream<Result<QuotesSingleResponse, Error>> {
285 if let Err(error) = self.ensure_credentials() {
286 return Self::error_stream(error);
287 }
288
289 let client = self.clone();
290 stream_pages(request, move |request| {
291 let client = client.clone();
292 async move { client.quotes_single(request).await }
293 })
294 }
295
296 pub async fn trades(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
297 self.ensure_credentials()?;
298 request.validate()?;
299 self.inner
300 .http
301 .get_json(
302 &self.inner.base_url,
303 Endpoint::StocksTrades,
304 &self.inner.auth,
305 request.to_query(),
306 )
307 .await
308 }
309
310 pub async fn trades_all(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
311 self.ensure_credentials()?;
312 let client = self.clone();
313
314 collect_all(request, move |request| {
315 let client = client.clone();
316 async move { client.trades(request).await }
317 })
318 .await
319 }
320
321 pub async fn trades_single(
322 &self,
323 request: TradesSingleRequest,
324 ) -> Result<TradesSingleResponse, Error> {
325 self.ensure_credentials()?;
326 request.validate()?;
327 let endpoint = Endpoint::StocksTradesSingle {
328 symbol: request.symbol.clone(),
329 };
330 self.inner
331 .http
332 .get_json(
333 &self.inner.base_url,
334 endpoint,
335 &self.inner.auth,
336 request.to_query(),
337 )
338 .await
339 }
340
341 pub async fn trades_single_all(
342 &self,
343 request: TradesSingleRequest,
344 ) -> Result<TradesSingleResponse, Error> {
345 self.ensure_credentials()?;
346 let client = self.clone();
347
348 collect_all(request, move |request| {
349 let client = client.clone();
350 async move { client.trades_single(request).await }
351 })
352 .await
353 }
354
355 pub fn trades_stream(
356 &self,
357 request: TradesRequest,
358 ) -> ResponseStream<Result<TradesResponse, Error>> {
359 if let Err(error) = self.ensure_credentials() {
360 return Self::error_stream(error);
361 }
362
363 let client = self.clone();
364 stream_pages(request, move |request| {
365 let client = client.clone();
366 async move { client.trades(request).await }
367 })
368 }
369
370 pub fn trades_single_stream(
371 &self,
372 request: TradesSingleRequest,
373 ) -> ResponseStream<Result<TradesSingleResponse, Error>> {
374 if let Err(error) = self.ensure_credentials() {
375 return Self::error_stream(error);
376 }
377
378 let client = self.clone();
379 stream_pages(request, move |request| {
380 let client = client.clone();
381 async move { client.trades_single(request).await }
382 })
383 }
384
385 pub async fn latest_bars(
386 &self,
387 request: LatestBarsRequest,
388 ) -> Result<LatestBarsResponse, Error> {
389 self.ensure_credentials()?;
390 request.validate()?;
391 self.inner
392 .http
393 .get_json(
394 &self.inner.base_url,
395 Endpoint::StocksLatestBars,
396 &self.inner.auth,
397 request.to_query(),
398 )
399 .await
400 }
401
402 pub async fn latest_bar(&self, request: LatestBarRequest) -> Result<LatestBarResponse, Error> {
403 self.ensure_credentials()?;
404 let endpoint = Endpoint::StocksLatestBar {
405 symbol: request.symbol.clone(),
406 };
407 self.inner
408 .http
409 .get_json(
410 &self.inner.base_url,
411 endpoint,
412 &self.inner.auth,
413 request.to_query(),
414 )
415 .await
416 }
417
418 pub async fn latest_quotes(
419 &self,
420 request: LatestQuotesRequest,
421 ) -> Result<LatestQuotesResponse, Error> {
422 self.ensure_credentials()?;
423 request.validate()?;
424 self.inner
425 .http
426 .get_json(
427 &self.inner.base_url,
428 Endpoint::StocksLatestQuotes,
429 &self.inner.auth,
430 request.to_query(),
431 )
432 .await
433 }
434
435 pub async fn latest_quote(
436 &self,
437 request: LatestQuoteRequest,
438 ) -> Result<LatestQuoteResponse, Error> {
439 self.ensure_credentials()?;
440 let endpoint = Endpoint::StocksLatestQuote {
441 symbol: request.symbol.clone(),
442 };
443 self.inner
444 .http
445 .get_json(
446 &self.inner.base_url,
447 endpoint,
448 &self.inner.auth,
449 request.to_query(),
450 )
451 .await
452 }
453
454 pub async fn latest_trades(
455 &self,
456 request: LatestTradesRequest,
457 ) -> Result<LatestTradesResponse, Error> {
458 self.ensure_credentials()?;
459 request.validate()?;
460 self.inner
461 .http
462 .get_json(
463 &self.inner.base_url,
464 Endpoint::StocksLatestTrades,
465 &self.inner.auth,
466 request.to_query(),
467 )
468 .await
469 }
470
471 pub async fn latest_trade(
472 &self,
473 request: LatestTradeRequest,
474 ) -> Result<LatestTradeResponse, Error> {
475 self.ensure_credentials()?;
476 let endpoint = Endpoint::StocksLatestTrade {
477 symbol: request.symbol.clone(),
478 };
479 self.inner
480 .http
481 .get_json(
482 &self.inner.base_url,
483 endpoint,
484 &self.inner.auth,
485 request.to_query(),
486 )
487 .await
488 }
489
490 pub async fn snapshots(&self, request: SnapshotsRequest) -> Result<SnapshotsResponse, Error> {
491 self.ensure_credentials()?;
492 request.validate()?;
493 self.inner
494 .http
495 .get_json(
496 &self.inner.base_url,
497 Endpoint::StocksSnapshots,
498 &self.inner.auth,
499 request.to_query(),
500 )
501 .await
502 }
503
504 pub async fn snapshot(&self, request: SnapshotRequest) -> Result<SnapshotResponse, Error> {
505 self.ensure_credentials()?;
506 let endpoint = Endpoint::StocksSnapshot {
507 symbol: request.symbol.clone(),
508 };
509 self.inner
510 .http
511 .get_json(
512 &self.inner.base_url,
513 endpoint,
514 &self.inner.auth,
515 request.to_query(),
516 )
517 .await
518 }
519
520 pub async fn condition_codes(
521 &self,
522 request: ConditionCodesRequest,
523 ) -> Result<ConditionCodesResponse, Error> {
524 self.ensure_credentials()?;
525 let ticktype = request.ticktype.as_str();
526 let query = request.to_query();
527 let endpoint = Endpoint::StocksConditionCodes { ticktype };
528 self.inner
529 .http
530 .get_json(&self.inner.base_url, endpoint, &self.inner.auth, query)
531 .await
532 }
533
534 pub async fn exchange_codes(&self) -> Result<ExchangeCodesResponse, Error> {
535 self.ensure_credentials()?;
536 self.inner
537 .http
538 .get_json(
539 &self.inner.base_url,
540 Endpoint::StocksExchangeCodes,
541 &self.inner.auth,
542 Vec::new(),
543 )
544 .await
545 }
546
547 fn ensure_credentials(&self) -> Result<(), Error> {
548 if self.inner.auth.has_credentials() {
549 Ok(())
550 } else {
551 Err(Error::MissingCredentials)
552 }
553 }
554
555 fn error_stream<Response>(error: Error) -> ResponseStream<Result<Response, Error>>
556 where
557 Response: Send + 'static,
558 {
559 Box::pin(futures_util::stream::once(async move { Err(error) }))
560 }
561}