1use crate::{
14 lib_context::{FlowContext, FlowExecutionContext, LibSetupContext},
15 ops::{
16 get_attachment_factory, get_optional_target_factory,
17 interface::{AttachmentSetupKey, FlowInstanceContext, TargetFactory},
18 },
19 prelude::*,
20 setup::{AttachmentsSetupChange, TargetSetupChange},
21};
22
23use sqlx::PgPool;
24use std::{
25 fmt::{Debug, Display},
26 str::FromStr,
27};
28
29use super::{AllSetupStates, GlobalSetupChange};
30use super::{
31 CombinedState, DesiredMode, ExistingMode, FlowSetupChange, FlowSetupState, ObjectSetupChange,
32 ObjectStatus, ResourceIdentifier, ResourceSetupChange, ResourceSetupInfo, SetupChangeType,
33 StateChange, TargetSetupState, db_metadata,
34};
35use crate::execution::db_tracking_setup;
36use std::fmt::Write;
37
38enum MetadataRecordType {
39 FlowVersion,
40 FlowMetadata,
41 TrackingTable,
42 Target(String),
43}
44
45impl Display for MetadataRecordType {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 match self {
48 MetadataRecordType::FlowVersion => f.write_str(db_metadata::FLOW_VERSION_RESOURCE_TYPE),
49 MetadataRecordType::FlowMetadata => write!(f, "FlowMetadata"),
50 MetadataRecordType::TrackingTable => write!(f, "TrackingTable"),
51 MetadataRecordType::Target(target_id) => write!(f, "Target:{target_id}"),
52 }
53 }
54}
55
56impl std::str::FromStr for MetadataRecordType {
57 type Err = Error;
58
59 fn from_str(s: &str) -> Result<Self> {
60 if s == db_metadata::FLOW_VERSION_RESOURCE_TYPE {
61 Ok(Self::FlowVersion)
62 } else if s == "FlowMetadata" {
63 Ok(Self::FlowMetadata)
64 } else if s == "TrackingTable" {
65 Ok(Self::TrackingTable)
66 } else if let Some(target_id) = s.strip_prefix("Target:") {
67 Ok(Self::Target(target_id.to_string()))
68 } else {
69 internal_bail!("Invalid MetadataRecordType string: {}", s)
70 }
71 }
72}
73
74fn from_metadata_record<S: DeserializeOwned + Debug + Clone>(
75 state: Option<serde_json::Value>,
76 staging_changes: sqlx::types::Json<Vec<StateChange<serde_json::Value>>>,
77 legacy_state_key: Option<serde_json::Value>,
78) -> Result<CombinedState<S>> {
79 let current: Option<S> = state.map(utils::deser::from_json_value).transpose()?;
80 let staging: Vec<StateChange<S>> = (staging_changes.0.into_iter())
81 .map(|sc| -> Result<_> {
82 Ok(match sc {
83 StateChange::Upsert(v) => StateChange::Upsert(utils::deser::from_json_value(v)?),
84 StateChange::Delete => StateChange::Delete,
85 })
86 })
87 .collect::<Result<_>>()?;
88 Ok(CombinedState {
89 current,
90 staging,
91 legacy_state_key,
92 })
93}
94
95fn get_export_target_factory(target_type: &str) -> Option<Arc<dyn TargetFactory + Send + Sync>> {
96 get_optional_target_factory(target_type)
97}
98
99pub async fn get_existing_setup_state(pool: &PgPool) -> Result<AllSetupStates<ExistingMode>> {
100 let setup_metadata_records = db_metadata::read_setup_metadata(pool).await?;
101
102 let setup_metadata_records = if let Some(records) = setup_metadata_records {
103 records
104 } else {
105 return Ok(AllSetupStates::default());
106 };
107
108 let setup_metadata_records = setup_metadata_records.into_iter().fold(
110 BTreeMap::<String, Vec<_>>::new(),
111 |mut acc, record| {
112 acc.entry(record.flow_name.clone())
113 .or_default()
114 .push(record);
115 acc
116 },
117 );
118
119 let flows = setup_metadata_records
120 .into_iter()
121 .map(|(flow_name, metadata_records)| -> Result<_> {
122 let mut flow_ss = FlowSetupState::default();
123 for metadata_record in metadata_records {
124 let state = metadata_record.state;
125 let staging_changes = metadata_record.staging_changes;
126 match MetadataRecordType::from_str(&metadata_record.resource_type)? {
127 MetadataRecordType::FlowVersion => {
128 flow_ss.seen_flow_metadata_version =
129 db_metadata::parse_flow_version(&state);
130 }
131 MetadataRecordType::FlowMetadata => {
132 flow_ss.metadata = from_metadata_record(state, staging_changes, None)?;
133 }
134 MetadataRecordType::TrackingTable => {
135 flow_ss.tracking_table =
136 from_metadata_record(state, staging_changes, None)?;
137 }
138 MetadataRecordType::Target(target_type) => {
139 let normalized_key = {
140 if let Some(factory) = get_export_target_factory(&target_type) {
141 factory.normalize_setup_key(&metadata_record.key)?
142 } else {
143 metadata_record.key.clone()
144 }
145 };
146 let combined_state = from_metadata_record(
147 state,
148 staging_changes,
149 (normalized_key != metadata_record.key).then_some(metadata_record.key),
150 )?;
151 flow_ss.targets.insert(
152 super::ResourceIdentifier {
153 key: normalized_key,
154 target_kind: target_type,
155 },
156 combined_state,
157 );
158 }
159 }
160 }
161 Ok((flow_name, flow_ss))
162 })
163 .collect::<Result<_>>()?;
164
165 Ok(AllSetupStates {
166 has_metadata_table: true,
167 flows,
168 })
169}
170
171fn diff_state<E, D, T>(
172 existing_state: Option<&E>,
173 desired_state: Option<&D>,
174 diff: impl Fn(Option<&E>, &D) -> Option<StateChange<T>>,
175) -> Option<StateChange<T>>
176where
177 E: PartialEq<D>,
178{
179 match (existing_state, desired_state) {
180 (None, None) => None,
181 (Some(_), None) => Some(StateChange::Delete),
182 (existing_state, Some(desired_state)) => {
183 if existing_state.map(|e| e == desired_state).unwrap_or(false) {
184 None
185 } else {
186 diff(existing_state, desired_state)
187 }
188 }
189 }
190}
191
192fn to_object_status<A, B>(existing: Option<A>, desired: Option<B>) -> Option<ObjectStatus> {
193 Some(match (&existing, &desired) {
194 (Some(_), None) => ObjectStatus::Deleted,
195 (None, Some(_)) => ObjectStatus::New,
196 (Some(_), Some(_)) => ObjectStatus::Existing,
197 (None, None) => return None,
198 })
199}
200
201#[derive(Debug)]
202struct GroupedResourceStates<S: Debug + Clone> {
203 desired: Option<S>,
204 existing: CombinedState<S>,
205}
206
207impl<S: Debug + Clone> Default for GroupedResourceStates<S> {
208 fn default() -> Self {
209 Self {
210 desired: None,
211 existing: CombinedState::default(),
212 }
213 }
214}
215
216fn group_states<K: Hash + Eq + std::fmt::Display + std::fmt::Debug + Clone, S: Debug + Clone>(
217 desired: impl Iterator<Item = (K, S)>,
218 existing: impl Iterator<Item = (K, CombinedState<S>)>,
219) -> Result<IndexMap<K, GroupedResourceStates<S>>> {
220 let mut grouped: IndexMap<K, GroupedResourceStates<S>> = desired
221 .into_iter()
222 .map(|(key, state)| {
223 (
224 key,
225 GroupedResourceStates {
226 desired: Some(state.clone()),
227 existing: CombinedState::default(),
228 },
229 )
230 })
231 .collect();
232 for (key, state) in existing {
233 let entry = grouped.entry(key.clone());
234 if state.current.is_some()
235 && let indexmap::map::Entry::Occupied(entry) = &entry
236 && entry.get().existing.current.is_some()
237 {
238 internal_bail!("Duplicate existing state for key: {}", entry.key());
239 }
240 let entry = entry.or_default();
241 if let Some(current) = &state.current {
242 entry.existing.current = Some(current.clone());
243 }
244 if let Some(legacy_state_key) = &state.legacy_state_key {
245 if entry
246 .existing
247 .legacy_state_key
248 .as_ref()
249 .is_some_and(|v| v != legacy_state_key)
250 {
251 warn!(
252 "inconsistent legacy key: {key}, {:?}",
253 entry.existing.legacy_state_key
254 );
255 }
256 entry.existing.legacy_state_key = Some(legacy_state_key.clone());
257 }
258 for s in state.staging.iter() {
259 match s {
260 StateChange::Upsert(v) => {
261 entry.existing.staging.push(StateChange::Upsert(v.clone()))
262 }
263 StateChange::Delete => entry.existing.staging.push(StateChange::Delete),
264 }
265 }
266 }
267 Ok(grouped)
268}
269
270async fn collect_attachments_setup_change(
271 target_key: &serde_json::Value,
272 desired: Option<&TargetSetupState>,
273 existing: &CombinedState<TargetSetupState>,
274 context: &interface::FlowInstanceContext,
275) -> Result<AttachmentsSetupChange> {
276 let existing_current_attachments = existing
277 .current
278 .iter()
279 .flat_map(|s| s.attachments.iter())
280 .map(|(key, state)| (key.clone(), CombinedState::current(state.clone())));
281 let existing_staging_attachments = existing.staging.iter().flat_map(|s| {
282 match s {
283 StateChange::Upsert(s) => Some(s.attachments.iter().map(|(key, state)| {
284 (
285 key.clone(),
286 CombinedState::staging(StateChange::Upsert(state.clone())),
287 )
288 })),
289 StateChange::Delete => None,
290 }
291 .into_iter()
292 .flatten()
293 });
294 let mut grouped_attachment_states = group_states(
295 desired.iter().flat_map(|s| {
296 s.attachments
297 .iter()
298 .map(|(key, state)| (key.clone(), state.clone()))
299 }),
300 (existing_current_attachments.into_iter())
301 .chain(existing_staging_attachments)
302 .rev(),
303 )?;
304 if existing
305 .staging
306 .iter()
307 .any(|s| matches!(s, StateChange::Delete))
308 {
309 for state in grouped_attachment_states.values_mut() {
310 if state
311 .existing
312 .staging
313 .iter()
314 .all(|s| matches!(s, StateChange::Delete))
315 {
316 state.existing.staging.push(StateChange::Delete);
317 }
318 }
319 }
320
321 let mut attachments_change = AttachmentsSetupChange::default();
322 for (AttachmentSetupKey(kind, key), setup_state) in grouped_attachment_states.into_iter() {
323 let has_diff = setup_state
324 .existing
325 .has_state_diff(setup_state.desired.as_ref(), |s| s);
326 if !has_diff {
327 continue;
328 }
329 attachments_change.has_tracked_state_change = true;
330 let factory = get_attachment_factory(&kind)?;
331 let is_upsertion = setup_state.desired.is_some();
332 if let Some(action) = factory
333 .diff_setup_states(
334 target_key,
335 &key,
336 setup_state.desired,
337 setup_state.existing,
338 context,
339 )
340 .await?
341 {
342 if is_upsertion {
343 attachments_change.upserts.push(action);
344 } else {
345 attachments_change.deletes.push(action);
346 }
347 }
348 }
349 Ok(attachments_change)
350}
351
352pub async fn diff_flow_setup_states(
353 desired_state: Option<&FlowSetupState<DesiredMode>>,
354 existing_state: Option<&FlowSetupState<ExistingMode>>,
355 flow_instance_ctx: &Arc<FlowInstanceContext>,
356) -> Result<FlowSetupChange> {
357 let metadata_change = diff_state(
358 existing_state.map(|e| &e.metadata),
359 desired_state.map(|d| &d.metadata),
360 |_, desired_state| Some(StateChange::Upsert(desired_state.clone())),
361 );
362
363 let source_names_needs_states_cleanup: BTreeMap<i32, BTreeSet<String>> =
365 if let Some(desired_state) = desired_state
366 && let Some(existing_state) = existing_state
367 {
368 let new_source_id_to_kind = desired_state
369 .metadata
370 .sources
371 .values()
372 .map(|v| (v.source_id, &v.source_kind))
373 .collect::<HashMap<i32, &String>>();
374
375 let mut existing_source_id_to_name_kind =
376 BTreeMap::<i32, Vec<(&String, &String)>>::new();
377 for (name, setup_state) in existing_state
378 .metadata
379 .possible_versions()
380 .flat_map(|v| v.sources.iter())
381 {
382 if !setup_state.source_kind.is_empty() {
384 existing_source_id_to_name_kind
385 .entry(setup_state.source_id)
386 .or_default()
387 .push((name, &setup_state.source_kind));
388 }
389 }
390
391 (existing_source_id_to_name_kind.into_iter())
392 .map(|(id, name_kinds)| {
393 let new_kind = new_source_id_to_kind.get(&id).copied();
394 let source_names_for_legacy_states = name_kinds
395 .into_iter()
396 .filter_map(|(name, kind)| {
397 if Some(kind) != new_kind {
398 Some(name.clone())
399 } else {
400 None
401 }
402 })
403 .collect::<BTreeSet<_>>();
404 (id, source_names_for_legacy_states)
405 })
406 .filter(|(_, v)| !v.is_empty())
407 .collect::<BTreeMap<_, _>>()
408 } else {
409 BTreeMap::new()
410 };
411
412 let tracking_table_change = db_tracking_setup::TrackingTableSetupChange::new(
413 desired_state.map(|d| &d.tracking_table),
414 &existing_state
415 .map(|e| Cow::Borrowed(&e.tracking_table))
416 .unwrap_or_default(),
417 source_names_needs_states_cleanup,
418 );
419
420 let mut target_resources = Vec::new();
421 let mut unknown_resources = Vec::new();
422
423 let grouped_target_resources = group_states(
424 desired_state
425 .iter()
426 .flat_map(|d| d.targets.iter().map(|(k, v)| (k.clone(), v.clone()))),
427 existing_state
428 .iter()
429 .flat_map(|e| e.targets.iter().map(|(k, v)| (k.clone(), v.clone()))),
430 )?;
431 for (resource_id, target_states_group) in grouped_target_resources.into_iter() {
432 let factory = match get_export_target_factory(&resource_id.target_kind) {
433 Some(factory) => factory,
434 None => {
435 unknown_resources.push(resource_id.clone());
436 continue;
437 }
438 };
439
440 let attachments_change = collect_attachments_setup_change(
441 &resource_id.key,
442 target_states_group.desired.as_ref(),
443 &target_states_group.existing,
444 flow_instance_ctx,
445 )
446 .await?;
447
448 let desired_state = target_states_group.desired.clone();
449 let has_tracked_state_change = target_states_group
450 .existing
451 .has_state_diff(desired_state.as_ref().map(|s| &s.state), |s| &s.state)
452 || attachments_change.has_tracked_state_change;
453 let existing_without_setup_by_user = CombinedState {
454 current: target_states_group
455 .existing
456 .current
457 .and_then(|s| s.state_unless_setup_by_user()),
458 staging: target_states_group
459 .existing
460 .staging
461 .into_iter()
462 .filter_map(|s| match s {
463 StateChange::Upsert(s) => {
464 s.state_unless_setup_by_user().map(StateChange::Upsert)
465 }
466 StateChange::Delete => Some(StateChange::Delete),
467 })
468 .collect(),
469 legacy_state_key: target_states_group.existing.legacy_state_key.clone(),
470 };
471 let target_state_to_setup = target_states_group
472 .desired
473 .and_then(|state| (!state.common.setup_by_user).then_some(state.state));
474 let never_setup_by_sys = target_state_to_setup.is_none()
475 && existing_without_setup_by_user.current.is_none()
476 && existing_without_setup_by_user.staging.is_empty();
477 let setup_change = if never_setup_by_sys {
478 None
479 } else {
480 Some(TargetSetupChange {
481 target_change: factory
482 .diff_setup_states(
483 &resource_id.key,
484 target_state_to_setup,
485 existing_without_setup_by_user,
486 flow_instance_ctx.clone(),
487 )
488 .await?,
489 attachments_change,
490 })
491 };
492
493 target_resources.push(ResourceSetupInfo {
494 key: resource_id.clone(),
495 state: desired_state,
496 has_tracked_state_change,
497 description: factory.describe_resource(&resource_id.key)?,
498 setup_change,
499 legacy_key: target_states_group
500 .existing
501 .legacy_state_key
502 .map(|legacy_state_key| ResourceIdentifier {
503 target_kind: resource_id.target_kind.clone(),
504 key: legacy_state_key,
505 }),
506 });
507 }
508 Ok(FlowSetupChange {
509 status: to_object_status(existing_state, desired_state),
510 seen_flow_metadata_version: existing_state.and_then(|s| s.seen_flow_metadata_version),
511 metadata_change,
512 tracking_table: tracking_table_change.map(|c| c.into_setup_info()),
513 target_resources,
514 unknown_resources,
515 })
516}
517
518struct ResourceSetupChangeItem<'a, K: 'a, C: ResourceSetupChange> {
519 key: &'a K,
520 setup_change: &'a C,
521}
522
523async fn maybe_update_resource_setup<
524 'a,
525 K: 'a,
526 S: 'a,
527 C: ResourceSetupChange,
528 ChangeApplierResultFut: Future<Output = Result<()>>,
529>(
530 resource_kind: &str,
531 write: &mut (dyn std::io::Write + Send),
532 resources: impl Iterator<Item = &'a ResourceSetupInfo<K, S, C>>,
533 apply_change: impl FnOnce(Vec<ResourceSetupChangeItem<'a, K, C>>) -> ChangeApplierResultFut,
534) -> Result<()> {
535 let mut changes = Vec::new();
536 for resource in resources {
537 if let Some(setup_change) = &resource.setup_change
538 && setup_change.change_type() != SetupChangeType::NoChange
539 {
540 changes.push(ResourceSetupChangeItem {
541 key: &resource.key,
542 setup_change,
543 });
544 writeln!(write, "{}:", resource.description)?;
545 for change in setup_change.describe_changes() {
546 match change {
547 setup::ChangeDescription::Action(action) => {
548 writeln!(write, " - {action}")?;
549 }
550 setup::ChangeDescription::Note(_) => {}
551 }
552 }
553 }
554 }
555 if !changes.is_empty() {
556 write!(write, "Pushing change for {resource_kind}...")?;
557 apply_change(changes).await?;
558 writeln!(write, "DONE")?;
559 }
560 Ok(())
561}
562
563#[instrument(name = "setup.apply_changes_for_flow", skip_all, fields(flow_name = %flow_ctx.flow_name()))]
564async fn apply_changes_for_flow(
565 write: &mut (dyn std::io::Write + Send),
566 flow_ctx: &FlowContext,
567 flow_setup_change: &FlowSetupChange,
568 existing_setup_state: &mut Option<setup::FlowSetupState<setup::ExistingMode>>,
569 pool: &PgPool,
570 ignore_target_drop_failures: bool,
571) -> Result<()> {
572 let Some(status) = flow_setup_change.status else {
573 return Ok(());
574 };
575 let verb = match status {
576 ObjectStatus::New => "Creating",
577 ObjectStatus::Deleted => "Deleting",
578 ObjectStatus::Existing => "Updating resources for ",
579 _ => internal_bail!("invalid flow status"),
580 };
581 write!(write, "\n{verb} flow {}:\n", flow_ctx.flow_name())?;
582 let is_deletion = status == ObjectStatus::Deleted;
584 let mut update_info =
585 HashMap::<db_metadata::ResourceTypeKey, db_metadata::StateUpdateInfo>::new();
586
587 if let Some(metadata_change) = &flow_setup_change.metadata_change {
588 update_info.insert(
589 db_metadata::ResourceTypeKey::new(
590 MetadataRecordType::FlowMetadata.to_string(),
591 serde_json::Value::Null,
592 ),
593 db_metadata::StateUpdateInfo::new(metadata_change.desired_state(), None)?,
594 );
595 }
596 if let Some(tracking_table) = &flow_setup_change.tracking_table
597 && tracking_table
598 .setup_change
599 .as_ref()
600 .map(|c| c.change_type() != SetupChangeType::NoChange)
601 .unwrap_or_default()
602 {
603 update_info.insert(
604 db_metadata::ResourceTypeKey::new(
605 MetadataRecordType::TrackingTable.to_string(),
606 serde_json::Value::Null,
607 ),
608 db_metadata::StateUpdateInfo::new(tracking_table.state.as_ref(), None)?,
609 );
610 }
611
612 for target_resource in &flow_setup_change.target_resources {
613 update_info.insert(
614 db_metadata::ResourceTypeKey::new(
615 MetadataRecordType::Target(target_resource.key.target_kind.clone()).to_string(),
616 target_resource.key.key.clone(),
617 ),
618 db_metadata::StateUpdateInfo::new(
619 target_resource.state.as_ref(),
620 target_resource.legacy_key.as_ref().map(|k| {
621 db_metadata::ResourceTypeKey::new(
622 MetadataRecordType::Target(k.target_kind.clone()).to_string(),
623 k.key.clone(),
624 )
625 }),
626 )?,
627 );
628 }
629
630 let new_version_id = db_metadata::stage_changes_for_flow(
631 flow_ctx.flow_name(),
632 flow_setup_change.seen_flow_metadata_version,
633 &update_info,
634 pool,
635 )
636 .await?;
637
638 if let Some(tracking_table) = &flow_setup_change.tracking_table {
639 maybe_update_resource_setup(
640 "tracking table",
641 write,
642 std::iter::once(tracking_table),
643 |setup_change| setup_change[0].setup_change.apply_change(),
644 )
645 .await?;
646 }
647
648 let mut setup_change_by_target_kind = IndexMap::<&str, Vec<_>>::new();
649 for target_resource in &flow_setup_change.target_resources {
650 setup_change_by_target_kind
651 .entry(target_resource.key.target_kind.as_str())
652 .or_default()
653 .push(target_resource);
654 }
655 for (target_kind, resources) in setup_change_by_target_kind.into_iter() {
656 maybe_update_resource_setup(
657 target_kind,
658 write,
659 resources.into_iter(),
660 |targets_change| async move {
661 let factory = get_export_target_factory(target_kind).ok_or_else(|| {
662 internal_error!("No factory found for target kind: {}", target_kind)
663 })?;
664 for target_change in targets_change.iter() {
665 for delete in target_change.setup_change.attachments_change.deletes.iter() {
666 delete.apply_change().await?;
667 }
668 }
669
670 let apply_result: Result<()> = (async {
673 factory
674 .apply_setup_changes(
675 targets_change
676 .iter()
677 .map(|s| interface::ResourceSetupChangeItem {
678 key: &s.key.key,
679 setup_change: s.setup_change.target_change.as_ref(),
680 })
681 .collect(),
682 flow_ctx.flow.flow_instance_ctx.clone(),
683 )
684 .await?;
685 for target_change in targets_change.iter() {
686 for delete in target_change.setup_change.attachments_change.upserts.iter() {
687 delete.apply_change().await?;
688 }
689 }
690 Ok(())
691 })
692 .await;
693
694 if let Err(e) = apply_result {
695 if is_deletion && ignore_target_drop_failures {
696 tracing::error!("Ignoring target drop failure for kind '{}' in flow '{}': {:#}",
697 target_kind, flow_ctx.flow_name(), e);
698 return Ok::<(), Error>(());
699 }
700 if is_deletion {
701 tracing::error!(
702 "{}\n\nHint: set COCOINDEX_IGNORE_TARGET_DROP_FAILURES=true to ignore target drop failures.",
703 e
704 );
705 }
706 return Err(e);
707 }
708
709 Ok::<(), Error>(())
710 },
711 )
712 .await?;
713 }
714
715 let is_deletion = status == ObjectStatus::Deleted;
716 db_metadata::commit_changes_for_flow(
717 flow_ctx.flow_name(),
718 new_version_id,
719 &update_info,
720 is_deletion,
721 pool,
722 )
723 .await?;
724 if is_deletion {
725 *existing_setup_state = None;
726 } else {
727 let (existing_metadata, existing_tracking_table, existing_targets) =
728 match std::mem::take(existing_setup_state) {
729 Some(s) => (Some(s.metadata), Some(s.tracking_table), s.targets),
730 None => Default::default(),
731 };
732 let metadata = CombinedState::from_change(
733 existing_metadata,
734 flow_setup_change
735 .metadata_change
736 .as_ref()
737 .map(|v| v.desired_state()),
738 );
739 let tracking_table = CombinedState::from_change(
740 existing_tracking_table,
741 flow_setup_change.tracking_table.as_ref().map(|c| {
742 c.setup_change
743 .as_ref()
744 .and_then(|c| c.desired_state.as_ref())
745 }),
746 );
747 let mut targets = existing_targets;
748 for target_resource in &flow_setup_change.target_resources {
749 match &target_resource.state {
750 Some(state) => {
751 targets.insert(
752 target_resource.key.clone(),
753 CombinedState::current(state.clone()),
754 );
755 }
756 None => {
757 targets.shift_remove(&target_resource.key);
758 }
759 }
760 }
761 *existing_setup_state = Some(setup::FlowSetupState {
762 metadata,
763 tracking_table,
764 seen_flow_metadata_version: Some(new_version_id),
765 targets,
766 });
767 }
768
769 writeln!(write, "Done for flow {}", flow_ctx.flow_name())?;
770 Ok(())
771}
772
773#[instrument(name = "setup.apply_global_changes", skip_all)]
774async fn apply_global_changes(
775 write: &mut (dyn std::io::Write + Send),
776 setup_change: &GlobalSetupChange,
777 all_setup_states: &mut AllSetupStates<ExistingMode>,
778) -> Result<()> {
779 maybe_update_resource_setup(
780 "metadata table",
781 write,
782 std::iter::once(&setup_change.metadata_table),
783 |setup_change| setup_change[0].setup_change.apply_change(),
784 )
785 .await?;
786
787 if setup_change
788 .metadata_table
789 .setup_change
790 .as_ref()
791 .is_some_and(|c| c.change_type() == SetupChangeType::Create)
792 {
793 all_setup_states.has_metadata_table = true;
794 }
795
796 Ok(())
797}
798
799#[derive(Debug, Clone, Copy, PartialEq, Eq)]
800pub enum FlowSetupChangeAction {
801 Setup,
802 Drop,
803}
804pub struct SetupChangeBundle {
805 pub action: FlowSetupChangeAction,
806 pub flow_names: Vec<String>,
807}
808
809impl SetupChangeBundle {
810 pub async fn describe(&self, lib_context: &LibContext) -> Result<(String, bool)> {
811 let mut text = String::new();
812 let mut is_up_to_date = true;
813
814 let setup_ctx = lib_context
815 .require_persistence_ctx()?
816 .setup_ctx
817 .read()
818 .await;
819 let setup_ctx = &*setup_ctx;
820
821 if self.action == FlowSetupChangeAction::Setup {
822 is_up_to_date = is_up_to_date && setup_ctx.global_setup_change.is_up_to_date();
823 write!(&mut text, "{}", setup_ctx.global_setup_change)?;
824 }
825
826 for flow_name in &self.flow_names {
827 let flow_ctx = {
828 let flows = lib_context.flows.lock().unwrap();
829 flows
830 .get(flow_name)
831 .ok_or_else(|| client_error!("Flow instance not found: {flow_name}"))?
832 .clone()
833 };
834 let flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().read().await;
835
836 let mut setup_change_buffer = None;
837 let setup_change = get_flow_setup_change(
838 setup_ctx,
839 &flow_ctx,
840 &flow_exec_ctx,
841 &self.action,
842 &mut setup_change_buffer,
843 )
844 .await?;
845
846 is_up_to_date = is_up_to_date && setup_change.is_up_to_date();
847 write!(
848 &mut text,
849 "{}",
850 setup::FormattedFlowSetupChange(flow_name, setup_change)
851 )?;
852 }
853 Ok((text, is_up_to_date))
854 }
855
856 pub async fn apply(
857 &self,
858 lib_context: &LibContext,
859 write: &mut (dyn std::io::Write + Send),
860 ) -> Result<()> {
861 let persistence_ctx = lib_context.require_persistence_ctx()?;
862 let mut setup_ctx = persistence_ctx.setup_ctx.write().await;
863 let setup_ctx = &mut *setup_ctx;
864
865 if self.action == FlowSetupChangeAction::Setup
866 && !setup_ctx.global_setup_change.is_up_to_date()
867 {
868 apply_global_changes(
869 write,
870 &setup_ctx.global_setup_change,
871 &mut setup_ctx.all_setup_states,
872 )
873 .await?;
874 setup_ctx.global_setup_change =
875 GlobalSetupChange::from_setup_states(&setup_ctx.all_setup_states);
876 }
877
878 for flow_name in &self.flow_names {
879 let flow_ctx = {
880 let flows = lib_context.flows.lock().unwrap();
881 flows
882 .get(flow_name)
883 .ok_or_else(|| client_error!("Flow instance not found: {flow_name}"))?
884 .clone()
885 };
886 let mut flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().write().await;
887 apply_changes_for_flow_ctx(
888 self.action,
889 &flow_ctx,
890 &mut flow_exec_ctx,
891 setup_ctx,
892 &persistence_ctx.builtin_db_pool,
893 write,
894 )
895 .await?;
896 }
897 Ok(())
898 }
899}
900
901async fn get_flow_setup_change<'a>(
902 setup_ctx: &LibSetupContext,
903 flow_ctx: &'a FlowContext,
904 flow_exec_ctx: &'a FlowExecutionContext,
905 action: &FlowSetupChangeAction,
906 buffer: &'a mut Option<FlowSetupChange>,
907) -> Result<&'a FlowSetupChange> {
908 let result = match action {
909 FlowSetupChangeAction::Setup => &flow_exec_ctx.setup_change,
910 FlowSetupChangeAction::Drop => {
911 let existing_state = setup_ctx.all_setup_states.flows.get(flow_ctx.flow_name());
912 buffer.insert(
913 diff_flow_setup_states(None, existing_state, &flow_ctx.flow.flow_instance_ctx)
914 .await?,
915 )
916 }
917 };
918 Ok(result)
919}
920
921#[instrument(name = "setup.apply_changes_for_flow_ctx", skip_all, fields(flow_name = %flow_ctx.flow_name()))]
922pub(crate) async fn apply_changes_for_flow_ctx(
923 action: FlowSetupChangeAction,
924 flow_ctx: &FlowContext,
925 flow_exec_ctx: &mut FlowExecutionContext,
926 setup_ctx: &mut LibSetupContext,
927 db_pool: &PgPool,
928 write: &mut (dyn std::io::Write + Send),
929) -> Result<()> {
930 let mut setup_change_buffer = None;
931 let setup_change = get_flow_setup_change(
932 setup_ctx,
933 flow_ctx,
934 flow_exec_ctx,
935 &action,
936 &mut setup_change_buffer,
937 )
938 .await?;
939 if setup_change.is_up_to_date() {
940 return Ok(());
941 }
942
943 let mut flow_states = setup_ctx
944 .all_setup_states
945 .flows
946 .remove(flow_ctx.flow_name());
947 let lib_ctx = crate::lib_context::get_lib_context().await?;
949 let ignore_target_drop_failures = lib_ctx.ignore_target_drop_failures;
950
951 apply_changes_for_flow(
952 write,
953 flow_ctx,
954 setup_change,
955 &mut flow_states,
956 db_pool,
957 ignore_target_drop_failures,
958 )
959 .await?;
960
961 flow_exec_ctx
962 .update_setup_state(&flow_ctx.flow, flow_states.as_ref())
963 .await?;
964 if let Some(flow_states) = flow_states {
965 setup_ctx
966 .all_setup_states
967 .flows
968 .insert(flow_ctx.flow_name().to_string(), flow_states);
969 }
970 Ok(())
971}