use crate::braze::BrazeClient;
use crate::config::ResolvedConfig;
use crate::diff::catalog::{CatalogItemsDiff, CatalogSchemaDiff};
use crate::diff::content_block::{ContentBlockDiff, ContentBlockIdIndex};
use crate::diff::custom_attribute::CustomAttributeOp;
use crate::diff::email_template::{EmailTemplateDiff, EmailTemplateIdIndex};
use crate::diff::orphan;
use crate::diff::{DiffOp, DiffSummary, ResourceDiff};
use crate::error::Error;
use crate::format::OutputFormat;
use crate::resource::{CatalogItems, ResourceKind};
use anyhow::{anyhow, Context as _};
use clap::Args;
use futures::stream::{StreamExt, TryStreamExt};
use std::collections::BTreeMap;
use std::path::Path;
use super::diff::{
compute_catalog_items_diffs, compute_catalog_schema_diffs, compute_content_block_plan,
compute_custom_attribute_diffs, compute_email_template_plan,
};
use super::selected_kinds;
#[derive(Args, Debug)]
pub struct ApplyArgs {
#[arg(long, value_enum)]
pub resource: Option<ResourceKind>,
#[arg(long, requires = "resource")]
pub name: Option<String>,
#[arg(long)]
pub confirm: bool,
#[arg(long)]
pub allow_destructive: bool,
#[arg(long)]
pub archive_orphans: bool,
}
pub async fn run(
args: &ApplyArgs,
resolved: ResolvedConfig,
config_dir: &Path,
format: OutputFormat,
) -> anyhow::Result<()> {
let catalogs_root = config_dir.join(&resolved.resources.catalog_schema.path);
let content_blocks_root = config_dir.join(&resolved.resources.content_block.path);
let email_templates_root = config_dir.join(&resolved.resources.email_template.path);
let custom_attributes_path = config_dir.join(&resolved.resources.custom_attribute.path);
let client = BrazeClient::from_resolved(&resolved);
let kinds = selected_kinds(args.resource, &resolved.resources);
let mut summary = DiffSummary::default();
let mut content_block_id_index: Option<ContentBlockIdIndex> = None;
let mut email_template_id_index: Option<EmailTemplateIdIndex> = None;
let mut catalog_items_local: Option<BTreeMap<String, CatalogItems>> = None;
for kind in kinds {
match kind {
ResourceKind::CatalogSchema => {
let diffs =
compute_catalog_schema_diffs(&client, &catalogs_root, args.name.as_deref())
.await
.context("computing catalog_schema plan")?;
summary.diffs.extend(diffs);
}
ResourceKind::CatalogItems => {
let (diffs, local_map) = compute_catalog_items_diffs(
&client,
&catalogs_root,
args.name.as_deref(),
true,
)
.await
.context("computing catalog_items plan")?;
summary.diffs.extend(diffs);
catalog_items_local = Some(local_map);
}
ResourceKind::ContentBlock => {
let (diffs, idx) =
compute_content_block_plan(&client, &content_blocks_root, args.name.as_deref())
.await
.context("computing content_block plan")?;
summary.diffs.extend(diffs);
content_block_id_index = Some(idx);
}
ResourceKind::EmailTemplate => {
let (diffs, idx) = compute_email_template_plan(
&client,
&email_templates_root,
args.name.as_deref(),
)
.await
.context("computing email_template plan")?;
summary.diffs.extend(diffs);
email_template_id_index = Some(idx);
}
ResourceKind::CustomAttribute => {
let diffs = compute_custom_attribute_diffs(
&client,
&custom_attributes_path,
args.name.as_deref(),
)
.await
.context("computing custom_attribute plan")?;
summary.diffs.extend(diffs);
}
}
}
let mode_label = if args.confirm {
"Plan:"
} else {
"Plan (dry-run, pass --confirm to apply):"
};
eprintln!("{mode_label}");
print!("{}", format.formatter().format(&summary));
if summary.actionable_count() == 0 {
if summary.changed_count() > 0 {
eprintln!(
"No actionable changes to apply \
(informational drift above can be reconciled with `export`)."
);
} else {
eprintln!("No changes to apply.");
}
return Ok(());
}
if !args.confirm {
eprintln!("DRY RUN — pass --confirm to apply these changes.");
return Ok(());
}
if summary.destructive_count() > 0 && !args.allow_destructive {
return Err(Error::DestructiveBlocked.into());
}
check_for_unsupported_ops(&summary)?;
let today = chrono::Utc::now().date_naive();
let parallel_batches = resolved.resources.catalog_items.parallel_batches;
let mut applied = 0;
let mut ca_deprecate: Vec<&str> = Vec::new();
let mut ca_reactivate: Vec<&str> = Vec::new();
for diff in &summary.diffs {
match diff {
ResourceDiff::CatalogSchema(d) => {
applied += apply_catalog_schema(&client, d).await?;
}
ResourceDiff::CatalogItems(d) => {
let local_map = catalog_items_local.as_ref().ok_or_else(|| {
anyhow!("internal: catalog_items_local not populated before apply")
})?;
let local = local_map.get(&d.catalog_name).ok_or_else(|| {
anyhow!(
"internal: local items for catalog '{}' missing from items map",
d.catalog_name
)
})?;
applied += apply_catalog_items(&client, d, local, parallel_batches).await?;
}
ResourceDiff::ContentBlock(d) => {
applied += apply_content_block(
&client,
d,
content_block_id_index.as_ref(),
args.archive_orphans,
today,
)
.await?;
}
ResourceDiff::EmailTemplate(d) => {
applied += apply_email_template(
&client,
d,
email_template_id_index.as_ref(),
args.archive_orphans,
today,
)
.await?;
}
ResourceDiff::CustomAttribute(d) => {
if let CustomAttributeOp::DeprecationToggled { to, .. } = &d.op {
if *to {
ca_deprecate.push(&d.name);
} else {
ca_reactivate.push(&d.name);
}
}
}
}
}
if !ca_deprecate.is_empty() || !ca_reactivate.is_empty() {
applied += apply_custom_attribute_batch(&client, &ca_deprecate, &ca_reactivate).await?;
}
eprintln!("✓ Applied {applied} change(s).");
Ok(())
}
fn check_for_unsupported_ops(summary: &DiffSummary) -> anyhow::Result<()> {
for diff in &summary.diffs {
if let ResourceDiff::CustomAttribute(d) = diff {
if matches!(d.op, CustomAttributeOp::PresentInGitOnly) {
return Err(Error::CustomAttributeCreateNotSupported {
name: d.name.clone(),
}
.into());
}
}
if let ResourceDiff::CatalogSchema(d) = diff {
match &d.op {
DiffOp::Added(_) => {
return Err(anyhow!(
"creating a new catalog '{}' is not supported by braze-sync; \
create the catalog in the Braze dashboard first, then run \
`braze-sync export` to populate the local schema",
d.name
));
}
DiffOp::Removed(_) => {
return Err(anyhow!(
"deleting catalog '{}' (top-level) is not supported by braze-sync; \
only field-level changes can be applied",
d.name
));
}
_ => {}
}
for fd in &d.field_diffs {
if let DiffOp::Modified { from, to } = fd {
return Err(anyhow!(
"modifying field '{}' on catalog '{}' (type {} → {}) \
is not supported by braze-sync; the change would be \
data-losing on the field. Drop the field manually \
in the Braze dashboard and re-run `braze-sync apply`",
to.name,
d.name,
from.field_type.as_str(),
to.field_type.as_str(),
));
}
}
}
}
Ok(())
}
async fn apply_content_block(
client: &BrazeClient,
d: &ContentBlockDiff,
id_index: Option<&ContentBlockIdIndex>,
archive_orphans: bool,
today: chrono::NaiveDate,
) -> anyhow::Result<usize> {
if d.orphan {
if !archive_orphans {
return Ok(0);
}
let id_index = id_index.ok_or_else(|| {
anyhow!("internal: content_block id index missing for orphan apply path")
})?;
let id = id_index.get(&d.name).ok_or_else(|| {
anyhow!(
"internal: orphan '{}' missing from id index — list/diff drift",
d.name
)
})?;
let archived = orphan::archive_name(today, &d.name);
if archived == d.name {
return Ok(0);
}
let mut cb = client
.get_content_block(id)
.await
.with_context(|| format!("fetching content block '{}' for archive rename", d.name))?;
cb.name = archived;
tracing::info!(
content_block = %d.name,
new_name = %cb.name,
"archiving orphan content block"
);
client.update_content_block(id, &cb).await?;
return Ok(1);
}
match &d.op {
DiffOp::Added(cb) => {
tracing::info!(content_block = %cb.name, "creating content block");
let _ = client.create_content_block(cb).await?;
Ok(1)
}
DiffOp::Modified { to, .. } => {
let id_index = id_index.ok_or_else(|| {
anyhow!("internal: content_block id index missing for modified apply path")
})?;
let id = id_index.get(&to.name).ok_or_else(|| {
anyhow!(
"internal: modified content block '{}' missing from id index",
to.name
)
})?;
tracing::info!(content_block = %to.name, "updating content block");
client.update_content_block(id, to).await?;
Ok(1)
}
DiffOp::Removed(_) => {
unreachable!("diff layer routes content block removals through orphan")
}
DiffOp::Unchanged => Ok(0),
}
}
async fn apply_catalog_schema(
client: &BrazeClient,
d: &CatalogSchemaDiff,
) -> anyhow::Result<usize> {
let mut count = 0;
for fd in &d.field_diffs {
match fd {
DiffOp::Added(f) => {
tracing::info!(
catalog = %d.name,
field = %f.name,
field_type = f.field_type.as_str(),
"adding catalog field"
);
client.add_catalog_field(&d.name, f).await?;
count += 1;
}
DiffOp::Removed(f) => {
tracing::info!(
catalog = %d.name,
field = %f.name,
"deleting catalog field"
);
client.delete_catalog_field(&d.name, &f.name).await?;
count += 1;
}
DiffOp::Modified { .. } => {
return Err(anyhow!(
"internal: Modified field op should have been rejected \
by check_for_unsupported_ops"
));
}
DiffOp::Unchanged => {}
}
}
Ok(count)
}
const ITEMS_BATCH_SIZE: usize = 50;
fn items_progress_bar(total: u64, label: &str, color: &str) -> indicatif::ProgressBar {
let pb = indicatif::ProgressBar::new(total);
pb.set_style(
indicatif::ProgressStyle::default_bar()
.template(&format!(
"{{spinner:.{color}}} [{{elapsed_precise}}] {{bar:40}} {{pos}}/{{len}} {label}"
))
.unwrap(),
);
pb
}
async fn run_batched<T, F, Fut>(
items: Vec<T>,
concurrency: usize,
pb: &indicatif::ProgressBar,
batch_fn: F,
) -> anyhow::Result<usize>
where
T: Send + Sync + 'static,
F: Fn(Vec<T>) -> Fut,
Fut: std::future::Future<Output = anyhow::Result<()>>,
{
let mut batches: Vec<Vec<T>> = Vec::new();
let mut iter = items.into_iter().peekable();
while iter.peek().is_some() {
batches.push(iter.by_ref().take(ITEMS_BATCH_SIZE).collect());
}
let count = futures::stream::iter(batches.into_iter().map(|batch| {
let batch_len = batch.len();
let fut = batch_fn(batch);
let pb = pb.clone();
async move {
fut.await?;
pb.inc(batch_len as u64);
Ok::<usize, anyhow::Error>(batch_len)
}
}))
.buffer_unordered(concurrency)
.try_fold(0usize, |acc, n| async move { Ok(acc + n) })
.await?;
pb.finish_and_clear();
Ok(count)
}
async fn apply_catalog_items(
client: &BrazeClient,
d: &CatalogItemsDiff,
local: &CatalogItems,
parallel_batches: u32,
) -> anyhow::Result<usize> {
if !d.has_changes() {
return Ok(0);
}
let catalog_name = &d.catalog_name;
let concurrency = (parallel_batches as usize).max(1);
let upsert_ids: Vec<&str> = d
.added_ids
.iter()
.chain(d.modified_ids.iter())
.map(String::as_str)
.collect();
let mut upsert_count = 0;
if !upsert_ids.is_empty() {
let rows = local.rows.as_ref().ok_or_else(|| {
anyhow!(
"internal: local items for catalog '{}' have no materialized rows",
catalog_name
)
})?;
let row_by_id: std::collections::HashMap<&str, &crate::resource::CatalogItemRow> =
rows.iter().map(|r| (r.id.as_str(), r)).collect();
let upsert_rows: Vec<crate::resource::CatalogItemRow> = upsert_ids
.iter()
.map(|&id| {
(*row_by_id
.get(id)
.expect("item in diff but missing from local rows"))
.clone()
})
.collect();
let pb = items_progress_bar(upsert_rows.len() as u64, "items", "green");
upsert_count = run_batched(upsert_rows, concurrency, &pb, |batch| {
let client = client.clone();
let catalog_name = catalog_name.clone();
async move {
tracing::info!(
catalog = %catalog_name,
batch_size = batch.len(),
"upserting catalog items batch"
);
client
.upsert_catalog_items(&catalog_name, &batch)
.await
.with_context(|| format!("upserting items batch for catalog '{catalog_name}'"))
}
})
.await?;
}
let mut delete_count = 0;
if !d.removed_ids.is_empty() {
let pb = items_progress_bar(d.removed_ids.len() as u64, "deletes", "red");
delete_count = run_batched(d.removed_ids.clone(), concurrency, &pb, |batch| {
let client = client.clone();
let catalog_name = catalog_name.clone();
async move {
tracing::info!(
catalog = %catalog_name,
batch_size = batch.len(),
"deleting catalog items batch"
);
client
.delete_catalog_items(&catalog_name, &batch)
.await
.with_context(|| format!("deleting items batch for catalog '{catalog_name}'"))
}
})
.await?;
}
Ok(upsert_count + delete_count)
}
async fn apply_email_template(
client: &BrazeClient,
d: &EmailTemplateDiff,
id_index: Option<&EmailTemplateIdIndex>,
archive_orphans: bool,
today: chrono::NaiveDate,
) -> anyhow::Result<usize> {
if d.orphan {
if !archive_orphans {
return Ok(0);
}
let id_index = id_index.ok_or_else(|| {
anyhow!("internal: email_template id index missing for orphan apply path")
})?;
let id = id_index.get(&d.name).ok_or_else(|| {
anyhow!(
"internal: orphan '{}' missing from id index — list/diff drift",
d.name
)
})?;
let archived = orphan::archive_name(today, &d.name);
if archived == d.name {
return Ok(0);
}
let mut et = client
.get_email_template(id)
.await
.with_context(|| format!("fetching email template '{}' for archive rename", d.name))?;
et.name = archived;
tracing::info!(
email_template = %d.name,
new_name = %et.name,
"archiving orphan email template"
);
client.update_email_template(id, &et).await?;
return Ok(1);
}
match &d.op {
DiffOp::Added(et) => {
tracing::info!(email_template = %et.name, "creating email template");
let _ = client.create_email_template(et).await?;
Ok(1)
}
DiffOp::Modified { to, .. } => {
let id_index = id_index.ok_or_else(|| {
anyhow!("internal: email_template id index missing for modified apply path")
})?;
let id = id_index.get(&to.name).ok_or_else(|| {
anyhow!(
"internal: modified email template '{}' missing from id index",
to.name
)
})?;
tracing::info!(email_template = %to.name, "updating email template");
client.update_email_template(id, to).await?;
Ok(1)
}
DiffOp::Removed(_) => {
unreachable!("diff layer routes email template removals through orphan")
}
DiffOp::Unchanged => Ok(0),
}
}
async fn apply_custom_attribute_batch(
client: &BrazeClient,
to_deprecate: &[&str],
to_reactivate: &[&str],
) -> anyhow::Result<usize> {
let mut applied = 0;
for (names, blocklisted, verb) in [
(to_deprecate, true, "deprecating"),
(to_reactivate, false, "reactivating"),
] {
if names.is_empty() {
continue;
}
tracing::info!(attributes = ?names, "{verb} custom attributes");
client
.set_custom_attribute_blocklist(names, blocklisted)
.await
.with_context(|| format!("{verb} custom attributes"))?;
let n = names.len();
let past = if blocklisted {
"deprecated"
} else {
"reactivated"
};
eprintln!(" ✓ {past} {n} custom attribute(s)");
applied += n;
}
Ok(applied)
}