Skip to main content

braze_sync/braze/
mod.rs

1//! Braze REST API client.
2//!
3//! Layered:
4//! - [`rate_limit`]: token-bucket throttle (governor)
5//! - [`error`]: typed [`error::BrazeApiError`] variants
6//! - [`catalog`] (and sibling modules per resource):
7//!   per-endpoint async methods written as `impl BrazeClient { ... }`
8//!   blocks
9//!
10//! Every request goes through [`BrazeClient::send_json`] so authentication,
11//! `User-Agent`, rate limiting, and 429 retry behavior are defined exactly
12//! once. See IMPLEMENTATION.md §8.
13
14pub mod catalog;
15pub mod content_block;
16pub mod custom_attribute;
17pub mod email_template;
18pub mod error;
19pub mod rate_limit;
20
21use crate::braze::error::BrazeApiError;
22use crate::braze::rate_limit::RateLimiter;
23use reqwest::{Client, RequestBuilder, StatusCode};
24use secrecy::{ExposeSecret, SecretString};
25use std::sync::Arc;
26use std::time::Duration;
27use url::Url;
28
29const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
30const MAX_RETRIES: u32 = 3;
31const DEFAULT_RETRY_AFTER: Duration = Duration::from_secs(2);
32
33/// Cheap-to-clone Braze API client. Internally Arc-shares the API key,
34/// the rate limiter, and `reqwest::Client`'s connection pool, so cloning
35/// for a parallel batch is essentially free.
36#[derive(Clone)]
37pub struct BrazeClient {
38    http: Client,
39    base_url: Url,
40    api_key: Arc<SecretString>,
41    limiter: Arc<RateLimiter>,
42}
43
44// Hand-written Debug to be 100% certain the api key never lands in
45// tracing output, even if SecretString's own Debug impl ever changes.
46impl std::fmt::Debug for BrazeClient {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("BrazeClient")
49            .field("base_url", &self.base_url)
50            .field("api_key", &"<redacted>")
51            .finish()
52    }
53}
54
55impl BrazeClient {
56    pub fn from_resolved(resolved: &crate::config::ResolvedConfig) -> Self {
57        Self::new(
58            resolved.api_endpoint.clone(),
59            resolved.api_key.clone(),
60            resolved.rate_limit_per_minute,
61        )
62    }
63
64    pub fn new(base_url: Url, api_key: SecretString, rpm: u32) -> Self {
65        let http = Client::builder()
66            .user_agent(concat!("braze-sync/", env!("CARGO_PKG_VERSION")))
67            .timeout(REQUEST_TIMEOUT)
68            .build()
69            .expect("reqwest client builds with default features");
70        Self {
71            http,
72            base_url,
73            api_key: Arc::new(api_key),
74            limiter: Arc::new(RateLimiter::new(rpm)),
75        }
76    }
77
78    /// Build a URL by appending each `segment` to the base URL as a
79    /// separately percent-encoded path segment.
80    ///
81    /// User-controlled segments cannot inject path traversal or query
82    /// strings because the URL crate encodes `/`, `?`, `#`, and so on
83    /// inside each segment. Any path that the base URL itself carried is
84    /// dropped, so the layout is predictable regardless of how the user
85    /// wrote `api_endpoint` in their config.
86    pub(crate) fn url_for(&self, segments: &[&str]) -> Url {
87        let mut url = self.base_url.clone();
88        {
89            let mut seg = url
90                .path_segments_mut()
91                .expect("base url must be hierarchical (http/https)");
92            seg.clear();
93            for s in segments {
94                seg.push(s);
95            }
96        }
97        url
98    }
99
100    /// Pre-authenticated GET builder for the given path segments.
101    pub(crate) fn get(&self, segments: &[&str]) -> RequestBuilder {
102        let url = self.url_for(segments);
103        self.http
104            .get(url)
105            .bearer_auth(self.api_key.expose_secret())
106            .header(reqwest::header::ACCEPT, "application/json")
107    }
108
109    pub(crate) fn post(&self, segments: &[&str]) -> RequestBuilder {
110        let url = self.url_for(segments);
111        self.http
112            .post(url)
113            .bearer_auth(self.api_key.expose_secret())
114            .header(reqwest::header::ACCEPT, "application/json")
115    }
116
117    pub(crate) fn delete(&self, segments: &[&str]) -> RequestBuilder {
118        let url = self.url_for(segments);
119        self.http
120            .delete(url)
121            .bearer_auth(self.api_key.expose_secret())
122            .header(reqwest::header::ACCEPT, "application/json")
123    }
124
125    /// Execute `builder` with rate-limit acquire + 429 retry, returning
126    /// the raw response on success or a typed error on failure. Shared
127    /// transport layer used by both [`Self::send_json`] and
128    /// [`Self::send_ok`] so the retry / auth-mapping policy lives in
129    /// exactly one place.
130    async fn send_with_retry(
131        &self,
132        builder: RequestBuilder,
133    ) -> Result<reqwest::Response, BrazeApiError> {
134        let mut attempt: u32 = 0;
135        loop {
136            self.limiter.acquire().await;
137            let req = builder
138                .try_clone()
139                .expect("non-streaming requests are cloneable");
140            let resp = req.send().await?;
141            let status = resp.status();
142
143            if status.is_success() {
144                return Ok(resp);
145            }
146            match status {
147                StatusCode::TOO_MANY_REQUESTS if attempt < MAX_RETRIES => {
148                    let wait = parse_retry_after(&resp).unwrap_or(DEFAULT_RETRY_AFTER);
149                    tracing::warn!(?wait, attempt, "429 received, backing off");
150                    tokio::time::sleep(wait).await;
151                    attempt += 1;
152                }
153                StatusCode::TOO_MANY_REQUESTS => {
154                    return Err(BrazeApiError::RateLimitExhausted);
155                }
156                StatusCode::UNAUTHORIZED => return Err(BrazeApiError::Unauthorized),
157                _ => {
158                    let body = resp.text().await.unwrap_or_default();
159                    return Err(BrazeApiError::Http { status, body });
160                }
161            }
162        }
163    }
164
165    /// Send `builder` and decode the JSON body as `T` on success.
166    pub(crate) async fn send_json<T: serde::de::DeserializeOwned>(
167        &self,
168        builder: RequestBuilder,
169    ) -> Result<T, BrazeApiError> {
170        let resp = self.send_with_retry(builder).await?;
171        Ok(resp.json::<T>().await?)
172    }
173
174    /// Send `builder` and discard the response body. Used for endpoints
175    /// whose only meaningful output is the HTTP status (POST add field,
176    /// DELETE field). Drains the body so the connection can return to
177    /// the reqwest pool cleanly even when the response is 204 No Content.
178    pub(crate) async fn send_ok(&self, builder: RequestBuilder) -> Result<(), BrazeApiError> {
179        let resp = self.send_with_retry(builder).await?;
180        let _ = resp.bytes().await;
181        Ok(())
182    }
183}
184
185/// Parse a `Retry-After` header as integer seconds. HTTP-date format
186/// (RFC 7231 §7.1.3) is not handled and falls through to `None`, which
187/// the caller maps to `DEFAULT_RETRY_AFTER`. Braze only sends seconds
188/// in practice; if that changes, extend this function rather than adding
189/// a full HTTP-date parser.
190fn parse_retry_after(resp: &reqwest::Response) -> Option<Duration> {
191    resp.headers()
192        .get(reqwest::header::RETRY_AFTER)?
193        .to_str()
194        .ok()?
195        .parse::<u64>()
196        .ok()
197        .map(Duration::from_secs)
198}
199
200/// Check whether a list response was truncated and return a
201/// `PaginationNotImplemented` error if so.  Shared by every list
202/// endpoint that uses the fail-closed pagination guard.
203pub(crate) fn check_pagination(
204    count: Option<usize>,
205    returned: usize,
206    limit: usize,
207    endpoint: &'static str,
208) -> Result<(), BrazeApiError> {
209    let truncation_detail: Option<String> = match count {
210        Some(total) if total > returned => Some(format!("got {returned} of {total} results")),
211        None if returned >= limit => Some(format!(
212            "got a full page of {returned} result(s) with no total reported; \
213             cannot verify whether more exist"
214        )),
215        _ => None,
216    };
217    if let Some(detail) = truncation_detail {
218        return Err(BrazeApiError::PaginationNotImplemented { endpoint, detail });
219    }
220    Ok(())
221}
222
223/// Check that no two items in a list response share the same name.
224/// Shared by every list endpoint that indexes resources by name.
225pub(crate) fn check_duplicate_names<'a>(
226    names: impl Iterator<Item = &'a str>,
227    count: usize,
228    endpoint: &'static str,
229) -> Result<(), BrazeApiError> {
230    let mut seen = std::collections::HashSet::with_capacity(count);
231    for name in names {
232        if !seen.insert(name) {
233            return Err(BrazeApiError::DuplicateNameInListResponse {
234                endpoint,
235                name: name.to_string(),
236            });
237        }
238    }
239    Ok(())
240}
241
242/// Outcome of classifying the `message` field on a Braze `/info`
243/// response. Shared by content_block and email_template — the only
244/// difference is the resource-specific "not found" phrase.
245pub(crate) enum InfoMessageClass {
246    Success,
247    NotFound,
248    Unexpected(String),
249}
250
251/// Classify the `message` field returned by Braze `/info` endpoints.
252/// `resource_phrase` is a resource-specific not-found indicator
253/// (e.g. `"no content block"`, `"no email template"`).
254pub(crate) fn classify_info_message(
255    message: Option<&str>,
256    resource_phrase: &str,
257) -> InfoMessageClass {
258    debug_assert!(
259        resource_phrase == resource_phrase.to_ascii_lowercase(),
260        "resource_phrase must be lowercase (compared against lowercased message)"
261    );
262    let Some(raw) = message else {
263        return InfoMessageClass::Success;
264    };
265    let trimmed = raw.trim();
266    if trimmed.eq_ignore_ascii_case("success") {
267        return InfoMessageClass::Success;
268    }
269    let lower = trimmed.to_ascii_lowercase();
270    if lower.contains("not found")
271        || lower.contains(resource_phrase)
272        || lower.contains("does not exist")
273    {
274        InfoMessageClass::NotFound
275    } else {
276        InfoMessageClass::Unexpected(raw.to_string())
277    }
278}
279
280#[cfg(test)]
281pub(crate) fn test_client(server: &wiremock::MockServer) -> BrazeClient {
282    BrazeClient::new(
283        Url::parse(&server.uri()).unwrap(),
284        SecretString::from("test-key".to_string()),
285        // Very high rpm so the limiter is effectively a no-op in tests.
286        10_000,
287    )
288}