Skip to main content

recoco_core/builder/
analyzer.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13use crate::builder::exec_ctx::AnalyzedSetupState;
14use crate::ops::{
15    get_attachment_factory, get_function_factory, get_source_factory, get_target_factory,
16};
17use crate::prelude::*;
18
19use super::plan::*;
20use crate::lib_context::get_auth_registry;
21use crate::{
22    base::{schema::*, spec::*},
23    ops::interface::*,
24};
25use futures::future::{BoxFuture, try_join3};
26use futures::{FutureExt, future::try_join_all};
27use std::time::Duration;
28use utils::fingerprint::Fingerprinter;
29
30const TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800);
31
32#[derive(Debug)]
33pub(super) enum ValueTypeBuilder {
34    Basic(BasicValueType),
35    Struct(StructSchemaBuilder),
36    Table(TableSchemaBuilder),
37}
38
39impl TryFrom<&ValueType> for ValueTypeBuilder {
40    type Error = Error;
41
42    fn try_from(value_type: &ValueType) -> std::result::Result<Self, Self::Error> {
43        match value_type {
44            ValueType::Basic(basic_type) => Ok(ValueTypeBuilder::Basic(basic_type.clone())),
45            ValueType::Struct(struct_type) => Ok(ValueTypeBuilder::Struct(struct_type.try_into()?)),
46            ValueType::Table(table_type) => Ok(ValueTypeBuilder::Table(table_type.try_into()?)),
47        }
48    }
49}
50
51impl TryInto<ValueType> for &ValueTypeBuilder {
52    type Error = Error;
53
54    fn try_into(self) -> std::result::Result<ValueType, Self::Error> {
55        match self {
56            ValueTypeBuilder::Basic(basic_type) => Ok(ValueType::Basic(basic_type.clone())),
57            ValueTypeBuilder::Struct(struct_type) => Ok(ValueType::Struct(struct_type.try_into()?)),
58            ValueTypeBuilder::Table(table_type) => Ok(ValueType::Table(table_type.try_into()?)),
59        }
60    }
61}
62
63#[derive(Default, Debug)]
64pub(super) struct StructSchemaBuilder {
65    fields: Vec<FieldSchema<ValueTypeBuilder>>,
66    field_name_idx: HashMap<FieldName, u32>,
67    description: Option<Arc<str>>,
68}
69
70impl StructSchemaBuilder {
71    fn add_field(&mut self, field: FieldSchema<ValueTypeBuilder>) -> Result<u32> {
72        let field_idx = self.fields.len() as u32;
73        match self.field_name_idx.entry(field.name.clone()) {
74            std::collections::hash_map::Entry::Occupied(_) => {
75                client_bail!("Field name already exists: {}", field.name);
76            }
77            std::collections::hash_map::Entry::Vacant(entry) => {
78                entry.insert(field_idx);
79            }
80        }
81        self.fields.push(field);
82        Ok(field_idx)
83    }
84
85    pub fn find_field(&self, field_name: &'_ str) -> Option<(u32, &FieldSchema<ValueTypeBuilder>)> {
86        self.field_name_idx
87            .get(field_name)
88            .map(|&field_idx| (field_idx, &self.fields[field_idx as usize]))
89    }
90}
91
92impl TryFrom<&StructSchema> for StructSchemaBuilder {
93    type Error = Error;
94
95    fn try_from(schema: &StructSchema) -> std::result::Result<Self, Self::Error> {
96        let mut result = StructSchemaBuilder {
97            fields: Vec::with_capacity(schema.fields.len()),
98            field_name_idx: HashMap::with_capacity(schema.fields.len()),
99            description: schema.description.clone(),
100        };
101        for field in schema.fields.iter() {
102            result.add_field(FieldSchema::<ValueTypeBuilder>::from_alternative(field)?)?;
103        }
104        Ok(result)
105    }
106}
107
108impl TryInto<StructSchema> for &StructSchemaBuilder {
109    type Error = Error;
110
111    fn try_into(self) -> std::result::Result<StructSchema, Self::Error> {
112        Ok(StructSchema {
113            fields: Arc::new(
114                self.fields
115                    .iter()
116                    .map(FieldSchema::<ValueType>::from_alternative)
117                    .collect::<std::result::Result<Vec<_>, _>>()?,
118            ),
119            description: self.description.clone(),
120        })
121    }
122}
123
124#[derive(Debug)]
125pub(super) struct TableSchemaBuilder {
126    pub kind: TableKind,
127    pub sub_scope: Arc<Mutex<DataScopeBuilder>>,
128}
129
130impl TryFrom<&TableSchema> for TableSchemaBuilder {
131    type Error = Error;
132
133    fn try_from(schema: &TableSchema) -> std::result::Result<Self, Self::Error> {
134        Ok(Self {
135            kind: schema.kind,
136            sub_scope: Arc::new(Mutex::new(DataScopeBuilder {
137                data: (&schema.row).try_into()?,
138                added_fields_def_fp: Default::default(),
139            })),
140        })
141    }
142}
143
144impl TryInto<TableSchema> for &TableSchemaBuilder {
145    type Error = Error;
146
147    fn try_into(self) -> std::result::Result<TableSchema, Self::Error> {
148        let sub_scope = self.sub_scope.lock().unwrap();
149        let row = (&sub_scope.data).try_into()?;
150        Ok(TableSchema {
151            kind: self.kind,
152            row,
153        })
154    }
155}
156
157fn try_make_common_value_type(
158    value_type1: &EnrichedValueType,
159    value_type2: &EnrichedValueType,
160) -> Result<EnrichedValueType> {
161    let typ = match (&value_type1.typ, &value_type2.typ) {
162        (ValueType::Basic(basic_type1), ValueType::Basic(basic_type2)) => {
163            if basic_type1 != basic_type2 {
164                api_bail!("Value types are not compatible: {basic_type1} vs {basic_type2}");
165            }
166            ValueType::Basic(basic_type1.clone())
167        }
168        (ValueType::Struct(struct_type1), ValueType::Struct(struct_type2)) => {
169            let common_schema = try_merge_struct_schemas(struct_type1, struct_type2)?;
170            ValueType::Struct(common_schema)
171        }
172        (ValueType::Table(table_type1), ValueType::Table(table_type2)) => {
173            if table_type1.kind != table_type2.kind {
174                api_bail!(
175                    "Collection types are not compatible: {} vs {}",
176                    table_type1,
177                    table_type2
178                );
179            }
180            let row = try_merge_struct_schemas(&table_type1.row, &table_type2.row)?;
181            ValueType::Table(TableSchema {
182                kind: table_type1.kind,
183                row,
184            })
185        }
186        (t1 @ (ValueType::Basic(_) | ValueType::Struct(_) | ValueType::Table(_)), t2) => {
187            api_bail!("Unmatched types:\n  {t1}\n  {t2}\n",)
188        }
189    };
190    let common_attrs: Vec<_> = value_type1
191        .attrs
192        .iter()
193        .filter_map(|(k, v)| {
194            if value_type2.attrs.get(k) == Some(v) {
195                Some((k, v))
196            } else {
197                None
198            }
199        })
200        .collect();
201    let attrs = if common_attrs.len() == value_type1.attrs.len() {
202        value_type1.attrs.clone()
203    } else {
204        Arc::new(
205            common_attrs
206                .into_iter()
207                .map(|(k, v)| (k.clone(), v.clone()))
208                .collect(),
209        )
210    };
211
212    Ok(EnrichedValueType {
213        typ,
214        nullable: value_type1.nullable || value_type2.nullable,
215        attrs,
216    })
217}
218
219fn try_merge_fields_schemas(
220    schema1: &[FieldSchema],
221    schema2: &[FieldSchema],
222) -> Result<Vec<FieldSchema>> {
223    if schema1.len() != schema2.len() {
224        api_bail!(
225            "Fields are not compatible as they have different fields count:\n  ({})\n  ({})\n",
226            schema1
227                .iter()
228                .map(|f| f.to_string())
229                .collect::<Vec<_>>()
230                .join(", "),
231            schema2
232                .iter()
233                .map(|f| f.to_string())
234                .collect::<Vec<_>>()
235                .join(", ")
236        );
237    }
238    let mut result_fields = Vec::with_capacity(schema1.len());
239    for (field1, field2) in schema1.iter().zip(schema2.iter()) {
240        if field1.name != field2.name {
241            api_bail!(
242                "Structs are not compatible as they have incompatible field names `{}` vs `{}`",
243                field1.name,
244                field2.name
245            );
246        }
247        result_fields.push(FieldSchema {
248            name: field1.name.clone(),
249            value_type: try_make_common_value_type(&field1.value_type, &field2.value_type)?,
250            description: None,
251        });
252    }
253    Ok(result_fields)
254}
255
256fn try_merge_struct_schemas(
257    schema1: &StructSchema,
258    schema2: &StructSchema,
259) -> Result<StructSchema> {
260    let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
261    Ok(StructSchema {
262        fields: Arc::new(fields),
263        description: schema1
264            .description
265            .clone()
266            .or_else(|| schema2.description.clone()),
267    })
268}
269
270fn try_merge_collector_schemas(
271    schema1: &CollectorSchema,
272    schema2: &CollectorSchema,
273) -> Result<CollectorSchema> {
274    let schema1_fields = &schema1.fields;
275    let schema2_fields = &schema2.fields;
276
277    // Create a map from field name to index in schema1
278    let field_map: HashMap<FieldName, usize> = schema1_fields
279        .iter()
280        .enumerate()
281        .map(|(i, f)| (f.name.clone(), i))
282        .collect();
283
284    let mut output_fields = Vec::new();
285    let mut next_field_id_1 = 0;
286    let mut next_field_id_2 = 0;
287
288    for (idx, field) in schema2_fields.iter().enumerate() {
289        if let Some(&idx1) = field_map.get(&field.name) {
290            if idx1 < next_field_id_1 {
291                api_bail!(
292                    "Common fields are expected to have consistent order across different `collect()` calls, but got different orders between fields '{}' and '{}'",
293                    field.name,
294                    schema1_fields[next_field_id_1 - 1].name
295                );
296            }
297            // Add intervening fields from schema1
298            for i in next_field_id_1..idx1 {
299                output_fields.push(schema1_fields[i].clone());
300            }
301            // Add intervening fields from schema2
302            for i in next_field_id_2..idx {
303                output_fields.push(schema2_fields[i].clone());
304            }
305            // Merge the field
306            let merged_type =
307                try_make_common_value_type(&schema1_fields[idx1].value_type, &field.value_type)?;
308            output_fields.push(FieldSchema {
309                name: field.name.clone(),
310                value_type: merged_type,
311                description: None,
312            });
313            next_field_id_1 = idx1 + 1;
314            next_field_id_2 = idx + 1;
315            // Fields not in schema1 and not UUID are added at the end
316        }
317    }
318
319    // Add remaining fields from schema1
320    for i in next_field_id_1..schema1_fields.len() {
321        output_fields.push(schema1_fields[i].clone());
322    }
323
324    // Add remaining fields from schema2
325    for i in next_field_id_2..schema2_fields.len() {
326        output_fields.push(schema2_fields[i].clone());
327    }
328
329    // Handle auto_uuid_field_idx
330    let auto_uuid_field_idx = match (schema1.auto_uuid_field_idx, schema2.auto_uuid_field_idx) {
331        (Some(idx1), Some(idx2)) => {
332            let name1 = &schema1_fields[idx1].name;
333            let name2 = &schema2_fields[idx2].name;
334            if name1 == name2 {
335                // Find the position of the auto_uuid field in the merged output
336                output_fields.iter().position(|f| &f.name == name1)
337            } else {
338                api_bail!(
339                    "Generated UUID fields must have the same name across different `collect()` calls, got different names: '{}' vs '{}'",
340                    name1,
341                    name2
342                );
343            }
344        }
345        (Some(_), None) | (None, Some(_)) => {
346            api_bail!(
347                "The generated UUID field, once present for one `collect()`, must be consistently present for other `collect()` calls for the same collector"
348            );
349        }
350        (None, None) => None,
351    };
352
353    Ok(CollectorSchema {
354        fields: output_fields,
355        auto_uuid_field_idx,
356    })
357}
358
359struct FieldDefFingerprintBuilder {
360    source_op_names: HashSet<String>,
361    fingerprinter: Fingerprinter,
362}
363
364impl FieldDefFingerprintBuilder {
365    pub fn new() -> Self {
366        Self {
367            source_op_names: HashSet::new(),
368            fingerprinter: Fingerprinter::default(),
369        }
370    }
371
372    pub fn add(&mut self, key: Option<&str>, def_fp: FieldDefFingerprint) -> Result<()> {
373        self.source_op_names.extend(def_fp.source_op_names);
374        let mut fingerprinter = std::mem::take(&mut self.fingerprinter);
375        if let Some(key) = key {
376            fingerprinter = fingerprinter.with(key)?;
377        }
378        fingerprinter = fingerprinter.with(def_fp.fingerprint.as_slice())?;
379        self.fingerprinter = fingerprinter;
380        Ok(())
381    }
382
383    pub fn build(self) -> FieldDefFingerprint {
384        FieldDefFingerprint {
385            source_op_names: self.source_op_names,
386            fingerprint: self.fingerprinter.into_fingerprint(),
387        }
388    }
389}
390
391#[derive(Debug)]
392pub(super) struct CollectorBuilder {
393    pub schema: Arc<CollectorSchema>,
394    pub is_used: bool,
395    pub def_fps: Vec<FieldDefFingerprint>,
396}
397
398impl CollectorBuilder {
399    pub fn new(schema: Arc<CollectorSchema>, def_fp: FieldDefFingerprint) -> Self {
400        Self {
401            schema,
402            is_used: false,
403            def_fps: vec![def_fp],
404        }
405    }
406
407    pub fn collect(&mut self, schema: &CollectorSchema, def_fp: FieldDefFingerprint) -> Result<()> {
408        if self.is_used {
409            api_bail!("Collector is already used");
410        }
411        let existing_schema = Arc::make_mut(&mut self.schema);
412        *existing_schema = try_merge_collector_schemas(existing_schema, schema)?;
413        self.def_fps.push(def_fp);
414        Ok(())
415    }
416
417    pub fn use_collection(&mut self) -> Result<(Arc<CollectorSchema>, FieldDefFingerprint)> {
418        self.is_used = true;
419
420        self.def_fps
421            .sort_by(|a, b| a.fingerprint.as_slice().cmp(b.fingerprint.as_slice()));
422        let mut def_fp_builder = FieldDefFingerprintBuilder::new();
423        for def_fp in self.def_fps.iter() {
424            def_fp_builder.add(None, def_fp.clone())?;
425        }
426        Ok((self.schema.clone(), def_fp_builder.build()))
427    }
428}
429
430#[derive(Debug)]
431pub(super) struct DataScopeBuilder {
432    pub data: StructSchemaBuilder,
433    pub added_fields_def_fp: IndexMap<FieldName, FieldDefFingerprint>,
434}
435
436impl DataScopeBuilder {
437    pub fn new() -> Self {
438        Self {
439            data: Default::default(),
440            added_fields_def_fp: Default::default(),
441        }
442    }
443
444    pub fn last_field(&self) -> Option<&FieldSchema<ValueTypeBuilder>> {
445        self.data.fields.last()
446    }
447
448    pub fn add_field(
449        &mut self,
450        name: FieldName,
451        value_type: &EnrichedValueType,
452        def_fp: FieldDefFingerprint,
453    ) -> Result<AnalyzedOpOutput> {
454        let field_index = self.data.add_field(FieldSchema {
455            name: name.clone(),
456            value_type: EnrichedValueType::from_alternative(value_type)?,
457            description: None,
458        })?;
459        self.added_fields_def_fp.insert(name, def_fp);
460        Ok(AnalyzedOpOutput {
461            field_idx: field_index,
462        })
463    }
464
465    /// Must be called on an non-empty field path.
466    pub fn analyze_field_path<'a>(
467        &'a self,
468        field_path: &'_ FieldPath,
469        base_def_fp: FieldDefFingerprint,
470    ) -> Result<(
471        AnalyzedLocalFieldReference,
472        &'a EnrichedValueType<ValueTypeBuilder>,
473        FieldDefFingerprint,
474    )> {
475        let mut indices = Vec::with_capacity(field_path.len());
476        let mut struct_schema = &self.data;
477        let mut def_fp = base_def_fp;
478
479        if field_path.is_empty() {
480            client_bail!("Field path is empty");
481        }
482
483        let mut i = 0;
484        let value_type = loop {
485            let field_name = &field_path[i];
486            let (field_idx, field) = struct_schema.find_field(field_name).ok_or_else(|| {
487                api_error!("Field {} not found", field_path[0..(i + 1)].join("."))
488            })?;
489            if let Some(added_def_fp) = self.added_fields_def_fp.get(field_name) {
490                def_fp = added_def_fp.clone();
491            } else {
492                def_fp.fingerprint = Fingerprinter::default()
493                    .with(&("field", &def_fp.fingerprint, field_name))?
494                    .into_fingerprint();
495            };
496            indices.push(field_idx);
497            if i + 1 >= field_path.len() {
498                break &field.value_type;
499            }
500            i += 1;
501
502            struct_schema = match &field.value_type.typ {
503                ValueTypeBuilder::Struct(struct_type) => struct_type,
504                _ => {
505                    api_bail!("Field {} is not a struct", field_path[0..(i + 1)].join("."));
506                }
507            };
508        };
509        Ok((
510            AnalyzedLocalFieldReference {
511                fields_idx: indices,
512            },
513            value_type,
514            def_fp,
515        ))
516    }
517}
518
519pub(super) struct AnalyzerContext {
520    pub lib_ctx: Arc<LibContext>,
521    pub flow_ctx: Arc<FlowInstanceContext>,
522}
523
524#[derive(Debug, Default)]
525pub(super) struct OpScopeStates {
526    pub op_output_types: HashMap<FieldName, EnrichedValueType>,
527    pub collectors: IndexMap<FieldName, CollectorBuilder>,
528    pub sub_scopes: HashMap<String, Arc<OpScopeSchema>>,
529}
530
531impl OpScopeStates {
532    pub fn add_collector(
533        &mut self,
534        collector_name: FieldName,
535        schema: CollectorSchema,
536        def_fp: FieldDefFingerprint,
537    ) -> Result<AnalyzedLocalCollectorReference> {
538        let existing_len = self.collectors.len();
539        let idx = match self.collectors.entry(collector_name) {
540            indexmap::map::Entry::Occupied(mut entry) => {
541                entry.get_mut().collect(&schema, def_fp)?;
542                entry.index()
543            }
544            indexmap::map::Entry::Vacant(entry) => {
545                entry.insert(CollectorBuilder::new(Arc::new(schema), def_fp));
546                existing_len
547            }
548        };
549        Ok(AnalyzedLocalCollectorReference {
550            collector_idx: idx as u32,
551        })
552    }
553
554    pub fn consume_collector(
555        &mut self,
556        collector_name: &FieldName,
557    ) -> Result<(
558        AnalyzedLocalCollectorReference,
559        Arc<CollectorSchema>,
560        FieldDefFingerprint,
561    )> {
562        let (collector_idx, _, collector) = self
563            .collectors
564            .get_full_mut(collector_name)
565            .ok_or_else(|| api_error!("Collector not found: {}", collector_name))?;
566        let (schema, def_fp) = collector.use_collection()?;
567        Ok((
568            AnalyzedLocalCollectorReference {
569                collector_idx: collector_idx as u32,
570            },
571            schema,
572            def_fp,
573        ))
574    }
575
576    fn build_op_scope_schema(&self) -> OpScopeSchema {
577        OpScopeSchema {
578            op_output_types: self
579                .op_output_types
580                .iter()
581                .map(|(name, value_type)| (name.clone(), value_type.without_attrs()))
582                .collect(),
583            collectors: self
584                .collectors
585                .iter()
586                .map(|(name, schema)| NamedSpec {
587                    name: name.clone(),
588                    spec: schema.schema.clone(),
589                })
590                .collect(),
591            op_scopes: self.sub_scopes.clone(),
592        }
593    }
594}
595
596#[derive(Debug)]
597pub struct OpScope {
598    pub name: String,
599    pub parent: Option<(Arc<OpScope>, spec::FieldPath)>,
600    pub(super) data: Arc<Mutex<DataScopeBuilder>>,
601    pub(super) states: Mutex<OpScopeStates>,
602    pub(super) base_value_def_fp: FieldDefFingerprint,
603}
604
605struct Iter<'a>(Option<&'a OpScope>);
606
607impl<'a> Iterator for Iter<'a> {
608    type Item = &'a OpScope;
609
610    fn next(&mut self) -> Option<Self::Item> {
611        match self.0 {
612            Some(scope) => {
613                self.0 = scope.parent.as_ref().map(|(parent, _)| parent.as_ref());
614                Some(scope)
615            }
616            None => None,
617        }
618    }
619}
620
621impl OpScope {
622    pub(super) fn new(
623        name: String,
624        parent: Option<(Arc<OpScope>, spec::FieldPath)>,
625        data: Arc<Mutex<DataScopeBuilder>>,
626        base_value_def_fp: FieldDefFingerprint,
627    ) -> Arc<Self> {
628        Arc::new(Self {
629            name,
630            parent,
631            data,
632            states: Mutex::default(),
633            base_value_def_fp,
634        })
635    }
636
637    fn add_op_output(
638        &self,
639        name: FieldName,
640        value_type: EnrichedValueType,
641        def_fp: FieldDefFingerprint,
642    ) -> Result<AnalyzedOpOutput> {
643        let op_output = self
644            .data
645            .lock()
646            .unwrap()
647            .add_field(name.clone(), &value_type, def_fp)?;
648        self.states
649            .lock()
650            .unwrap()
651            .op_output_types
652            .insert(name, value_type);
653        Ok(op_output)
654    }
655
656    pub fn ancestors(&self) -> impl Iterator<Item = &OpScope> {
657        Iter(Some(self))
658    }
659
660    pub fn is_op_scope_descendant(&self, other: &Self) -> bool {
661        if self == other {
662            return true;
663        }
664        match &self.parent {
665            Some((parent, _)) => parent.is_op_scope_descendant(other),
666            None => false,
667        }
668    }
669
670    pub(super) fn new_foreach_op_scope(
671        self: &Arc<Self>,
672        scope_name: String,
673        field_path: &FieldPath,
674    ) -> Result<(AnalyzedLocalFieldReference, Arc<Self>)> {
675        let (local_field_ref, sub_data_scope, def_fp) = {
676            let data_scope = self.data.lock().unwrap();
677            let (local_field_ref, value_type, def_fp) =
678                data_scope.analyze_field_path(field_path, self.base_value_def_fp.clone())?;
679            let sub_data_scope = match &value_type.typ {
680                ValueTypeBuilder::Table(table_type) => table_type.sub_scope.clone(),
681                _ => api_bail!("ForEach only works on collection, field {field_path} is not"),
682            };
683            (local_field_ref, sub_data_scope, def_fp)
684        };
685        let sub_op_scope = OpScope::new(
686            scope_name,
687            Some((self.clone(), field_path.clone())),
688            sub_data_scope,
689            def_fp,
690        );
691        Ok((local_field_ref, sub_op_scope))
692    }
693}
694
695impl std::fmt::Display for OpScope {
696    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
697        if let Some((scope, field_path)) = &self.parent {
698            write!(f, "{} [{} AS {}]", scope, field_path, self.name)?;
699        } else {
700            write!(f, "[{}]", self.name)?;
701        }
702        Ok(())
703    }
704}
705
706impl PartialEq for OpScope {
707    fn eq(&self, other: &Self) -> bool {
708        std::ptr::eq(self, other)
709    }
710}
711impl Eq for OpScope {}
712
713fn find_scope<'a>(scope_name: &ScopeName, op_scope: &'a OpScope) -> Result<(u32, &'a OpScope)> {
714    let (up_level, scope) = op_scope
715        .ancestors()
716        .enumerate()
717        .find(|(_, s)| &s.name == scope_name)
718        .ok_or_else(|| api_error!("Scope not found: {}", scope_name))?;
719    Ok((up_level as u32, scope))
720}
721
722fn analyze_struct_mapping(
723    mapping: &StructMapping,
724    op_scope: &OpScope,
725) -> Result<(AnalyzedStructMapping, Vec<FieldSchema>, FieldDefFingerprint)> {
726    let mut field_mappings = Vec::with_capacity(mapping.fields.len());
727    let mut field_schemas = Vec::with_capacity(mapping.fields.len());
728
729    let mut fields_def_fps = Vec::with_capacity(mapping.fields.len());
730    for field in mapping.fields.iter() {
731        let (field_mapping, value_type, field_def_fp) =
732            analyze_value_mapping(&field.spec, op_scope)?;
733        field_mappings.push(field_mapping);
734        field_schemas.push(FieldSchema {
735            name: field.name.clone(),
736            value_type,
737            description: None,
738        });
739        fields_def_fps.push((field.name.as_str(), field_def_fp));
740    }
741    fields_def_fps.sort_by_key(|(name, _)| *name);
742    let mut def_fp_builder = FieldDefFingerprintBuilder::new();
743    for (name, def_fp) in fields_def_fps {
744        def_fp_builder.add(Some(name), def_fp)?;
745    }
746    Ok((
747        AnalyzedStructMapping {
748            fields: field_mappings,
749        },
750        field_schemas,
751        def_fp_builder.build(),
752    ))
753}
754
755fn analyze_value_mapping(
756    value_mapping: &ValueMapping,
757    op_scope: &OpScope,
758) -> Result<(AnalyzedValueMapping, EnrichedValueType, FieldDefFingerprint)> {
759    let result = match value_mapping {
760        ValueMapping::Constant(v) => {
761            let value = value::Value::from_json(v.value.clone(), &v.schema.typ)?;
762            let value_mapping = AnalyzedValueMapping::Constant { value };
763            let def_fp = FieldDefFingerprint {
764                source_op_names: HashSet::new(),
765                fingerprint: Fingerprinter::default()
766                    .with(&("constant", &v.value, &v.schema.without_attrs()))?
767                    .into_fingerprint(),
768            };
769            (value_mapping, v.schema.clone(), def_fp)
770        }
771
772        ValueMapping::Field(v) => {
773            let (scope_up_level, op_scope) = match &v.scope {
774                Some(scope_name) => find_scope(scope_name, op_scope)?,
775                None => (0, op_scope),
776            };
777            let data_scope = op_scope.data.lock().unwrap();
778            let (local_field_ref, value_type, def_fp) =
779                data_scope.analyze_field_path(&v.field_path, op_scope.base_value_def_fp.clone())?;
780            let schema = EnrichedValueType::from_alternative(value_type)?;
781            let value_mapping = AnalyzedValueMapping::Field(AnalyzedFieldReference {
782                local: local_field_ref,
783                scope_up_level,
784            });
785            (value_mapping, schema, def_fp)
786        }
787    };
788    Ok(result)
789}
790
791fn analyze_input_fields(
792    arg_bindings: &[OpArgBinding],
793    op_scope: &OpScope,
794) -> Result<(Vec<OpArgSchema>, FieldDefFingerprint)> {
795    let mut op_arg_schemas = Vec::with_capacity(arg_bindings.len());
796    let mut def_fp_builder = FieldDefFingerprintBuilder::new();
797    for arg_binding in arg_bindings.iter() {
798        let (analyzed_value, value_type, def_fp) =
799            analyze_value_mapping(&arg_binding.value, op_scope)?;
800        let op_arg_schema = OpArgSchema {
801            name: arg_binding.arg_name.clone(),
802            value_type,
803            analyzed_value: analyzed_value.clone(),
804        };
805        def_fp_builder.add(arg_binding.arg_name.0.as_deref(), def_fp)?;
806        op_arg_schemas.push(op_arg_schema);
807    }
808    Ok((op_arg_schemas, def_fp_builder.build()))
809}
810
811fn add_collector(
812    scope_name: &ScopeName,
813    collector_name: FieldName,
814    schema: CollectorSchema,
815    op_scope: &OpScope,
816    def_fp: FieldDefFingerprint,
817) -> Result<AnalyzedCollectorReference> {
818    let (scope_up_level, scope) = find_scope(scope_name, op_scope)?;
819    let local_ref = scope
820        .states
821        .lock()
822        .unwrap()
823        .add_collector(collector_name, schema, def_fp)?;
824    Ok(AnalyzedCollectorReference {
825        local: local_ref,
826        scope_up_level,
827    })
828}
829
830struct ExportDataFieldsInfo {
831    local_collector_ref: AnalyzedLocalCollectorReference,
832    primary_key_def: AnalyzedPrimaryKeyDef,
833    primary_key_schema: Box<[FieldSchema]>,
834    value_fields_idx: Vec<u32>,
835    value_stable: bool,
836    output_value_fingerprinter: Fingerprinter,
837    def_fp: FieldDefFingerprint,
838}
839
840impl AnalyzerContext {
841    pub(super) async fn analyze_import_op(
842        &self,
843        op_scope: &Arc<OpScope>,
844        import_op: NamedSpec<ImportOpSpec>,
845    ) -> Result<impl Future<Output = Result<AnalyzedImportOp>> + Send + use<>> {
846        let source_factory = get_source_factory(&import_op.spec.source.kind)?;
847        let (output_type, executor) = source_factory
848            .build(
849                &import_op.name,
850                serde_json::Value::Object(import_op.spec.source.spec),
851                self.flow_ctx.clone(),
852            )
853            .await?;
854
855        let op_name = import_op.name;
856        let primary_key_schema = Box::from(output_type.typ.key_schema());
857        let def_fp = FieldDefFingerprint {
858            source_op_names: HashSet::from([op_name.clone()]),
859            fingerprint: Fingerprinter::default()
860                .with(&("import", &op_name))?
861                .into_fingerprint(),
862        };
863        let output = op_scope.add_op_output(op_name.clone(), output_type, def_fp)?;
864
865        let concur_control_options = import_op
866            .spec
867            .execution_options
868            .get_concur_control_options();
869        let global_concurrency_controller = self.lib_ctx.global_concurrency_controller.clone();
870        let result_fut = async move {
871            trace!("Start building executor for source op `{op_name}`");
872            let executor = executor
873                .await
874                .with_context(|| format!("Preparing for source op: {op_name}"))?;
875            trace!("Finished building executor for source op `{op_name}`");
876            Ok(AnalyzedImportOp {
877                executor,
878                output,
879                primary_key_schema,
880                name: op_name,
881                refresh_options: import_op.spec.refresh_options,
882                concurrency_controller: concur_control::CombinedConcurrencyController::new(
883                    &concur_control_options,
884                    global_concurrency_controller,
885                ),
886            })
887        };
888        Ok(result_fut)
889    }
890
891    pub(super) async fn analyze_reactive_op(
892        &self,
893        op_scope: &Arc<OpScope>,
894        reactive_op: &NamedSpec<ReactiveOpSpec>,
895    ) -> Result<BoxFuture<'static, Result<AnalyzedReactiveOp>>> {
896        let reactive_op_clone = reactive_op.clone();
897        let reactive_op_name = reactive_op.name.clone();
898        let result_fut = match reactive_op_clone.spec {
899            ReactiveOpSpec::Transform(op) => {
900                let (input_field_schemas, input_def_fp) =
901                    analyze_input_fields(&op.inputs, op_scope).with_context(|| {
902                        format!("Preparing inputs for transform op: {}", reactive_op_name)
903                    })?;
904                let spec = serde_json::Value::Object(op.op.spec.clone());
905
906                let fn_executor = get_function_factory(&op.op.kind)?;
907                let input_value_mappings = input_field_schemas
908                    .iter()
909                    .map(|field| field.analyzed_value.clone())
910                    .collect();
911                let build_output = fn_executor
912                    .build(spec, input_field_schemas, self.flow_ctx.clone())
913                    .await?;
914                let output_type = build_output.output_type.typ.clone();
915                let logic_fingerprinter = Fingerprinter::default()
916                    .with(&op.op)?
917                    .with(&build_output.output_type.without_attrs())?
918                    .with(&build_output.behavior_version)?;
919
920                let def_fp = FieldDefFingerprint {
921                    source_op_names: input_def_fp.source_op_names,
922                    fingerprint: Fingerprinter::default()
923                        .with(&(
924                            "transform",
925                            &op.op,
926                            &input_def_fp.fingerprint,
927                            &build_output.behavior_version,
928                        ))?
929                        .into_fingerprint(),
930                };
931                let output = op_scope.add_op_output(
932                    reactive_op_name.clone(),
933                    build_output.output_type,
934                    def_fp,
935                )?;
936                let op_name = reactive_op_name.clone();
937                let op_kind = op.op.kind.clone();
938
939                let execution_options_timeout = op.execution_options.timeout;
940
941                let behavior_version = build_output.behavior_version;
942                async move {
943                            trace!("Start building executor for transform op `{op_name}`");
944                            let executor = build_output.executor.await.with_context(|| {
945                                format!("Preparing for transform op: {op_name}")
946                            })?;
947                            let enable_cache = executor.enable_cache();
948                            let timeout = executor.timeout()
949                                .or(execution_options_timeout)
950                                .or(Some(TIMEOUT_THRESHOLD));
951                            trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}");
952                            let function_exec_info = AnalyzedFunctionExecInfo {
953                                enable_cache,
954                                timeout,
955                                behavior_version,
956                                fingerprinter: logic_fingerprinter,
957                                output_type
958                            };
959                            if function_exec_info.enable_cache
960                                && function_exec_info.behavior_version.is_none()
961                            {
962                                api_bail!(
963                                    "When caching is enabled, behavior version must be specified for transform op: {op_name}"
964                                );
965                            }
966                            Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp {
967                                name: op_name,
968                                op_kind,
969                                inputs: input_value_mappings,
970                                function_exec_info,
971                                executor,
972                                output,
973                            }))
974                }
975                .boxed()
976            }
977
978            ReactiveOpSpec::ForEach(foreach_op) => {
979                let (local_field_ref, sub_op_scope) = op_scope.new_foreach_op_scope(
980                    foreach_op.op_scope.name.clone(),
981                    &foreach_op.field_path,
982                )?;
983                let analyzed_op_scope_fut = {
984                    let analyzed_op_scope_fut = self
985                        .analyze_op_scope(&sub_op_scope, &foreach_op.op_scope.ops)
986                        .boxed_local()
987                        .await?;
988                    let sub_op_scope_schema =
989                        sub_op_scope.states.lock().unwrap().build_op_scope_schema();
990                    op_scope
991                        .states
992                        .lock()
993                        .unwrap()
994                        .sub_scopes
995                        .insert(reactive_op_name.clone(), Arc::new(sub_op_scope_schema));
996                    analyzed_op_scope_fut
997                };
998                let op_name = reactive_op_name.clone();
999
1000                let concur_control_options =
1001                    foreach_op.execution_options.get_concur_control_options();
1002                async move {
1003                    Ok(AnalyzedReactiveOp::ForEach(AnalyzedForEachOp {
1004                        local_field_ref,
1005                        op_scope: analyzed_op_scope_fut
1006                            .await
1007                            .with_context(|| format!("Preparing for foreach op: {op_name}"))?,
1008                        name: op_name,
1009                        concurrency_controller: concur_control::ConcurrencyController::new(
1010                            &concur_control_options,
1011                        ),
1012                    }))
1013                }
1014                .boxed()
1015            }
1016
1017            ReactiveOpSpec::Collect(op) => {
1018                let (struct_mapping, fields_schema, mut def_fp) =
1019                    analyze_struct_mapping(&op.input, op_scope)?;
1020                let has_auto_uuid_field = op.auto_uuid_field.is_some();
1021                def_fp.fingerprint = Fingerprinter::default()
1022                    .with(&(
1023                        "collect",
1024                        &def_fp.fingerprint,
1025                        &fields_schema,
1026                        &has_auto_uuid_field,
1027                    ))?
1028                    .into_fingerprint();
1029                let fingerprinter = Fingerprinter::default().with(&fields_schema)?;
1030
1031                let input_field_names: Vec<FieldName> =
1032                    fields_schema.iter().map(|f| f.name.clone()).collect();
1033                let collector_ref = add_collector(
1034                    &op.scope_name,
1035                    op.collector_name.clone(),
1036                    CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
1037                    op_scope,
1038                    def_fp,
1039                )?;
1040                let op_scope = op_scope.clone();
1041                async move {
1042                    // Get the merged collector schema after adding
1043                    let collector_schema: Arc<CollectorSchema> = {
1044                        let scope = find_scope(&op.scope_name, &op_scope)?.1;
1045                        let states = scope.states.lock().unwrap();
1046                        let collector = states.collectors.get(&op.collector_name).unwrap();
1047                        collector.schema.clone()
1048                    };
1049
1050                    // Pre-compute field index mappings for efficient evaluation
1051                    let field_name_to_index: HashMap<&FieldName, usize> = input_field_names
1052                        .iter()
1053                        .enumerate()
1054                        .map(|(i, n)| (n, i))
1055                        .collect();
1056                    let field_index_mapping = collector_schema
1057                        .fields
1058                        .iter()
1059                        .map(|field| field_name_to_index.get(&field.name).copied())
1060                        .collect::<Vec<Option<usize>>>();
1061
1062                    let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
1063                        name: reactive_op_name,
1064                        has_auto_uuid_field,
1065                        input: struct_mapping,
1066                        input_field_names,
1067                        collector_schema,
1068                        collector_ref,
1069                        field_index_mapping,
1070                        fingerprinter,
1071                    });
1072                    Ok(collect_op)
1073                }
1074                .boxed()
1075            }
1076        };
1077        Ok(result_fut)
1078    }
1079
1080    #[allow(clippy::too_many_arguments)]
1081    async fn analyze_export_op_group(
1082        &self,
1083        target_kind: &str,
1084        op_scope: &Arc<OpScope>,
1085        flow_inst: &FlowInstanceSpec,
1086        export_op_group: &AnalyzedExportTargetOpGroup,
1087        declarations: Vec<serde_json::Value>,
1088        targets_analyzed_ss: &mut [Option<exec_ctx::AnalyzedTargetSetupState>],
1089        declarations_analyzed_ss: &mut Vec<exec_ctx::AnalyzedTargetSetupState>,
1090    ) -> Result<Vec<impl Future<Output = Result<AnalyzedExportOp>> + Send + use<>>> {
1091        let mut collection_specs = Vec::<interface::ExportDataCollectionSpec>::new();
1092        let mut data_fields_infos = Vec::<ExportDataFieldsInfo>::new();
1093        for idx in export_op_group.op_idx.iter() {
1094            let export_op = &flow_inst.export_ops[*idx];
1095            let (local_collector_ref, collector_schema, def_fp) =
1096                op_scope
1097                    .states
1098                    .lock()
1099                    .unwrap()
1100                    .consume_collector(&export_op.spec.collector_name)?;
1101            let (value_fields_schema, data_collection_info) =
1102                match &export_op.spec.index_options.primary_key_fields {
1103                    Some(fields) => {
1104                        let pk_fields_idx = fields
1105                            .iter()
1106                            .map(|f| {
1107                                collector_schema
1108                                    .fields
1109                                    .iter()
1110                                    .position(|field| &field.name == f)
1111                                    .ok_or_else(|| client_error!("field not found: {}", f))
1112                            })
1113                            .collect::<Result<Vec<_>>>()?;
1114
1115                        let primary_key_schema = pk_fields_idx
1116                            .iter()
1117                            .map(|idx| collector_schema.fields[*idx].without_attrs())
1118                            .collect::<Box<[_]>>();
1119                        let mut value_fields_schema: Vec<FieldSchema> = vec![];
1120                        let mut value_fields_idx = vec![];
1121                        for (idx, field) in collector_schema.fields.iter().enumerate() {
1122                            if !pk_fields_idx.contains(&idx) {
1123                                value_fields_schema.push(field.without_attrs());
1124                                value_fields_idx.push(idx as u32);
1125                            }
1126                        }
1127                        let value_stable = collector_schema
1128                            .auto_uuid_field_idx
1129                            .as_ref()
1130                            .map(|uuid_idx| pk_fields_idx.contains(uuid_idx))
1131                            .unwrap_or(false);
1132                        let output_value_fingerprinter =
1133                            Fingerprinter::default().with(&value_fields_schema)?;
1134                        (
1135                            value_fields_schema,
1136                            ExportDataFieldsInfo {
1137                                local_collector_ref,
1138                                primary_key_def: AnalyzedPrimaryKeyDef::Fields(pk_fields_idx),
1139                                primary_key_schema,
1140                                value_fields_idx,
1141                                value_stable,
1142                                output_value_fingerprinter,
1143                                def_fp,
1144                            },
1145                        )
1146                    }
1147                    None => {
1148                        // TODO: Support auto-generate primary key
1149                        api_bail!("Primary key fields must be specified")
1150                    }
1151                };
1152            collection_specs.push(interface::ExportDataCollectionSpec {
1153                name: export_op.name.clone(),
1154                spec: serde_json::Value::Object(export_op.spec.target.spec.clone()),
1155                key_fields_schema: data_collection_info.primary_key_schema.clone(),
1156                value_fields_schema,
1157                index_options: export_op.spec.index_options.clone(),
1158            });
1159            data_fields_infos.push(data_collection_info);
1160        }
1161        let (data_collections_output, declarations_output) = export_op_group
1162            .target_factory
1163            .clone()
1164            .build(collection_specs, declarations, self.flow_ctx.clone())
1165            .await?;
1166        let analyzed_export_ops = export_op_group
1167            .op_idx
1168            .iter()
1169            .zip(data_collections_output.into_iter())
1170            .zip(data_fields_infos.into_iter())
1171            .map(|((idx, data_coll_output), data_fields_info)| {
1172                let export_op = &flow_inst.export_ops[*idx];
1173                let op_name = export_op.name.clone();
1174                let export_target_factory = export_op_group.target_factory.clone();
1175
1176                let attachments = export_op
1177                    .spec
1178                    .attachments
1179                    .iter()
1180                    .map(|attachment| {
1181                        let attachment_factory = get_attachment_factory(&attachment.kind)?;
1182                        let attachment_state = attachment_factory.get_state(
1183                            &op_name,
1184                            &export_op.spec.target.spec,
1185                            serde_json::Value::Object(attachment.spec.clone()),
1186                        )?;
1187                        Ok((
1188                            interface::AttachmentSetupKey(
1189                                attachment.kind.clone(),
1190                                attachment_state.setup_key,
1191                            ),
1192                            attachment_state.setup_state,
1193                        ))
1194                    })
1195                    .collect::<Result<IndexMap<_, _>>>()?;
1196
1197                let export_op_ss = exec_ctx::AnalyzedTargetSetupState {
1198                    target_kind: target_kind.to_string(),
1199                    setup_key: data_coll_output.setup_key,
1200                    desired_setup_state: data_coll_output.desired_setup_state,
1201                    setup_by_user: export_op.spec.setup_by_user,
1202                    key_type: Some(
1203                        data_fields_info
1204                            .primary_key_schema
1205                            .iter()
1206                            .map(|field| field.value_type.typ.clone())
1207                            .collect::<Box<[_]>>(),
1208                    ),
1209                    attachments,
1210                };
1211                targets_analyzed_ss[*idx] = Some(export_op_ss);
1212
1213                let def_fp = FieldDefFingerprint {
1214                    source_op_names: data_fields_info.def_fp.source_op_names,
1215                    fingerprint: Fingerprinter::default()
1216                        .with("export")?
1217                        .with(&data_fields_info.def_fp.fingerprint)?
1218                        .with(&export_op.spec.target)?
1219                        .into_fingerprint(),
1220                };
1221                Ok(async move {
1222                    trace!("Start building executor for export op `{op_name}`");
1223                    let export_context = data_coll_output
1224                        .export_context
1225                        .await
1226                        .with_context(|| format!("Preparing for export op: {op_name}"))?;
1227                    trace!("Finished building executor for export op `{op_name}`");
1228                    Ok(AnalyzedExportOp {
1229                        name: op_name,
1230                        input: data_fields_info.local_collector_ref,
1231                        export_target_factory,
1232                        export_context,
1233                        primary_key_def: data_fields_info.primary_key_def,
1234                        primary_key_schema: data_fields_info.primary_key_schema,
1235                        value_fields: data_fields_info.value_fields_idx,
1236                        value_stable: data_fields_info.value_stable,
1237                        output_value_fingerprinter: data_fields_info.output_value_fingerprinter,
1238                        def_fp,
1239                    })
1240                })
1241            })
1242            .collect::<Result<Vec<_>>>()?;
1243        for (setup_key, desired_setup_state) in declarations_output {
1244            let decl_ss = exec_ctx::AnalyzedTargetSetupState {
1245                target_kind: target_kind.to_string(),
1246                setup_key,
1247                desired_setup_state,
1248                setup_by_user: false,
1249                key_type: None,
1250                attachments: IndexMap::new(),
1251            };
1252            declarations_analyzed_ss.push(decl_ss);
1253        }
1254        Ok(analyzed_export_ops)
1255    }
1256
1257    async fn analyze_op_scope(
1258        &self,
1259        op_scope: &Arc<OpScope>,
1260        reactive_ops: &[NamedSpec<ReactiveOpSpec>],
1261    ) -> Result<impl Future<Output = Result<AnalyzedOpScope>> + Send + use<>> {
1262        let mut op_futs = Vec::with_capacity(reactive_ops.len());
1263        for reactive_op in reactive_ops.iter() {
1264            op_futs.push(self.analyze_reactive_op(op_scope, reactive_op).await?);
1265        }
1266        let collector_len = op_scope.states.lock().unwrap().collectors.len();
1267        let scope_qualifier = self.build_scope_qualifier(op_scope);
1268        let result_fut = async move {
1269            Ok(AnalyzedOpScope {
1270                reactive_ops: try_join_all(op_futs).await?,
1271                collector_len,
1272                scope_qualifier,
1273            })
1274        };
1275        Ok(result_fut)
1276    }
1277
1278    fn build_scope_qualifier(&self, op_scope: &Arc<OpScope>) -> String {
1279        let mut scope_names = Vec::new();
1280        let mut current_scope = op_scope.as_ref();
1281
1282        // Walk up the parent chain to collect scope names
1283        while let Some((parent, _)) = &current_scope.parent {
1284            scope_names.push(current_scope.name.as_str());
1285            current_scope = parent.as_ref();
1286        }
1287
1288        // Reverse to get the correct order (root to leaf)
1289        scope_names.reverse();
1290
1291        // Build the qualifier string
1292        let mut result = String::new();
1293        for name in scope_names {
1294            result.push_str(name);
1295            result.push('.');
1296        }
1297        result
1298    }
1299}
1300
1301pub fn build_flow_instance_context(flow_inst_name: &str) -> Arc<FlowInstanceContext> {
1302    Arc::new(FlowInstanceContext {
1303        flow_instance_name: flow_inst_name.to_string(),
1304        auth_registry: get_auth_registry().clone(),
1305    })
1306}
1307
1308fn build_flow_schema(root_op_scope: &OpScope) -> Result<FlowSchema> {
1309    let schema = (&root_op_scope.data.lock().unwrap().data).try_into()?;
1310    let root_op_scope_schema = root_op_scope.states.lock().unwrap().build_op_scope_schema();
1311    Ok(FlowSchema {
1312        schema,
1313        root_op_scope: root_op_scope_schema,
1314    })
1315}
1316
1317pub async fn analyze_flow(
1318    flow_inst: &FlowInstanceSpec,
1319    flow_ctx: Arc<FlowInstanceContext>,
1320) -> Result<(
1321    FlowSchema,
1322    AnalyzedSetupState,
1323    impl Future<Output = Result<ExecutionPlan>> + Send + use<>,
1324)> {
1325    let analyzer_ctx = AnalyzerContext {
1326        lib_ctx: get_lib_context().await?,
1327        flow_ctx,
1328    };
1329    let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new()));
1330    let root_op_scope = OpScope::new(
1331        ROOT_SCOPE_NAME.to_string(),
1332        None,
1333        root_data_scope,
1334        FieldDefFingerprint::default(),
1335    );
1336    let mut import_ops_futs = Vec::with_capacity(flow_inst.import_ops.len());
1337    for import_op in flow_inst.import_ops.iter() {
1338        import_ops_futs.push(
1339            analyzer_ctx
1340                .analyze_import_op(&root_op_scope, import_op.clone())
1341                .await
1342                .with_context(|| format!("Preparing for import op: {}", import_op.name))?,
1343        );
1344    }
1345    let op_scope_fut = analyzer_ctx
1346        .analyze_op_scope(&root_op_scope, &flow_inst.reactive_ops)
1347        .await?;
1348
1349    #[derive(Default)]
1350    struct TargetOpGroup {
1351        export_op_ids: Vec<usize>,
1352        declarations: Vec<serde_json::Value>,
1353    }
1354    let mut target_op_group = IndexMap::<String, TargetOpGroup>::new();
1355    for (idx, export_op) in flow_inst.export_ops.iter().enumerate() {
1356        target_op_group
1357            .entry(export_op.spec.target.kind.clone())
1358            .or_default()
1359            .export_op_ids
1360            .push(idx);
1361    }
1362    for declaration in flow_inst.declarations.iter() {
1363        target_op_group
1364            .entry(declaration.kind.clone())
1365            .or_default()
1366            .declarations
1367            .push(serde_json::Value::Object(declaration.spec.clone()));
1368    }
1369
1370    let mut export_ops_futs = vec![];
1371    let mut analyzed_target_op_groups = vec![];
1372
1373    let mut targets_analyzed_ss = Vec::with_capacity(flow_inst.export_ops.len());
1374    targets_analyzed_ss.resize_with(flow_inst.export_ops.len(), || None);
1375
1376    let mut declarations_analyzed_ss = Vec::with_capacity(flow_inst.declarations.len());
1377
1378    for (target_kind, op_ids) in target_op_group.into_iter() {
1379        let target_factory = get_target_factory(&target_kind)?;
1380        let analyzed_target_op_group = AnalyzedExportTargetOpGroup {
1381            target_factory,
1382            target_kind: target_kind.clone(),
1383            op_idx: op_ids.export_op_ids,
1384        };
1385        export_ops_futs.extend(
1386            analyzer_ctx
1387                .analyze_export_op_group(
1388                    target_kind.as_str(),
1389                    &root_op_scope,
1390                    flow_inst,
1391                    &analyzed_target_op_group,
1392                    op_ids.declarations,
1393                    &mut targets_analyzed_ss,
1394                    &mut declarations_analyzed_ss,
1395                )
1396                .await
1397                .with_context(|| format!("Analyzing export ops for target `{target_kind}`"))?,
1398        );
1399        analyzed_target_op_groups.push(analyzed_target_op_group);
1400    }
1401
1402    let flow_schema = build_flow_schema(&root_op_scope)?;
1403    let analyzed_ss = exec_ctx::AnalyzedSetupState {
1404        targets: targets_analyzed_ss
1405            .into_iter()
1406            .enumerate()
1407            .map(|(idx, v)| v.ok_or_else(|| internal_error!("target op `{}` not found", idx)))
1408            .collect::<Result<Vec<_>>>()?,
1409        declarations: declarations_analyzed_ss,
1410    };
1411
1412    let legacy_fingerprint_v1 = Fingerprinter::default()
1413        .with(&flow_inst)?
1414        .with(&flow_schema.schema)?
1415        .into_fingerprint();
1416
1417    fn append_reactive_op_scope(
1418        mut fingerprinter: Fingerprinter,
1419        reactive_ops: &[NamedSpec<ReactiveOpSpec>],
1420    ) -> Result<Fingerprinter> {
1421        fingerprinter = fingerprinter.with(&reactive_ops.len())?;
1422        for reactive_op in reactive_ops.iter() {
1423            fingerprinter = fingerprinter.with(&reactive_op.name)?;
1424            match &reactive_op.spec {
1425                ReactiveOpSpec::Transform(_) => {}
1426                ReactiveOpSpec::ForEach(foreach_op) => {
1427                    fingerprinter = fingerprinter.with(&foreach_op.field_path)?;
1428                    fingerprinter =
1429                        append_reactive_op_scope(fingerprinter, &foreach_op.op_scope.ops)?;
1430                }
1431                ReactiveOpSpec::Collect(collect_op) => {
1432                    fingerprinter = fingerprinter.with(collect_op)?;
1433                }
1434            }
1435        }
1436        Ok(fingerprinter)
1437    }
1438    let current_fingerprinter =
1439        append_reactive_op_scope(Fingerprinter::default(), &flow_inst.reactive_ops)?
1440            .with(&flow_inst.export_ops)?
1441            .with(&flow_inst.declarations)?
1442            .with(&flow_schema.schema)?;
1443    let plan_fut = async move {
1444        let (import_ops, op_scope, export_ops) = try_join3(
1445            try_join_all(import_ops_futs),
1446            op_scope_fut,
1447            try_join_all(export_ops_futs),
1448        )
1449        .await?;
1450
1451        fn append_function_behavior(
1452            mut fingerprinter: Fingerprinter,
1453            reactive_ops: &[AnalyzedReactiveOp],
1454        ) -> Result<Fingerprinter> {
1455            for reactive_op in reactive_ops.iter() {
1456                match reactive_op {
1457                    AnalyzedReactiveOp::Transform(transform_op) => {
1458                        fingerprinter = fingerprinter.with(&transform_op.name)?.with(
1459                            &transform_op
1460                                .function_exec_info
1461                                .fingerprinter
1462                                .clone()
1463                                .into_fingerprint(),
1464                        )?;
1465                    }
1466                    AnalyzedReactiveOp::ForEach(foreach_op) => {
1467                        fingerprinter = append_function_behavior(
1468                            fingerprinter,
1469                            &foreach_op.op_scope.reactive_ops,
1470                        )?;
1471                    }
1472                    _ => {}
1473                }
1474            }
1475            Ok(fingerprinter)
1476        }
1477        let legacy_fingerprint_v2 =
1478            append_function_behavior(current_fingerprinter, &op_scope.reactive_ops)?
1479                .into_fingerprint();
1480        Ok(ExecutionPlan {
1481            legacy_fingerprint: vec![legacy_fingerprint_v1, legacy_fingerprint_v2],
1482            import_ops,
1483            op_scope,
1484            export_ops,
1485            export_op_groups: analyzed_target_op_groups,
1486        })
1487    };
1488
1489    Ok((flow_schema, analyzed_ss, plan_fut))
1490}
1491
1492pub async fn analyze_transient_flow<'a>(
1493    flow_inst: &TransientFlowSpec,
1494    flow_ctx: Arc<FlowInstanceContext>,
1495) -> Result<(
1496    EnrichedValueType,
1497    FlowSchema,
1498    impl Future<Output = Result<TransientExecutionPlan>> + Send + 'a,
1499)> {
1500    let mut root_data_scope = DataScopeBuilder::new();
1501    let analyzer_ctx = AnalyzerContext {
1502        lib_ctx: get_lib_context().await?,
1503        flow_ctx,
1504    };
1505    let mut input_fields = vec![];
1506    for field in flow_inst.input_fields.iter() {
1507        let analyzed_field = root_data_scope.add_field(
1508            field.name.clone(),
1509            &field.value_type,
1510            FieldDefFingerprint::default(),
1511        )?;
1512        input_fields.push(analyzed_field);
1513    }
1514    let root_op_scope = OpScope::new(
1515        ROOT_SCOPE_NAME.to_string(),
1516        None,
1517        Arc::new(Mutex::new(root_data_scope)),
1518        FieldDefFingerprint::default(),
1519    );
1520    let op_scope_fut = analyzer_ctx
1521        .analyze_op_scope(&root_op_scope, &flow_inst.reactive_ops)
1522        .await?;
1523    let (output_value, output_type, _) =
1524        analyze_value_mapping(&flow_inst.output_value, &root_op_scope)?;
1525    let data_schema = build_flow_schema(&root_op_scope)?;
1526    let plan_fut = async move {
1527        let op_scope = op_scope_fut.await?;
1528        Ok(TransientExecutionPlan {
1529            input_fields,
1530            op_scope,
1531            output_value,
1532        })
1533    };
1534    Ok((output_type, data_schema, plan_fut))
1535}