pub mod catalog;
pub mod content_block;
pub mod custom_attribute;
pub mod email_template;
pub mod error;
use crate::braze::error::BrazeApiError;
use reqwest::{Client, RequestBuilder, StatusCode};
use secrecy::{ExposeSecret, SecretString};
use std::sync::Arc;
use std::time::Duration;
use url::Url;
const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
const RETRY_BUDGET: Duration = Duration::from_secs(60);
const RETRY_MAX_ATTEMPTS: u32 = 100;
const BACKOFF_BASE: Duration = Duration::from_millis(500);
const BACKOFF_CAP: Duration = Duration::from_secs(10);
#[derive(Clone)]
pub struct BrazeClient {
http: Client,
base_url: Url,
api_key: Arc<SecretString>,
}
impl std::fmt::Debug for BrazeClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BrazeClient")
.field("base_url", &self.base_url)
.field("api_key", &"<redacted>")
.finish()
}
}
impl BrazeClient {
pub fn from_resolved(resolved: &crate::config::ResolvedConfig) -> Self {
Self::new(resolved.api_endpoint.clone(), resolved.api_key.clone())
}
pub fn new(base_url: Url, api_key: SecretString) -> Self {
let http = Client::builder()
.user_agent(concat!("braze-sync/", env!("CARGO_PKG_VERSION")))
.timeout(REQUEST_TIMEOUT)
.build()
.expect("reqwest client builds with default features");
Self {
http,
base_url,
api_key: Arc::new(api_key),
}
}
pub(crate) fn url_for(&self, segments: &[&str]) -> Url {
let mut url = self.base_url.clone();
{
let mut seg = url
.path_segments_mut()
.expect("base url must be hierarchical (http/https)");
seg.clear();
for s in segments {
seg.push(s);
}
}
url
}
fn authed(&self, rb: RequestBuilder) -> RequestBuilder {
rb.bearer_auth(self.api_key.expose_secret())
.header(reqwest::header::ACCEPT, "application/json")
}
pub(crate) fn get(&self, segments: &[&str]) -> RequestBuilder {
self.authed(self.http.get(self.url_for(segments)))
}
pub(crate) fn post(&self, segments: &[&str]) -> RequestBuilder {
self.authed(self.http.post(self.url_for(segments)))
}
pub(crate) fn delete(&self, segments: &[&str]) -> RequestBuilder {
self.authed(self.http.delete(self.url_for(segments)))
}
pub(crate) fn get_absolute(&self, url: &str) -> Result<RequestBuilder, BrazeApiError> {
let parsed = Url::parse(url).map_err(|e| BrazeApiError::Http {
status: StatusCode::BAD_GATEWAY,
body: format!("malformed pagination URL {url:?}: {e}"),
})?;
let same_origin = parsed.scheme() == self.base_url.scheme()
&& parsed.host_str() == self.base_url.host_str()
&& parsed.port_or_known_default() == self.base_url.port_or_known_default();
if !same_origin {
return Err(BrazeApiError::Http {
status: StatusCode::BAD_GATEWAY,
body: format!(
"refusing cross-origin pagination URL {url:?} (base is {})",
self.base_url
),
});
}
Ok(self.authed(self.http.get(parsed)))
}
async fn send_with_retry(
&self,
builder: RequestBuilder,
) -> Result<reqwest::Response, BrazeApiError> {
let mut attempt: u32 = 0;
let mut elapsed = Duration::ZERO;
loop {
let req = builder
.try_clone()
.expect("non-streaming requests are cloneable");
let resp = req.send().await?;
let status = resp.status();
if status.is_success() {
return Ok(resp);
}
match status {
StatusCode::TOO_MANY_REQUESTS => {
if attempt >= RETRY_MAX_ATTEMPTS || elapsed >= RETRY_BUDGET {
return Err(BrazeApiError::RateLimitExhausted);
}
let remaining = RETRY_BUDGET.saturating_sub(elapsed);
let wait = compute_backoff(&resp, attempt, remaining);
tracing::warn!(?wait, attempt, ?elapsed, "429 received, backing off");
tokio::time::sleep(wait).await;
elapsed = elapsed.saturating_add(wait);
attempt += 1;
}
StatusCode::UNAUTHORIZED => return Err(BrazeApiError::Unauthorized),
_ => {
let body = resp.text().await.unwrap_or_default();
return Err(BrazeApiError::Http { status, body });
}
}
}
}
pub(crate) async fn send_json<T: serde::de::DeserializeOwned>(
&self,
builder: RequestBuilder,
) -> Result<T, BrazeApiError> {
let resp = self.send_with_retry(builder).await?;
Ok(resp.json::<T>().await?)
}
pub(crate) async fn send_json_with_next_link<T: serde::de::DeserializeOwned>(
&self,
builder: RequestBuilder,
) -> Result<(T, Option<String>), BrazeApiError> {
let resp = self.send_with_retry(builder).await?;
let next = parse_next_link(resp.headers());
let body = resp.json::<T>().await?;
Ok((body, next))
}
pub(crate) async fn send_ok(&self, builder: RequestBuilder) -> Result<(), BrazeApiError> {
let resp = self.send_with_retry(builder).await?;
let _ = resp.bytes().await;
Ok(())
}
}
fn parse_retry_after(resp: &reqwest::Response) -> Option<Duration> {
let raw = resp
.headers()
.get(reqwest::header::RETRY_AFTER)?
.to_str()
.ok()?;
if let Ok(secs) = raw.parse::<u64>() {
return Some(Duration::from_secs(secs));
}
let dt = chrono::DateTime::parse_from_rfc2822(raw).ok()?;
let delta = dt
.timestamp()
.saturating_sub(chrono::Utc::now().timestamp());
Some(Duration::from_secs(delta.max(0) as u64))
}
fn compute_backoff(resp: &reqwest::Response, attempt: u32, remaining_budget: Duration) -> Duration {
let wait = match parse_retry_after(resp) {
Some(ra) => ra,
None => {
let shifted = BACKOFF_BASE.saturating_mul(1u32 << attempt.min(6));
let capped = shifted.min(BACKOFF_CAP);
Duration::from_millis(fastrand::u64(0..=capped.as_millis() as u64))
}
};
wait.min(remaining_budget)
}
pub(crate) fn parse_next_link(headers: &reqwest::header::HeaderMap) -> Option<String> {
for hv in headers.get_all(reqwest::header::LINK) {
let Ok(raw) = hv.to_str() else { continue };
for part in raw.split(',') {
let part = part.trim();
let Some((url_part, params)) = part.split_once(';') else {
continue;
};
let has_next = params.split(';').map(str::trim).any(|p| {
let Some((k, v)) = p.split_once('=') else {
return false;
};
if !k.trim().eq_ignore_ascii_case("rel") {
return false;
}
v.trim()
.trim_matches('"')
.split_ascii_whitespace()
.any(|tok| tok.eq_ignore_ascii_case("next"))
});
if !has_next {
continue;
}
let url = url_part
.trim()
.trim_start_matches('<')
.trim_end_matches('>');
return Some(url.to_string());
}
}
None
}
pub(crate) const LIST_SAFETY_CAP_ITEMS: usize = 100_000;
pub(crate) fn check_duplicate_names<'a>(
names: impl Iterator<Item = &'a str>,
count: usize,
endpoint: &'static str,
) -> Result<(), BrazeApiError> {
let mut seen = std::collections::HashSet::with_capacity(count);
for name in names {
if !seen.insert(name) {
return Err(BrazeApiError::DuplicateNameInListResponse {
endpoint,
name: name.to_string(),
});
}
}
Ok(())
}
pub(crate) enum InfoMessageClass {
Success,
NotFound,
Unexpected(String),
}
pub(crate) fn classify_info_message(
message: Option<&str>,
resource_phrase: &str,
) -> InfoMessageClass {
debug_assert!(
resource_phrase == resource_phrase.to_ascii_lowercase(),
"resource_phrase must be lowercase (compared against lowercased message)"
);
let Some(raw) = message else {
return InfoMessageClass::Success;
};
let trimmed = raw.trim();
if trimmed.eq_ignore_ascii_case("success") {
return InfoMessageClass::Success;
}
let lower = trimmed.to_ascii_lowercase();
if lower.contains("not found")
|| lower.contains(resource_phrase)
|| lower.contains("does not exist")
{
InfoMessageClass::NotFound
} else {
InfoMessageClass::Unexpected(raw.to_string())
}
}
#[cfg(test)]
pub(crate) fn test_client(server: &wiremock::MockServer) -> BrazeClient {
BrazeClient::new(
Url::parse(&server.uri()).unwrap(),
SecretString::from("test-key".to_string()),
)
}
#[cfg(test)]
mod retry_tests {
use super::*;
use chrono::{Duration as ChronoDuration, Utc};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
async fn response_with_retry_after(val: &str) -> reqwest::Response {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/r"))
.respond_with(ResponseTemplate::new(429).insert_header("retry-after", val))
.mount(&server)
.await;
reqwest::get(format!("{}/r", server.uri())).await.unwrap()
}
#[tokio::test]
async fn retry_after_parses_integer_seconds() {
let resp = response_with_retry_after("5").await;
assert_eq!(parse_retry_after(&resp), Some(Duration::from_secs(5)));
}
#[tokio::test]
async fn retry_after_parses_http_date() {
let future = Utc::now() + ChronoDuration::seconds(10);
let formatted = future.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
let resp = response_with_retry_after(&formatted).await;
let d = parse_retry_after(&resp).expect("should parse HTTP-date");
assert!(
d >= Duration::from_secs(8) && d <= Duration::from_secs(12),
"expected ~10s, got {d:?}"
);
}
#[tokio::test]
async fn retry_after_past_http_date_clamps_to_zero() {
let past = Utc::now() - ChronoDuration::seconds(30);
let formatted = past.format("%a, %d %b %Y %H:%M:%S GMT").to_string();
let resp = response_with_retry_after(&formatted).await;
assert_eq!(parse_retry_after(&resp), Some(Duration::ZERO));
}
#[tokio::test]
async fn retry_after_unparseable_returns_none() {
let resp = response_with_retry_after("not a date").await;
assert_eq!(parse_retry_after(&resp), None);
}
#[tokio::test]
async fn backoff_without_header_falls_back_to_exponential_jitter() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/r"))
.respond_with(ResponseTemplate::new(429))
.mount(&server)
.await;
let resp = reqwest::get(format!("{}/r", server.uri())).await.unwrap();
for _ in 0..20 {
let w = compute_backoff(&resp, 0, Duration::from_secs(60));
assert!(w <= Duration::from_millis(500), "attempt=0 bound: {w:?}");
}
for _ in 0..20 {
let w = compute_backoff(&resp, 10, Duration::from_secs(60));
assert!(w <= BACKOFF_CAP, "attempt=10 cap: {w:?}");
}
}
#[tokio::test]
async fn backoff_clamped_to_remaining_budget() {
let resp = response_with_retry_after("30").await;
let w = compute_backoff(&resp, 0, Duration::from_secs(5));
assert_eq!(w, Duration::from_secs(5));
}
#[test]
fn parse_next_link_single_rel() {
let mut h = reqwest::header::HeaderMap::new();
h.insert(
reqwest::header::LINK,
r#"<https://rest.example/custom_attributes/?cursor=abc>; rel="next""#
.parse()
.unwrap(),
);
assert_eq!(
parse_next_link(&h),
Some("https://rest.example/custom_attributes/?cursor=abc".to_string())
);
}
#[test]
fn parse_next_link_multiple_rels_picks_next() {
let mut h = reqwest::header::HeaderMap::new();
h.insert(
reqwest::header::LINK,
r#"<https://rest.example/?cursor=prev>; rel="prev", <https://rest.example/?cursor=next>; rel="next""#
.parse()
.unwrap(),
);
assert_eq!(
parse_next_link(&h),
Some("https://rest.example/?cursor=next".to_string())
);
}
#[test]
fn parse_next_link_absent_returns_none() {
let h = reqwest::header::HeaderMap::new();
assert_eq!(parse_next_link(&h), None);
}
#[test]
fn parse_next_link_without_next_rel_returns_none() {
let mut h = reqwest::header::HeaderMap::new();
h.insert(
reqwest::header::LINK,
r#"<https://rest.example/?cursor=prev>; rel="prev""#
.parse()
.unwrap(),
);
assert_eq!(parse_next_link(&h), None);
}
#[test]
fn parse_next_link_scans_multiple_link_header_fields() {
let mut h = reqwest::header::HeaderMap::new();
h.append(
reqwest::header::LINK,
r#"<https://rest.example/?cursor=prev>; rel="prev""#
.parse()
.unwrap(),
);
h.append(
reqwest::header::LINK,
r#"<https://rest.example/?cursor=next>; rel="next""#
.parse()
.unwrap(),
);
assert_eq!(
parse_next_link(&h),
Some("https://rest.example/?cursor=next".to_string())
);
}
#[test]
fn parse_next_link_matches_space_delimited_rel_list() {
let mut h = reqwest::header::HeaderMap::new();
h.insert(
reqwest::header::LINK,
r#"<https://rest.example/?cursor=n>; rel="prev next""#
.parse()
.unwrap(),
);
assert_eq!(
parse_next_link(&h),
Some("https://rest.example/?cursor=n".to_string())
);
}
#[tokio::test]
async fn get_absolute_rejects_cross_origin() {
let server = MockServer::start().await;
let client = super::test_client(&server);
let err = client
.get_absolute("https://attacker.example/custom_attributes/?cursor=x")
.unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("cross-origin"), "got {msg:?}");
}
#[tokio::test]
async fn get_absolute_accepts_same_origin() {
let server = MockServer::start().await;
let client = super::test_client(&server);
let url = format!("{}/custom_attributes/?cursor=abc", server.uri());
let _builder = client
.get_absolute(&url)
.expect("same-origin URL should be accepted");
}
#[tokio::test]
async fn get_absolute_rejects_malformed_url() {
let server = MockServer::start().await;
let client = super::test_client(&server);
let err = client.get_absolute("not a url").unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("malformed"), "got {msg:?}");
}
#[tokio::test]
async fn retries_attempt_cap_fires_on_degenerate_zero_retry_after() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/x"))
.respond_with(ResponseTemplate::new(429).insert_header("retry-after", "0"))
.mount(&server)
.await;
let client = super::test_client(&server);
let req = client.get(&["x"]);
let err = client
.send_json::<serde_json::Value>(req)
.await
.unwrap_err();
assert!(
matches!(err, BrazeApiError::RateLimitExhausted),
"expected RateLimitExhausted, got {err:?}"
);
}
}