Skip to main content

recoco_core/setup/
driver.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13use crate::{
14    lib_context::{FlowContext, FlowExecutionContext, LibSetupContext},
15    ops::{
16        get_attachment_factory, get_optional_target_factory,
17        interface::{AttachmentSetupKey, FlowInstanceContext, TargetFactory},
18    },
19    prelude::*,
20    setup::{AttachmentsSetupChange, TargetSetupChange},
21};
22
23use sqlx::PgPool;
24use std::{
25    fmt::{Debug, Display},
26    str::FromStr,
27};
28
29use super::{AllSetupStates, GlobalSetupChange};
30use super::{
31    CombinedState, DesiredMode, ExistingMode, FlowSetupChange, FlowSetupState, ObjectSetupChange,
32    ObjectStatus, ResourceIdentifier, ResourceSetupChange, ResourceSetupInfo, SetupChangeType,
33    StateChange, TargetSetupState, db_metadata,
34};
35use crate::execution::db_tracking_setup;
36use std::fmt::Write;
37
38enum MetadataRecordType {
39    FlowVersion,
40    FlowMetadata,
41    TrackingTable,
42    Target(String),
43}
44
45impl Display for MetadataRecordType {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        match self {
48            MetadataRecordType::FlowVersion => f.write_str(db_metadata::FLOW_VERSION_RESOURCE_TYPE),
49            MetadataRecordType::FlowMetadata => write!(f, "FlowMetadata"),
50            MetadataRecordType::TrackingTable => write!(f, "TrackingTable"),
51            MetadataRecordType::Target(target_id) => write!(f, "Target:{target_id}"),
52        }
53    }
54}
55
56impl std::str::FromStr for MetadataRecordType {
57    type Err = Error;
58
59    fn from_str(s: &str) -> Result<Self> {
60        if s == db_metadata::FLOW_VERSION_RESOURCE_TYPE {
61            Ok(Self::FlowVersion)
62        } else if s == "FlowMetadata" {
63            Ok(Self::FlowMetadata)
64        } else if s == "TrackingTable" {
65            Ok(Self::TrackingTable)
66        } else if let Some(target_id) = s.strip_prefix("Target:") {
67            Ok(Self::Target(target_id.to_string()))
68        } else {
69            internal_bail!("Invalid MetadataRecordType string: {}", s)
70        }
71    }
72}
73
74fn from_metadata_record<S: DeserializeOwned + Debug + Clone>(
75    state: Option<serde_json::Value>,
76    staging_changes: sqlx::types::Json<Vec<StateChange<serde_json::Value>>>,
77    legacy_state_key: Option<serde_json::Value>,
78) -> Result<CombinedState<S>> {
79    let current: Option<S> = state.map(utils::deser::from_json_value).transpose()?;
80    let staging: Vec<StateChange<S>> = (staging_changes.0.into_iter())
81        .map(|sc| -> Result<_> {
82            Ok(match sc {
83                StateChange::Upsert(v) => StateChange::Upsert(utils::deser::from_json_value(v)?),
84                StateChange::Delete => StateChange::Delete,
85            })
86        })
87        .collect::<Result<_>>()?;
88    Ok(CombinedState {
89        current,
90        staging,
91        legacy_state_key,
92    })
93}
94
95fn get_export_target_factory(target_type: &str) -> Option<Arc<dyn TargetFactory + Send + Sync>> {
96    get_optional_target_factory(target_type)
97}
98
99pub async fn get_existing_setup_state(pool: &PgPool) -> Result<AllSetupStates<ExistingMode>> {
100    let setup_metadata_records = db_metadata::read_setup_metadata(pool).await?;
101
102    let setup_metadata_records = if let Some(records) = setup_metadata_records {
103        records
104    } else {
105        return Ok(AllSetupStates::default());
106    };
107
108    // Group setup metadata records by flow name
109    let setup_metadata_records = setup_metadata_records.into_iter().fold(
110        BTreeMap::<String, Vec<_>>::new(),
111        |mut acc, record| {
112            acc.entry(record.flow_name.clone())
113                .or_default()
114                .push(record);
115            acc
116        },
117    );
118
119    let flows = setup_metadata_records
120        .into_iter()
121        .map(|(flow_name, metadata_records)| -> Result<_> {
122            let mut flow_ss = FlowSetupState::default();
123            for metadata_record in metadata_records {
124                let state = metadata_record.state;
125                let staging_changes = metadata_record.staging_changes;
126                match MetadataRecordType::from_str(&metadata_record.resource_type)? {
127                    MetadataRecordType::FlowVersion => {
128                        flow_ss.seen_flow_metadata_version =
129                            db_metadata::parse_flow_version(&state);
130                    }
131                    MetadataRecordType::FlowMetadata => {
132                        flow_ss.metadata = from_metadata_record(state, staging_changes, None)?;
133                    }
134                    MetadataRecordType::TrackingTable => {
135                        flow_ss.tracking_table =
136                            from_metadata_record(state, staging_changes, None)?;
137                    }
138                    MetadataRecordType::Target(target_type) => {
139                        let normalized_key = {
140                            if let Some(factory) = get_export_target_factory(&target_type) {
141                                factory.normalize_setup_key(&metadata_record.key)?
142                            } else {
143                                metadata_record.key.clone()
144                            }
145                        };
146                        let combined_state = from_metadata_record(
147                            state,
148                            staging_changes,
149                            (normalized_key != metadata_record.key).then_some(metadata_record.key),
150                        )?;
151                        flow_ss.targets.insert(
152                            super::ResourceIdentifier {
153                                key: normalized_key,
154                                target_kind: target_type,
155                            },
156                            combined_state,
157                        );
158                    }
159                }
160            }
161            Ok((flow_name, flow_ss))
162        })
163        .collect::<Result<_>>()?;
164
165    Ok(AllSetupStates {
166        has_metadata_table: true,
167        flows,
168    })
169}
170
171fn diff_state<E, D, T>(
172    existing_state: Option<&E>,
173    desired_state: Option<&D>,
174    diff: impl Fn(Option<&E>, &D) -> Option<StateChange<T>>,
175) -> Option<StateChange<T>>
176where
177    E: PartialEq<D>,
178{
179    match (existing_state, desired_state) {
180        (None, None) => None,
181        (Some(_), None) => Some(StateChange::Delete),
182        (existing_state, Some(desired_state)) => {
183            if existing_state.map(|e| e == desired_state).unwrap_or(false) {
184                None
185            } else {
186                diff(existing_state, desired_state)
187            }
188        }
189    }
190}
191
192fn to_object_status<A, B>(existing: Option<A>, desired: Option<B>) -> Option<ObjectStatus> {
193    Some(match (&existing, &desired) {
194        (Some(_), None) => ObjectStatus::Deleted,
195        (None, Some(_)) => ObjectStatus::New,
196        (Some(_), Some(_)) => ObjectStatus::Existing,
197        (None, None) => return None,
198    })
199}
200
201#[derive(Debug)]
202struct GroupedResourceStates<S: Debug + Clone> {
203    desired: Option<S>,
204    existing: CombinedState<S>,
205}
206
207impl<S: Debug + Clone> Default for GroupedResourceStates<S> {
208    fn default() -> Self {
209        Self {
210            desired: None,
211            existing: CombinedState::default(),
212        }
213    }
214}
215
216fn group_states<K: Hash + Eq + std::fmt::Display + std::fmt::Debug + Clone, S: Debug + Clone>(
217    desired: impl Iterator<Item = (K, S)>,
218    existing: impl Iterator<Item = (K, CombinedState<S>)>,
219) -> Result<IndexMap<K, GroupedResourceStates<S>>> {
220    let mut grouped: IndexMap<K, GroupedResourceStates<S>> = desired
221        .into_iter()
222        .map(|(key, state)| {
223            (
224                key,
225                GroupedResourceStates {
226                    desired: Some(state.clone()),
227                    existing: CombinedState::default(),
228                },
229            )
230        })
231        .collect();
232    for (key, state) in existing {
233        let entry = grouped.entry(key.clone());
234        if state.current.is_some()
235            && let indexmap::map::Entry::Occupied(entry) = &entry
236            && entry.get().existing.current.is_some()
237        {
238            internal_bail!("Duplicate existing state for key: {}", entry.key());
239        }
240        let entry = entry.or_default();
241        if let Some(current) = &state.current {
242            entry.existing.current = Some(current.clone());
243        }
244        if let Some(legacy_state_key) = &state.legacy_state_key {
245            if entry
246                .existing
247                .legacy_state_key
248                .as_ref()
249                .is_some_and(|v| v != legacy_state_key)
250            {
251                warn!(
252                    "inconsistent legacy key: {key}, {:?}",
253                    entry.existing.legacy_state_key
254                );
255            }
256            entry.existing.legacy_state_key = Some(legacy_state_key.clone());
257        }
258        for s in state.staging.iter() {
259            match s {
260                StateChange::Upsert(v) => {
261                    entry.existing.staging.push(StateChange::Upsert(v.clone()))
262                }
263                StateChange::Delete => entry.existing.staging.push(StateChange::Delete),
264            }
265        }
266    }
267    Ok(grouped)
268}
269
270async fn collect_attachments_setup_change(
271    target_key: &serde_json::Value,
272    desired: Option<&TargetSetupState>,
273    existing: &CombinedState<TargetSetupState>,
274    context: &interface::FlowInstanceContext,
275) -> Result<AttachmentsSetupChange> {
276    let existing_current_attachments = existing
277        .current
278        .iter()
279        .flat_map(|s| s.attachments.iter())
280        .map(|(key, state)| (key.clone(), CombinedState::current(state.clone())));
281    let existing_staging_attachments = existing.staging.iter().flat_map(|s| {
282        match s {
283            StateChange::Upsert(s) => Some(s.attachments.iter().map(|(key, state)| {
284                (
285                    key.clone(),
286                    CombinedState::staging(StateChange::Upsert(state.clone())),
287                )
288            })),
289            StateChange::Delete => None,
290        }
291        .into_iter()
292        .flatten()
293    });
294    let mut grouped_attachment_states = group_states(
295        desired.iter().flat_map(|s| {
296            s.attachments
297                .iter()
298                .map(|(key, state)| (key.clone(), state.clone()))
299        }),
300        (existing_current_attachments.into_iter())
301            .chain(existing_staging_attachments)
302            .rev(),
303    )?;
304    if existing
305        .staging
306        .iter()
307        .any(|s| matches!(s, StateChange::Delete))
308    {
309        for state in grouped_attachment_states.values_mut() {
310            if state
311                .existing
312                .staging
313                .iter()
314                .all(|s| matches!(s, StateChange::Delete))
315            {
316                state.existing.staging.push(StateChange::Delete);
317            }
318        }
319    }
320
321    let mut attachments_change = AttachmentsSetupChange::default();
322    for (AttachmentSetupKey(kind, key), setup_state) in grouped_attachment_states.into_iter() {
323        let has_diff = setup_state
324            .existing
325            .has_state_diff(setup_state.desired.as_ref(), |s| s);
326        if !has_diff {
327            continue;
328        }
329        attachments_change.has_tracked_state_change = true;
330        let factory = get_attachment_factory(&kind)?;
331        let is_upsertion = setup_state.desired.is_some();
332        if let Some(action) = factory
333            .diff_setup_states(
334                target_key,
335                &key,
336                setup_state.desired,
337                setup_state.existing,
338                context,
339            )
340            .await?
341        {
342            if is_upsertion {
343                attachments_change.upserts.push(action);
344            } else {
345                attachments_change.deletes.push(action);
346            }
347        }
348    }
349    Ok(attachments_change)
350}
351
352pub async fn diff_flow_setup_states(
353    desired_state: Option<&FlowSetupState<DesiredMode>>,
354    existing_state: Option<&FlowSetupState<ExistingMode>>,
355    flow_instance_ctx: &Arc<FlowInstanceContext>,
356) -> Result<FlowSetupChange> {
357    let metadata_change = diff_state(
358        existing_state.map(|e| &e.metadata),
359        desired_state.map(|d| &d.metadata),
360        |_, desired_state| Some(StateChange::Upsert(desired_state.clone())),
361    );
362
363    // If the source kind has changed, we need to clean the source states.
364    let source_names_needs_states_cleanup: BTreeMap<i32, BTreeSet<String>> =
365        if let Some(desired_state) = desired_state
366            && let Some(existing_state) = existing_state
367        {
368            let new_source_id_to_kind = desired_state
369                .metadata
370                .sources
371                .values()
372                .map(|v| (v.source_id, &v.source_kind))
373                .collect::<HashMap<i32, &String>>();
374
375            let mut existing_source_id_to_name_kind =
376                BTreeMap::<i32, Vec<(&String, &String)>>::new();
377            for (name, setup_state) in existing_state
378                .metadata
379                .possible_versions()
380                .flat_map(|v| v.sources.iter())
381            {
382                // For backward compatibility, we only process source states for non-empty source kinds.
383                if !setup_state.source_kind.is_empty() {
384                    existing_source_id_to_name_kind
385                        .entry(setup_state.source_id)
386                        .or_default()
387                        .push((name, &setup_state.source_kind));
388                }
389            }
390
391            (existing_source_id_to_name_kind.into_iter())
392                .map(|(id, name_kinds)| {
393                    let new_kind = new_source_id_to_kind.get(&id).copied();
394                    let source_names_for_legacy_states = name_kinds
395                        .into_iter()
396                        .filter_map(|(name, kind)| {
397                            if Some(kind) != new_kind {
398                                Some(name.clone())
399                            } else {
400                                None
401                            }
402                        })
403                        .collect::<BTreeSet<_>>();
404                    (id, source_names_for_legacy_states)
405                })
406                .filter(|(_, v)| !v.is_empty())
407                .collect::<BTreeMap<_, _>>()
408        } else {
409            BTreeMap::new()
410        };
411
412    let tracking_table_change = db_tracking_setup::TrackingTableSetupChange::new(
413        desired_state.map(|d| &d.tracking_table),
414        &existing_state
415            .map(|e| Cow::Borrowed(&e.tracking_table))
416            .unwrap_or_default(),
417        source_names_needs_states_cleanup,
418    );
419
420    let mut target_resources = Vec::new();
421    let mut unknown_resources = Vec::new();
422
423    let grouped_target_resources = group_states(
424        desired_state
425            .iter()
426            .flat_map(|d| d.targets.iter().map(|(k, v)| (k.clone(), v.clone()))),
427        existing_state
428            .iter()
429            .flat_map(|e| e.targets.iter().map(|(k, v)| (k.clone(), v.clone()))),
430    )?;
431    for (resource_id, target_states_group) in grouped_target_resources.into_iter() {
432        let factory = match get_export_target_factory(&resource_id.target_kind) {
433            Some(factory) => factory,
434            None => {
435                unknown_resources.push(resource_id.clone());
436                continue;
437            }
438        };
439
440        let attachments_change = collect_attachments_setup_change(
441            &resource_id.key,
442            target_states_group.desired.as_ref(),
443            &target_states_group.existing,
444            flow_instance_ctx,
445        )
446        .await?;
447
448        let desired_state = target_states_group.desired.clone();
449        let has_tracked_state_change = target_states_group
450            .existing
451            .has_state_diff(desired_state.as_ref().map(|s| &s.state), |s| &s.state)
452            || attachments_change.has_tracked_state_change;
453        let existing_without_setup_by_user = CombinedState {
454            current: target_states_group
455                .existing
456                .current
457                .and_then(|s| s.state_unless_setup_by_user()),
458            staging: target_states_group
459                .existing
460                .staging
461                .into_iter()
462                .filter_map(|s| match s {
463                    StateChange::Upsert(s) => {
464                        s.state_unless_setup_by_user().map(StateChange::Upsert)
465                    }
466                    StateChange::Delete => Some(StateChange::Delete),
467                })
468                .collect(),
469            legacy_state_key: target_states_group.existing.legacy_state_key.clone(),
470        };
471        let target_state_to_setup = target_states_group
472            .desired
473            .and_then(|state| (!state.common.setup_by_user).then_some(state.state));
474        let never_setup_by_sys = target_state_to_setup.is_none()
475            && existing_without_setup_by_user.current.is_none()
476            && existing_without_setup_by_user.staging.is_empty();
477        let setup_change = if never_setup_by_sys {
478            None
479        } else {
480            Some(TargetSetupChange {
481                target_change: factory
482                    .diff_setup_states(
483                        &resource_id.key,
484                        target_state_to_setup,
485                        existing_without_setup_by_user,
486                        flow_instance_ctx.clone(),
487                    )
488                    .await?,
489                attachments_change,
490            })
491        };
492
493        target_resources.push(ResourceSetupInfo {
494            key: resource_id.clone(),
495            state: desired_state,
496            has_tracked_state_change,
497            description: factory.describe_resource(&resource_id.key)?,
498            setup_change,
499            legacy_key: target_states_group
500                .existing
501                .legacy_state_key
502                .map(|legacy_state_key| ResourceIdentifier {
503                    target_kind: resource_id.target_kind.clone(),
504                    key: legacy_state_key,
505                }),
506        });
507    }
508    Ok(FlowSetupChange {
509        status: to_object_status(existing_state, desired_state),
510        seen_flow_metadata_version: existing_state.and_then(|s| s.seen_flow_metadata_version),
511        metadata_change,
512        tracking_table: tracking_table_change.map(|c| c.into_setup_info()),
513        target_resources,
514        unknown_resources,
515    })
516}
517
518struct ResourceSetupChangeItem<'a, K: 'a, C: ResourceSetupChange> {
519    key: &'a K,
520    setup_change: &'a C,
521}
522
523async fn maybe_update_resource_setup<
524    'a,
525    K: 'a,
526    S: 'a,
527    C: ResourceSetupChange,
528    ChangeApplierResultFut: Future<Output = Result<()>>,
529>(
530    resource_kind: &str,
531    write: &mut (dyn std::io::Write + Send),
532    resources: impl Iterator<Item = &'a ResourceSetupInfo<K, S, C>>,
533    apply_change: impl FnOnce(Vec<ResourceSetupChangeItem<'a, K, C>>) -> ChangeApplierResultFut,
534) -> Result<()> {
535    let mut changes = Vec::new();
536    for resource in resources {
537        if let Some(setup_change) = &resource.setup_change
538            && setup_change.change_type() != SetupChangeType::NoChange
539        {
540            changes.push(ResourceSetupChangeItem {
541                key: &resource.key,
542                setup_change,
543            });
544            writeln!(write, "{}:", resource.description)?;
545            for change in setup_change.describe_changes() {
546                match change {
547                    setup::ChangeDescription::Action(action) => {
548                        writeln!(write, "  - {action}")?;
549                    }
550                    setup::ChangeDescription::Note(_) => {}
551                }
552            }
553        }
554    }
555    if !changes.is_empty() {
556        write!(write, "Pushing change for {resource_kind}...")?;
557        apply_change(changes).await?;
558        writeln!(write, "DONE")?;
559    }
560    Ok(())
561}
562
563#[instrument(name = "setup.apply_changes_for_flow", skip_all, fields(flow_name = %flow_ctx.flow_name()))]
564async fn apply_changes_for_flow(
565    write: &mut (dyn std::io::Write + Send),
566    flow_ctx: &FlowContext,
567    flow_setup_change: &FlowSetupChange,
568    existing_setup_state: &mut Option<setup::FlowSetupState<setup::ExistingMode>>,
569    pool: &PgPool,
570    ignore_target_drop_failures: bool,
571) -> Result<()> {
572    let Some(status) = flow_setup_change.status else {
573        return Ok(());
574    };
575    let verb = match status {
576        ObjectStatus::New => "Creating",
577        ObjectStatus::Deleted => "Deleting",
578        ObjectStatus::Existing => "Updating resources for ",
579        _ => internal_bail!("invalid flow status"),
580    };
581    write!(write, "\n{verb} flow {}:\n", flow_ctx.flow_name())?;
582    // Precompute whether this operation is a deletion so closures can reference it.
583    let is_deletion = status == ObjectStatus::Deleted;
584    let mut update_info =
585        HashMap::<db_metadata::ResourceTypeKey, db_metadata::StateUpdateInfo>::new();
586
587    if let Some(metadata_change) = &flow_setup_change.metadata_change {
588        update_info.insert(
589            db_metadata::ResourceTypeKey::new(
590                MetadataRecordType::FlowMetadata.to_string(),
591                serde_json::Value::Null,
592            ),
593            db_metadata::StateUpdateInfo::new(metadata_change.desired_state(), None)?,
594        );
595    }
596    if let Some(tracking_table) = &flow_setup_change.tracking_table
597        && tracking_table
598            .setup_change
599            .as_ref()
600            .map(|c| c.change_type() != SetupChangeType::NoChange)
601            .unwrap_or_default()
602    {
603        update_info.insert(
604            db_metadata::ResourceTypeKey::new(
605                MetadataRecordType::TrackingTable.to_string(),
606                serde_json::Value::Null,
607            ),
608            db_metadata::StateUpdateInfo::new(tracking_table.state.as_ref(), None)?,
609        );
610    }
611
612    for target_resource in &flow_setup_change.target_resources {
613        update_info.insert(
614            db_metadata::ResourceTypeKey::new(
615                MetadataRecordType::Target(target_resource.key.target_kind.clone()).to_string(),
616                target_resource.key.key.clone(),
617            ),
618            db_metadata::StateUpdateInfo::new(
619                target_resource.state.as_ref(),
620                target_resource.legacy_key.as_ref().map(|k| {
621                    db_metadata::ResourceTypeKey::new(
622                        MetadataRecordType::Target(k.target_kind.clone()).to_string(),
623                        k.key.clone(),
624                    )
625                }),
626            )?,
627        );
628    }
629
630    let new_version_id = db_metadata::stage_changes_for_flow(
631        flow_ctx.flow_name(),
632        flow_setup_change.seen_flow_metadata_version,
633        &update_info,
634        pool,
635    )
636    .await?;
637
638    if let Some(tracking_table) = &flow_setup_change.tracking_table {
639        maybe_update_resource_setup(
640            "tracking table",
641            write,
642            std::iter::once(tracking_table),
643            |setup_change| setup_change[0].setup_change.apply_change(),
644        )
645        .await?;
646    }
647
648    let mut setup_change_by_target_kind = IndexMap::<&str, Vec<_>>::new();
649    for target_resource in &flow_setup_change.target_resources {
650        setup_change_by_target_kind
651            .entry(target_resource.key.target_kind.as_str())
652            .or_default()
653            .push(target_resource);
654    }
655    for (target_kind, resources) in setup_change_by_target_kind.into_iter() {
656        maybe_update_resource_setup(
657            target_kind,
658            write,
659            resources.into_iter(),
660            |targets_change| async move {
661                let factory = get_export_target_factory(target_kind).ok_or_else(|| {
662                    internal_error!("No factory found for target kind: {}", target_kind)
663                })?;
664                for target_change in targets_change.iter() {
665                    for delete in target_change.setup_change.attachments_change.deletes.iter() {
666                        delete.apply_change().await?;
667                        }
668                    }
669
670                    // Attempt to apply setup changes and handle failures according to the
671                    // `ignore_target_drop_failures` flag when we're deleting a flow.
672                    let apply_result: Result<()> = (async {
673                       factory
674                    .apply_setup_changes(
675                        targets_change
676                            .iter()
677                            .map(|s| interface::ResourceSetupChangeItem {
678                                key: &s.key.key,
679                                setup_change: s.setup_change.target_change.as_ref(),
680                            })
681                            .collect(),
682                        flow_ctx.flow.flow_instance_ctx.clone(),
683                    )
684                    .await?;
685                for target_change in targets_change.iter() {
686                    for delete in target_change.setup_change.attachments_change.upserts.iter() {
687                        delete.apply_change().await?;
688                            }
689                        }
690                        Ok(())
691                    })
692                    .await;
693
694                    if let Err(e) = apply_result {
695                        if is_deletion && ignore_target_drop_failures {
696                            tracing::error!("Ignoring target drop failure for kind '{}' in flow '{}': {:#}",
697                                target_kind, flow_ctx.flow_name(), e);
698                            return Ok::<(), Error>(());
699                        }
700                        if is_deletion {
701                            tracing::error!(
702                                "{}\n\nHint: set COCOINDEX_IGNORE_TARGET_DROP_FAILURES=true to ignore target drop failures.",
703                                e
704                            );
705                        }
706                        return Err(e);
707                    }
708
709                   Ok::<(), Error>(())
710                },
711        )
712        .await?;
713    }
714
715    let is_deletion = status == ObjectStatus::Deleted;
716    db_metadata::commit_changes_for_flow(
717        flow_ctx.flow_name(),
718        new_version_id,
719        &update_info,
720        is_deletion,
721        pool,
722    )
723    .await?;
724    if is_deletion {
725        *existing_setup_state = None;
726    } else {
727        let (existing_metadata, existing_tracking_table, existing_targets) =
728            match std::mem::take(existing_setup_state) {
729                Some(s) => (Some(s.metadata), Some(s.tracking_table), s.targets),
730                None => Default::default(),
731            };
732        let metadata = CombinedState::from_change(
733            existing_metadata,
734            flow_setup_change
735                .metadata_change
736                .as_ref()
737                .map(|v| v.desired_state()),
738        );
739        let tracking_table = CombinedState::from_change(
740            existing_tracking_table,
741            flow_setup_change.tracking_table.as_ref().map(|c| {
742                c.setup_change
743                    .as_ref()
744                    .and_then(|c| c.desired_state.as_ref())
745            }),
746        );
747        let mut targets = existing_targets;
748        for target_resource in &flow_setup_change.target_resources {
749            match &target_resource.state {
750                Some(state) => {
751                    targets.insert(
752                        target_resource.key.clone(),
753                        CombinedState::current(state.clone()),
754                    );
755                }
756                None => {
757                    targets.shift_remove(&target_resource.key);
758                }
759            }
760        }
761        *existing_setup_state = Some(setup::FlowSetupState {
762            metadata,
763            tracking_table,
764            seen_flow_metadata_version: Some(new_version_id),
765            targets,
766        });
767    }
768
769    writeln!(write, "Done for flow {}", flow_ctx.flow_name())?;
770    Ok(())
771}
772
773#[instrument(name = "setup.apply_global_changes", skip_all)]
774async fn apply_global_changes(
775    write: &mut (dyn std::io::Write + Send),
776    setup_change: &GlobalSetupChange,
777    all_setup_states: &mut AllSetupStates<ExistingMode>,
778) -> Result<()> {
779    maybe_update_resource_setup(
780        "metadata table",
781        write,
782        std::iter::once(&setup_change.metadata_table),
783        |setup_change| setup_change[0].setup_change.apply_change(),
784    )
785    .await?;
786
787    if setup_change
788        .metadata_table
789        .setup_change
790        .as_ref()
791        .is_some_and(|c| c.change_type() == SetupChangeType::Create)
792    {
793        all_setup_states.has_metadata_table = true;
794    }
795
796    Ok(())
797}
798
799#[derive(Debug, Clone, Copy, PartialEq, Eq)]
800pub enum FlowSetupChangeAction {
801    Setup,
802    Drop,
803}
804pub struct SetupChangeBundle {
805    pub action: FlowSetupChangeAction,
806    pub flow_names: Vec<String>,
807}
808
809impl SetupChangeBundle {
810    pub async fn describe(&self, lib_context: &LibContext) -> Result<(String, bool)> {
811        let mut text = String::new();
812        let mut is_up_to_date = true;
813
814        let setup_ctx = lib_context
815            .require_persistence_ctx()?
816            .setup_ctx
817            .read()
818            .await;
819        let setup_ctx = &*setup_ctx;
820
821        if self.action == FlowSetupChangeAction::Setup {
822            is_up_to_date = is_up_to_date && setup_ctx.global_setup_change.is_up_to_date();
823            write!(&mut text, "{}", setup_ctx.global_setup_change)?;
824        }
825
826        for flow_name in &self.flow_names {
827            let flow_ctx = {
828                let flows = lib_context.flows.lock().unwrap();
829                flows
830                    .get(flow_name)
831                    .ok_or_else(|| client_error!("Flow instance not found: {flow_name}"))?
832                    .clone()
833            };
834            let flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().read().await;
835
836            let mut setup_change_buffer = None;
837            let setup_change = get_flow_setup_change(
838                setup_ctx,
839                &flow_ctx,
840                &flow_exec_ctx,
841                &self.action,
842                &mut setup_change_buffer,
843            )
844            .await?;
845
846            is_up_to_date = is_up_to_date && setup_change.is_up_to_date();
847            write!(
848                &mut text,
849                "{}",
850                setup::FormattedFlowSetupChange(flow_name, setup_change)
851            )?;
852        }
853        Ok((text, is_up_to_date))
854    }
855
856    pub async fn apply(
857        &self,
858        lib_context: &LibContext,
859        write: &mut (dyn std::io::Write + Send),
860    ) -> Result<()> {
861        let persistence_ctx = lib_context.require_persistence_ctx()?;
862        let mut setup_ctx = persistence_ctx.setup_ctx.write().await;
863        let setup_ctx = &mut *setup_ctx;
864
865        if self.action == FlowSetupChangeAction::Setup
866            && !setup_ctx.global_setup_change.is_up_to_date()
867        {
868            apply_global_changes(
869                write,
870                &setup_ctx.global_setup_change,
871                &mut setup_ctx.all_setup_states,
872            )
873            .await?;
874            setup_ctx.global_setup_change =
875                GlobalSetupChange::from_setup_states(&setup_ctx.all_setup_states);
876        }
877
878        for flow_name in &self.flow_names {
879            let flow_ctx = {
880                let flows = lib_context.flows.lock().unwrap();
881                flows
882                    .get(flow_name)
883                    .ok_or_else(|| client_error!("Flow instance not found: {flow_name}"))?
884                    .clone()
885            };
886            let mut flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().write().await;
887            apply_changes_for_flow_ctx(
888                self.action,
889                &flow_ctx,
890                &mut flow_exec_ctx,
891                setup_ctx,
892                &persistence_ctx.builtin_db_pool,
893                write,
894            )
895            .await?;
896        }
897        Ok(())
898    }
899}
900
901async fn get_flow_setup_change<'a>(
902    setup_ctx: &LibSetupContext,
903    flow_ctx: &'a FlowContext,
904    flow_exec_ctx: &'a FlowExecutionContext,
905    action: &FlowSetupChangeAction,
906    buffer: &'a mut Option<FlowSetupChange>,
907) -> Result<&'a FlowSetupChange> {
908    let result = match action {
909        FlowSetupChangeAction::Setup => &flow_exec_ctx.setup_change,
910        FlowSetupChangeAction::Drop => {
911            let existing_state = setup_ctx.all_setup_states.flows.get(flow_ctx.flow_name());
912            buffer.insert(
913                diff_flow_setup_states(None, existing_state, &flow_ctx.flow.flow_instance_ctx)
914                    .await?,
915            )
916        }
917    };
918    Ok(result)
919}
920
921#[instrument(name = "setup.apply_changes_for_flow_ctx", skip_all, fields(flow_name = %flow_ctx.flow_name()))]
922pub(crate) async fn apply_changes_for_flow_ctx(
923    action: FlowSetupChangeAction,
924    flow_ctx: &FlowContext,
925    flow_exec_ctx: &mut FlowExecutionContext,
926    setup_ctx: &mut LibSetupContext,
927    db_pool: &PgPool,
928    write: &mut (dyn std::io::Write + Send),
929) -> Result<()> {
930    let mut setup_change_buffer = None;
931    let setup_change = get_flow_setup_change(
932        setup_ctx,
933        flow_ctx,
934        flow_exec_ctx,
935        &action,
936        &mut setup_change_buffer,
937    )
938    .await?;
939    if setup_change.is_up_to_date() {
940        return Ok(());
941    }
942
943    let mut flow_states = setup_ctx
944        .all_setup_states
945        .flows
946        .remove(flow_ctx.flow_name());
947    // Read runtime-wide setting to decide whether to ignore failures during target drops.
948    let lib_ctx = crate::lib_context::get_lib_context().await?;
949    let ignore_target_drop_failures = lib_ctx.ignore_target_drop_failures;
950
951    apply_changes_for_flow(
952        write,
953        flow_ctx,
954        setup_change,
955        &mut flow_states,
956        db_pool,
957        ignore_target_drop_failures,
958    )
959    .await?;
960
961    flow_exec_ctx
962        .update_setup_state(&flow_ctx.flow, flow_states.as_ref())
963        .await?;
964    if let Some(flow_states) = flow_states {
965        setup_ctx
966            .all_setup_states
967            .flows
968            .insert(flow_ctx.flow_name().to_string(), flow_states);
969    }
970    Ok(())
971}