Skip to main content

alpaca_data/
client.rs

1use std::fmt;
2use std::sync::Arc;
3use std::time::Duration;
4
5use alpaca_core::{BaseUrl, Credentials, env};
6use alpaca_http::{
7    ConcurrencyLimit, HttpClient, HttpResponse, NoContent, RequestParts, RetryConfig,
8    StaticHeaderAuthenticator, TransportObserver,
9};
10use serde::de::DeserializeOwned;
11
12use crate::{
13    Error, corporate_actions::CorporateActionsClient, news::NewsClient, options::OptionsClient,
14    stocks::StocksClient,
15};
16
17pub const DATA_API_KEY_ENV: &str = "ALPACA_DATA_API_KEY";
18pub const DATA_SECRET_KEY_ENV: &str = "ALPACA_DATA_SECRET_KEY";
19pub const DATA_BASE_URL_ENV: &str = "ALPACA_DATA_BASE_URL";
20pub const LEGACY_DATA_BASE_URL_ENV: &str = "APCA_API_DATA_URL";
21pub const DEFAULT_DATA_BASE_URL: &str = "https://data.alpaca.markets";
22const APCA_API_KEY_HEADER: &str = "APCA-API-KEY-ID";
23const APCA_API_SECRET_HEADER: &str = "APCA-API-SECRET-KEY";
24
25#[derive(Clone)]
26pub struct Client {
27    pub(crate) inner: Arc<ClientInner>,
28}
29
30#[allow(dead_code)]
31pub(crate) struct ClientInner {
32    http: HttpClient,
33    auth: StaticHeaderAuthenticator,
34    base_url: BaseUrl,
35}
36
37#[derive(Clone, Default)]
38pub struct ClientBuilder {
39    api_key: Option<String>,
40    secret_key: Option<String>,
41    base_url: Option<BaseUrl>,
42    timeout: Option<Duration>,
43    reqwest_client: Option<reqwest::Client>,
44    observer: Option<Arc<dyn TransportObserver>>,
45    retry_config: RetryConfig,
46    max_in_flight: Option<usize>,
47}
48
49impl Client {
50    #[must_use]
51    pub fn builder() -> ClientBuilder {
52        ClientBuilder::default()
53    }
54
55    pub fn new(credentials: Credentials) -> Result<Self, Error> {
56        Self::builder().credentials(credentials).build()
57    }
58
59    pub fn from_env() -> Result<Self, Error> {
60        Self::builder()
61            .credentials_from_env()?
62            .base_url_from_env()?
63            .build()
64    }
65
66    #[must_use]
67    pub fn base_url(&self) -> &BaseUrl {
68        self.inner.base_url()
69    }
70
71    #[must_use]
72    pub fn stocks(&self) -> StocksClient {
73        StocksClient::new(self.inner.clone())
74    }
75
76    #[must_use]
77    pub fn options(&self) -> OptionsClient {
78        OptionsClient::new(self.inner.clone())
79    }
80
81    #[must_use]
82    pub fn news(&self) -> NewsClient {
83        NewsClient::new(self.inner.clone())
84    }
85
86    #[must_use]
87    pub fn corporate_actions(&self) -> CorporateActionsClient {
88        CorporateActionsClient::new(self.inner.clone())
89    }
90}
91
92impl fmt::Debug for Client {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        f.debug_struct("Client")
95            .field("base_url", self.inner.base_url())
96            .field("http", &"HttpClient")
97            .field("auth", &"[REDACTED]")
98            .finish()
99    }
100}
101
102impl fmt::Debug for ClientInner {
103    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104        f.debug_struct("ClientInner")
105            .field("base_url", &self.base_url)
106            .field("http", &"HttpClient")
107            .field("auth", &"[REDACTED]")
108            .finish()
109    }
110}
111
112impl fmt::Debug for ClientBuilder {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        f.debug_struct("ClientBuilder")
115            .field("api_key", &redacted_option(&self.api_key))
116            .field("secret_key", &redacted_option(&self.secret_key))
117            .field("base_url", &self.base_url)
118            .field("timeout", &self.timeout)
119            .field(
120                "reqwest_client",
121                &self.reqwest_client.as_ref().map(|_| "reqwest::Client"),
122            )
123            .field(
124                "observer",
125                &self.observer.as_ref().map(|_| "TransportObserver"),
126            )
127            .field("retry_config", &self.retry_config)
128            .field("max_in_flight", &self.max_in_flight)
129            .finish()
130    }
131}
132
133impl ClientBuilder {
134    #[must_use]
135    pub fn credentials(mut self, credentials: Credentials) -> Self {
136        self.api_key = Some(credentials.api_key().to_owned());
137        self.secret_key = Some(credentials.secret_key().to_owned());
138        self
139    }
140
141    #[must_use]
142    pub fn api_key(mut self, api_key: impl Into<String>) -> Self {
143        self.api_key = Some(api_key.into());
144        self
145    }
146
147    #[must_use]
148    pub fn secret_key(mut self, secret_key: impl Into<String>) -> Self {
149        self.secret_key = Some(secret_key.into());
150        self
151    }
152
153    #[must_use]
154    pub fn base_url(mut self, base_url: BaseUrl) -> Self {
155        self.base_url = Some(base_url);
156        self
157    }
158
159    pub fn base_url_str(mut self, base_url: impl AsRef<str>) -> Result<Self, Error> {
160        self.base_url = Some(BaseUrl::new(base_url.as_ref())?);
161        Ok(self)
162    }
163
164    pub fn credentials_from_env(self) -> Result<Self, Error> {
165        self.credentials_from_env_names(DATA_API_KEY_ENV, DATA_SECRET_KEY_ENV)
166    }
167
168    pub fn credentials_from_env_names(
169        mut self,
170        api_key_var: &str,
171        secret_key_var: &str,
172    ) -> Result<Self, Error> {
173        if let Some(credentials) = env::credentials_from_env_names(api_key_var, secret_key_var)? {
174            return Ok(self.credentials(credentials));
175        }
176
177        if let Some(credentials) = env::credentials_from_env()? {
178            self = self.credentials(credentials);
179        }
180
181        Ok(self)
182    }
183
184    pub fn base_url_from_env(mut self) -> Result<Self, Error> {
185        if let Some(base_url) = env::base_url_from_env_name(DATA_BASE_URL_ENV)? {
186            self.base_url = Some(base_url);
187            return Ok(self);
188        }
189
190        if let Some(base_url) = env::base_url_from_env_name(LEGACY_DATA_BASE_URL_ENV)? {
191            self.base_url = Some(base_url);
192        }
193
194        Ok(self)
195    }
196
197    #[must_use]
198    pub fn timeout(mut self, timeout: Duration) -> Self {
199        self.timeout = Some(timeout);
200        self
201    }
202
203    #[must_use]
204    pub fn reqwest_client(mut self, reqwest_client: reqwest::Client) -> Self {
205        self.reqwest_client = Some(reqwest_client);
206        self
207    }
208
209    #[must_use]
210    pub fn observer(mut self, observer: Arc<dyn TransportObserver>) -> Self {
211        self.observer = Some(observer);
212        self
213    }
214
215    #[must_use]
216    pub fn retry_config(mut self, retry_config: RetryConfig) -> Self {
217        self.retry_config = retry_config;
218        self
219    }
220
221    #[must_use]
222    pub fn max_in_flight(mut self, max_in_flight: usize) -> Self {
223        self.max_in_flight = Some(max_in_flight);
224        self
225    }
226
227    pub fn build(self) -> Result<Client, Error> {
228        if self.reqwest_client.is_some() && self.timeout.is_some() {
229            return Err(Error::InvalidConfiguration(
230                "reqwest_client owns timeout configuration; remove timeout(...) or configure timeout on the injected reqwest::Client".to_owned(),
231            ));
232        }
233
234        let credentials = match (self.api_key, self.secret_key) {
235            (Some(api_key), Some(secret_key)) => Credentials::new(api_key, secret_key)?,
236            (None, None) => return Err(Error::MissingCredentials),
237            _ => {
238                return Err(Error::InvalidConfiguration(
239                    "api_key and secret_key must be paired".to_owned(),
240                ));
241            }
242        };
243
244        let base_url = match self.base_url {
245            Some(base_url) => base_url,
246            None => BaseUrl::new(DEFAULT_DATA_BASE_URL)?,
247        };
248        let auth = StaticHeaderAuthenticator::from_pairs([
249            (APCA_API_KEY_HEADER, credentials.api_key()),
250            (APCA_API_SECRET_HEADER, credentials.secret_key()),
251        ])?;
252
253        let mut http_builder = HttpClient::builder().retry_config(self.retry_config);
254        if let Some(timeout) = self.timeout {
255            http_builder = http_builder.timeout(timeout);
256        }
257        if let Some(reqwest_client) = self.reqwest_client {
258            http_builder = http_builder.reqwest_client(reqwest_client);
259        }
260        if let Some(observer) = self.observer {
261            http_builder = http_builder.observer(observer);
262        }
263        if let Some(max_in_flight) = self.max_in_flight {
264            http_builder =
265                http_builder.concurrency_limit(ConcurrencyLimit::new(Some(max_in_flight)));
266        }
267
268        let http = http_builder.build()?;
269
270        Ok(Client {
271            inner: Arc::new(ClientInner {
272                http,
273                auth,
274                base_url,
275            }),
276        })
277    }
278}
279
280impl ClientInner {
281    #[allow(dead_code)]
282    pub(crate) async fn send_json<T>(&self, request: RequestParts) -> Result<HttpResponse<T>, Error>
283    where
284        T: DeserializeOwned,
285    {
286        self.http
287            .send_json(&self.base_url, request, Some(&self.auth))
288            .await
289            .map_err(Error::from)
290    }
291
292    #[allow(dead_code)]
293    pub(crate) async fn send_text(
294        &self,
295        request: RequestParts,
296    ) -> Result<HttpResponse<String>, Error> {
297        self.http
298            .send_text(&self.base_url, request, Some(&self.auth))
299            .await
300            .map_err(Error::from)
301    }
302
303    #[allow(dead_code)]
304    pub(crate) async fn send_no_content(
305        &self,
306        request: RequestParts,
307    ) -> Result<HttpResponse<NoContent>, Error> {
308        self.http
309            .send_no_content(&self.base_url, request, Some(&self.auth))
310            .await
311            .map_err(Error::from)
312    }
313
314    #[must_use]
315    pub(crate) fn base_url(&self) -> &BaseUrl {
316        &self.base_url
317    }
318}
319
320fn redacted_option(value: &Option<String>) -> &'static str {
321    match value {
322        Some(_) => "[REDACTED]",
323        None => "None",
324    }
325}