1use crate::braze::error::BrazeApiError;
8use crate::braze::BrazeClient;
9use crate::config::ResolvedConfig;
10use crate::diff::catalog::diff_schema;
11use crate::diff::content_block::{
12 diff as diff_content_block, ContentBlockDiff, ContentBlockIdIndex,
13};
14use crate::diff::{DiffSummary, ResourceDiff};
15use crate::error::Error;
16use crate::format::OutputFormat;
17use crate::fs::{catalog_io, content_block_io};
18use crate::resource::{Catalog, ContentBlock, ResourceKind};
19use anyhow::Context as _;
20use clap::Args;
21use futures::stream::{StreamExt, TryStreamExt};
22use std::collections::{BTreeMap, BTreeSet};
23use std::path::Path;
24
25use super::{selected_kinds, warn_unimplemented, FETCH_CONCURRENCY};
26
27#[derive(Args, Debug)]
28pub struct DiffArgs {
29 #[arg(long, value_enum)]
31 pub resource: Option<ResourceKind>,
32
33 #[arg(long, requires = "resource")]
36 pub name: Option<String>,
37
38 #[arg(long)]
40 pub fail_on_drift: bool,
41}
42
43pub async fn run(
44 args: &DiffArgs,
45 resolved: ResolvedConfig,
46 config_dir: &Path,
47 format: OutputFormat,
48) -> anyhow::Result<()> {
49 let catalogs_root = config_dir.join(&resolved.resources.catalog_schema.path);
50 let content_blocks_root = config_dir.join(&resolved.resources.content_block.path);
51 let client = BrazeClient::from_resolved(&resolved);
52 let kinds = selected_kinds(args.resource, &resolved.resources);
53
54 let mut summary = DiffSummary::default();
55 for kind in kinds {
56 match kind {
57 ResourceKind::CatalogSchema => {
58 let diffs =
59 compute_catalog_schema_diffs(&client, &catalogs_root, args.name.as_deref())
60 .await
61 .context("computing catalog_schema diff")?;
62 summary.diffs.extend(diffs);
63 }
64 ResourceKind::ContentBlock => {
65 let (diffs, _idx) =
66 compute_content_block_plan(&client, &content_blocks_root, args.name.as_deref())
67 .await
68 .context("computing content_block diff")?;
69 summary.diffs.extend(diffs);
70 }
71 other => {
72 warn_unimplemented(other);
73 }
74 }
75 }
76
77 let formatted = format.formatter().format(&summary);
78 print!("{formatted}");
79
80 if args.fail_on_drift && summary.changed_count() > 0 {
81 return Err(Error::DriftDetected {
82 count: summary.changed_count(),
83 }
84 .into());
85 }
86
87 Ok(())
88}
89
90pub(crate) async fn compute_catalog_schema_diffs(
93 client: &BrazeClient,
94 catalogs_root: &Path,
95 name_filter: Option<&str>,
96) -> anyhow::Result<Vec<ResourceDiff>> {
97 let mut local = catalog_io::load_all_schemas(catalogs_root)?;
98 if let Some(name) = name_filter {
99 local.retain(|c| c.name == name);
100 }
101
102 let remote: Vec<Catalog> = match name_filter {
103 Some(name) => match client.get_catalog(name).await {
104 Ok(c) => vec![c],
105 Err(BrazeApiError::NotFound { .. }) => Vec::new(),
108 Err(e) => return Err(e.into()),
109 },
110 None => client.list_catalogs().await?,
111 };
112
113 let local_by_name: BTreeMap<&str, &Catalog> =
114 local.iter().map(|c| (c.name.as_str(), c)).collect();
115 let remote_by_name: BTreeMap<&str, &Catalog> =
116 remote.iter().map(|c| (c.name.as_str(), c)).collect();
117
118 let mut all_names: BTreeSet<&str> = BTreeSet::new();
119 all_names.extend(local_by_name.keys().copied());
120 all_names.extend(remote_by_name.keys().copied());
121
122 let mut diffs = Vec::new();
123 for name in all_names {
124 let l = local_by_name.get(name).copied();
125 let r = remote_by_name.get(name).copied();
126 if let Some(d) = diff_schema(l, r) {
127 diffs.push(ResourceDiff::CatalogSchema(d));
128 }
129 }
130
131 Ok(diffs)
132}
133
134pub(crate) async fn compute_content_block_plan(
138 client: &BrazeClient,
139 content_blocks_root: &Path,
140 name_filter: Option<&str>,
141) -> anyhow::Result<(Vec<ResourceDiff>, ContentBlockIdIndex)> {
142 let mut local = content_block_io::load_all_content_blocks(content_blocks_root)?;
143 if let Some(name) = name_filter {
144 local.retain(|c| c.name == name);
145 }
146
147 let mut summaries = client.list_content_blocks().await?;
148 if let Some(name) = name_filter {
149 summaries.retain(|s| s.name == name);
150 }
151
152 let id_index: ContentBlockIdIndex = summaries
153 .into_iter()
154 .map(|s| (s.name, s.content_block_id))
155 .collect();
156
157 let local_by_name: BTreeMap<&str, &ContentBlock> =
158 local.iter().map(|c| (c.name.as_str(), c)).collect();
159
160 let shared_names: Vec<&str> = id_index
163 .keys()
164 .map(String::as_str)
165 .filter(|n| local_by_name.contains_key(n))
166 .collect();
167 let fetched: BTreeMap<String, ContentBlock> =
168 futures::stream::iter(shared_names.iter().map(|name| {
169 let id = id_index
170 .get(*name)
171 .expect("id_index built from the same summaries set");
172 async move {
173 client
174 .get_content_block(id)
175 .await
176 .map(|cb| (name.to_string(), cb))
177 .with_context(|| format!("fetching content block '{name}'"))
178 }
179 }))
180 .buffer_unordered(FETCH_CONCURRENCY)
181 .try_collect()
182 .await?;
183
184 let mut all_names: BTreeSet<&str> = BTreeSet::new();
185 all_names.extend(local_by_name.keys().copied());
186 all_names.extend(id_index.keys().map(String::as_str));
187
188 let mut diffs = Vec::new();
189 for name in all_names {
190 let local_cb = local_by_name.get(name).copied();
191 let remote_cb = fetched.get(name);
192 let remote_present = id_index.contains_key(name);
193 let diff_result = match (local_cb, remote_cb, remote_present) {
200 (Some(l), Some(r), true) => diff_content_block(Some(l), Some(r)),
201 (Some(l), None, false) => diff_content_block(Some(l), None),
202 (None, None, true) => Some(ContentBlockDiff::orphan(name)),
203 _ => unreachable!(
204 "content_block diff invariant violated for '{name}': \
205 local={} remote={} remote_present={remote_present}",
206 local_cb.is_some(),
207 remote_cb.is_some(),
208 ),
209 };
210 if let Some(d) = diff_result {
211 diffs.push(ResourceDiff::ContentBlock(d));
212 }
213 }
214
215 Ok((diffs, id_index))
216}