Skip to main content

alpaca_data/
client.rs

1use std::fmt;
2use std::sync::Arc;
3use std::time::Duration;
4
5use alpaca_core::{BaseUrl, Credentials, env};
6use alpaca_http::{
7    ConcurrencyLimit, HttpClient, HttpResponse, NoContent, RequestParts, RetryConfig,
8    StaticHeaderAuthenticator, TransportObserver,
9};
10use serde::de::DeserializeOwned;
11
12use crate::{
13    Error, corporate_actions::CorporateActionsClient, news::NewsClient, options::OptionsClient,
14    stocks::StocksClient,
15};
16
17pub const DATA_API_KEY_ENV: &str = "ALPACA_DATA_API_KEY";
18pub const DATA_SECRET_KEY_ENV: &str = "ALPACA_DATA_SECRET_KEY";
19const DEFAULT_DATA_BASE_URL: &str = "https://data.alpaca.markets";
20const APCA_API_KEY_HEADER: &str = "APCA-API-KEY-ID";
21const APCA_API_SECRET_HEADER: &str = "APCA-API-SECRET-KEY";
22const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
23const DEFAULT_MAX_IN_FLIGHT: usize = 50;
24const DEFAULT_POOL_MAX_IDLE_PER_HOST: usize = 50;
25const DEFAULT_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(90);
26const DEFAULT_TCP_KEEPALIVE: Duration = Duration::from_secs(60);
27
28#[derive(Clone)]
29pub struct Client {
30    pub(crate) inner: Arc<ClientInner>,
31}
32
33#[allow(dead_code)]
34pub(crate) struct ClientInner {
35    http: HttpClient,
36    auth: StaticHeaderAuthenticator,
37    base_url: BaseUrl,
38}
39
40#[derive(Clone, Default)]
41pub struct ClientBuilder {
42    api_key: Option<String>,
43    secret_key: Option<String>,
44    timeout: Option<Duration>,
45    observer: Option<Arc<dyn TransportObserver>>,
46    retry_config: RetryConfig,
47    max_in_flight: Option<usize>,
48}
49
50impl Client {
51    #[must_use]
52    pub fn builder() -> ClientBuilder {
53        ClientBuilder::default()
54    }
55
56    pub fn new(credentials: Credentials) -> Result<Self, Error> {
57        Self::builder().credentials(credentials).build()
58    }
59
60    pub fn from_env() -> Result<Self, Error> {
61        Self::builder().credentials_from_env()?.build()
62    }
63
64    #[must_use]
65    pub fn stocks(&self) -> StocksClient {
66        StocksClient::new(self.inner.clone())
67    }
68
69    #[must_use]
70    pub fn options(&self) -> OptionsClient {
71        OptionsClient::new(self.inner.clone())
72    }
73
74    #[must_use]
75    pub fn news(&self) -> NewsClient {
76        NewsClient::new(self.inner.clone())
77    }
78
79    #[must_use]
80    pub fn corporate_actions(&self) -> CorporateActionsClient {
81        CorporateActionsClient::new(self.inner.clone())
82    }
83}
84
85impl fmt::Debug for Client {
86    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87        f.debug_struct("Client")
88            .field("http", &"HttpClient")
89            .field("auth", &"[REDACTED]")
90            .finish()
91    }
92}
93
94impl fmt::Debug for ClientInner {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        f.debug_struct("ClientInner")
97            .field("http", &"HttpClient")
98            .field("auth", &"[REDACTED]")
99            .finish()
100    }
101}
102
103impl fmt::Debug for ClientBuilder {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        f.debug_struct("ClientBuilder")
106            .field("api_key", &redacted_option(&self.api_key))
107            .field("secret_key", &redacted_option(&self.secret_key))
108            .field("timeout", &self.timeout)
109            .field(
110                "observer",
111                &self.observer.as_ref().map(|_| "TransportObserver"),
112            )
113            .field("retry_config", &self.retry_config)
114            .field("max_in_flight", &self.max_in_flight)
115            .finish()
116    }
117}
118
119impl ClientBuilder {
120    #[must_use]
121    pub fn credentials(mut self, credentials: Credentials) -> Self {
122        self.api_key = Some(credentials.api_key().to_owned());
123        self.secret_key = Some(credentials.secret_key().to_owned());
124        self
125    }
126
127    #[must_use]
128    pub fn api_key(mut self, api_key: impl Into<String>) -> Self {
129        self.api_key = Some(api_key.into());
130        self
131    }
132
133    #[must_use]
134    pub fn secret_key(mut self, secret_key: impl Into<String>) -> Self {
135        self.secret_key = Some(secret_key.into());
136        self
137    }
138
139    pub fn credentials_from_env(self) -> Result<Self, Error> {
140        self.credentials_from_env_names(DATA_API_KEY_ENV, DATA_SECRET_KEY_ENV)
141    }
142
143    pub fn credentials_from_env_names(
144        mut self,
145        api_key_var: &str,
146        secret_key_var: &str,
147    ) -> Result<Self, Error> {
148        if let Some(credentials) = env::credentials_from_env_names(api_key_var, secret_key_var)? {
149            return Ok(self.credentials(credentials));
150        }
151
152        if let Some(credentials) = env::credentials_from_env()? {
153            self = self.credentials(credentials);
154        }
155
156        Ok(self)
157    }
158
159    #[must_use]
160    pub fn timeout(mut self, timeout: Duration) -> Self {
161        self.timeout = Some(timeout);
162        self
163    }
164
165    #[must_use]
166    pub fn observer(mut self, observer: Arc<dyn TransportObserver>) -> Self {
167        self.observer = Some(observer);
168        self
169    }
170
171    #[must_use]
172    pub fn retry_config(mut self, retry_config: RetryConfig) -> Self {
173        self.retry_config = retry_config;
174        self
175    }
176
177    #[must_use]
178    pub fn max_in_flight(mut self, max_in_flight: usize) -> Self {
179        self.max_in_flight = Some(max_in_flight);
180        self
181    }
182
183    pub fn build(self) -> Result<Client, Error> {
184        let credentials = match (self.api_key, self.secret_key) {
185            (Some(api_key), Some(secret_key)) => Credentials::new(api_key, secret_key)?,
186            (None, None) => return Err(Error::MissingCredentials),
187            _ => {
188                return Err(Error::InvalidConfiguration(
189                    "api_key and secret_key must be paired".to_owned(),
190                ));
191            }
192        };
193
194        let base_url = BaseUrl::new(DEFAULT_DATA_BASE_URL)?;
195        let auth = StaticHeaderAuthenticator::from_pairs([
196            (APCA_API_KEY_HEADER, credentials.api_key()),
197            (APCA_API_SECRET_HEADER, credentials.secret_key()),
198        ])?;
199
200        let timeout = self.timeout.unwrap_or(DEFAULT_TIMEOUT);
201        let reqwest_client = Self::build_reqwest_client(timeout)?;
202
203        let mut http_builder = HttpClient::builder()
204            .retry_config(self.retry_config)
205            .reqwest_client(reqwest_client);
206        if let Some(observer) = self.observer {
207            http_builder = http_builder.observer(observer);
208        }
209        http_builder = http_builder.concurrency_limit(ConcurrencyLimit::new(Some(
210            self.max_in_flight.unwrap_or(DEFAULT_MAX_IN_FLIGHT),
211        )));
212
213        let http = http_builder.build()?;
214
215        Ok(Client {
216            inner: Arc::new(ClientInner {
217                http,
218                auth,
219                base_url,
220            }),
221        })
222    }
223
224    fn build_reqwest_client(timeout: Duration) -> Result<reqwest::Client, Error> {
225        reqwest::Client::builder()
226            .no_proxy()
227            .pool_max_idle_per_host(DEFAULT_POOL_MAX_IDLE_PER_HOST)
228            .pool_idle_timeout(DEFAULT_POOL_IDLE_TIMEOUT)
229            .tcp_keepalive(DEFAULT_TCP_KEEPALIVE)
230            .timeout(timeout)
231            .http1_only()
232            .build()
233            .map_err(|error| alpaca_http::Error::from_reqwest(error, None).into())
234    }
235}
236
237impl ClientInner {
238    #[allow(dead_code)]
239    pub(crate) async fn send_json<T>(&self, request: RequestParts) -> Result<HttpResponse<T>, Error>
240    where
241        T: DeserializeOwned,
242    {
243        self.http
244            .send_json(&self.base_url, request, Some(&self.auth))
245            .await
246            .map_err(Error::from)
247    }
248
249    #[allow(dead_code)]
250    pub(crate) async fn send_text(
251        &self,
252        request: RequestParts,
253    ) -> Result<HttpResponse<String>, Error> {
254        self.http
255            .send_text(&self.base_url, request, Some(&self.auth))
256            .await
257            .map_err(Error::from)
258    }
259
260    #[allow(dead_code)]
261    pub(crate) async fn send_no_content(
262        &self,
263        request: RequestParts,
264    ) -> Result<HttpResponse<NoContent>, Error> {
265        self.http
266            .send_no_content(&self.base_url, request, Some(&self.auth))
267            .await
268            .map_err(Error::from)
269    }
270}
271
272fn redacted_option(value: &Option<String>) -> &'static str {
273    match value {
274        Some(_) => "[REDACTED]",
275        None => "None",
276    }
277}