1pub mod catalog;
23pub mod content_block;
24pub mod custom_attribute;
25pub mod email_template;
26pub mod error;
27
28use crate::braze::error::BrazeApiError;
29use reqwest::{Client, RequestBuilder, StatusCode};
30use secrecy::{ExposeSecret, SecretString};
31use std::sync::Arc;
32use std::time::Duration;
33use url::Url;
34
35const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
36
37const RETRY_BUDGET: Duration = Duration::from_secs(60);
41
42const RETRY_MAX_ATTEMPTS: u32 = 100;
46
47const BACKOFF_BASE: Duration = Duration::from_millis(500);
50const BACKOFF_CAP: Duration = Duration::from_secs(10);
51
52#[derive(Clone)]
56pub struct BrazeClient {
57 http: Client,
58 base_url: Url,
59 api_key: Arc<SecretString>,
60}
61
62impl std::fmt::Debug for BrazeClient {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 f.debug_struct("BrazeClient")
67 .field("base_url", &self.base_url)
68 .field("api_key", &"<redacted>")
69 .finish()
70 }
71}
72
73impl BrazeClient {
74 pub fn from_resolved(resolved: &crate::config::ResolvedConfig) -> Self {
75 Self::new(resolved.api_endpoint.clone(), resolved.api_key.clone())
76 }
77
78 pub fn new(base_url: Url, api_key: SecretString) -> Self {
79 let http = Client::builder()
80 .user_agent(concat!("braze-sync/", env!("CARGO_PKG_VERSION")))
81 .timeout(REQUEST_TIMEOUT)
82 .build()
83 .expect("reqwest client builds with default features");
84 Self {
85 http,
86 base_url,
87 api_key: Arc::new(api_key),
88 }
89 }
90
91 pub(crate) fn url_for(&self, segments: &[&str]) -> Url {
100 let mut url = self.base_url.clone();
101 {
102 let mut seg = url
103 .path_segments_mut()
104 .expect("base url must be hierarchical (http/https)");
105 seg.clear();
106 for s in segments {
107 seg.push(s);
108 }
109 }
110 url
111 }
112
113 fn authed(&self, rb: RequestBuilder) -> RequestBuilder {
115 rb.bearer_auth(self.api_key.expose_secret())
116 .header(reqwest::header::ACCEPT, "application/json")
117 }
118
119 pub(crate) fn get(&self, segments: &[&str]) -> RequestBuilder {
121 self.authed(self.http.get(self.url_for(segments)))
122 }
123
124 pub(crate) fn post(&self, segments: &[&str]) -> RequestBuilder {
125 self.authed(self.http.post(self.url_for(segments)))
126 }
127
128 pub(crate) fn delete(&self, segments: &[&str]) -> RequestBuilder {
129 self.authed(self.http.delete(self.url_for(segments)))
130 }
131
132 pub(crate) fn get_absolute(&self, url: &str) -> Result<RequestBuilder, BrazeApiError> {
138 let parsed = Url::parse(url).map_err(|e| BrazeApiError::Http {
139 status: StatusCode::BAD_GATEWAY,
140 body: format!("malformed pagination URL {url:?}: {e}"),
141 })?;
142 let same_origin = parsed.scheme() == self.base_url.scheme()
143 && parsed.host_str() == self.base_url.host_str()
144 && parsed.port_or_known_default() == self.base_url.port_or_known_default();
145 if !same_origin {
146 return Err(BrazeApiError::Http {
147 status: StatusCode::BAD_GATEWAY,
148 body: format!(
149 "refusing cross-origin pagination URL {url:?} (base is {})",
150 self.base_url
151 ),
152 });
153 }
154 Ok(self.authed(self.http.get(parsed)))
155 }
156
157 async fn send_with_retry(
169 &self,
170 builder: RequestBuilder,
171 ) -> Result<reqwest::Response, BrazeApiError> {
172 let mut attempt: u32 = 0;
173 let mut elapsed = Duration::ZERO;
174 loop {
175 let req = builder
176 .try_clone()
177 .expect("non-streaming requests are cloneable");
178 let resp = req.send().await?;
179 let status = resp.status();
180
181 if status.is_success() {
182 return Ok(resp);
183 }
184 match status {
185 StatusCode::TOO_MANY_REQUESTS => {
186 if attempt >= RETRY_MAX_ATTEMPTS || elapsed >= RETRY_BUDGET {
187 return Err(BrazeApiError::RateLimitExhausted);
188 }
189 let remaining = RETRY_BUDGET.saturating_sub(elapsed);
190 let wait = compute_backoff(&resp, attempt, remaining);
191 tracing::warn!(?wait, attempt, ?elapsed, "429 received, backing off");
192 tokio::time::sleep(wait).await;
193 elapsed = elapsed.saturating_add(wait);
194 attempt += 1;
195 }
196 StatusCode::UNAUTHORIZED => return Err(BrazeApiError::Unauthorized),
197 _ => {
198 let body = resp.text().await.unwrap_or_default();
199 return Err(BrazeApiError::Http { status, body });
200 }
201 }
202 }
203 }
204
205 pub(crate) async fn send_json<T: serde::de::DeserializeOwned>(
207 &self,
208 builder: RequestBuilder,
209 ) -> Result<T, BrazeApiError> {
210 let resp = self.send_with_retry(builder).await?;
211 Ok(resp.json::<T>().await?)
212 }
213
214 pub(crate) async fn send_json_with_next_link<T: serde::de::DeserializeOwned>(
219 &self,
220 builder: RequestBuilder,
221 ) -> Result<(T, Option<String>), BrazeApiError> {
222 let resp = self.send_with_retry(builder).await?;
223 let next = parse_next_link(resp.headers());
224 let body = resp.json::<T>().await?;
225 Ok((body, next))
226 }
227
228 pub(crate) async fn send_ok(&self, builder: RequestBuilder) -> Result<(), BrazeApiError> {
233 let resp = self.send_with_retry(builder).await?;
234 let _ = resp.bytes().await;
235 Ok(())
236 }
237}
238
239fn parse_retry_after(resp: &reqwest::Response) -> Option<Duration> {
243 let raw = resp
244 .headers()
245 .get(reqwest::header::RETRY_AFTER)?
246 .to_str()
247 .ok()?;
248 if let Ok(secs) = raw.parse::<u64>() {
249 return Some(Duration::from_secs(secs));
250 }
251 let dt = chrono::DateTime::parse_from_rfc2822(raw).ok()?;
254 let delta = dt
255 .timestamp()
256 .saturating_sub(chrono::Utc::now().timestamp());
257 Some(Duration::from_secs(delta.max(0) as u64))
258}
259
260fn compute_backoff(resp: &reqwest::Response, attempt: u32, remaining_budget: Duration) -> Duration {
264 let wait = match parse_retry_after(resp) {
265 Some(ra) => ra,
266 None => {
267 let shifted = BACKOFF_BASE.saturating_mul(1u32 << attempt.min(6));
270 let capped = shifted.min(BACKOFF_CAP);
271 Duration::from_millis(fastrand::u64(0..=capped.as_millis() as u64))
272 }
273 };
274 wait.min(remaining_budget)
275}
276
277pub(crate) fn parse_next_link(headers: &reqwest::header::HeaderMap) -> Option<String> {
281 for hv in headers.get_all(reqwest::header::LINK) {
284 let Ok(raw) = hv.to_str() else { continue };
285 for part in raw.split(',') {
286 let part = part.trim();
287 let Some((url_part, params)) = part.split_once(';') else {
288 continue;
289 };
290 let has_next = params.split(';').map(str::trim).any(|p| {
291 let Some((k, v)) = p.split_once('=') else {
292 return false;
293 };
294 if !k.trim().eq_ignore_ascii_case("rel") {
295 return false;
296 }
297 v.trim()
299 .trim_matches('"')
300 .split_ascii_whitespace()
301 .any(|tok| tok.eq_ignore_ascii_case("next"))
302 });
303 if !has_next {
304 continue;
305 }
306 let url = url_part
307 .trim()
308 .trim_start_matches('<')
309 .trim_end_matches('>');
310 return Some(url.to_string());
311 }
312 }
313 None
314}
315
316pub(crate) const LIST_SAFETY_CAP_ITEMS: usize = 100_000;
320
321pub(crate) fn check_duplicate_names<'a>(
324 names: impl Iterator<Item = &'a str>,
325 count: usize,
326 endpoint: &'static str,
327) -> Result<(), BrazeApiError> {
328 let mut seen = std::collections::HashSet::with_capacity(count);
329 for name in names {
330 if !seen.insert(name) {
331 return Err(BrazeApiError::DuplicateNameInListResponse {
332 endpoint,
333 name: name.to_string(),
334 });
335 }
336 }
337 Ok(())
338}
339
340pub(crate) enum InfoMessageClass {
344 Success,
345 NotFound,
346 Unexpected(String),
347}
348
349pub(crate) fn classify_info_message(
353 message: Option<&str>,
354 resource_phrase: &str,
355) -> InfoMessageClass {
356 debug_assert!(
357 resource_phrase == resource_phrase.to_ascii_lowercase(),
358 "resource_phrase must be lowercase (compared against lowercased message)"
359 );
360 let Some(raw) = message else {
361 return InfoMessageClass::Success;
362 };
363 let trimmed = raw.trim();
364 if trimmed.eq_ignore_ascii_case("success") {
365 return InfoMessageClass::Success;
366 }
367 let lower = trimmed.to_ascii_lowercase();
368 if lower.contains("not found")
369 || lower.contains(resource_phrase)
370 || lower.contains("does not exist")
371 {
372 InfoMessageClass::NotFound
373 } else {
374 InfoMessageClass::Unexpected(raw.to_string())
375 }
376}
377
378#[cfg(test)]
379pub(crate) fn test_client(server: &wiremock::MockServer) -> BrazeClient {
380 BrazeClient::new(
381 Url::parse(&server.uri()).unwrap(),
382 SecretString::from("test-key".to_string()),
383 )
384}
385
386#[cfg(test)]
387mod retry_tests {
388 use super::*;
389 use chrono::{Duration as ChronoDuration, Utc};
390 use wiremock::matchers::{method, path};
391 use wiremock::{Mock, MockServer, ResponseTemplate};
392
393 async fn response_with_retry_after(val: &str) -> reqwest::Response {
397 let server = MockServer::start().await;
398 Mock::given(method("GET"))
399 .and(path("/r"))
400 .respond_with(ResponseTemplate::new(429).insert_header("retry-after", val))
401 .mount(&server)
402 .await;
403 reqwest::get(format!("{}/r", server.uri())).await.unwrap()
404 }
405
406 #[tokio::test]
407 async fn retry_after_parses_integer_seconds() {
408 let resp = response_with_retry_after("5").await;
409 assert_eq!(parse_retry_after(&resp), Some(Duration::from_secs(5)));
410 }
411
412 #[tokio::test]
413 async fn retry_after_parses_http_date() {
414 let future = Utc::now() + ChronoDuration::seconds(10);
415 let formatted = future.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
417 let resp = response_with_retry_after(&formatted).await;
418 let d = parse_retry_after(&resp).expect("should parse HTTP-date");
419 assert!(
421 d >= Duration::from_secs(8) && d <= Duration::from_secs(12),
422 "expected ~10s, got {d:?}"
423 );
424 }
425
426 #[tokio::test]
427 async fn retry_after_past_http_date_clamps_to_zero() {
428 let past = Utc::now() - ChronoDuration::seconds(30);
429 let formatted = past.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
430 let resp = response_with_retry_after(&formatted).await;
431 assert_eq!(parse_retry_after(&resp), Some(Duration::ZERO));
432 }
433
434 #[tokio::test]
435 async fn retry_after_unparseable_returns_none() {
436 let resp = response_with_retry_after("not a date").await;
437 assert_eq!(parse_retry_after(&resp), None);
438 }
439
440 #[tokio::test]
441 async fn backoff_without_header_falls_back_to_exponential_jitter() {
442 let server = MockServer::start().await;
444 Mock::given(method("GET"))
445 .and(path("/r"))
446 .respond_with(ResponseTemplate::new(429))
447 .mount(&server)
448 .await;
449 let resp = reqwest::get(format!("{}/r", server.uri())).await.unwrap();
450
451 for _ in 0..20 {
453 let w = compute_backoff(&resp, 0, Duration::from_secs(60));
454 assert!(w <= Duration::from_millis(500), "attempt=0 bound: {w:?}");
455 }
456 for _ in 0..20 {
458 let w = compute_backoff(&resp, 10, Duration::from_secs(60));
459 assert!(w <= BACKOFF_CAP, "attempt=10 cap: {w:?}");
460 }
461 }
462
463 #[tokio::test]
464 async fn backoff_clamped_to_remaining_budget() {
465 let resp = response_with_retry_after("30").await;
466 let w = compute_backoff(&resp, 0, Duration::from_secs(5));
468 assert_eq!(w, Duration::from_secs(5));
469 }
470
471 #[test]
472 fn parse_next_link_single_rel() {
473 let mut h = reqwest::header::HeaderMap::new();
474 h.insert(
475 reqwest::header::LINK,
476 r#"<https://rest.example/custom_attributes/?cursor=abc>; rel="next""#
477 .parse()
478 .unwrap(),
479 );
480 assert_eq!(
481 parse_next_link(&h),
482 Some("https://rest.example/custom_attributes/?cursor=abc".to_string())
483 );
484 }
485
486 #[test]
487 fn parse_next_link_multiple_rels_picks_next() {
488 let mut h = reqwest::header::HeaderMap::new();
489 h.insert(
490 reqwest::header::LINK,
491 r#"<https://rest.example/?cursor=prev>; rel="prev", <https://rest.example/?cursor=next>; rel="next""#
492 .parse()
493 .unwrap(),
494 );
495 assert_eq!(
496 parse_next_link(&h),
497 Some("https://rest.example/?cursor=next".to_string())
498 );
499 }
500
501 #[test]
502 fn parse_next_link_absent_returns_none() {
503 let h = reqwest::header::HeaderMap::new();
504 assert_eq!(parse_next_link(&h), None);
505 }
506
507 #[test]
508 fn parse_next_link_without_next_rel_returns_none() {
509 let mut h = reqwest::header::HeaderMap::new();
510 h.insert(
511 reqwest::header::LINK,
512 r#"<https://rest.example/?cursor=prev>; rel="prev""#
513 .parse()
514 .unwrap(),
515 );
516 assert_eq!(parse_next_link(&h), None);
517 }
518
519 #[test]
520 fn parse_next_link_scans_multiple_link_header_fields() {
521 let mut h = reqwest::header::HeaderMap::new();
526 h.append(
527 reqwest::header::LINK,
528 r#"<https://rest.example/?cursor=prev>; rel="prev""#
529 .parse()
530 .unwrap(),
531 );
532 h.append(
533 reqwest::header::LINK,
534 r#"<https://rest.example/?cursor=next>; rel="next""#
535 .parse()
536 .unwrap(),
537 );
538 assert_eq!(
539 parse_next_link(&h),
540 Some("https://rest.example/?cursor=next".to_string())
541 );
542 }
543
544 #[test]
545 fn parse_next_link_matches_space_delimited_rel_list() {
546 let mut h = reqwest::header::HeaderMap::new();
548 h.insert(
549 reqwest::header::LINK,
550 r#"<https://rest.example/?cursor=n>; rel="prev next""#
551 .parse()
552 .unwrap(),
553 );
554 assert_eq!(
555 parse_next_link(&h),
556 Some("https://rest.example/?cursor=n".to_string())
557 );
558 }
559
560 #[tokio::test]
561 async fn get_absolute_rejects_cross_origin() {
562 let server = MockServer::start().await;
563 let client = super::test_client(&server);
564 let err = client
565 .get_absolute("https://attacker.example/custom_attributes/?cursor=x")
566 .unwrap_err();
567 let msg = format!("{err}");
568 assert!(msg.contains("cross-origin"), "got {msg:?}");
569 }
570
571 #[tokio::test]
572 async fn get_absolute_accepts_same_origin() {
573 let server = MockServer::start().await;
574 let client = super::test_client(&server);
575 let url = format!("{}/custom_attributes/?cursor=abc", server.uri());
576 let _builder = client
577 .get_absolute(&url)
578 .expect("same-origin URL should be accepted");
579 }
580
581 #[tokio::test]
582 async fn get_absolute_rejects_malformed_url() {
583 let server = MockServer::start().await;
584 let client = super::test_client(&server);
585 let err = client.get_absolute("not a url").unwrap_err();
586 let msg = format!("{err}");
587 assert!(msg.contains("malformed"), "got {msg:?}");
588 }
589
590 #[tokio::test]
591 async fn retries_attempt_cap_fires_on_degenerate_zero_retry_after() {
592 let server = MockServer::start().await;
594 Mock::given(method("GET"))
595 .and(path("/x"))
596 .respond_with(ResponseTemplate::new(429).insert_header("retry-after", "0"))
597 .mount(&server)
598 .await;
599 let client = super::test_client(&server);
600 let req = client.get(&["x"]);
601 let err = client
602 .send_json::<serde_json::Value>(req)
603 .await
604 .unwrap_err();
605 assert!(
606 matches!(err, BrazeApiError::RateLimitExhausted),
607 "expected RateLimitExhausted, got {err:?}"
608 );
609 }
610}