1use crate::braze::BrazeClient;
13use crate::config::ResolvedConfig;
14use crate::diff::catalog::{CatalogItemsDiff, CatalogSchemaDiff};
15use crate::diff::content_block::{ContentBlockDiff, ContentBlockIdIndex};
16use crate::diff::custom_attribute::CustomAttributeOp;
17use crate::diff::email_template::{EmailTemplateDiff, EmailTemplateIdIndex};
18use crate::diff::orphan;
19use crate::diff::{DiffOp, DiffSummary, ResourceDiff};
20use crate::error::Error;
21use crate::format::OutputFormat;
22use crate::resource::{CatalogItems, ResourceKind};
23use anyhow::{anyhow, Context as _};
24use clap::Args;
25use futures::stream::{StreamExt, TryStreamExt};
26use std::collections::BTreeMap;
27use std::path::Path;
28
29use super::diff::{
30 compute_catalog_items_diffs, compute_catalog_schema_diffs, compute_content_block_plan,
31 compute_custom_attribute_diffs, compute_email_template_plan,
32};
33use super::selected_kinds;
34
35#[derive(Args, Debug)]
36pub struct ApplyArgs {
37 #[arg(long, value_enum)]
39 pub resource: Option<ResourceKind>,
40
41 #[arg(long, requires = "resource")]
44 pub name: Option<String>,
45
46 #[arg(long)]
49 pub confirm: bool,
50
51 #[arg(long)]
55 pub allow_destructive: bool,
56
57 #[arg(long)]
61 pub archive_orphans: bool,
62}
63
64pub async fn run(
65 args: &ApplyArgs,
66 resolved: ResolvedConfig,
67 config_dir: &Path,
68 format: OutputFormat,
69) -> anyhow::Result<()> {
70 let catalogs_root = config_dir.join(&resolved.resources.catalog_schema.path);
71 let content_blocks_root = config_dir.join(&resolved.resources.content_block.path);
72 let email_templates_root = config_dir.join(&resolved.resources.email_template.path);
73 let custom_attributes_path = config_dir.join(&resolved.resources.custom_attribute.path);
74 let client = BrazeClient::from_resolved(&resolved);
75 let kinds = selected_kinds(args.resource, &resolved.resources);
76
77 let mut summary = DiffSummary::default();
78 let mut content_block_id_index: Option<ContentBlockIdIndex> = None;
79 let mut email_template_id_index: Option<EmailTemplateIdIndex> = None;
80 let mut catalog_items_local: Option<BTreeMap<String, CatalogItems>> = None;
81 for kind in kinds {
82 match kind {
83 ResourceKind::CatalogSchema => {
84 let diffs =
85 compute_catalog_schema_diffs(&client, &catalogs_root, args.name.as_deref())
86 .await
87 .context("computing catalog_schema plan")?;
88 summary.diffs.extend(diffs);
89 }
90 ResourceKind::CatalogItems => {
91 let (diffs, local_map) = compute_catalog_items_diffs(
92 &client,
93 &catalogs_root,
94 args.name.as_deref(),
95 true,
96 )
97 .await
98 .context("computing catalog_items plan")?;
99 summary.diffs.extend(diffs);
100 catalog_items_local = Some(local_map);
101 }
102 ResourceKind::ContentBlock => {
103 let (diffs, idx) =
104 compute_content_block_plan(&client, &content_blocks_root, args.name.as_deref())
105 .await
106 .context("computing content_block plan")?;
107 summary.diffs.extend(diffs);
108 content_block_id_index = Some(idx);
109 }
110 ResourceKind::EmailTemplate => {
111 let (diffs, idx) = compute_email_template_plan(
112 &client,
113 &email_templates_root,
114 args.name.as_deref(),
115 )
116 .await
117 .context("computing email_template plan")?;
118 summary.diffs.extend(diffs);
119 email_template_id_index = Some(idx);
120 }
121 ResourceKind::CustomAttribute => {
122 let diffs = compute_custom_attribute_diffs(
123 &client,
124 &custom_attributes_path,
125 args.name.as_deref(),
126 )
127 .await
128 .context("computing custom_attribute plan")?;
129 summary.diffs.extend(diffs);
130 }
131 }
132 }
133
134 let mode_label = if args.confirm {
135 "Plan:"
136 } else {
137 "Plan (dry-run, pass --confirm to apply):"
138 };
139 eprintln!("{mode_label}");
140 print!("{}", format.formatter().format(&summary));
141
142 if summary.actionable_count() == 0 {
143 if summary.changed_count() > 0 {
144 eprintln!(
145 "No actionable changes to apply \
146 (informational drift above can be reconciled with `export`)."
147 );
148 } else {
149 eprintln!("No changes to apply.");
150 }
151 return Ok(());
152 }
153
154 if !args.confirm {
155 eprintln!("DRY RUN — pass --confirm to apply these changes.");
156 return Ok(());
157 }
158
159 if summary.destructive_count() > 0 && !args.allow_destructive {
160 return Err(Error::DestructiveBlocked.into());
161 }
162
163 check_for_unsupported_ops(&summary)?;
164
165 let today = chrono::Utc::now().date_naive();
171
172 let parallel_batches = resolved.resources.catalog_items.parallel_batches;
173 let mut applied = 0;
174 let mut ca_deprecate: Vec<&str> = Vec::new();
175 let mut ca_reactivate: Vec<&str> = Vec::new();
176 for diff in &summary.diffs {
177 match diff {
178 ResourceDiff::CatalogSchema(d) => {
179 applied += apply_catalog_schema(&client, d).await?;
180 }
181 ResourceDiff::CatalogItems(d) => {
182 let local_map = catalog_items_local.as_ref().ok_or_else(|| {
183 anyhow!("internal: catalog_items_local not populated before apply")
184 })?;
185 let local = local_map.get(&d.catalog_name).ok_or_else(|| {
186 anyhow!(
187 "internal: local items for catalog '{}' missing from items map",
188 d.catalog_name
189 )
190 })?;
191 applied += apply_catalog_items(&client, d, local, parallel_batches).await?;
192 }
193 ResourceDiff::ContentBlock(d) => {
194 applied += apply_content_block(
195 &client,
196 d,
197 content_block_id_index.as_ref(),
198 args.archive_orphans,
199 today,
200 )
201 .await?;
202 }
203 ResourceDiff::EmailTemplate(d) => {
204 applied += apply_email_template(
205 &client,
206 d,
207 email_template_id_index.as_ref(),
208 args.archive_orphans,
209 today,
210 )
211 .await?;
212 }
213 ResourceDiff::CustomAttribute(d) => {
214 if let CustomAttributeOp::DeprecationToggled { to, .. } = &d.op {
215 if *to {
216 ca_deprecate.push(&d.name);
217 } else {
218 ca_reactivate.push(&d.name);
219 }
220 }
221 }
222 }
223 }
224
225 if !ca_deprecate.is_empty() || !ca_reactivate.is_empty() {
226 applied += apply_custom_attribute_batch(&client, &ca_deprecate, &ca_reactivate).await?;
227 }
228
229 eprintln!("✓ Applied {applied} change(s).");
230 Ok(())
231}
232
233fn check_for_unsupported_ops(summary: &DiffSummary) -> anyhow::Result<()> {
243 for diff in &summary.diffs {
244 if let ResourceDiff::CustomAttribute(d) = diff {
245 if matches!(d.op, CustomAttributeOp::PresentInGitOnly) {
246 return Err(Error::CustomAttributeCreateNotSupported {
247 name: d.name.clone(),
248 }
249 .into());
250 }
251 }
252 if let ResourceDiff::CatalogSchema(d) = diff {
253 match &d.op {
254 DiffOp::Added(_) => {
255 return Err(anyhow!(
256 "creating a new catalog '{}' is not supported by braze-sync; \
257 create the catalog in the Braze dashboard first, then run \
258 `braze-sync export` to populate the local schema",
259 d.name
260 ));
261 }
262 DiffOp::Removed(_) => {
263 return Err(anyhow!(
264 "deleting catalog '{}' (top-level) is not supported by braze-sync; \
265 only field-level changes can be applied",
266 d.name
267 ));
268 }
269 _ => {}
270 }
271 for fd in &d.field_diffs {
274 if let DiffOp::Modified { from, to } = fd {
275 return Err(anyhow!(
276 "modifying field '{}' on catalog '{}' (type {} → {}) \
277 is not supported by braze-sync; the change would be \
278 data-losing on the field. Drop the field manually \
279 in the Braze dashboard and re-run `braze-sync apply`",
280 to.name,
281 d.name,
282 from.field_type.as_str(),
283 to.field_type.as_str(),
284 ));
285 }
286 }
287 }
288 }
289 Ok(())
290}
291
292async fn apply_content_block(
293 client: &BrazeClient,
294 d: &ContentBlockDiff,
295 id_index: Option<&ContentBlockIdIndex>,
296 archive_orphans: bool,
297 today: chrono::NaiveDate,
298) -> anyhow::Result<usize> {
299 if d.orphan {
302 if !archive_orphans {
303 return Ok(0);
304 }
305 let id_index = id_index.ok_or_else(|| {
306 anyhow!("internal: content_block id index missing for orphan apply path")
307 })?;
308 let id = id_index.get(&d.name).ok_or_else(|| {
309 anyhow!(
310 "internal: orphan '{}' missing from id index — list/diff drift",
311 d.name
312 )
313 })?;
314 let archived = orphan::archive_name(today, &d.name);
315 if archived == d.name {
316 return Ok(0);
317 }
318 let mut cb = client
325 .get_content_block(id)
326 .await
327 .with_context(|| format!("fetching content block '{}' for archive rename", d.name))?;
328 cb.name = archived;
329 tracing::info!(
330 content_block = %d.name,
331 new_name = %cb.name,
332 "archiving orphan content block"
333 );
334 client.update_content_block(id, &cb).await?;
335 return Ok(1);
336 }
337
338 match &d.op {
339 DiffOp::Added(cb) => {
340 tracing::info!(content_block = %cb.name, "creating content block");
341 let _ = client.create_content_block(cb).await?;
342 Ok(1)
343 }
344 DiffOp::Modified { to, .. } => {
345 let id_index = id_index.ok_or_else(|| {
346 anyhow!("internal: content_block id index missing for modified apply path")
347 })?;
348 let id = id_index.get(&to.name).ok_or_else(|| {
349 anyhow!(
350 "internal: modified content block '{}' missing from id index",
351 to.name
352 )
353 })?;
354 tracing::info!(content_block = %to.name, "updating content block");
355 client.update_content_block(id, to).await?;
356 Ok(1)
357 }
358 DiffOp::Removed(_) => {
361 unreachable!("diff layer routes content block removals through orphan")
362 }
363 DiffOp::Unchanged => Ok(0),
364 }
365}
366
367async fn apply_catalog_schema(
368 client: &BrazeClient,
369 d: &CatalogSchemaDiff,
370) -> anyhow::Result<usize> {
371 let mut count = 0;
372 for fd in &d.field_diffs {
373 match fd {
374 DiffOp::Added(f) => {
375 tracing::info!(
376 catalog = %d.name,
377 field = %f.name,
378 field_type = f.field_type.as_str(),
379 "adding catalog field"
380 );
381 client.add_catalog_field(&d.name, f).await?;
382 count += 1;
383 }
384 DiffOp::Removed(f) => {
385 tracing::info!(
386 catalog = %d.name,
387 field = %f.name,
388 "deleting catalog field"
389 );
390 client.delete_catalog_field(&d.name, &f.name).await?;
391 count += 1;
392 }
393 DiffOp::Modified { .. } => {
394 return Err(anyhow!(
395 "internal: Modified field op should have been rejected \
396 by check_for_unsupported_ops"
397 ));
398 }
399 DiffOp::Unchanged => {}
400 }
401 }
402 Ok(count)
403}
404
405const ITEMS_BATCH_SIZE: usize = 50;
407
408fn items_progress_bar(total: u64, label: &str, color: &str) -> indicatif::ProgressBar {
409 let pb = indicatif::ProgressBar::new(total);
410 pb.set_style(
411 indicatif::ProgressStyle::default_bar()
412 .template(&format!(
413 "{{spinner:.{color}}} [{{elapsed_precise}}] {{bar:40}} {{pos}}/{{len}} {label}"
414 ))
415 .unwrap(),
416 );
417 pb
418}
419
420async fn run_batched<T, F, Fut>(
421 items: Vec<T>,
422 concurrency: usize,
423 pb: &indicatif::ProgressBar,
424 batch_fn: F,
425) -> anyhow::Result<usize>
426where
427 T: Send + Sync + 'static,
428 F: Fn(Vec<T>) -> Fut,
429 Fut: std::future::Future<Output = anyhow::Result<()>>,
430{
431 let mut batches: Vec<Vec<T>> = Vec::new();
432 let mut iter = items.into_iter().peekable();
433 while iter.peek().is_some() {
434 batches.push(iter.by_ref().take(ITEMS_BATCH_SIZE).collect());
435 }
436
437 let count = futures::stream::iter(batches.into_iter().map(|batch| {
438 let batch_len = batch.len();
439 let fut = batch_fn(batch);
440 let pb = pb.clone();
441 async move {
442 fut.await?;
443 pb.inc(batch_len as u64);
444 Ok::<usize, anyhow::Error>(batch_len)
445 }
446 }))
447 .buffer_unordered(concurrency)
448 .try_fold(0usize, |acc, n| async move { Ok(acc + n) })
449 .await?;
450
451 pb.finish_and_clear();
452 Ok(count)
453}
454
455async fn apply_catalog_items(
456 client: &BrazeClient,
457 d: &CatalogItemsDiff,
458 local: &CatalogItems,
459 parallel_batches: u32,
460) -> anyhow::Result<usize> {
461 if !d.has_changes() {
462 return Ok(0);
463 }
464
465 let catalog_name = &d.catalog_name;
466 let concurrency = (parallel_batches as usize).max(1);
467
468 let upsert_ids: Vec<&str> = d
469 .added_ids
470 .iter()
471 .chain(d.modified_ids.iter())
472 .map(String::as_str)
473 .collect();
474
475 let mut upsert_count = 0;
476 if !upsert_ids.is_empty() {
477 let rows = local.rows.as_ref().ok_or_else(|| {
478 anyhow!(
479 "internal: local items for catalog '{}' have no materialized rows",
480 catalog_name
481 )
482 })?;
483 let row_by_id: std::collections::HashMap<&str, &crate::resource::CatalogItemRow> =
484 rows.iter().map(|r| (r.id.as_str(), r)).collect();
485
486 let upsert_rows: Vec<crate::resource::CatalogItemRow> = upsert_ids
487 .iter()
488 .map(|&id| {
489 (*row_by_id
490 .get(id)
491 .expect("item in diff but missing from local rows"))
492 .clone()
493 })
494 .collect();
495
496 let pb = items_progress_bar(upsert_rows.len() as u64, "items", "green");
497
498 upsert_count = run_batched(upsert_rows, concurrency, &pb, |batch| {
499 let client = client.clone();
500 let catalog_name = catalog_name.clone();
501 async move {
502 tracing::info!(
503 catalog = %catalog_name,
504 batch_size = batch.len(),
505 "upserting catalog items batch"
506 );
507 client
508 .upsert_catalog_items(&catalog_name, &batch)
509 .await
510 .with_context(|| format!("upserting items batch for catalog '{catalog_name}'"))
511 }
512 })
513 .await?;
514 }
515
516 let mut delete_count = 0;
517 if !d.removed_ids.is_empty() {
518 let pb = items_progress_bar(d.removed_ids.len() as u64, "deletes", "red");
519
520 delete_count = run_batched(d.removed_ids.clone(), concurrency, &pb, |batch| {
521 let client = client.clone();
522 let catalog_name = catalog_name.clone();
523 async move {
524 tracing::info!(
525 catalog = %catalog_name,
526 batch_size = batch.len(),
527 "deleting catalog items batch"
528 );
529 client
530 .delete_catalog_items(&catalog_name, &batch)
531 .await
532 .with_context(|| format!("deleting items batch for catalog '{catalog_name}'"))
533 }
534 })
535 .await?;
536 }
537
538 Ok(upsert_count + delete_count)
539}
540
541async fn apply_email_template(
542 client: &BrazeClient,
543 d: &EmailTemplateDiff,
544 id_index: Option<&EmailTemplateIdIndex>,
545 archive_orphans: bool,
546 today: chrono::NaiveDate,
547) -> anyhow::Result<usize> {
548 if d.orphan {
549 if !archive_orphans {
550 return Ok(0);
551 }
552 let id_index = id_index.ok_or_else(|| {
553 anyhow!("internal: email_template id index missing for orphan apply path")
554 })?;
555 let id = id_index.get(&d.name).ok_or_else(|| {
556 anyhow!(
557 "internal: orphan '{}' missing from id index — list/diff drift",
558 d.name
559 )
560 })?;
561 let archived = orphan::archive_name(today, &d.name);
562 if archived == d.name {
563 return Ok(0);
564 }
565 let mut et = client
566 .get_email_template(id)
567 .await
568 .with_context(|| format!("fetching email template '{}' for archive rename", d.name))?;
569 et.name = archived;
570 tracing::info!(
571 email_template = %d.name,
572 new_name = %et.name,
573 "archiving orphan email template"
574 );
575 client.update_email_template(id, &et).await?;
576 return Ok(1);
577 }
578
579 match &d.op {
580 DiffOp::Added(et) => {
581 tracing::info!(email_template = %et.name, "creating email template");
582 let _ = client.create_email_template(et).await?;
583 Ok(1)
584 }
585 DiffOp::Modified { to, .. } => {
586 let id_index = id_index.ok_or_else(|| {
587 anyhow!("internal: email_template id index missing for modified apply path")
588 })?;
589 let id = id_index.get(&to.name).ok_or_else(|| {
590 anyhow!(
591 "internal: modified email template '{}' missing from id index",
592 to.name
593 )
594 })?;
595 tracing::info!(email_template = %to.name, "updating email template");
596 client.update_email_template(id, to).await?;
597 Ok(1)
598 }
599 DiffOp::Removed(_) => {
600 unreachable!("diff layer routes email template removals through orphan")
601 }
602 DiffOp::Unchanged => Ok(0),
603 }
604}
605
606async fn apply_custom_attribute_batch(
610 client: &BrazeClient,
611 to_deprecate: &[&str],
612 to_reactivate: &[&str],
613) -> anyhow::Result<usize> {
614 let mut applied = 0;
615 for (names, blocklisted, verb) in [
616 (to_deprecate, true, "deprecating"),
617 (to_reactivate, false, "reactivating"),
618 ] {
619 if names.is_empty() {
620 continue;
621 }
622 tracing::info!(attributes = ?names, "{verb} custom attributes");
623 client
624 .set_custom_attribute_blocklist(names, blocklisted)
625 .await
626 .with_context(|| format!("{verb} custom attributes"))?;
627 let n = names.len();
628 let past = if blocklisted {
629 "deprecated"
630 } else {
631 "reactivated"
632 };
633 eprintln!(" ✓ {past} {n} custom attribute(s)");
634 applied += n;
635 }
636
637 Ok(applied)
638}