1use std::sync::Arc;
2
3use crate::{
4 Error,
5 client::Inner,
6 common::response::ResponseStream,
7 transport::endpoint::Endpoint,
8 transport::pagination::{collect_all, stream_pages},
9};
10
11use super::{
12 BarsRequest, BarsResponse, ChainRequest, ChainResponse, ConditionCodesRequest,
13 ConditionCodesResponse, ExchangeCodesResponse, LatestQuotesRequest, LatestQuotesResponse,
14 LatestTradesRequest, LatestTradesResponse, SnapshotsRequest, SnapshotsResponse, TradesRequest,
15 TradesResponse,
16};
17
18#[derive(Clone, Debug)]
19pub struct OptionsClient {
20 inner: Arc<Inner>,
21}
22
23impl OptionsClient {
24 pub(crate) fn new(inner: Arc<Inner>) -> Self {
25 Self { inner }
26 }
27
28 pub async fn bars(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
29 self.ensure_credentials()?;
30 self.inner
31 .http
32 .get_json(
33 &self.inner.base_url,
34 Endpoint::OptionsBars,
35 &self.inner.auth,
36 request.to_query(),
37 )
38 .await
39 }
40
41 pub async fn bars_all(&self, request: BarsRequest) -> Result<BarsResponse, Error> {
42 self.ensure_credentials()?;
43 let client = self.clone();
44
45 collect_all(request, move |request| {
46 let client = client.clone();
47 async move { client.bars(request).await }
48 })
49 .await
50 }
51
52 pub fn bars_stream(&self, request: BarsRequest) -> ResponseStream<Result<BarsResponse, Error>> {
53 if let Err(error) = self.ensure_credentials() {
54 return Self::error_stream(error);
55 }
56
57 let client = self.clone();
58 stream_pages(request, move |request| {
59 let client = client.clone();
60 async move { client.bars(request).await }
61 })
62 }
63
64 pub async fn trades(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
65 self.ensure_credentials()?;
66 self.inner
67 .http
68 .get_json(
69 &self.inner.base_url,
70 Endpoint::OptionsTrades,
71 &self.inner.auth,
72 request.to_query(),
73 )
74 .await
75 }
76
77 pub async fn trades_all(&self, request: TradesRequest) -> Result<TradesResponse, Error> {
78 self.ensure_credentials()?;
79 let client = self.clone();
80
81 collect_all(request, move |request| {
82 let client = client.clone();
83 async move { client.trades(request).await }
84 })
85 .await
86 }
87
88 pub fn trades_stream(
89 &self,
90 request: TradesRequest,
91 ) -> ResponseStream<Result<TradesResponse, Error>> {
92 if let Err(error) = self.ensure_credentials() {
93 return Self::error_stream(error);
94 }
95
96 let client = self.clone();
97 stream_pages(request, move |request| {
98 let client = client.clone();
99 async move { client.trades(request).await }
100 })
101 }
102
103 pub async fn latest_quotes(
104 &self,
105 request: LatestQuotesRequest,
106 ) -> Result<LatestQuotesResponse, Error> {
107 self.ensure_credentials()?;
108 self.inner
109 .http
110 .get_json(
111 &self.inner.base_url,
112 Endpoint::OptionsLatestQuotes,
113 &self.inner.auth,
114 request.to_query(),
115 )
116 .await
117 }
118
119 pub async fn latest_trades(
120 &self,
121 request: LatestTradesRequest,
122 ) -> Result<LatestTradesResponse, Error> {
123 self.ensure_credentials()?;
124 self.inner
125 .http
126 .get_json(
127 &self.inner.base_url,
128 Endpoint::OptionsLatestTrades,
129 &self.inner.auth,
130 request.to_query(),
131 )
132 .await
133 }
134
135 pub async fn snapshots(&self, request: SnapshotsRequest) -> Result<SnapshotsResponse, Error> {
136 self.ensure_credentials()?;
137 self.inner
138 .http
139 .get_json(
140 &self.inner.base_url,
141 Endpoint::OptionsSnapshots,
142 &self.inner.auth,
143 request.to_query(),
144 )
145 .await
146 }
147
148 pub async fn snapshots_all(
149 &self,
150 request: SnapshotsRequest,
151 ) -> Result<SnapshotsResponse, Error> {
152 self.ensure_credentials()?;
153 let client = self.clone();
154
155 collect_all(request, move |request| {
156 let client = client.clone();
157 async move { client.snapshots(request).await }
158 })
159 .await
160 }
161
162 pub fn snapshots_stream(
163 &self,
164 request: SnapshotsRequest,
165 ) -> ResponseStream<Result<SnapshotsResponse, Error>> {
166 if let Err(error) = self.ensure_credentials() {
167 return Self::error_stream(error);
168 }
169
170 let client = self.clone();
171 stream_pages(request, move |request| {
172 let client = client.clone();
173 async move { client.snapshots(request).await }
174 })
175 }
176
177 pub async fn chain(&self, request: ChainRequest) -> Result<ChainResponse, Error> {
178 self.ensure_credentials()?;
179 let endpoint = Endpoint::OptionsChain {
180 underlying_symbol: request.underlying_symbol.clone(),
181 };
182 self.inner
183 .http
184 .get_json(
185 &self.inner.base_url,
186 endpoint,
187 &self.inner.auth,
188 request.to_query(),
189 )
190 .await
191 }
192
193 pub async fn chain_all(&self, request: ChainRequest) -> Result<ChainResponse, Error> {
194 self.ensure_credentials()?;
195 let client = self.clone();
196
197 collect_all(request, move |request| {
198 let client = client.clone();
199 async move { client.chain(request).await }
200 })
201 .await
202 }
203
204 pub fn chain_stream(
205 &self,
206 request: ChainRequest,
207 ) -> ResponseStream<Result<ChainResponse, Error>> {
208 if let Err(error) = self.ensure_credentials() {
209 return Self::error_stream(error);
210 }
211
212 let client = self.clone();
213 stream_pages(request, move |request| {
214 let client = client.clone();
215 async move { client.chain(request).await }
216 })
217 }
218
219 pub async fn exchange_codes(&self) -> Result<ExchangeCodesResponse, Error> {
220 self.ensure_credentials()?;
221 self.inner
222 .http
223 .get_json(
224 &self.inner.base_url,
225 Endpoint::OptionsExchangeCodes,
226 &self.inner.auth,
227 Vec::new(),
228 )
229 .await
230 }
231
232 pub async fn condition_codes(
233 &self,
234 request: ConditionCodesRequest,
235 ) -> Result<ConditionCodesResponse, Error> {
236 self.ensure_credentials()?;
237 let endpoint = Endpoint::OptionsConditionCodes {
238 ticktype: request.ticktype(),
239 };
240 self.inner
241 .http
242 .get_json(&self.inner.base_url, endpoint, &self.inner.auth, Vec::new())
243 .await
244 }
245
246 fn ensure_credentials(&self) -> Result<(), Error> {
247 if self.inner.auth.has_credentials() {
248 Ok(())
249 } else {
250 Err(Error::MissingCredentials)
251 }
252 }
253
254 fn error_stream<Response>(error: Error) -> ResponseStream<Result<Response, Error>>
255 where
256 Response: Send + 'static,
257 {
258 Box::pin(futures_util::stream::once(async move { Err(error) }))
259 }
260}