Skip to main content

recoco_core/builder/
exec_ctx.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13use crate::prelude::*;
14
15#[cfg(feature = "persistence")]
16use crate::execution::db_tracking_setup;
17#[cfg(feature = "persistence")]
18use crate::ops::get_target_factory;
19#[cfg(feature = "persistence")]
20use crate::ops::interface::SetupStateCompatibility;
21
22pub struct ImportOpExecutionContext {
23    pub source_id: i32,
24}
25
26pub struct ExportOpExecutionContext {
27    pub target_id: i32,
28    pub schema_version_id: usize,
29}
30
31#[cfg(feature = "persistence")]
32pub struct FlowSetupExecutionContext {
33    pub setup_state: setup::FlowSetupState<setup::DesiredMode>,
34    pub import_ops: Vec<ImportOpExecutionContext>,
35    pub export_ops: Vec<ExportOpExecutionContext>,
36}
37
38pub struct AnalyzedTargetSetupState {
39    pub target_kind: String,
40    pub setup_key: serde_json::Value,
41    pub desired_setup_state: serde_json::Value,
42    pub setup_by_user: bool,
43    /// None for declarations.
44    pub key_type: Option<Box<[schema::ValueType]>>,
45
46    pub attachments: IndexMap<interface::AttachmentSetupKey, serde_json::Value>,
47}
48
49pub struct AnalyzedSetupState {
50    pub targets: Vec<AnalyzedTargetSetupState>,
51    pub declarations: Vec<AnalyzedTargetSetupState>,
52}
53
54#[cfg(feature = "persistence")]
55fn build_import_op_exec_ctx(
56    import_op: &spec::NamedSpec<spec::ImportOpSpec>,
57    import_op_output_type: &schema::EnrichedValueType,
58    existing_source_states: Option<&Vec<&setup::SourceSetupState>>,
59    metadata: &mut setup::FlowSetupMetadata,
60) -> Result<ImportOpExecutionContext> {
61    let keys_schema_no_attrs = import_op_output_type
62        .typ
63        .key_schema()
64        .iter()
65        .map(|field| field.value_type.typ.without_attrs())
66        .collect::<Box<[_]>>();
67
68    let existing_source_ids = existing_source_states
69        .iter()
70        .flat_map(|v| v.iter())
71        .filter_map(|state| {
72            let existing_keys_schema: &[schema::ValueType] =
73                if let Some(keys_schema) = &state.keys_schema {
74                    keys_schema
75                } else {
76                    #[cfg(feature = "legacy-states-v0")]
77                    if let Some(key_schema) = &state.key_schema {
78                        std::slice::from_ref(key_schema)
79                    } else {
80                        &[]
81                    }
82                    #[cfg(not(feature = "legacy-states-v0"))]
83                    &[]
84                };
85            if existing_keys_schema == keys_schema_no_attrs.as_ref() {
86                Some(state.source_id)
87            } else {
88                None
89            }
90        })
91        .collect::<HashSet<_>>();
92    let source_id = if existing_source_ids.len() == 1 {
93        existing_source_ids.into_iter().next().unwrap()
94    } else {
95        if existing_source_ids.len() > 1 {
96            warn!("Multiple source states with the same key schema found");
97        }
98        metadata.last_source_id += 1;
99        metadata.last_source_id
100    };
101    metadata.sources.insert(
102        import_op.name.clone(),
103        setup::SourceSetupState {
104            source_id,
105
106            // Keep this field for backward compatibility,
107            // so users can still swap back to older version if needed.
108            #[cfg(feature = "legacy-states-v0")]
109            key_schema: Some(if keys_schema_no_attrs.len() == 1 {
110                keys_schema_no_attrs[0].clone()
111            } else {
112                schema::ValueType::Struct(schema::StructSchema {
113                    fields: Arc::new(
114                        import_op_output_type
115                            .typ
116                            .key_schema()
117                            .iter()
118                            .map(|field| {
119                                schema::FieldSchema::new(
120                                    field.name.clone(),
121                                    field.value_type.clone(),
122                                )
123                            })
124                            .collect(),
125                    ),
126                    description: None,
127                })
128            }),
129            keys_schema: Some(keys_schema_no_attrs),
130            source_kind: import_op.spec.source.kind.clone(),
131        },
132    );
133    Ok(ImportOpExecutionContext { source_id })
134}
135
136#[cfg(feature = "persistence")]
137fn build_export_op_exec_ctx(
138    analyzed_target_ss: &AnalyzedTargetSetupState,
139    existing_target_states: &HashMap<&setup::ResourceIdentifier, Vec<&setup::TargetSetupState>>,
140    metadata: &mut setup::FlowSetupMetadata,
141    target_states: &mut IndexMap<setup::ResourceIdentifier, setup::TargetSetupState>,
142) -> Result<ExportOpExecutionContext> {
143    let target_factory = get_target_factory(&analyzed_target_ss.target_kind)?;
144
145    let resource_id = setup::ResourceIdentifier {
146        key: analyzed_target_ss.setup_key.clone(),
147        target_kind: analyzed_target_ss.target_kind.clone(),
148    };
149    let existing_target_states = existing_target_states.get(&resource_id);
150    let mut compatible_target_ids = HashSet::<Option<i32>>::new();
151    let mut reusable_schema_version_ids = HashSet::<Option<usize>>::new();
152    for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) {
153        let compatibility = if let Some(key_type) = &analyzed_target_ss.key_type
154            && let Some(existing_key_type) = &existing_state.common.key_type
155            && key_type != existing_key_type
156        {
157            SetupStateCompatibility::NotCompatible
158        } else if analyzed_target_ss.setup_by_user != existing_state.common.setup_by_user {
159            SetupStateCompatibility::NotCompatible
160        } else {
161            target_factory.check_state_compatibility(
162                &analyzed_target_ss.desired_setup_state,
163                &existing_state.state,
164            )?
165        };
166        let compatible_target_id = if compatibility != SetupStateCompatibility::NotCompatible {
167            reusable_schema_version_ids.insert(
168                (compatibility == SetupStateCompatibility::Compatible)
169                    .then_some(existing_state.common.schema_version_id),
170            );
171            Some(existing_state.common.target_id)
172        } else {
173            None
174        };
175        compatible_target_ids.insert(compatible_target_id);
176    }
177
178    let target_id = if compatible_target_ids.len() == 1 {
179        compatible_target_ids.into_iter().next().flatten()
180    } else {
181        if compatible_target_ids.len() > 1 {
182            warn!("Multiple target states with the same key schema found");
183        }
184        None
185    };
186    let target_id = target_id.unwrap_or_else(|| {
187        metadata.last_target_id += 1;
188        metadata.last_target_id
189    });
190    let max_schema_version_id = existing_target_states
191        .iter()
192        .flat_map(|v| v.iter())
193        .map(|s| s.common.max_schema_version_id)
194        .max()
195        .unwrap_or(0);
196    let schema_version_id = if reusable_schema_version_ids.len() == 1 {
197        reusable_schema_version_ids
198            .into_iter()
199            .next()
200            .unwrap()
201            .unwrap_or(max_schema_version_id + 1)
202    } else {
203        max_schema_version_id + 1
204    };
205
206    match target_states.entry(resource_id) {
207        indexmap::map::Entry::Occupied(entry) => {
208            api_bail!(
209                "Target resource already exists: kind = {}, key = {}",
210                entry.key().target_kind,
211                entry.key().key
212            );
213        }
214        indexmap::map::Entry::Vacant(entry) => {
215            entry.insert(setup::TargetSetupState {
216                common: setup::TargetSetupStateCommon {
217                    target_id,
218                    schema_version_id,
219                    max_schema_version_id: max_schema_version_id.max(schema_version_id),
220                    setup_by_user: analyzed_target_ss.setup_by_user,
221                    key_type: analyzed_target_ss.key_type.clone(),
222                },
223                state: analyzed_target_ss.desired_setup_state.clone(),
224                attachments: analyzed_target_ss.attachments.clone(),
225            });
226        }
227    }
228    Ok(ExportOpExecutionContext {
229        target_id,
230        schema_version_id,
231    })
232}
233
234#[cfg(feature = "persistence")]
235pub fn build_flow_setup_execution_context(
236    flow_inst: &spec::FlowInstanceSpec,
237    data_schema: &schema::FlowSchema,
238    analyzed_ss: &AnalyzedSetupState,
239    existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
240) -> Result<FlowSetupExecutionContext> {
241    let existing_metadata_versions = || {
242        existing_flow_ss
243            .iter()
244            .flat_map(|flow_ss| flow_ss.metadata.possible_versions())
245    };
246
247    let mut source_states_by_name = HashMap::<&str, Vec<&setup::SourceSetupState>>::new();
248    for metadata_version in existing_metadata_versions() {
249        for (source_name, state) in metadata_version.sources.iter() {
250            source_states_by_name
251                .entry(source_name.as_str())
252                .or_default()
253                .push(state);
254        }
255    }
256
257    let mut target_states_by_name_type =
258        HashMap::<&setup::ResourceIdentifier, Vec<&setup::TargetSetupState>>::new();
259    for metadata_version in existing_flow_ss.iter() {
260        for (resource_id, target) in metadata_version.targets.iter() {
261            target_states_by_name_type
262                .entry(resource_id)
263                .or_default()
264                .extend(target.possible_versions());
265        }
266    }
267
268    let mut metadata = setup::FlowSetupMetadata {
269        last_source_id: existing_metadata_versions()
270            .map(|metadata| metadata.last_source_id)
271            .max()
272            .unwrap_or(0),
273        last_target_id: existing_metadata_versions()
274            .map(|metadata| metadata.last_target_id)
275            .max()
276            .unwrap_or(0),
277        sources: BTreeMap::new(),
278        features: existing_flow_ss
279            .map(|m| {
280                m.metadata
281                    .possible_versions()
282                    .flat_map(|v| v.features.iter())
283                    .cloned()
284                    .collect::<BTreeSet<_>>()
285            })
286            .unwrap_or_else(setup::flow_features::default_features),
287    };
288    let mut target_states = IndexMap::new();
289
290    let import_op_exec_ctx = flow_inst
291        .import_ops
292        .iter()
293        .map(|import_op| {
294            let output_type = data_schema
295                .root_op_scope
296                .op_output_types
297                .get(&import_op.name)
298                .ok_or_else(invariance_violation)?;
299            build_import_op_exec_ctx(
300                import_op,
301                output_type,
302                source_states_by_name.get(&import_op.name.as_str()),
303                &mut metadata,
304            )
305        })
306        .collect::<Result<Vec<_>>>()?;
307
308    let export_op_exec_ctx = analyzed_ss
309        .targets
310        .iter()
311        .map(|analyzed_target_ss| {
312            build_export_op_exec_ctx(
313                analyzed_target_ss,
314                &target_states_by_name_type,
315                &mut metadata,
316                &mut target_states,
317            )
318        })
319        .collect::<Result<Vec<_>>>()?;
320
321    for analyzed_target_ss in analyzed_ss.declarations.iter() {
322        build_export_op_exec_ctx(
323            analyzed_target_ss,
324            &target_states_by_name_type,
325            &mut metadata,
326            &mut target_states,
327        )?;
328    }
329
330    let setup_state = setup::FlowSetupState::<setup::DesiredMode> {
331        seen_flow_metadata_version: existing_flow_ss
332            .and_then(|flow_ss| flow_ss.seen_flow_metadata_version),
333        tracking_table: db_tracking_setup::TrackingTableSetupState {
334            table_name: existing_flow_ss
335                .and_then(|flow_ss| {
336                    flow_ss
337                        .tracking_table
338                        .current
339                        .as_ref()
340                        .map(|v| v.table_name.clone())
341                })
342                .unwrap_or_else(|| db_tracking_setup::default_tracking_table_name(&flow_inst.name)),
343            version_id: db_tracking_setup::CURRENT_TRACKING_TABLE_VERSION,
344            source_state_table_name: metadata
345                .features
346                .contains(setup::flow_features::SOURCE_STATE_TABLE)
347                .then(|| {
348                    existing_flow_ss
349                        .and_then(|flow_ss| flow_ss.tracking_table.current.as_ref())
350                        .and_then(|v| v.source_state_table_name.clone())
351                        .unwrap_or_else(|| {
352                            db_tracking_setup::default_source_state_table_name(&flow_inst.name)
353                        })
354                }),
355            has_fast_fingerprint_column: metadata
356                .features
357                .contains(setup::flow_features::FAST_FINGERPRINT),
358        },
359        targets: target_states,
360        metadata,
361    };
362    Ok(FlowSetupExecutionContext {
363        setup_state,
364        import_ops: import_op_exec_ctx,
365        export_ops: export_op_exec_ctx,
366    })
367}