Skip to main content

alpaca_data/
client.rs

1use std::fmt;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::{
6    auth::Auth,
7    corporate_actions::CorporateActionsClient,
8    crypto::CryptoClient,
9    env,
10    error::Error,
11    news::NewsClient,
12    options::OptionsClient,
13    stocks::StocksClient,
14    transport::{
15        http::HttpClient,
16        observer::{ObserverHandle, TransportObserver},
17        rate_limit::RateLimiter,
18        retry::RetryConfig,
19    },
20};
21
22const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
23
24/// Root async client for Alpaca Market Data HTTP APIs.
25///
26/// Build a client once, then obtain resource clients with [`Self::stocks`],
27/// [`Self::options`], [`Self::crypto`], [`Self::news`], and
28/// [`Self::corporate_actions`].
29///
30/// # Examples
31///
32/// ```no_run
33/// use alpaca_data::Client;
34///
35/// let client = Client::builder().build()?;
36/// # let _ = client;
37/// # Ok::<(), alpaca_data::Error>(())
38/// ```
39#[derive(Clone)]
40pub struct Client {
41    pub(crate) inner: Arc<Inner>,
42}
43
44#[allow(dead_code)]
45pub(crate) struct Inner {
46    pub(crate) auth: Auth,
47    pub(crate) base_url: String,
48    pub(crate) timeout: Option<Duration>,
49    pub(crate) retry_config: RetryConfig,
50    pub(crate) max_in_flight: Option<usize>,
51    pub(crate) http: HttpClient,
52}
53
54#[derive(Clone)]
55pub struct ClientBuilder {
56    api_key: Option<String>,
57    secret_key: Option<String>,
58    base_url: Option<String>,
59    timeout: Option<Duration>,
60    reqwest_client: Option<reqwest::Client>,
61    observer: Option<ObserverHandle>,
62    retry_config: RetryConfig,
63    max_in_flight: Option<usize>,
64}
65
66impl fmt::Debug for Client {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        f.debug_struct("Client")
69            .field("inner", &self.inner)
70            .finish()
71    }
72}
73
74impl fmt::Debug for Inner {
75    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76        f.debug_struct("Inner")
77            .field("auth", &self.auth)
78            .field("base_url", &RedactedBaseUrl(&self.base_url))
79            .field("timeout", &self.timeout)
80            .field("retry_config", &self.retry_config)
81            .field("max_in_flight", &self.max_in_flight)
82            .field("http", &ConfiguredDebug("HttpClient"))
83            .finish()
84    }
85}
86
87impl fmt::Debug for ClientBuilder {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        f.debug_struct("ClientBuilder")
90            .field("api_key", &RedactedCredential(&self.api_key))
91            .field("secret_key", &RedactedCredential(&self.secret_key))
92            .field("base_url", &self.base_url.as_deref().map(RedactedBaseUrl))
93            .field("timeout", &self.timeout)
94            .field(
95                "reqwest_client",
96                &self
97                    .reqwest_client
98                    .as_ref()
99                    .map(|_| ConfiguredDebug("reqwest::Client")),
100            )
101            .field("observer", &self.observer)
102            .field("retry_config", &self.retry_config)
103            .field("max_in_flight", &self.max_in_flight)
104            .finish()
105    }
106}
107
108impl Client {
109    /// Builds a client with default runtime settings and no credentials.
110    ///
111    /// This is useful for the currently implemented public crypto endpoints.
112    pub fn new() -> Self {
113        Self::builder()
114            .build()
115            .expect("client builder is infallible during bootstrap")
116    }
117
118    /// Starts a [`ClientBuilder`] for explicit runtime configuration.
119    pub fn builder() -> ClientBuilder {
120        ClientBuilder::default()
121    }
122
123    /// Returns the stocks resource client.
124    pub fn stocks(&self) -> StocksClient {
125        StocksClient::new(self.inner.clone())
126    }
127
128    /// Returns the options resource client.
129    pub fn options(&self) -> OptionsClient {
130        OptionsClient::new(self.inner.clone())
131    }
132
133    /// Returns the crypto resource client.
134    pub fn crypto(&self) -> CryptoClient {
135        CryptoClient::new(self.inner.clone())
136    }
137
138    /// Returns the news resource client.
139    pub fn news(&self) -> NewsClient {
140        NewsClient::new(self.inner.clone())
141    }
142
143    /// Returns the corporate actions resource client.
144    pub fn corporate_actions(&self) -> CorporateActionsClient {
145        CorporateActionsClient::new(self.inner.clone())
146    }
147
148    pub(crate) fn from_parts(
149        auth: Auth,
150        base_url: String,
151        timeout: Option<Duration>,
152        reqwest_client: Option<reqwest::Client>,
153        observer: Option<ObserverHandle>,
154        retry_config: RetryConfig,
155        max_in_flight: Option<usize>,
156    ) -> Result<Self, Error> {
157        let http = match reqwest_client {
158            Some(client) => HttpClient::with_client(
159                client,
160                observer,
161                retry_config.clone(),
162                RateLimiter::new(max_in_flight),
163            ),
164            None => HttpClient::from_timeout(
165                timeout.unwrap_or(DEFAULT_TIMEOUT),
166                observer,
167                retry_config.clone(),
168                RateLimiter::new(max_in_flight),
169            )?,
170        };
171
172        Ok(Self {
173            inner: Arc::new(Inner {
174                auth,
175                base_url,
176                timeout,
177                retry_config,
178                max_in_flight,
179                http,
180            }),
181        })
182    }
183}
184
185impl Default for ClientBuilder {
186    fn default() -> Self {
187        Self {
188            api_key: None,
189            secret_key: None,
190            base_url: None,
191            timeout: None,
192            reqwest_client: None,
193            observer: None,
194            retry_config: RetryConfig::default(),
195            max_in_flight: None,
196        }
197    }
198}
199
200impl ClientBuilder {
201    /// Sets the Alpaca API key.
202    pub fn api_key(mut self, api_key: impl Into<String>) -> Self {
203        self.api_key = Some(api_key.into());
204        self
205    }
206
207    /// Sets the Alpaca API secret key.
208    pub fn secret_key(mut self, secret_key: impl Into<String>) -> Self {
209        self.secret_key = Some(secret_key.into());
210        self
211    }
212
213    /// Overrides the default data API base URL.
214    pub fn base_url(mut self, base_url: impl Into<String>) -> Self {
215        self.base_url = Some(base_url.into());
216        self
217    }
218
219    /// Sets the request timeout for the internally constructed `reqwest::Client`.
220    ///
221    /// Building fails if `reqwest_client(...)` is also used because the injected
222    /// client owns its own timeout configuration.
223    pub fn timeout(mut self, timeout: Duration) -> Self {
224        self.timeout = Some(timeout);
225        self
226    }
227
228    /// Injects a preconfigured `reqwest::Client` for advanced transport tuning.
229    ///
230    /// The injected client owns reqwest-level behavior such as connection
231    /// pooling, proxy behavior, default headers, and timeout settings. Build
232    /// validation rejects conflicting builder settings such as `timeout(...)`.
233    pub fn reqwest_client(mut self, reqwest_client: reqwest::Client) -> Self {
234        self.reqwest_client = Some(reqwest_client);
235        self
236    }
237
238    /// Registers an immutable observer for successful transport responses.
239    ///
240    /// Observers receive endpoint metadata only. They cannot change request
241    /// execution or response shaping.
242    pub fn observer(mut self, observer: Arc<dyn TransportObserver>) -> Self {
243        self.observer = Some(ObserverHandle::new(observer));
244        self
245    }
246
247    /// Sets the maximum number of retry attempts for one request.
248    ///
249    /// This applies to server-error retries by default. HTTP 429 retries only
250    /// participate when [`Self::retry_on_429`] is enabled.
251    pub fn max_retries(mut self, max_retries: u32) -> Self {
252        self.retry_config.max_retries = max_retries;
253        self
254    }
255
256    /// Enables or disables automatic retries on HTTP 429 responses.
257    ///
258    /// This setting is disabled by default and affects only 429 responses. It
259    /// does not automatically enable honoring `Retry-After`.
260    pub fn retry_on_429(mut self, retry_on_429: bool) -> Self {
261        self.retry_config.retry_on_429 = retry_on_429;
262        self
263    }
264
265    /// Enables or disables honoring the `Retry-After` response header.
266    ///
267    /// This setting only participates when 429 retries are enabled with
268    /// [`Self::retry_on_429`]. If a 429 response omits `Retry-After`, the
269    /// transport falls back to the configured backoff schedule.
270    pub fn respect_retry_after(mut self, respect_retry_after: bool) -> Self {
271        self.retry_config.respect_retry_after = respect_retry_after;
272        self
273    }
274
275    /// Sets the base retry backoff used by the shared HTTP transport.
276    pub fn base_backoff(mut self, base_backoff: Duration) -> Self {
277        self.retry_config.base_backoff = base_backoff;
278        self
279    }
280
281    /// Sets the maximum retry backoff used by the shared HTTP transport.
282    pub fn max_backoff(mut self, max_backoff: Duration) -> Self {
283        self.retry_config.max_backoff = max_backoff;
284        self
285    }
286
287    /// Adds a bounded random delay on top of each computed retry wait.
288    ///
289    /// Jitter helps concurrent callers avoid retrying in lockstep. The
290    /// transport keeps the added delay within the configured retry budget and
291    /// maximum backoff constraints.
292    pub fn retry_jitter(mut self, retry_jitter: Duration) -> Self {
293        self.retry_config.jitter = Some(retry_jitter);
294        self
295    }
296
297    /// Sets an optional elapsed-time budget for one request's retry loop.
298    ///
299    /// The transport subtracts the request's retry-loop elapsed time from this
300    /// budget before each retry decision. The remaining budget then caps each
301    /// scheduled retry wait, including waits derived from `Retry-After` and
302    /// waits with jitter enabled.
303    pub fn total_retry_budget(mut self, total_retry_budget: Duration) -> Self {
304        self.retry_config.total_retry_budget = Some(total_retry_budget);
305        self
306    }
307
308    /// Loads credentials from `APCA_API_KEY_ID` and `APCA_API_SECRET_KEY`.
309    ///
310    /// If both variables are unset, the builder is left unchanged. If only one
311    /// side is set, this returns [`Error::InvalidConfiguration`].
312    pub fn credentials_from_env(self) -> Result<Self, Error> {
313        self.credentials_from_env_names(env::DEFAULT_API_KEY_ENV, env::DEFAULT_SECRET_KEY_ENV)
314    }
315
316    /// Loads credentials from the provided environment variable names.
317    ///
318    /// If both variables are unset, the builder is left unchanged. If only one
319    /// side is set, this returns [`Error::InvalidConfiguration`].
320    pub fn credentials_from_env_names(
321        mut self,
322        api_key_var: &str,
323        secret_key_var: &str,
324    ) -> Result<Self, Error> {
325        if let Some((api_key, secret_key)) =
326            env::credentials_from_env_names(api_key_var, secret_key_var)?
327        {
328            self.api_key = Some(api_key);
329            self.secret_key = Some(secret_key);
330        }
331
332        Ok(self)
333    }
334
335    /// Sets the maximum number of concurrent in-flight requests.
336    pub fn max_in_flight(mut self, max_in_flight: usize) -> Self {
337        self.max_in_flight = Some(max_in_flight);
338        self
339    }
340
341    /// Validates configuration and builds a [`Client`].
342    ///
343    /// Credentials must be provided as a pair or omitted as a pair. Any
344    /// provided `api_key` and `secret_key` values must be nonblank and valid
345    /// HTTP header values before the client is constructed.
346    pub fn build(self) -> Result<Client, Error> {
347        if self.retry_config.max_backoff < self.retry_config.base_backoff {
348            return Err(Error::InvalidConfiguration(
349                "max_backoff must be greater than or equal to base_backoff".into(),
350            ));
351        }
352
353        if self.reqwest_client.is_some() && self.timeout.is_some() {
354            return Err(Error::InvalidConfiguration(
355                "reqwest_client owns timeout configuration; remove timeout(...) or configure timeout on the injected reqwest::Client".into(),
356            ));
357        }
358
359        let auth = Auth::new(self.api_key, self.secret_key)?;
360        let base_url = self
361            .base_url
362            .unwrap_or_else(|| "https://data.alpaca.markets".to_string());
363
364        Client::from_parts(
365            auth,
366            base_url,
367            self.timeout,
368            self.reqwest_client,
369            self.observer,
370            self.retry_config,
371            self.max_in_flight,
372        )
373    }
374}
375
376struct RedactedCredential<'a>(&'a Option<String>);
377
378impl fmt::Debug for RedactedCredential<'_> {
379    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380        match self.0 {
381            Some(_) => f.write_str("\"[REDACTED]\""),
382            None => f.write_str("None"),
383        }
384    }
385}
386
387struct RedactedBaseUrl<'a>(&'a str);
388
389impl fmt::Debug for RedactedBaseUrl<'_> {
390    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
391        fmt::Debug::fmt(&redact_base_url_userinfo(self.0), f)
392    }
393}
394
395struct ConfiguredDebug(&'static str);
396
397impl fmt::Debug for ConfiguredDebug {
398    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
399        write!(f, "\"{} {{ .. }}\"", self.0)
400    }
401}
402
403fn redact_base_url_userinfo(base_url: &str) -> String {
404    if let Ok(mut url) = reqwest::Url::parse(base_url) {
405        if !url.username().is_empty() || url.password().is_some() {
406            let _ = url.set_username("");
407            let _ = url.set_password(None);
408            return url.to_string();
409        }
410    }
411
412    redact_base_url_userinfo_fallback(base_url)
413}
414
415fn redact_base_url_userinfo_fallback(base_url: &str) -> String {
416    let (prefix, rest) = if let Some((scheme, rest)) = base_url.split_once("://") {
417        (&base_url[..scheme.len() + 3], rest)
418    } else if let Some(rest) = base_url.strip_prefix("//") {
419        ("//", rest)
420    } else {
421        ("", base_url)
422    };
423
424    let (authority, suffix) = split_authority_and_suffix(rest);
425
426    match authority.rfind('@') {
427        Some(index) => format!("{prefix}{}{}", &authority[index + 1..], suffix),
428        None => base_url.to_string(),
429    }
430}
431
432fn split_authority_and_suffix(rest: &str) -> (&str, &str) {
433    match rest
434        .char_indices()
435        .find_map(|(index, ch)| matches!(ch, '/' | '?' | '#').then_some(index))
436    {
437        Some(index) => (&rest[..index], &rest[index..]),
438        None => (rest, ""),
439    }
440}