1use std::fmt;
2use std::sync::Arc;
3use std::time::Duration;
4
5use alpaca_core::{BaseUrl, Credentials, env};
6use alpaca_http::{
7 ConcurrencyLimit, HttpClient, HttpResponse, NoContent, RequestParts, RetryConfig,
8 StaticHeaderAuthenticator, TransportObserver,
9};
10use serde::de::DeserializeOwned;
11
12use crate::{
13 Error, corporate_actions::CorporateActionsClient, news::NewsClient, options::OptionsClient,
14 stocks::StocksClient,
15};
16
17pub const DATA_API_KEY_ENV: &str = "ALPACA_DATA_API_KEY";
18pub const DATA_SECRET_KEY_ENV: &str = "ALPACA_DATA_SECRET_KEY";
19const DEFAULT_DATA_BASE_URL: &str = "https://data.alpaca.markets";
20const APCA_API_KEY_HEADER: &str = "APCA-API-KEY-ID";
21const APCA_API_SECRET_HEADER: &str = "APCA-API-SECRET-KEY";
22const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
23const DEFAULT_MAX_IN_FLIGHT: usize = 50;
24const DEFAULT_POOL_MAX_IDLE_PER_HOST: usize = 50;
25const DEFAULT_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(90);
26const DEFAULT_TCP_KEEPALIVE: Duration = Duration::from_secs(60);
27
28#[derive(Clone)]
29pub struct Client {
30 pub(crate) inner: Arc<ClientInner>,
31}
32
33#[allow(dead_code)]
34pub(crate) struct ClientInner {
35 http: HttpClient,
36 auth: StaticHeaderAuthenticator,
37 base_url: BaseUrl,
38}
39
40#[derive(Clone, Default)]
41pub struct ClientBuilder {
42 api_key: Option<String>,
43 secret_key: Option<String>,
44 timeout: Option<Duration>,
45 observer: Option<Arc<dyn TransportObserver>>,
46 retry_config: RetryConfig,
47 max_in_flight: Option<usize>,
48}
49
50impl Client {
51 #[must_use]
52 pub fn builder() -> ClientBuilder {
53 ClientBuilder::default()
54 }
55
56 pub fn new(credentials: Credentials) -> Result<Self, Error> {
57 Self::builder().credentials(credentials).build()
58 }
59
60 pub fn from_env() -> Result<Self, Error> {
61 Self::builder().credentials_from_env()?.build()
62 }
63
64 #[must_use]
65 pub fn stocks(&self) -> StocksClient {
66 StocksClient::new(self.inner.clone())
67 }
68
69 #[must_use]
70 pub fn options(&self) -> OptionsClient {
71 OptionsClient::new(self.inner.clone())
72 }
73
74 #[must_use]
75 pub fn news(&self) -> NewsClient {
76 NewsClient::new(self.inner.clone())
77 }
78
79 #[must_use]
80 pub fn corporate_actions(&self) -> CorporateActionsClient {
81 CorporateActionsClient::new(self.inner.clone())
82 }
83}
84
85impl fmt::Debug for Client {
86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 f.debug_struct("Client")
88 .field("http", &"HttpClient")
89 .field("auth", &"[REDACTED]")
90 .finish()
91 }
92}
93
94impl fmt::Debug for ClientInner {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 f.debug_struct("ClientInner")
97 .field("http", &"HttpClient")
98 .field("auth", &"[REDACTED]")
99 .finish()
100 }
101}
102
103impl fmt::Debug for ClientBuilder {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 f.debug_struct("ClientBuilder")
106 .field("api_key", &redacted_option(&self.api_key))
107 .field("secret_key", &redacted_option(&self.secret_key))
108 .field("timeout", &self.timeout)
109 .field(
110 "observer",
111 &self.observer.as_ref().map(|_| "TransportObserver"),
112 )
113 .field("retry_config", &self.retry_config)
114 .field("max_in_flight", &self.max_in_flight)
115 .finish()
116 }
117}
118
119impl ClientBuilder {
120 #[must_use]
121 pub fn credentials(mut self, credentials: Credentials) -> Self {
122 self.api_key = Some(credentials.api_key().to_owned());
123 self.secret_key = Some(credentials.secret_key().to_owned());
124 self
125 }
126
127 #[must_use]
128 pub fn api_key(mut self, api_key: impl Into<String>) -> Self {
129 self.api_key = Some(api_key.into());
130 self
131 }
132
133 #[must_use]
134 pub fn secret_key(mut self, secret_key: impl Into<String>) -> Self {
135 self.secret_key = Some(secret_key.into());
136 self
137 }
138
139 pub fn credentials_from_env(self) -> Result<Self, Error> {
140 self.credentials_from_env_names(DATA_API_KEY_ENV, DATA_SECRET_KEY_ENV)
141 }
142
143 pub fn credentials_from_env_names(
144 mut self,
145 api_key_var: &str,
146 secret_key_var: &str,
147 ) -> Result<Self, Error> {
148 if let Some(credentials) = env::credentials_from_env_names(api_key_var, secret_key_var)? {
149 return Ok(self.credentials(credentials));
150 }
151
152 if let Some(credentials) = env::credentials_from_env()? {
153 self = self.credentials(credentials);
154 }
155
156 Ok(self)
157 }
158
159 #[must_use]
160 pub fn timeout(mut self, timeout: Duration) -> Self {
161 self.timeout = Some(timeout);
162 self
163 }
164
165 #[must_use]
166 pub fn observer(mut self, observer: Arc<dyn TransportObserver>) -> Self {
167 self.observer = Some(observer);
168 self
169 }
170
171 #[must_use]
172 pub fn retry_config(mut self, retry_config: RetryConfig) -> Self {
173 self.retry_config = retry_config;
174 self
175 }
176
177 #[must_use]
178 pub fn max_in_flight(mut self, max_in_flight: usize) -> Self {
179 self.max_in_flight = Some(max_in_flight);
180 self
181 }
182
183 pub fn build(self) -> Result<Client, Error> {
184 let credentials = match (self.api_key, self.secret_key) {
185 (Some(api_key), Some(secret_key)) => Credentials::new(api_key, secret_key)?,
186 (None, None) => return Err(Error::MissingCredentials),
187 _ => {
188 return Err(Error::InvalidConfiguration(
189 "api_key and secret_key must be paired".to_owned(),
190 ));
191 }
192 };
193
194 let base_url = BaseUrl::new(DEFAULT_DATA_BASE_URL)?;
195 let auth = StaticHeaderAuthenticator::from_pairs([
196 (APCA_API_KEY_HEADER, credentials.api_key()),
197 (APCA_API_SECRET_HEADER, credentials.secret_key()),
198 ])?;
199
200 let timeout = self.timeout.unwrap_or(DEFAULT_TIMEOUT);
201 let reqwest_client = Self::build_reqwest_client(timeout)?;
202
203 let mut http_builder = HttpClient::builder()
204 .retry_config(self.retry_config)
205 .reqwest_client(reqwest_client);
206 if let Some(observer) = self.observer {
207 http_builder = http_builder.observer(observer);
208 }
209 http_builder = http_builder.concurrency_limit(ConcurrencyLimit::new(Some(
210 self.max_in_flight.unwrap_or(DEFAULT_MAX_IN_FLIGHT),
211 )));
212
213 let http = http_builder.build()?;
214
215 Ok(Client {
216 inner: Arc::new(ClientInner {
217 http,
218 auth,
219 base_url,
220 }),
221 })
222 }
223
224 fn build_reqwest_client(timeout: Duration) -> Result<reqwest::Client, Error> {
225 reqwest::Client::builder()
226 .no_proxy()
227 .pool_max_idle_per_host(DEFAULT_POOL_MAX_IDLE_PER_HOST)
228 .pool_idle_timeout(DEFAULT_POOL_IDLE_TIMEOUT)
229 .tcp_keepalive(DEFAULT_TCP_KEEPALIVE)
230 .timeout(timeout)
231 .http1_only()
232 .build()
233 .map_err(|error| alpaca_http::Error::from_reqwest(error, None).into())
234 }
235}
236
237impl ClientInner {
238 #[allow(dead_code)]
239 pub(crate) async fn send_json<T>(&self, request: RequestParts) -> Result<HttpResponse<T>, Error>
240 where
241 T: DeserializeOwned,
242 {
243 self.http
244 .send_json(&self.base_url, request, Some(&self.auth))
245 .await
246 .map_err(Error::from)
247 }
248
249 #[allow(dead_code)]
250 pub(crate) async fn send_text(
251 &self,
252 request: RequestParts,
253 ) -> Result<HttpResponse<String>, Error> {
254 self.http
255 .send_text(&self.base_url, request, Some(&self.auth))
256 .await
257 .map_err(Error::from)
258 }
259
260 #[allow(dead_code)]
261 pub(crate) async fn send_no_content(
262 &self,
263 request: RequestParts,
264 ) -> Result<HttpResponse<NoContent>, Error> {
265 self.http
266 .send_no_content(&self.base_url, request, Some(&self.auth))
267 .await
268 .map_err(Error::from)
269 }
270}
271
272fn redacted_option(value: &Option<String>) -> &'static str {
273 match value {
274 Some(_) => "[REDACTED]",
275 None => "None",
276 }
277}