use crate::braze::error::BrazeApiError;
use crate::braze::BrazeClient;
use crate::resource::{Catalog, CatalogField, CatalogFieldType, CatalogItemRow};
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize)]
struct CatalogsResponse {
#[serde(default)]
catalogs: Vec<Catalog>,
#[serde(default)]
next_cursor: Option<String>,
}
const MAX_CATALOG_ITEM_PAGES: usize = 2_000;
impl BrazeClient {
pub async fn list_catalogs(&self) -> Result<Vec<Catalog>, BrazeApiError> {
let req = self.get(&["catalogs"]);
let resp: CatalogsResponse = self.send_json(req).await?;
if let Some(cursor) = resp.next_cursor.as_deref() {
if !cursor.is_empty() {
return Err(BrazeApiError::PaginationNotImplemented {
endpoint: "/catalogs",
detail: format!(
"got {} catalog(s) plus a non-empty next_cursor; \
aborting to prevent silent truncation",
resp.catalogs.len()
),
});
}
}
Ok(resp.catalogs)
}
pub async fn get_catalog(&self, name: &str) -> Result<Catalog, BrazeApiError> {
let req = self.get(&["catalogs", name]);
match self.send_json::<CatalogsResponse>(req).await {
Ok(resp) => resp
.catalogs
.into_iter()
.next()
.ok_or_else(|| BrazeApiError::NotFound {
resource: format!("catalog '{name}'"),
}),
Err(BrazeApiError::Http { status, .. }) if status == StatusCode::NOT_FOUND => {
Err(BrazeApiError::NotFound {
resource: format!("catalog '{name}'"),
})
}
Err(e) => Err(e),
}
}
pub async fn add_catalog_field(
&self,
catalog_name: &str,
field: &CatalogField,
) -> Result<(), BrazeApiError> {
let body = AddFieldsRequest {
fields: vec![WireField {
name: &field.name,
field_type: field.field_type,
}],
};
let req = self.post(&["catalogs", catalog_name, "fields"]).json(&body);
self.send_ok(req).await
}
pub async fn delete_catalog_field(
&self,
catalog_name: &str,
field_name: &str,
) -> Result<(), BrazeApiError> {
let req = self.delete(&["catalogs", catalog_name, "fields", field_name]);
self.send_ok(req).await
}
pub async fn list_catalog_items(
&self,
catalog_name: &str,
) -> Result<Vec<CatalogItemRow>, BrazeApiError> {
let mut all_items = Vec::new();
let mut cursor: Option<String> = None;
let mut page: usize = 0;
loop {
page += 1;
if page > MAX_CATALOG_ITEM_PAGES {
return Err(BrazeApiError::Http {
status: StatusCode::INTERNAL_SERVER_ERROR,
body: format!(
"catalog '{catalog_name}' pagination exceeded \
{MAX_CATALOG_ITEM_PAGES} pages; aborting to prevent infinite loop"
),
});
}
let mut req = self.get(&["catalogs", catalog_name, "items"]);
if let Some(c) = &cursor {
req = req.query(&[("cursor", c.as_str())]);
}
let resp: CatalogItemsResponse = match self.send_json(req).await {
Ok(r) => r,
Err(BrazeApiError::Http { status, .. }) if status == StatusCode::NOT_FOUND => {
return Err(BrazeApiError::NotFound {
resource: format!("catalog '{catalog_name}'"),
});
}
Err(e) => return Err(e),
};
all_items.extend(resp.items);
match resp.next_cursor {
Some(c) if !c.is_empty() => cursor = Some(c),
_ => break,
}
}
Ok(all_items)
}
pub async fn upsert_catalog_items(
&self,
catalog_name: &str,
items: &[CatalogItemRow],
) -> Result<(), BrazeApiError> {
let body = UpsertItemsRequest { items };
let req = self.post(&["catalogs", catalog_name, "items"]).json(&body);
self.send_ok(req).await
}
pub async fn delete_catalog_items(
&self,
catalog_name: &str,
item_ids: &[String],
) -> Result<(), BrazeApiError> {
let items: Vec<DeleteItemId<'_>> = item_ids
.iter()
.map(|id| DeleteItemId { id: id.as_str() })
.collect();
let body = DeleteItemsRequest { items };
let req = self
.delete(&["catalogs", catalog_name, "items"])
.json(&body);
self.send_ok(req).await
}
}
#[derive(Serialize)]
struct AddFieldsRequest<'a> {
fields: Vec<WireField<'a>>,
}
#[derive(Serialize)]
struct WireField<'a> {
name: &'a str,
#[serde(rename = "type")]
field_type: CatalogFieldType,
}
#[derive(Debug, Deserialize)]
struct CatalogItemsResponse {
#[serde(default)]
items: Vec<CatalogItemRow>,
#[serde(default)]
next_cursor: Option<String>,
}
#[derive(Serialize)]
struct UpsertItemsRequest<'a> {
items: &'a [CatalogItemRow],
}
#[derive(Serialize)]
struct DeleteItemsRequest<'a> {
items: Vec<DeleteItemId<'a>>,
}
#[derive(Serialize)]
struct DeleteItemId<'a> {
id: &'a str,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::braze::test_client as make_client;
use serde_json::json;
use wiremock::matchers::{body_json, header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
#[tokio::test]
async fn list_catalogs_happy_path() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs"))
.and(header("authorization", "Bearer test-key"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"catalogs": [
{
"name": "cardiology",
"description": "Cardiology catalog",
"fields": [
{"name": "id", "type": "string"},
{"name": "score", "type": "number"}
]
},
{
"name": "dermatology",
"fields": [
{"name": "id", "type": "string"}
]
}
],
"message": "success"
})))
.mount(&server)
.await;
let client = make_client(&server);
let cats = client.list_catalogs().await.unwrap();
assert_eq!(cats.len(), 2);
assert_eq!(cats[0].name, "cardiology");
assert_eq!(cats[0].description.as_deref(), Some("Cardiology catalog"));
assert_eq!(cats[0].fields.len(), 2);
assert_eq!(cats[0].fields[1].field_type, CatalogFieldType::Number);
assert_eq!(cats[1].name, "dermatology");
assert_eq!(cats[1].description, None);
}
#[tokio::test]
async fn list_catalogs_empty() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"catalogs": []})))
.mount(&server)
.await;
let client = make_client(&server);
let cats = client.list_catalogs().await.unwrap();
assert!(cats.is_empty());
}
#[tokio::test]
async fn list_catalogs_sets_user_agent() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs"))
.and(header(
"user-agent",
concat!("braze-sync/", env!("CARGO_PKG_VERSION")),
))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"catalogs": []})))
.mount(&server)
.await;
let client = make_client(&server);
client.list_catalogs().await.unwrap();
}
#[tokio::test]
async fn list_catalogs_ignores_unknown_fields_in_response() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"catalogs": [
{
"name": "future",
"description": "tomorrow",
"future_metadata": {"foo": "bar"},
"num_items": 1234,
"fields": [
{"name": "id", "type": "string", "extra": "ignored"}
]
}
],
"future_top_level": {"whatever": true},
"message": "success"
})))
.mount(&server)
.await;
let client = make_client(&server);
let cats = client.list_catalogs().await.unwrap();
assert_eq!(cats.len(), 1);
assert_eq!(cats[0].name, "future");
}
#[tokio::test]
async fn list_catalogs_errors_when_next_cursor_present() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"catalogs": [
{"name": "cardiology", "fields": [{"name": "id", "type": "string"}]}
],
"next_cursor": "abc123"
})))
.mount(&server)
.await;
let client = make_client(&server);
let err = client.list_catalogs().await.unwrap_err();
match err {
BrazeApiError::PaginationNotImplemented { endpoint, detail } => {
assert_eq!(endpoint, "/catalogs");
assert!(detail.contains("next_cursor"), "detail: {detail}");
assert!(detail.contains("1 catalog"), "detail: {detail}");
}
other => panic!("expected PaginationNotImplemented, got {other:?}"),
}
}
#[tokio::test]
async fn list_catalogs_empty_string_cursor_is_treated_as_no_more_pages() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"catalogs": [{"name": "only", "fields": []}],
"next_cursor": ""
})))
.mount(&server)
.await;
let client = make_client(&server);
let cats = client.list_catalogs().await.unwrap();
assert_eq!(cats.len(), 1);
assert_eq!(cats[0].name, "only");
}
#[tokio::test]
async fn unauthorized_returns_typed_error() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs"))
.respond_with(ResponseTemplate::new(401).set_body_string("invalid api key"))
.mount(&server)
.await;
let client = make_client(&server);
let err = client.list_catalogs().await.unwrap_err();
assert!(matches!(err, BrazeApiError::Unauthorized), "got {err:?}");
}
#[tokio::test]
async fn server_error_carries_status_and_body() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs"))
.respond_with(ResponseTemplate::new(500).set_body_string("internal explosion"))
.mount(&server)
.await;
let client = make_client(&server);
let err = client.list_catalogs().await.unwrap_err();
match err {
BrazeApiError::Http { status, body } => {
assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
assert!(body.contains("internal explosion"));
}
other => panic!("expected Http, got {other:?}"),
}
}
#[tokio::test]
async fn retries_on_429_and_succeeds() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"catalogs": [{"name": "after_retry", "fields": []}]
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/catalogs"))
.respond_with(ResponseTemplate::new(429).insert_header("retry-after", "0"))
.up_to_n_times(1)
.mount(&server)
.await;
let client = make_client(&server);
let cats = client.list_catalogs().await.unwrap();
assert_eq!(cats.len(), 1);
assert_eq!(cats[0].name, "after_retry");
}
#[tokio::test]
async fn retries_exhausted_returns_rate_limit_exhausted() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs"))
.respond_with(ResponseTemplate::new(429).insert_header("retry-after", "0"))
.mount(&server)
.await;
let client = make_client(&server);
let err = client.list_catalogs().await.unwrap_err();
assert!(
matches!(err, BrazeApiError::RateLimitExhausted),
"got {err:?}"
);
}
#[tokio::test]
async fn get_catalog_happy_path() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs/cardiology"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"catalogs": [
{"name": "cardiology", "fields": [{"name": "id", "type": "string"}]}
]
})))
.mount(&server)
.await;
let client = make_client(&server);
let cat = client.get_catalog("cardiology").await.unwrap();
assert_eq!(cat.name, "cardiology");
assert_eq!(cat.fields.len(), 1);
}
#[tokio::test]
async fn get_catalog_404_is_mapped_to_not_found() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs/missing"))
.respond_with(ResponseTemplate::new(404).set_body_string("not found"))
.mount(&server)
.await;
let client = make_client(&server);
let err = client.get_catalog("missing").await.unwrap_err();
match err {
BrazeApiError::NotFound { resource } => assert!(resource.contains("missing")),
other => panic!("expected NotFound, got {other:?}"),
}
}
#[tokio::test]
async fn get_catalog_empty_response_array_is_not_found() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs/ghost"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"catalogs": []})))
.mount(&server)
.await;
let client = make_client(&server);
let err = client.get_catalog("ghost").await.unwrap_err();
assert!(matches!(err, BrazeApiError::NotFound { .. }), "got {err:?}");
}
#[tokio::test]
async fn debug_does_not_leak_api_key() {
let server = MockServer::start().await;
let client = make_client(&server);
let dbg = format!("{client:?}");
assert!(!dbg.contains("test-key"), "leaked api key in: {dbg}");
assert!(dbg.contains("<redacted>"));
}
#[tokio::test]
async fn add_catalog_field_happy_path_sends_correct_body() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/catalogs/cardiology/fields"))
.and(header("authorization", "Bearer test-key"))
.and(body_json(json!({
"fields": [{"name": "severity_level", "type": "number"}]
})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"message": "success"})))
.mount(&server)
.await;
let client = make_client(&server);
let field = CatalogField {
name: "severity_level".into(),
field_type: CatalogFieldType::Number,
};
client
.add_catalog_field("cardiology", &field)
.await
.unwrap();
}
#[tokio::test]
async fn add_catalog_field_unauthorized_propagates() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/catalogs/cardiology/fields"))
.respond_with(ResponseTemplate::new(401).set_body_string("invalid key"))
.mount(&server)
.await;
let client = make_client(&server);
let field = CatalogField {
name: "x".into(),
field_type: CatalogFieldType::String,
};
let err = client
.add_catalog_field("cardiology", &field)
.await
.unwrap_err();
assert!(matches!(err, BrazeApiError::Unauthorized), "got {err:?}");
}
#[tokio::test]
async fn add_catalog_field_retries_on_429_then_succeeds() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/catalogs/cardiology/fields"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"message": "ok"})))
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/catalogs/cardiology/fields"))
.respond_with(ResponseTemplate::new(429).insert_header("retry-after", "0"))
.up_to_n_times(1)
.mount(&server)
.await;
let client = make_client(&server);
let field = CatalogField {
name: "x".into(),
field_type: CatalogFieldType::String,
};
client
.add_catalog_field("cardiology", &field)
.await
.unwrap();
}
#[tokio::test]
async fn delete_catalog_field_happy_path_uses_segment_encoded_path() {
let server = MockServer::start().await;
Mock::given(method("DELETE"))
.and(path("/catalogs/cardiology/fields/legacy_code"))
.and(header("authorization", "Bearer test-key"))
.respond_with(ResponseTemplate::new(204))
.mount(&server)
.await;
let client = make_client(&server);
client
.delete_catalog_field("cardiology", "legacy_code")
.await
.unwrap();
}
#[tokio::test]
async fn delete_catalog_field_server_error_returns_http() {
let server = MockServer::start().await;
Mock::given(method("DELETE"))
.and(path("/catalogs/cardiology/fields/x"))
.respond_with(ResponseTemplate::new(500).set_body_string("oops"))
.mount(&server)
.await;
let client = make_client(&server);
let err = client
.delete_catalog_field("cardiology", "x")
.await
.unwrap_err();
match err {
BrazeApiError::Http { status, body } => {
assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
assert!(body.contains("oops"));
}
other => panic!("expected Http, got {other:?}"),
}
}
#[tokio::test]
async fn list_catalog_items_single_page() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs/cardiology/items"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [
{"id": "af001", "name": "atrial", "order": 1},
{"id": "af002", "name": "ventricular", "order": 2}
],
"message": "success"
})))
.mount(&server)
.await;
let client = make_client(&server);
let items = client.list_catalog_items("cardiology").await.unwrap();
assert_eq!(items.len(), 2);
assert_eq!(items[0].id, "af001");
assert_eq!(items[1].id, "af002");
}
#[tokio::test]
async fn list_catalog_items_paginated() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs/cardiology/items"))
.and(wiremock::matchers::query_param("cursor", "page2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [{"id": "af003", "name": "third"}],
"message": "success"
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/catalogs/cardiology/items"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"items": [
{"id": "af001", "name": "first"},
{"id": "af002", "name": "second"}
],
"next_cursor": "page2",
"message": "success"
})))
.up_to_n_times(1)
.mount(&server)
.await;
let client = make_client(&server);
let items = client.list_catalog_items("cardiology").await.unwrap();
assert_eq!(items.len(), 3);
assert_eq!(items[0].id, "af001");
assert_eq!(items[2].id, "af003");
}
#[tokio::test]
async fn list_catalog_items_empty() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs/empty/items"))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(json!({"items": [], "message": "success"})),
)
.mount(&server)
.await;
let client = make_client(&server);
let items = client.list_catalog_items("empty").await.unwrap();
assert!(items.is_empty());
}
#[tokio::test]
async fn list_catalog_items_404_is_not_found() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/catalogs/missing/items"))
.respond_with(ResponseTemplate::new(404).set_body_string("not found"))
.mount(&server)
.await;
let client = make_client(&server);
let err = client.list_catalog_items("missing").await.unwrap_err();
assert!(matches!(err, BrazeApiError::NotFound { .. }), "got {err:?}");
}
#[tokio::test]
async fn upsert_catalog_items_sends_correct_body() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/catalogs/cardiology/items"))
.and(header("authorization", "Bearer test-key"))
.and(body_json(json!({
"items": [
{"id": "af001", "name": "atrial", "order": 1}
]
})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"message": "success"})))
.mount(&server)
.await;
let client = make_client(&server);
let mut fields = serde_json::Map::new();
fields.insert("name".into(), json!("atrial"));
fields.insert("order".into(), json!(1));
let items = vec![CatalogItemRow {
id: "af001".into(),
fields,
}];
client
.upsert_catalog_items("cardiology", &items)
.await
.unwrap();
}
#[tokio::test]
async fn upsert_catalog_items_retries_on_429() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/catalogs/cardiology/items"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"message": "ok"})))
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/catalogs/cardiology/items"))
.respond_with(ResponseTemplate::new(429).insert_header("retry-after", "0"))
.up_to_n_times(1)
.mount(&server)
.await;
let client = make_client(&server);
let items = vec![CatalogItemRow {
id: "x".into(),
fields: serde_json::Map::new(),
}];
client
.upsert_catalog_items("cardiology", &items)
.await
.unwrap();
}
#[tokio::test]
async fn delete_catalog_items_sends_id_only_body() {
let server = MockServer::start().await;
Mock::given(method("DELETE"))
.and(path("/catalogs/cardiology/items"))
.and(body_json(json!({
"items": [{"id": "old1"}, {"id": "old2"}]
})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"message": "success"})))
.mount(&server)
.await;
let client = make_client(&server);
client
.delete_catalog_items("cardiology", &["old1".into(), "old2".into()])
.await
.unwrap();
}
#[tokio::test]
async fn delete_catalog_items_unauthorized() {
let server = MockServer::start().await;
Mock::given(method("DELETE"))
.and(path("/catalogs/c/items"))
.respond_with(ResponseTemplate::new(401))
.mount(&server)
.await;
let client = make_client(&server);
let err = client
.delete_catalog_items("c", &["x".into()])
.await
.unwrap_err();
assert!(matches!(err, BrazeApiError::Unauthorized), "got {err:?}");
}
}