Skip to main content

braze_sync/cli/
apply.rs

1//! `braze-sync apply` — the only command that mutates remote state.
2//!
3//! Recomputes the diff via the same code path as `diff`, prints it,
4//! then short-circuits unless `--confirm` is set. Pre-validates
5//! unsupported/destructive ops before any API call, then walks the
6//! plan and aborts on the first write failure. Braze has no
7//! cross-resource transaction, so a mid-loop failure can still leave
8//! earlier writes applied — the pre-validation prevents *known-bad*
9//! plans from firing any writes at all, but it does not promise
10//! cross-write atomicity.
11
12use crate::braze::BrazeClient;
13use crate::config::ResolvedConfig;
14use crate::diff::catalog::{CatalogItemsDiff, CatalogSchemaDiff};
15use crate::diff::content_block::{ContentBlockDiff, ContentBlockIdIndex};
16use crate::diff::custom_attribute::CustomAttributeOp;
17use crate::diff::email_template::{EmailTemplateDiff, EmailTemplateIdIndex};
18use crate::diff::orphan;
19use crate::diff::{DiffOp, DiffSummary, ResourceDiff};
20use crate::error::Error;
21use crate::format::OutputFormat;
22use crate::resource::{CatalogItems, ResourceKind};
23use anyhow::{anyhow, Context as _};
24use clap::Args;
25use futures::stream::{StreamExt, TryStreamExt};
26use std::collections::BTreeMap;
27use std::path::Path;
28
29use super::diff::{
30    compute_catalog_items_diffs, compute_catalog_schema_diffs, compute_content_block_plan,
31    compute_custom_attribute_diffs, compute_email_template_plan,
32};
33use super::selected_kinds;
34
35#[derive(Args, Debug)]
36pub struct ApplyArgs {
37    /// Limit apply to a specific resource kind.
38    #[arg(long, value_enum)]
39    pub resource: Option<ResourceKind>,
40
41    /// When `--resource` is given, optionally restrict to a single named
42    /// resource. Requires `--resource`.
43    #[arg(long, requires = "resource")]
44    pub name: Option<String>,
45
46    /// Actually apply changes. Without this, runs in dry-run mode and
47    /// makes zero write calls to Braze. This is the default for safety.
48    #[arg(long)]
49    pub confirm: bool,
50
51    /// Permit destructive operations (field deletes, etc.). Required in
52    /// addition to `--confirm` for any change that would lose data on
53    /// the Braze side.
54    #[arg(long)]
55    pub allow_destructive: bool,
56
57    /// Archive orphan Content Blocks / Email Templates by prefixing the
58    /// remote name with `[ARCHIVED-YYYY-MM-DD]`. Inert for resource
59    /// kinds that have no orphan concept (e.g. catalog schema).
60    #[arg(long)]
61    pub archive_orphans: bool,
62}
63
64pub async fn run(
65    args: &ApplyArgs,
66    resolved: ResolvedConfig,
67    config_dir: &Path,
68    format: OutputFormat,
69) -> anyhow::Result<()> {
70    let catalogs_root = config_dir.join(&resolved.resources.catalog_schema.path);
71    let content_blocks_root = config_dir.join(&resolved.resources.content_block.path);
72    let email_templates_root = config_dir.join(&resolved.resources.email_template.path);
73    let custom_attributes_path = config_dir.join(&resolved.resources.custom_attribute.path);
74    let client = BrazeClient::from_resolved(&resolved);
75    let kinds = selected_kinds(args.resource, &resolved.resources);
76
77    let mut summary = DiffSummary::default();
78    let mut content_block_id_index: Option<ContentBlockIdIndex> = None;
79    let mut email_template_id_index: Option<EmailTemplateIdIndex> = None;
80    let mut catalog_items_local: Option<BTreeMap<String, CatalogItems>> = None;
81    for kind in kinds {
82        match kind {
83            ResourceKind::CatalogSchema => {
84                let diffs =
85                    compute_catalog_schema_diffs(&client, &catalogs_root, args.name.as_deref())
86                        .await
87                        .context("computing catalog_schema plan")?;
88                summary.diffs.extend(diffs);
89            }
90            ResourceKind::CatalogItems => {
91                let (diffs, local_map) = compute_catalog_items_diffs(
92                    &client,
93                    &catalogs_root,
94                    args.name.as_deref(),
95                    true,
96                )
97                .await
98                .context("computing catalog_items plan")?;
99                summary.diffs.extend(diffs);
100                catalog_items_local = Some(local_map);
101            }
102            ResourceKind::ContentBlock => {
103                let (diffs, idx) =
104                    compute_content_block_plan(&client, &content_blocks_root, args.name.as_deref())
105                        .await
106                        .context("computing content_block plan")?;
107                summary.diffs.extend(diffs);
108                content_block_id_index = Some(idx);
109            }
110            ResourceKind::EmailTemplate => {
111                let (diffs, idx) = compute_email_template_plan(
112                    &client,
113                    &email_templates_root,
114                    args.name.as_deref(),
115                )
116                .await
117                .context("computing email_template plan")?;
118                summary.diffs.extend(diffs);
119                email_template_id_index = Some(idx);
120            }
121            ResourceKind::CustomAttribute => {
122                let diffs = compute_custom_attribute_diffs(
123                    &client,
124                    &custom_attributes_path,
125                    args.name.as_deref(),
126                )
127                .await
128                .context("computing custom_attribute plan")?;
129                summary.diffs.extend(diffs);
130            }
131        }
132    }
133
134    let mode_label = if args.confirm {
135        "Plan:"
136    } else {
137        "Plan (dry-run, pass --confirm to apply):"
138    };
139    eprintln!("{mode_label}");
140    print!("{}", format.formatter().format(&summary));
141
142    if summary.actionable_count() == 0 {
143        if summary.changed_count() > 0 {
144            eprintln!(
145                "No actionable changes to apply \
146                 (informational drift above can be reconciled with `export`)."
147            );
148        } else {
149            eprintln!("No changes to apply.");
150        }
151        return Ok(());
152    }
153
154    if !args.confirm {
155        eprintln!("DRY RUN — pass --confirm to apply these changes.");
156        return Ok(());
157    }
158
159    if summary.destructive_count() > 0 && !args.allow_destructive {
160        return Err(Error::DestructiveBlocked.into());
161    }
162
163    check_for_unsupported_ops(&summary)?;
164
165    // One canonical archive timestamp per run, even across multiple
166    // orphans. UTC (not Local) so two operators running the same archive
167    // on the same wall-clock day from different timezones produce the
168    // same `[ARCHIVED-YYYY-MM-DD]` prefix — determinism across a team
169    // matters more than matching the operator's local calendar.
170    let today = chrono::Utc::now().date_naive();
171
172    let parallel_batches = resolved.resources.catalog_items.parallel_batches;
173    let mut applied = 0;
174    let mut ca_deprecate: Vec<&str> = Vec::new();
175    let mut ca_reactivate: Vec<&str> = Vec::new();
176    for diff in &summary.diffs {
177        match diff {
178            ResourceDiff::CatalogSchema(d) => {
179                applied += apply_catalog_schema(&client, d).await?;
180            }
181            ResourceDiff::CatalogItems(d) => {
182                let local_map = catalog_items_local.as_ref().ok_or_else(|| {
183                    anyhow!("internal: catalog_items_local not populated before apply")
184                })?;
185                let local = local_map.get(&d.catalog_name).ok_or_else(|| {
186                    anyhow!(
187                        "internal: local items for catalog '{}' missing from items map",
188                        d.catalog_name
189                    )
190                })?;
191                applied += apply_catalog_items(&client, d, local, parallel_batches).await?;
192            }
193            ResourceDiff::ContentBlock(d) => {
194                applied += apply_content_block(
195                    &client,
196                    d,
197                    content_block_id_index.as_ref(),
198                    args.archive_orphans,
199                    today,
200                )
201                .await?;
202            }
203            ResourceDiff::EmailTemplate(d) => {
204                applied += apply_email_template(
205                    &client,
206                    d,
207                    email_template_id_index.as_ref(),
208                    args.archive_orphans,
209                    today,
210                )
211                .await?;
212            }
213            ResourceDiff::CustomAttribute(d) => {
214                if let CustomAttributeOp::DeprecationToggled { to, .. } = &d.op {
215                    if *to {
216                        ca_deprecate.push(&d.name);
217                    } else {
218                        ca_reactivate.push(&d.name);
219                    }
220                }
221            }
222        }
223    }
224
225    if !ca_deprecate.is_empty() || !ca_reactivate.is_empty() {
226        applied += apply_custom_attribute_batch(&client, &ca_deprecate, &ca_reactivate).await?;
227    }
228
229    eprintln!("✓ Applied {applied} change(s).");
230    Ok(())
231}
232
233/// Reject ops the API can't actually perform. Runs before any write
234/// call so a statically-known-bad plan cannot fire a partial apply;
235/// runtime write failures can still leave earlier writes in place.
236///
237/// ContentBlock diffs are deliberately not inspected here: every shape
238/// the diff layer can produce (`Added` → create, `Modified` → update,
239/// `orphan` → archive-or-noop) maps to a supported API call, so there
240/// is nothing to statically reject. If a future diff shape is added
241/// (e.g. content-type change with no in-place update), re-evaluate.
242fn check_for_unsupported_ops(summary: &DiffSummary) -> anyhow::Result<()> {
243    for diff in &summary.diffs {
244        if let ResourceDiff::CustomAttribute(d) = diff {
245            if matches!(d.op, CustomAttributeOp::PresentInGitOnly) {
246                return Err(Error::CustomAttributeCreateNotSupported {
247                    name: d.name.clone(),
248                }
249                .into());
250            }
251        }
252        if let ResourceDiff::CatalogSchema(d) = diff {
253            match &d.op {
254                DiffOp::Added(_) => {
255                    return Err(anyhow!(
256                        "creating a new catalog '{}' is not supported by braze-sync; \
257                         create the catalog in the Braze dashboard first, then run \
258                         `braze-sync export` to populate the local schema",
259                        d.name
260                    ));
261                }
262                DiffOp::Removed(_) => {
263                    return Err(anyhow!(
264                        "deleting catalog '{}' (top-level) is not supported by braze-sync; \
265                         only field-level changes can be applied",
266                        d.name
267                    ));
268                }
269                _ => {}
270            }
271            // Field type change would require delete-then-add, which is
272            // data-losing on the field — refuse rather than silently drop.
273            for fd in &d.field_diffs {
274                if let DiffOp::Modified { from, to } = fd {
275                    return Err(anyhow!(
276                        "modifying field '{}' on catalog '{}' (type {} → {}) \
277                         is not supported by braze-sync; the change would be \
278                         data-losing on the field. Drop the field manually \
279                         in the Braze dashboard and re-run `braze-sync apply`",
280                        to.name,
281                        d.name,
282                        from.field_type.as_str(),
283                        to.field_type.as_str(),
284                    ));
285                }
286            }
287        }
288    }
289    Ok(())
290}
291
292async fn apply_content_block(
293    client: &BrazeClient,
294    d: &ContentBlockDiff,
295    id_index: Option<&ContentBlockIdIndex>,
296    archive_orphans: bool,
297    today: chrono::NaiveDate,
298) -> anyhow::Result<usize> {
299    // Orphans only mutate remote state when --archive-orphans is set;
300    // otherwise the plan-print is the entire effect.
301    if d.orphan {
302        if !archive_orphans {
303            return Ok(0);
304        }
305        let id_index = id_index.ok_or_else(|| {
306            anyhow!("internal: content_block id index missing for orphan apply path")
307        })?;
308        let id = id_index.get(&d.name).ok_or_else(|| {
309            anyhow!(
310                "internal: orphan '{}' missing from id index — list/diff drift",
311                d.name
312            )
313        })?;
314        let archived = orphan::archive_name(today, &d.name);
315        if archived == d.name {
316            return Ok(0);
317        }
318        // Update endpoint requires the full body, not a partial. Safe
319        // re: state — `get_content_block` defaults state to Active
320        // (Braze /info has no state field) and `update_content_block`
321        // omits state from the wire body, so this rename can never
322        // toggle the remote state as a side effect. If either of those
323        // invariants ever changes, the orphan path needs revisiting.
324        let mut cb = client
325            .get_content_block(id)
326            .await
327            .with_context(|| format!("fetching content block '{}' for archive rename", d.name))?;
328        cb.name = archived;
329        tracing::info!(
330            content_block = %d.name,
331            new_name = %cb.name,
332            "archiving orphan content block"
333        );
334        client.update_content_block(id, &cb).await?;
335        return Ok(1);
336    }
337
338    match &d.op {
339        DiffOp::Added(cb) => {
340            tracing::info!(content_block = %cb.name, "creating content block");
341            let _ = client.create_content_block(cb).await?;
342            Ok(1)
343        }
344        DiffOp::Modified { to, .. } => {
345            let id_index = id_index.ok_or_else(|| {
346                anyhow!("internal: content_block id index missing for modified apply path")
347            })?;
348            let id = id_index.get(&to.name).ok_or_else(|| {
349                anyhow!(
350                    "internal: modified content block '{}' missing from id index",
351                    to.name
352                )
353            })?;
354            tracing::info!(content_block = %to.name, "updating content block");
355            client.update_content_block(id, to).await?;
356            Ok(1)
357        }
358        // The diff layer routes remote-only blocks through the orphan
359        // flag, never as a Removed op.
360        DiffOp::Removed(_) => {
361            unreachable!("diff layer routes content block removals through orphan")
362        }
363        DiffOp::Unchanged => Ok(0),
364    }
365}
366
367async fn apply_catalog_schema(
368    client: &BrazeClient,
369    d: &CatalogSchemaDiff,
370) -> anyhow::Result<usize> {
371    let mut count = 0;
372    for fd in &d.field_diffs {
373        match fd {
374            DiffOp::Added(f) => {
375                tracing::info!(
376                    catalog = %d.name,
377                    field = %f.name,
378                    field_type = f.field_type.as_str(),
379                    "adding catalog field"
380                );
381                client.add_catalog_field(&d.name, f).await?;
382                count += 1;
383            }
384            DiffOp::Removed(f) => {
385                tracing::info!(
386                    catalog = %d.name,
387                    field = %f.name,
388                    "deleting catalog field"
389                );
390                client.delete_catalog_field(&d.name, &f.name).await?;
391                count += 1;
392            }
393            DiffOp::Modified { .. } => {
394                return Err(anyhow!(
395                    "internal: Modified field op should have been rejected \
396                     by check_for_unsupported_ops"
397                ));
398            }
399            DiffOp::Unchanged => {}
400        }
401    }
402    Ok(count)
403}
404
405/// Batch size for catalog items upsert/delete (Braze limit).
406const ITEMS_BATCH_SIZE: usize = 50;
407
408fn items_progress_bar(total: u64, label: &str, color: &str) -> indicatif::ProgressBar {
409    let pb = indicatif::ProgressBar::new(total);
410    pb.set_style(
411        indicatif::ProgressStyle::default_bar()
412            .template(&format!(
413                "{{spinner:.{color}}} [{{elapsed_precise}}] {{bar:40}} {{pos}}/{{len}} {label}"
414            ))
415            .unwrap(),
416    );
417    pb
418}
419
420async fn run_batched<T, F, Fut>(
421    items: Vec<T>,
422    concurrency: usize,
423    pb: &indicatif::ProgressBar,
424    batch_fn: F,
425) -> anyhow::Result<usize>
426where
427    T: Send + Sync + 'static,
428    F: Fn(Vec<T>) -> Fut,
429    Fut: std::future::Future<Output = anyhow::Result<()>>,
430{
431    let mut batches: Vec<Vec<T>> = Vec::new();
432    let mut iter = items.into_iter().peekable();
433    while iter.peek().is_some() {
434        batches.push(iter.by_ref().take(ITEMS_BATCH_SIZE).collect());
435    }
436
437    let count = futures::stream::iter(batches.into_iter().map(|batch| {
438        let batch_len = batch.len();
439        let fut = batch_fn(batch);
440        let pb = pb.clone();
441        async move {
442            fut.await?;
443            pb.inc(batch_len as u64);
444            Ok::<usize, anyhow::Error>(batch_len)
445        }
446    }))
447    .buffer_unordered(concurrency)
448    .try_fold(0usize, |acc, n| async move { Ok(acc + n) })
449    .await?;
450
451    pb.finish_and_clear();
452    Ok(count)
453}
454
455async fn apply_catalog_items(
456    client: &BrazeClient,
457    d: &CatalogItemsDiff,
458    local: &CatalogItems,
459    parallel_batches: u32,
460) -> anyhow::Result<usize> {
461    if !d.has_changes() {
462        return Ok(0);
463    }
464
465    let catalog_name = &d.catalog_name;
466    let concurrency = (parallel_batches as usize).max(1);
467
468    let upsert_ids: Vec<&str> = d
469        .added_ids
470        .iter()
471        .chain(d.modified_ids.iter())
472        .map(String::as_str)
473        .collect();
474
475    let mut upsert_count = 0;
476    if !upsert_ids.is_empty() {
477        let rows = local.rows.as_ref().ok_or_else(|| {
478            anyhow!(
479                "internal: local items for catalog '{}' have no materialized rows",
480                catalog_name
481            )
482        })?;
483        let row_by_id: std::collections::HashMap<&str, &crate::resource::CatalogItemRow> =
484            rows.iter().map(|r| (r.id.as_str(), r)).collect();
485
486        let upsert_rows: Vec<crate::resource::CatalogItemRow> = upsert_ids
487            .iter()
488            .map(|&id| {
489                (*row_by_id
490                    .get(id)
491                    .expect("item in diff but missing from local rows"))
492                .clone()
493            })
494            .collect();
495
496        let pb = items_progress_bar(upsert_rows.len() as u64, "items", "green");
497
498        upsert_count = run_batched(upsert_rows, concurrency, &pb, |batch| {
499            let client = client.clone();
500            let catalog_name = catalog_name.clone();
501            async move {
502                tracing::info!(
503                    catalog = %catalog_name,
504                    batch_size = batch.len(),
505                    "upserting catalog items batch"
506                );
507                client
508                    .upsert_catalog_items(&catalog_name, &batch)
509                    .await
510                    .with_context(|| format!("upserting items batch for catalog '{catalog_name}'"))
511            }
512        })
513        .await?;
514    }
515
516    let mut delete_count = 0;
517    if !d.removed_ids.is_empty() {
518        let pb = items_progress_bar(d.removed_ids.len() as u64, "deletes", "red");
519
520        delete_count = run_batched(d.removed_ids.clone(), concurrency, &pb, |batch| {
521            let client = client.clone();
522            let catalog_name = catalog_name.clone();
523            async move {
524                tracing::info!(
525                    catalog = %catalog_name,
526                    batch_size = batch.len(),
527                    "deleting catalog items batch"
528                );
529                client
530                    .delete_catalog_items(&catalog_name, &batch)
531                    .await
532                    .with_context(|| format!("deleting items batch for catalog '{catalog_name}'"))
533            }
534        })
535        .await?;
536    }
537
538    Ok(upsert_count + delete_count)
539}
540
541async fn apply_email_template(
542    client: &BrazeClient,
543    d: &EmailTemplateDiff,
544    id_index: Option<&EmailTemplateIdIndex>,
545    archive_orphans: bool,
546    today: chrono::NaiveDate,
547) -> anyhow::Result<usize> {
548    if d.orphan {
549        if !archive_orphans {
550            return Ok(0);
551        }
552        let id_index = id_index.ok_or_else(|| {
553            anyhow!("internal: email_template id index missing for orphan apply path")
554        })?;
555        let id = id_index.get(&d.name).ok_or_else(|| {
556            anyhow!(
557                "internal: orphan '{}' missing from id index — list/diff drift",
558                d.name
559            )
560        })?;
561        let archived = orphan::archive_name(today, &d.name);
562        if archived == d.name {
563            return Ok(0);
564        }
565        let mut et = client
566            .get_email_template(id)
567            .await
568            .with_context(|| format!("fetching email template '{}' for archive rename", d.name))?;
569        et.name = archived;
570        tracing::info!(
571            email_template = %d.name,
572            new_name = %et.name,
573            "archiving orphan email template"
574        );
575        client.update_email_template(id, &et).await?;
576        return Ok(1);
577    }
578
579    match &d.op {
580        DiffOp::Added(et) => {
581            tracing::info!(email_template = %et.name, "creating email template");
582            let _ = client.create_email_template(et).await?;
583            Ok(1)
584        }
585        DiffOp::Modified { to, .. } => {
586            let id_index = id_index.ok_or_else(|| {
587                anyhow!("internal: email_template id index missing for modified apply path")
588            })?;
589            let id = id_index.get(&to.name).ok_or_else(|| {
590                anyhow!(
591                    "internal: modified email template '{}' missing from id index",
592                    to.name
593                )
594            })?;
595            tracing::info!(email_template = %to.name, "updating email template");
596            client.update_email_template(id, to).await?;
597            Ok(1)
598        }
599        DiffOp::Removed(_) => {
600            unreachable!("diff layer routes email template removals through orphan")
601        }
602        DiffOp::Unchanged => Ok(0),
603    }
604}
605
606/// Batch custom attribute deprecation toggles by direction so we issue
607/// at most two API calls. Each batch is reported to stderr on success so
608/// the user can tell what was committed if a later batch fails.
609async fn apply_custom_attribute_batch(
610    client: &BrazeClient,
611    to_deprecate: &[&str],
612    to_reactivate: &[&str],
613) -> anyhow::Result<usize> {
614    let mut applied = 0;
615    for (names, blocklisted, verb) in [
616        (to_deprecate, true, "deprecating"),
617        (to_reactivate, false, "reactivating"),
618    ] {
619        if names.is_empty() {
620            continue;
621        }
622        tracing::info!(attributes = ?names, "{verb} custom attributes");
623        client
624            .set_custom_attribute_blocklist(names, blocklisted)
625            .await
626            .with_context(|| format!("{verb} custom attributes"))?;
627        let n = names.len();
628        let past = if blocklisted {
629            "deprecated"
630        } else {
631            "reactivated"
632        };
633        eprintln!("  ✓ {past} {n} custom attribute(s)");
634        applied += n;
635    }
636
637    Ok(applied)
638}