Skip to main content

braze_sync/braze/
mod.rs

1//! Braze REST API client.
2//!
3//! Layered:
4//! - [`error`]: typed [`error::BrazeApiError`] variants
5//! - [`catalog`] (and sibling modules per resource):
6//!   per-endpoint async methods written as `impl BrazeClient { ... }`
7//!   blocks
8//!
9//! Every request goes through [`BrazeClient::send_json`] so authentication,
10//! `User-Agent`, and 429 retry behavior are defined exactly once.
11//!
12//! ## Rate limiting philosophy
13//!
14//! braze-sync does **not** carry a client-side predictive rate limiter.
15//! Braze is authoritative on its own quotas (and shares pools across
16//! endpoints in ways the client can't know), so we react to 429 +
17//! `Retry-After` instead of pre-throttling. The retry loop below is the
18//! only pacing mechanism: it honors `Retry-After` exactly when present,
19//! does exponential backoff with full jitter when absent, and gives up
20//! when either a total-time budget or a hard attempt cap is exceeded.
21
22pub mod catalog;
23pub mod content_block;
24pub mod custom_attribute;
25pub mod email_template;
26pub mod error;
27
28use crate::braze::error::BrazeApiError;
29use reqwest::{Client, RequestBuilder, StatusCode};
30use secrecy::{ExposeSecret, SecretString};
31use std::sync::Arc;
32use std::time::Duration;
33use url::Url;
34
35const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
36
37/// Cumulative sleep budget for 429 retries on a single logical request.
38/// Once total backoff sleep crosses this, the request fails fast so the
39/// caller can surface a user-visible error instead of hanging.
40const RETRY_BUDGET: Duration = Duration::from_secs(60);
41
42/// Hard attempt cap. Protects against a degenerate server returning
43/// `Retry-After: 0` forever (which would never consume the time budget
44/// because each sleep is zero).
45const RETRY_MAX_ATTEMPTS: u32 = 100;
46
47/// Exponential-backoff parameters. Used only when the 429 response has
48/// no `Retry-After` header.
49const BACKOFF_BASE: Duration = Duration::from_millis(500);
50const BACKOFF_CAP: Duration = Duration::from_secs(10);
51
52/// Cheap-to-clone Braze API client. Internally Arc-shares the API key
53/// and `reqwest::Client`'s connection pool, so cloning for a parallel
54/// batch is essentially free.
55#[derive(Clone)]
56pub struct BrazeClient {
57    http: Client,
58    base_url: Url,
59    api_key: Arc<SecretString>,
60}
61
62// Hand-written Debug to be 100% certain the api key never lands in
63// tracing output, even if SecretString's own Debug impl ever changes.
64impl std::fmt::Debug for BrazeClient {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("BrazeClient")
67            .field("base_url", &self.base_url)
68            .field("api_key", &"<redacted>")
69            .finish()
70    }
71}
72
73impl BrazeClient {
74    pub fn from_resolved(resolved: &crate::config::ResolvedConfig) -> Self {
75        Self::new(resolved.api_endpoint.clone(), resolved.api_key.clone())
76    }
77
78    pub fn new(base_url: Url, api_key: SecretString) -> Self {
79        let http = Client::builder()
80            .user_agent(concat!("braze-sync/", env!("CARGO_PKG_VERSION")))
81            .timeout(REQUEST_TIMEOUT)
82            .build()
83            .expect("reqwest client builds with default features");
84        Self {
85            http,
86            base_url,
87            api_key: Arc::new(api_key),
88        }
89    }
90
91    /// Build a URL by appending each `segment` to the base URL as a
92    /// separately percent-encoded path segment.
93    ///
94    /// User-controlled segments cannot inject path traversal or query
95    /// strings because the URL crate encodes `/`, `?`, `#`, and so on
96    /// inside each segment. Any path that the base URL itself carried is
97    /// dropped, so the layout is predictable regardless of how the user
98    /// wrote `api_endpoint` in their config.
99    pub(crate) fn url_for(&self, segments: &[&str]) -> Url {
100        let mut url = self.base_url.clone();
101        {
102            let mut seg = url
103                .path_segments_mut()
104                .expect("base url must be hierarchical (http/https)");
105            seg.clear();
106            for s in segments {
107                seg.push(s);
108            }
109        }
110        url
111    }
112
113    /// Attach bearer auth + JSON Accept to a raw request builder.
114    fn authed(&self, rb: RequestBuilder) -> RequestBuilder {
115        rb.bearer_auth(self.api_key.expose_secret())
116            .header(reqwest::header::ACCEPT, "application/json")
117    }
118
119    /// Pre-authenticated GET builder for the given path segments.
120    pub(crate) fn get(&self, segments: &[&str]) -> RequestBuilder {
121        self.authed(self.http.get(self.url_for(segments)))
122    }
123
124    pub(crate) fn post(&self, segments: &[&str]) -> RequestBuilder {
125        self.authed(self.http.post(self.url_for(segments)))
126    }
127
128    pub(crate) fn delete(&self, segments: &[&str]) -> RequestBuilder {
129        self.authed(self.http.delete(self.url_for(segments)))
130    }
131
132    /// Pre-authenticated GET for an absolute URL that must share origin
133    /// with `base_url`. Used by pagination paths that receive the URL of
134    /// the next page in a `Link: rel="next"` header. Refuses cross-origin
135    /// URLs so a compromised or misconfigured upstream can't redirect us
136    /// to attacker-controlled hosts carrying our bearer token.
137    pub(crate) fn get_absolute(&self, url: &str) -> Result<RequestBuilder, BrazeApiError> {
138        let parsed = Url::parse(url).map_err(|e| BrazeApiError::Http {
139            status: StatusCode::BAD_GATEWAY,
140            body: format!("malformed pagination URL {url:?}: {e}"),
141        })?;
142        let same_origin = parsed.scheme() == self.base_url.scheme()
143            && parsed.host_str() == self.base_url.host_str()
144            && parsed.port_or_known_default() == self.base_url.port_or_known_default();
145        if !same_origin {
146            return Err(BrazeApiError::Http {
147                status: StatusCode::BAD_GATEWAY,
148                body: format!(
149                    "refusing cross-origin pagination URL {url:?} (base is {})",
150                    self.base_url
151                ),
152            });
153        }
154        Ok(self.authed(self.http.get(parsed)))
155    }
156
157    /// Execute `builder` with 429 retry, returning the raw response on
158    /// success or a typed error on failure. Shared transport layer used
159    /// by both [`Self::send_json`] and [`Self::send_ok`] so the retry /
160    /// auth-mapping policy lives in exactly one place.
161    ///
162    /// Retry policy: honor `Retry-After` (integer seconds or HTTP-date)
163    /// when present; otherwise exponential backoff with full jitter
164    /// (`BACKOFF_BASE * 2^attempt`, capped at `BACKOFF_CAP`). Give up
165    /// when cumulative sleep exceeds [`RETRY_BUDGET`] or attempts exceed
166    /// [`RETRY_MAX_ATTEMPTS`] (the latter only protects against servers
167    /// that return `Retry-After: 0` forever).
168    async fn send_with_retry(
169        &self,
170        builder: RequestBuilder,
171    ) -> Result<reqwest::Response, BrazeApiError> {
172        let mut attempt: u32 = 0;
173        let mut elapsed = Duration::ZERO;
174        loop {
175            let req = builder
176                .try_clone()
177                .expect("non-streaming requests are cloneable");
178            let resp = req.send().await?;
179            let status = resp.status();
180
181            if status.is_success() {
182                return Ok(resp);
183            }
184            match status {
185                StatusCode::TOO_MANY_REQUESTS => {
186                    if attempt >= RETRY_MAX_ATTEMPTS || elapsed >= RETRY_BUDGET {
187                        return Err(BrazeApiError::RateLimitExhausted);
188                    }
189                    let remaining = RETRY_BUDGET.saturating_sub(elapsed);
190                    let wait = compute_backoff(&resp, attempt, remaining);
191                    tracing::warn!(?wait, attempt, ?elapsed, "429 received, backing off");
192                    tokio::time::sleep(wait).await;
193                    elapsed = elapsed.saturating_add(wait);
194                    attempt += 1;
195                }
196                StatusCode::UNAUTHORIZED => return Err(BrazeApiError::Unauthorized),
197                _ => {
198                    let body = resp.text().await.unwrap_or_default();
199                    return Err(BrazeApiError::Http { status, body });
200                }
201            }
202        }
203    }
204
205    /// Send `builder` and decode the JSON body as `T` on success.
206    pub(crate) async fn send_json<T: serde::de::DeserializeOwned>(
207        &self,
208        builder: RequestBuilder,
209    ) -> Result<T, BrazeApiError> {
210        let resp = self.send_with_retry(builder).await?;
211        Ok(resp.json::<T>().await?)
212    }
213
214    /// Like [`Self::send_json`] but also returns the `Link: rel="next"`
215    /// URL if present — the only header paginated endpoints care about.
216    /// Parsing it before body deserialization avoids cloning the full
217    /// `HeaderMap` on every paginated request.
218    pub(crate) async fn send_json_with_next_link<T: serde::de::DeserializeOwned>(
219        &self,
220        builder: RequestBuilder,
221    ) -> Result<(T, Option<String>), BrazeApiError> {
222        let resp = self.send_with_retry(builder).await?;
223        let next = parse_next_link(resp.headers());
224        let body = resp.json::<T>().await?;
225        Ok((body, next))
226    }
227
228    /// Send `builder` and discard the response body. Used for endpoints
229    /// whose only meaningful output is the HTTP status (POST add field,
230    /// DELETE field). Drains the body so the connection can return to
231    /// the reqwest pool cleanly even when the response is 204 No Content.
232    pub(crate) async fn send_ok(&self, builder: RequestBuilder) -> Result<(), BrazeApiError> {
233        let resp = self.send_with_retry(builder).await?;
234        let _ = resp.bytes().await;
235        Ok(())
236    }
237}
238
239/// Parse a `Retry-After` header. Supports both integer seconds (the
240/// common case for Braze) and RFC 7231 §7.1.3 HTTP-date (IMF-fixdate /
241/// RFC 2822). Returns `None` if the header is missing or unparseable.
242fn parse_retry_after(resp: &reqwest::Response) -> Option<Duration> {
243    let raw = resp
244        .headers()
245        .get(reqwest::header::RETRY_AFTER)?
246        .to_str()
247        .ok()?;
248    if let Ok(secs) = raw.parse::<u64>() {
249        return Some(Duration::from_secs(secs));
250    }
251    // HTTP-date. `parse_from_rfc2822` accepts IMF-fixdate (which is a
252    // strict subset). Negative deltas (date already past) collapse to 0.
253    let dt = chrono::DateTime::parse_from_rfc2822(raw).ok()?;
254    let delta = dt
255        .timestamp()
256        .saturating_sub(chrono::Utc::now().timestamp());
257    Some(Duration::from_secs(delta.max(0) as u64))
258}
259
260/// Compute the sleep duration before the next retry. Clamped to
261/// `remaining_budget` so a degenerate server can't push a single sleep
262/// past the retry budget.
263fn compute_backoff(resp: &reqwest::Response, attempt: u32, remaining_budget: Duration) -> Duration {
264    let wait = match parse_retry_after(resp) {
265        Some(ra) => ra,
266        None => {
267            // `attempt.min(6)` keeps `1u32 << attempt` from overflowing;
268            // `BACKOFF_BASE * 64 = 32s` already exceeds `BACKOFF_CAP`.
269            let shifted = BACKOFF_BASE.saturating_mul(1u32 << attempt.min(6));
270            let capped = shifted.min(BACKOFF_CAP);
271            Duration::from_millis(fastrand::u64(0..=capped.as_millis() as u64))
272        }
273    };
274    wait.min(remaining_budget)
275}
276
277/// Parse RFC 5988 Link header and return the URL associated with
278/// `rel="next"`, if present. Braze uses this for cursor-style pagination
279/// on `/custom_attributes`; the response body does not carry the cursor.
280pub(crate) fn parse_next_link(headers: &reqwest::header::HeaderMap) -> Option<String> {
281    // RFC 5988 allows repeated `Link` fields; `get_all` so a later
282    // `rel="next"` isn't missed. Braze cursors are comma-free in practice.
283    for hv in headers.get_all(reqwest::header::LINK) {
284        let Ok(raw) = hv.to_str() else { continue };
285        for part in raw.split(',') {
286            let part = part.trim();
287            let Some((url_part, params)) = part.split_once(';') else {
288                continue;
289            };
290            let has_next = params.split(';').map(str::trim).any(|p| {
291                let Some((k, v)) = p.split_once('=') else {
292                    return false;
293                };
294                if !k.trim().eq_ignore_ascii_case("rel") {
295                    return false;
296                }
297                // `rel` may be a space-delimited list (e.g. `"prev next"`).
298                v.trim()
299                    .trim_matches('"')
300                    .split_ascii_whitespace()
301                    .any(|tok| tok.eq_ignore_ascii_case("next"))
302            });
303            if !has_next {
304                continue;
305            }
306            let url = url_part
307                .trim()
308                .trim_start_matches('<')
309                .trim_end_matches('>');
310            return Some(url.to_string());
311        }
312    }
313    None
314}
315
316/// Hard cap on items fetched via offset pagination. At a 1000/page
317/// limit this is 100 pages — beyond any realistic workspace and
318/// guards against a server that keeps returning a full page.
319pub(crate) const LIST_SAFETY_CAP_ITEMS: usize = 100_000;
320
321/// Check that no two items in a list response share the same name.
322/// Shared by every list endpoint that indexes resources by name.
323pub(crate) fn check_duplicate_names<'a>(
324    names: impl Iterator<Item = &'a str>,
325    count: usize,
326    endpoint: &'static str,
327) -> Result<(), BrazeApiError> {
328    let mut seen = std::collections::HashSet::with_capacity(count);
329    for name in names {
330        if !seen.insert(name) {
331            return Err(BrazeApiError::DuplicateNameInListResponse {
332                endpoint,
333                name: name.to_string(),
334            });
335        }
336    }
337    Ok(())
338}
339
340/// Outcome of classifying the `message` field on a Braze `/info`
341/// response. Shared by content_block and email_template — the only
342/// difference is the resource-specific "not found" phrase.
343pub(crate) enum InfoMessageClass {
344    Success,
345    NotFound,
346    Unexpected(String),
347}
348
349/// Classify the `message` field returned by Braze `/info` endpoints.
350/// `resource_phrase` is a resource-specific not-found indicator
351/// (e.g. `"no content block"`, `"no email template"`).
352pub(crate) fn classify_info_message(
353    message: Option<&str>,
354    resource_phrase: &str,
355) -> InfoMessageClass {
356    debug_assert!(
357        resource_phrase == resource_phrase.to_ascii_lowercase(),
358        "resource_phrase must be lowercase (compared against lowercased message)"
359    );
360    let Some(raw) = message else {
361        return InfoMessageClass::Success;
362    };
363    let trimmed = raw.trim();
364    if trimmed.eq_ignore_ascii_case("success") {
365        return InfoMessageClass::Success;
366    }
367    let lower = trimmed.to_ascii_lowercase();
368    if lower.contains("not found")
369        || lower.contains(resource_phrase)
370        || lower.contains("does not exist")
371    {
372        InfoMessageClass::NotFound
373    } else {
374        InfoMessageClass::Unexpected(raw.to_string())
375    }
376}
377
378#[cfg(test)]
379pub(crate) fn test_client(server: &wiremock::MockServer) -> BrazeClient {
380    BrazeClient::new(
381        Url::parse(&server.uri()).unwrap(),
382        SecretString::from("test-key".to_string()),
383    )
384}
385
386#[cfg(test)]
387mod retry_tests {
388    use super::*;
389    use chrono::{Duration as ChronoDuration, Utc};
390    use wiremock::matchers::{method, path};
391    use wiremock::{Mock, MockServer, ResponseTemplate};
392
393    // Build a reqwest Response with only the specified Retry-After header
394    // set, without going through the network. Used to exercise the pure
395    // parsing path of `parse_retry_after`.
396    async fn response_with_retry_after(val: &str) -> reqwest::Response {
397        let server = MockServer::start().await;
398        Mock::given(method("GET"))
399            .and(path("/r"))
400            .respond_with(ResponseTemplate::new(429).insert_header("retry-after", val))
401            .mount(&server)
402            .await;
403        reqwest::get(format!("{}/r", server.uri())).await.unwrap()
404    }
405
406    #[tokio::test]
407    async fn retry_after_parses_integer_seconds() {
408        let resp = response_with_retry_after("5").await;
409        assert_eq!(parse_retry_after(&resp), Some(Duration::from_secs(5)));
410    }
411
412    #[tokio::test]
413    async fn retry_after_parses_http_date() {
414        let future = Utc::now() + ChronoDuration::seconds(10);
415        // `%a, %d %b %Y %H:%M:%S GMT` is IMF-fixdate, accepted by rfc2822.
416        let formatted = future.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
417        let resp = response_with_retry_after(&formatted).await;
418        let d = parse_retry_after(&resp).expect("should parse HTTP-date");
419        // Allow ±2s for scheduler/clock drift in CI.
420        assert!(
421            d >= Duration::from_secs(8) && d <= Duration::from_secs(12),
422            "expected ~10s, got {d:?}"
423        );
424    }
425
426    #[tokio::test]
427    async fn retry_after_past_http_date_clamps_to_zero() {
428        let past = Utc::now() - ChronoDuration::seconds(30);
429        let formatted = past.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
430        let resp = response_with_retry_after(&formatted).await;
431        assert_eq!(parse_retry_after(&resp), Some(Duration::ZERO));
432    }
433
434    #[tokio::test]
435    async fn retry_after_unparseable_returns_none() {
436        let resp = response_with_retry_after("not a date").await;
437        assert_eq!(parse_retry_after(&resp), None);
438    }
439
440    #[tokio::test]
441    async fn backoff_without_header_falls_back_to_exponential_jitter() {
442        // No Retry-After → full-jitter draw in [0, cap).
443        let server = MockServer::start().await;
444        Mock::given(method("GET"))
445            .and(path("/r"))
446            .respond_with(ResponseTemplate::new(429))
447            .mount(&server)
448            .await;
449        let resp = reqwest::get(format!("{}/r", server.uri())).await.unwrap();
450
451        // attempt=0 → min(cap, base*1)=500ms → jitter in [0, 500ms].
452        for _ in 0..20 {
453            let w = compute_backoff(&resp, 0, Duration::from_secs(60));
454            assert!(w <= Duration::from_millis(500), "attempt=0 bound: {w:?}");
455        }
456        // attempt=10 → saturates to cap (10s) → jitter in [0, 10s].
457        for _ in 0..20 {
458            let w = compute_backoff(&resp, 10, Duration::from_secs(60));
459            assert!(w <= BACKOFF_CAP, "attempt=10 cap: {w:?}");
460        }
461    }
462
463    #[tokio::test]
464    async fn backoff_clamped_to_remaining_budget() {
465        let resp = response_with_retry_after("30").await;
466        // Server says 30s but only 5s budget left.
467        let w = compute_backoff(&resp, 0, Duration::from_secs(5));
468        assert_eq!(w, Duration::from_secs(5));
469    }
470
471    #[test]
472    fn parse_next_link_single_rel() {
473        let mut h = reqwest::header::HeaderMap::new();
474        h.insert(
475            reqwest::header::LINK,
476            r#"<https://rest.example/custom_attributes/?cursor=abc>; rel="next""#
477                .parse()
478                .unwrap(),
479        );
480        assert_eq!(
481            parse_next_link(&h),
482            Some("https://rest.example/custom_attributes/?cursor=abc".to_string())
483        );
484    }
485
486    #[test]
487    fn parse_next_link_multiple_rels_picks_next() {
488        let mut h = reqwest::header::HeaderMap::new();
489        h.insert(
490            reqwest::header::LINK,
491            r#"<https://rest.example/?cursor=prev>; rel="prev", <https://rest.example/?cursor=next>; rel="next""#
492                .parse()
493                .unwrap(),
494        );
495        assert_eq!(
496            parse_next_link(&h),
497            Some("https://rest.example/?cursor=next".to_string())
498        );
499    }
500
501    #[test]
502    fn parse_next_link_absent_returns_none() {
503        let h = reqwest::header::HeaderMap::new();
504        assert_eq!(parse_next_link(&h), None);
505    }
506
507    #[test]
508    fn parse_next_link_without_next_rel_returns_none() {
509        let mut h = reqwest::header::HeaderMap::new();
510        h.insert(
511            reqwest::header::LINK,
512            r#"<https://rest.example/?cursor=prev>; rel="prev""#
513                .parse()
514                .unwrap(),
515        );
516        assert_eq!(parse_next_link(&h), None);
517    }
518
519    #[test]
520    fn parse_next_link_scans_multiple_link_header_fields() {
521        // Some servers emit multiple `Link:` header fields rather than
522        // comma-joining into one. `HeaderMap::get` only returns the first
523        // value; we must iterate `get_all` so a later `rel="next"` still
524        // wins.
525        let mut h = reqwest::header::HeaderMap::new();
526        h.append(
527            reqwest::header::LINK,
528            r#"<https://rest.example/?cursor=prev>; rel="prev""#
529                .parse()
530                .unwrap(),
531        );
532        h.append(
533            reqwest::header::LINK,
534            r#"<https://rest.example/?cursor=next>; rel="next""#
535                .parse()
536                .unwrap(),
537        );
538        assert_eq!(
539            parse_next_link(&h),
540            Some("https://rest.example/?cursor=next".to_string())
541        );
542    }
543
544    #[test]
545    fn parse_next_link_matches_space_delimited_rel_list() {
546        // RFC 5988 allows a space-delimited list of rel tokens.
547        let mut h = reqwest::header::HeaderMap::new();
548        h.insert(
549            reqwest::header::LINK,
550            r#"<https://rest.example/?cursor=n>; rel="prev next""#
551                .parse()
552                .unwrap(),
553        );
554        assert_eq!(
555            parse_next_link(&h),
556            Some("https://rest.example/?cursor=n".to_string())
557        );
558    }
559
560    #[tokio::test]
561    async fn get_absolute_rejects_cross_origin() {
562        let server = MockServer::start().await;
563        let client = super::test_client(&server);
564        let err = client
565            .get_absolute("https://attacker.example/custom_attributes/?cursor=x")
566            .unwrap_err();
567        let msg = format!("{err}");
568        assert!(msg.contains("cross-origin"), "got {msg:?}");
569    }
570
571    #[tokio::test]
572    async fn get_absolute_accepts_same_origin() {
573        let server = MockServer::start().await;
574        let client = super::test_client(&server);
575        let url = format!("{}/custom_attributes/?cursor=abc", server.uri());
576        let _builder = client
577            .get_absolute(&url)
578            .expect("same-origin URL should be accepted");
579    }
580
581    #[tokio::test]
582    async fn get_absolute_rejects_malformed_url() {
583        let server = MockServer::start().await;
584        let client = super::test_client(&server);
585        let err = client.get_absolute("not a url").unwrap_err();
586        let msg = format!("{err}");
587        assert!(msg.contains("malformed"), "got {msg:?}");
588    }
589
590    #[tokio::test]
591    async fn retries_attempt_cap_fires_on_degenerate_zero_retry_after() {
592        // retry-after: 0 forever → attempt cap is the only exit.
593        let server = MockServer::start().await;
594        Mock::given(method("GET"))
595            .and(path("/x"))
596            .respond_with(ResponseTemplate::new(429).insert_header("retry-after", "0"))
597            .mount(&server)
598            .await;
599        let client = super::test_client(&server);
600        let req = client.get(&["x"]);
601        let err = client
602            .send_json::<serde_json::Value>(req)
603            .await
604            .unwrap_err();
605        assert!(
606            matches!(err, BrazeApiError::RateLimitExhausted),
607            "expected RateLimitExhausted, got {err:?}"
608        );
609    }
610}