1use crate::builder::exec_ctx::AnalyzedSetupState;
14use crate::ops::{
15 get_attachment_factory, get_function_factory, get_source_factory, get_target_factory,
16};
17use crate::prelude::*;
18
19use super::plan::*;
20use crate::lib_context::get_auth_registry;
21use crate::{
22 base::{schema::*, spec::*},
23 ops::interface::*,
24};
25use futures::future::{BoxFuture, try_join3};
26use futures::{FutureExt, future::try_join_all};
27use std::time::Duration;
28use utils::fingerprint::Fingerprinter;
29
30const TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800);
31
32#[derive(Debug)]
33pub(super) enum ValueTypeBuilder {
34 Basic(BasicValueType),
35 Struct(StructSchemaBuilder),
36 Table(TableSchemaBuilder),
37}
38
39impl TryFrom<&ValueType> for ValueTypeBuilder {
40 type Error = Error;
41
42 fn try_from(value_type: &ValueType) -> std::result::Result<Self, Self::Error> {
43 match value_type {
44 ValueType::Basic(basic_type) => Ok(ValueTypeBuilder::Basic(basic_type.clone())),
45 ValueType::Struct(struct_type) => Ok(ValueTypeBuilder::Struct(struct_type.try_into()?)),
46 ValueType::Table(table_type) => Ok(ValueTypeBuilder::Table(table_type.try_into()?)),
47 }
48 }
49}
50
51impl TryInto<ValueType> for &ValueTypeBuilder {
52 type Error = Error;
53
54 fn try_into(self) -> std::result::Result<ValueType, Self::Error> {
55 match self {
56 ValueTypeBuilder::Basic(basic_type) => Ok(ValueType::Basic(basic_type.clone())),
57 ValueTypeBuilder::Struct(struct_type) => Ok(ValueType::Struct(struct_type.try_into()?)),
58 ValueTypeBuilder::Table(table_type) => Ok(ValueType::Table(table_type.try_into()?)),
59 }
60 }
61}
62
63#[derive(Default, Debug)]
64pub(super) struct StructSchemaBuilder {
65 fields: Vec<FieldSchema<ValueTypeBuilder>>,
66 field_name_idx: HashMap<FieldName, u32>,
67 description: Option<Arc<str>>,
68}
69
70impl StructSchemaBuilder {
71 fn add_field(&mut self, field: FieldSchema<ValueTypeBuilder>) -> Result<u32> {
72 let field_idx = self.fields.len() as u32;
73 match self.field_name_idx.entry(field.name.clone()) {
74 std::collections::hash_map::Entry::Occupied(_) => {
75 client_bail!("Field name already exists: {}", field.name);
76 }
77 std::collections::hash_map::Entry::Vacant(entry) => {
78 entry.insert(field_idx);
79 }
80 }
81 self.fields.push(field);
82 Ok(field_idx)
83 }
84
85 pub fn find_field(&self, field_name: &'_ str) -> Option<(u32, &FieldSchema<ValueTypeBuilder>)> {
86 self.field_name_idx
87 .get(field_name)
88 .map(|&field_idx| (field_idx, &self.fields[field_idx as usize]))
89 }
90}
91
92impl TryFrom<&StructSchema> for StructSchemaBuilder {
93 type Error = Error;
94
95 fn try_from(schema: &StructSchema) -> std::result::Result<Self, Self::Error> {
96 let mut result = StructSchemaBuilder {
97 fields: Vec::with_capacity(schema.fields.len()),
98 field_name_idx: HashMap::with_capacity(schema.fields.len()),
99 description: schema.description.clone(),
100 };
101 for field in schema.fields.iter() {
102 result.add_field(FieldSchema::<ValueTypeBuilder>::from_alternative(field)?)?;
103 }
104 Ok(result)
105 }
106}
107
108impl TryInto<StructSchema> for &StructSchemaBuilder {
109 type Error = Error;
110
111 fn try_into(self) -> std::result::Result<StructSchema, Self::Error> {
112 Ok(StructSchema {
113 fields: Arc::new(
114 self.fields
115 .iter()
116 .map(FieldSchema::<ValueType>::from_alternative)
117 .collect::<std::result::Result<Vec<_>, _>>()?,
118 ),
119 description: self.description.clone(),
120 })
121 }
122}
123
124#[derive(Debug)]
125pub(super) struct TableSchemaBuilder {
126 pub kind: TableKind,
127 pub sub_scope: Arc<Mutex<DataScopeBuilder>>,
128}
129
130impl TryFrom<&TableSchema> for TableSchemaBuilder {
131 type Error = Error;
132
133 fn try_from(schema: &TableSchema) -> std::result::Result<Self, Self::Error> {
134 Ok(Self {
135 kind: schema.kind,
136 sub_scope: Arc::new(Mutex::new(DataScopeBuilder {
137 data: (&schema.row).try_into()?,
138 added_fields_def_fp: Default::default(),
139 })),
140 })
141 }
142}
143
144impl TryInto<TableSchema> for &TableSchemaBuilder {
145 type Error = Error;
146
147 fn try_into(self) -> std::result::Result<TableSchema, Self::Error> {
148 let sub_scope = self.sub_scope.lock().unwrap();
149 let row = (&sub_scope.data).try_into()?;
150 Ok(TableSchema {
151 kind: self.kind,
152 row,
153 })
154 }
155}
156
157fn try_make_common_value_type(
158 value_type1: &EnrichedValueType,
159 value_type2: &EnrichedValueType,
160) -> Result<EnrichedValueType> {
161 let typ = match (&value_type1.typ, &value_type2.typ) {
162 (ValueType::Basic(basic_type1), ValueType::Basic(basic_type2)) => {
163 if basic_type1 != basic_type2 {
164 api_bail!("Value types are not compatible: {basic_type1} vs {basic_type2}");
165 }
166 ValueType::Basic(basic_type1.clone())
167 }
168 (ValueType::Struct(struct_type1), ValueType::Struct(struct_type2)) => {
169 let common_schema = try_merge_struct_schemas(struct_type1, struct_type2)?;
170 ValueType::Struct(common_schema)
171 }
172 (ValueType::Table(table_type1), ValueType::Table(table_type2)) => {
173 if table_type1.kind != table_type2.kind {
174 api_bail!(
175 "Collection types are not compatible: {} vs {}",
176 table_type1,
177 table_type2
178 );
179 }
180 let row = try_merge_struct_schemas(&table_type1.row, &table_type2.row)?;
181 ValueType::Table(TableSchema {
182 kind: table_type1.kind,
183 row,
184 })
185 }
186 (t1 @ (ValueType::Basic(_) | ValueType::Struct(_) | ValueType::Table(_)), t2) => {
187 api_bail!("Unmatched types:\n {t1}\n {t2}\n",)
188 }
189 };
190 let common_attrs: Vec<_> = value_type1
191 .attrs
192 .iter()
193 .filter_map(|(k, v)| {
194 if value_type2.attrs.get(k) == Some(v) {
195 Some((k, v))
196 } else {
197 None
198 }
199 })
200 .collect();
201 let attrs = if common_attrs.len() == value_type1.attrs.len() {
202 value_type1.attrs.clone()
203 } else {
204 Arc::new(
205 common_attrs
206 .into_iter()
207 .map(|(k, v)| (k.clone(), v.clone()))
208 .collect(),
209 )
210 };
211
212 Ok(EnrichedValueType {
213 typ,
214 nullable: value_type1.nullable || value_type2.nullable,
215 attrs,
216 })
217}
218
219fn try_merge_fields_schemas(
220 schema1: &[FieldSchema],
221 schema2: &[FieldSchema],
222) -> Result<Vec<FieldSchema>> {
223 if schema1.len() != schema2.len() {
224 api_bail!(
225 "Fields are not compatible as they have different fields count:\n ({})\n ({})\n",
226 schema1
227 .iter()
228 .map(|f| f.to_string())
229 .collect::<Vec<_>>()
230 .join(", "),
231 schema2
232 .iter()
233 .map(|f| f.to_string())
234 .collect::<Vec<_>>()
235 .join(", ")
236 );
237 }
238 let mut result_fields = Vec::with_capacity(schema1.len());
239 for (field1, field2) in schema1.iter().zip(schema2.iter()) {
240 if field1.name != field2.name {
241 api_bail!(
242 "Structs are not compatible as they have incompatible field names `{}` vs `{}`",
243 field1.name,
244 field2.name
245 );
246 }
247 result_fields.push(FieldSchema {
248 name: field1.name.clone(),
249 value_type: try_make_common_value_type(&field1.value_type, &field2.value_type)?,
250 description: None,
251 });
252 }
253 Ok(result_fields)
254}
255
256fn try_merge_struct_schemas(
257 schema1: &StructSchema,
258 schema2: &StructSchema,
259) -> Result<StructSchema> {
260 let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
261 Ok(StructSchema {
262 fields: Arc::new(fields),
263 description: schema1
264 .description
265 .clone()
266 .or_else(|| schema2.description.clone()),
267 })
268}
269
270fn try_merge_collector_schemas(
271 schema1: &CollectorSchema,
272 schema2: &CollectorSchema,
273) -> Result<CollectorSchema> {
274 let schema1_fields = &schema1.fields;
275 let schema2_fields = &schema2.fields;
276
277 let field_map: HashMap<FieldName, usize> = schema1_fields
279 .iter()
280 .enumerate()
281 .map(|(i, f)| (f.name.clone(), i))
282 .collect();
283
284 let mut output_fields = Vec::new();
285 let mut next_field_id_1 = 0;
286 let mut next_field_id_2 = 0;
287
288 for (idx, field) in schema2_fields.iter().enumerate() {
289 if let Some(&idx1) = field_map.get(&field.name) {
290 if idx1 < next_field_id_1 {
291 api_bail!(
292 "Common fields are expected to have consistent order across different `collect()` calls, but got different orders between fields '{}' and '{}'",
293 field.name,
294 schema1_fields[next_field_id_1 - 1].name
295 );
296 }
297 for i in next_field_id_1..idx1 {
299 output_fields.push(schema1_fields[i].clone());
300 }
301 for i in next_field_id_2..idx {
303 output_fields.push(schema2_fields[i].clone());
304 }
305 let merged_type =
307 try_make_common_value_type(&schema1_fields[idx1].value_type, &field.value_type)?;
308 output_fields.push(FieldSchema {
309 name: field.name.clone(),
310 value_type: merged_type,
311 description: None,
312 });
313 next_field_id_1 = idx1 + 1;
314 next_field_id_2 = idx + 1;
315 }
317 }
318
319 for i in next_field_id_1..schema1_fields.len() {
321 output_fields.push(schema1_fields[i].clone());
322 }
323
324 for i in next_field_id_2..schema2_fields.len() {
326 output_fields.push(schema2_fields[i].clone());
327 }
328
329 let auto_uuid_field_idx = match (schema1.auto_uuid_field_idx, schema2.auto_uuid_field_idx) {
331 (Some(idx1), Some(idx2)) => {
332 let name1 = &schema1_fields[idx1].name;
333 let name2 = &schema2_fields[idx2].name;
334 if name1 == name2 {
335 output_fields.iter().position(|f| &f.name == name1)
337 } else {
338 api_bail!(
339 "Generated UUID fields must have the same name across different `collect()` calls, got different names: '{}' vs '{}'",
340 name1,
341 name2
342 );
343 }
344 }
345 (Some(_), None) | (None, Some(_)) => {
346 api_bail!(
347 "The generated UUID field, once present for one `collect()`, must be consistently present for other `collect()` calls for the same collector"
348 );
349 }
350 (None, None) => None,
351 };
352
353 Ok(CollectorSchema {
354 fields: output_fields,
355 auto_uuid_field_idx,
356 })
357}
358
359struct FieldDefFingerprintBuilder {
360 source_op_names: HashSet<String>,
361 fingerprinter: Fingerprinter,
362}
363
364impl FieldDefFingerprintBuilder {
365 pub fn new() -> Self {
366 Self {
367 source_op_names: HashSet::new(),
368 fingerprinter: Fingerprinter::default(),
369 }
370 }
371
372 pub fn add(&mut self, key: Option<&str>, def_fp: FieldDefFingerprint) -> Result<()> {
373 self.source_op_names.extend(def_fp.source_op_names);
374 let mut fingerprinter = std::mem::take(&mut self.fingerprinter);
375 if let Some(key) = key {
376 fingerprinter = fingerprinter.with(key)?;
377 }
378 fingerprinter = fingerprinter.with(def_fp.fingerprint.as_slice())?;
379 self.fingerprinter = fingerprinter;
380 Ok(())
381 }
382
383 pub fn build(self) -> FieldDefFingerprint {
384 FieldDefFingerprint {
385 source_op_names: self.source_op_names,
386 fingerprint: self.fingerprinter.into_fingerprint(),
387 }
388 }
389}
390
391#[derive(Debug)]
392pub(super) struct CollectorBuilder {
393 pub schema: Arc<CollectorSchema>,
394 pub is_used: bool,
395 pub def_fps: Vec<FieldDefFingerprint>,
396}
397
398impl CollectorBuilder {
399 pub fn new(schema: Arc<CollectorSchema>, def_fp: FieldDefFingerprint) -> Self {
400 Self {
401 schema,
402 is_used: false,
403 def_fps: vec![def_fp],
404 }
405 }
406
407 pub fn collect(&mut self, schema: &CollectorSchema, def_fp: FieldDefFingerprint) -> Result<()> {
408 if self.is_used {
409 api_bail!("Collector is already used");
410 }
411 let existing_schema = Arc::make_mut(&mut self.schema);
412 *existing_schema = try_merge_collector_schemas(existing_schema, schema)?;
413 self.def_fps.push(def_fp);
414 Ok(())
415 }
416
417 pub fn use_collection(&mut self) -> Result<(Arc<CollectorSchema>, FieldDefFingerprint)> {
418 self.is_used = true;
419
420 self.def_fps
421 .sort_by(|a, b| a.fingerprint.as_slice().cmp(b.fingerprint.as_slice()));
422 let mut def_fp_builder = FieldDefFingerprintBuilder::new();
423 for def_fp in self.def_fps.iter() {
424 def_fp_builder.add(None, def_fp.clone())?;
425 }
426 Ok((self.schema.clone(), def_fp_builder.build()))
427 }
428}
429
430#[derive(Debug)]
431pub(super) struct DataScopeBuilder {
432 pub data: StructSchemaBuilder,
433 pub added_fields_def_fp: IndexMap<FieldName, FieldDefFingerprint>,
434}
435
436impl DataScopeBuilder {
437 pub fn new() -> Self {
438 Self {
439 data: Default::default(),
440 added_fields_def_fp: Default::default(),
441 }
442 }
443
444 pub fn last_field(&self) -> Option<&FieldSchema<ValueTypeBuilder>> {
445 self.data.fields.last()
446 }
447
448 pub fn add_field(
449 &mut self,
450 name: FieldName,
451 value_type: &EnrichedValueType,
452 def_fp: FieldDefFingerprint,
453 ) -> Result<AnalyzedOpOutput> {
454 let field_index = self.data.add_field(FieldSchema {
455 name: name.clone(),
456 value_type: EnrichedValueType::from_alternative(value_type)?,
457 description: None,
458 })?;
459 self.added_fields_def_fp.insert(name, def_fp);
460 Ok(AnalyzedOpOutput {
461 field_idx: field_index,
462 })
463 }
464
465 pub fn analyze_field_path<'a>(
467 &'a self,
468 field_path: &'_ FieldPath,
469 base_def_fp: FieldDefFingerprint,
470 ) -> Result<(
471 AnalyzedLocalFieldReference,
472 &'a EnrichedValueType<ValueTypeBuilder>,
473 FieldDefFingerprint,
474 )> {
475 let mut indices = Vec::with_capacity(field_path.len());
476 let mut struct_schema = &self.data;
477 let mut def_fp = base_def_fp;
478
479 if field_path.is_empty() {
480 client_bail!("Field path is empty");
481 }
482
483 let mut i = 0;
484 let value_type = loop {
485 let field_name = &field_path[i];
486 let (field_idx, field) = struct_schema.find_field(field_name).ok_or_else(|| {
487 api_error!("Field {} not found", field_path[0..(i + 1)].join("."))
488 })?;
489 if let Some(added_def_fp) = self.added_fields_def_fp.get(field_name) {
490 def_fp = added_def_fp.clone();
491 } else {
492 def_fp.fingerprint = Fingerprinter::default()
493 .with(&("field", &def_fp.fingerprint, field_name))?
494 .into_fingerprint();
495 };
496 indices.push(field_idx);
497 if i + 1 >= field_path.len() {
498 break &field.value_type;
499 }
500 i += 1;
501
502 struct_schema = match &field.value_type.typ {
503 ValueTypeBuilder::Struct(struct_type) => struct_type,
504 _ => {
505 api_bail!("Field {} is not a struct", field_path[0..(i + 1)].join("."));
506 }
507 };
508 };
509 Ok((
510 AnalyzedLocalFieldReference {
511 fields_idx: indices,
512 },
513 value_type,
514 def_fp,
515 ))
516 }
517}
518
519pub(super) struct AnalyzerContext {
520 pub lib_ctx: Arc<LibContext>,
521 pub flow_ctx: Arc<FlowInstanceContext>,
522}
523
524#[derive(Debug, Default)]
525pub(super) struct OpScopeStates {
526 pub op_output_types: HashMap<FieldName, EnrichedValueType>,
527 pub collectors: IndexMap<FieldName, CollectorBuilder>,
528 pub sub_scopes: HashMap<String, Arc<OpScopeSchema>>,
529}
530
531impl OpScopeStates {
532 pub fn add_collector(
533 &mut self,
534 collector_name: FieldName,
535 schema: CollectorSchema,
536 def_fp: FieldDefFingerprint,
537 ) -> Result<AnalyzedLocalCollectorReference> {
538 let existing_len = self.collectors.len();
539 let idx = match self.collectors.entry(collector_name) {
540 indexmap::map::Entry::Occupied(mut entry) => {
541 entry.get_mut().collect(&schema, def_fp)?;
542 entry.index()
543 }
544 indexmap::map::Entry::Vacant(entry) => {
545 entry.insert(CollectorBuilder::new(Arc::new(schema), def_fp));
546 existing_len
547 }
548 };
549 Ok(AnalyzedLocalCollectorReference {
550 collector_idx: idx as u32,
551 })
552 }
553
554 pub fn consume_collector(
555 &mut self,
556 collector_name: &FieldName,
557 ) -> Result<(
558 AnalyzedLocalCollectorReference,
559 Arc<CollectorSchema>,
560 FieldDefFingerprint,
561 )> {
562 let (collector_idx, _, collector) = self
563 .collectors
564 .get_full_mut(collector_name)
565 .ok_or_else(|| api_error!("Collector not found: {}", collector_name))?;
566 let (schema, def_fp) = collector.use_collection()?;
567 Ok((
568 AnalyzedLocalCollectorReference {
569 collector_idx: collector_idx as u32,
570 },
571 schema,
572 def_fp,
573 ))
574 }
575
576 fn build_op_scope_schema(&self) -> OpScopeSchema {
577 OpScopeSchema {
578 op_output_types: self
579 .op_output_types
580 .iter()
581 .map(|(name, value_type)| (name.clone(), value_type.without_attrs()))
582 .collect(),
583 collectors: self
584 .collectors
585 .iter()
586 .map(|(name, schema)| NamedSpec {
587 name: name.clone(),
588 spec: schema.schema.clone(),
589 })
590 .collect(),
591 op_scopes: self.sub_scopes.clone(),
592 }
593 }
594}
595
596#[derive(Debug)]
597pub struct OpScope {
598 pub name: String,
599 pub parent: Option<(Arc<OpScope>, spec::FieldPath)>,
600 pub(super) data: Arc<Mutex<DataScopeBuilder>>,
601 pub(super) states: Mutex<OpScopeStates>,
602 pub(super) base_value_def_fp: FieldDefFingerprint,
603}
604
605struct Iter<'a>(Option<&'a OpScope>);
606
607impl<'a> Iterator for Iter<'a> {
608 type Item = &'a OpScope;
609
610 fn next(&mut self) -> Option<Self::Item> {
611 match self.0 {
612 Some(scope) => {
613 self.0 = scope.parent.as_ref().map(|(parent, _)| parent.as_ref());
614 Some(scope)
615 }
616 None => None,
617 }
618 }
619}
620
621impl OpScope {
622 pub(super) fn new(
623 name: String,
624 parent: Option<(Arc<OpScope>, spec::FieldPath)>,
625 data: Arc<Mutex<DataScopeBuilder>>,
626 base_value_def_fp: FieldDefFingerprint,
627 ) -> Arc<Self> {
628 Arc::new(Self {
629 name,
630 parent,
631 data,
632 states: Mutex::default(),
633 base_value_def_fp,
634 })
635 }
636
637 fn add_op_output(
638 &self,
639 name: FieldName,
640 value_type: EnrichedValueType,
641 def_fp: FieldDefFingerprint,
642 ) -> Result<AnalyzedOpOutput> {
643 let op_output = self
644 .data
645 .lock()
646 .unwrap()
647 .add_field(name.clone(), &value_type, def_fp)?;
648 self.states
649 .lock()
650 .unwrap()
651 .op_output_types
652 .insert(name, value_type);
653 Ok(op_output)
654 }
655
656 pub fn ancestors(&self) -> impl Iterator<Item = &OpScope> {
657 Iter(Some(self))
658 }
659
660 pub fn is_op_scope_descendant(&self, other: &Self) -> bool {
661 if self == other {
662 return true;
663 }
664 match &self.parent {
665 Some((parent, _)) => parent.is_op_scope_descendant(other),
666 None => false,
667 }
668 }
669
670 pub(super) fn new_foreach_op_scope(
671 self: &Arc<Self>,
672 scope_name: String,
673 field_path: &FieldPath,
674 ) -> Result<(AnalyzedLocalFieldReference, Arc<Self>)> {
675 let (local_field_ref, sub_data_scope, def_fp) = {
676 let data_scope = self.data.lock().unwrap();
677 let (local_field_ref, value_type, def_fp) =
678 data_scope.analyze_field_path(field_path, self.base_value_def_fp.clone())?;
679 let sub_data_scope = match &value_type.typ {
680 ValueTypeBuilder::Table(table_type) => table_type.sub_scope.clone(),
681 _ => api_bail!("ForEach only works on collection, field {field_path} is not"),
682 };
683 (local_field_ref, sub_data_scope, def_fp)
684 };
685 let sub_op_scope = OpScope::new(
686 scope_name,
687 Some((self.clone(), field_path.clone())),
688 sub_data_scope,
689 def_fp,
690 );
691 Ok((local_field_ref, sub_op_scope))
692 }
693}
694
695impl std::fmt::Display for OpScope {
696 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
697 if let Some((scope, field_path)) = &self.parent {
698 write!(f, "{} [{} AS {}]", scope, field_path, self.name)?;
699 } else {
700 write!(f, "[{}]", self.name)?;
701 }
702 Ok(())
703 }
704}
705
706impl PartialEq for OpScope {
707 fn eq(&self, other: &Self) -> bool {
708 std::ptr::eq(self, other)
709 }
710}
711impl Eq for OpScope {}
712
713fn find_scope<'a>(scope_name: &ScopeName, op_scope: &'a OpScope) -> Result<(u32, &'a OpScope)> {
714 let (up_level, scope) = op_scope
715 .ancestors()
716 .enumerate()
717 .find(|(_, s)| &s.name == scope_name)
718 .ok_or_else(|| api_error!("Scope not found: {}", scope_name))?;
719 Ok((up_level as u32, scope))
720}
721
722fn analyze_struct_mapping(
723 mapping: &StructMapping,
724 op_scope: &OpScope,
725) -> Result<(AnalyzedStructMapping, Vec<FieldSchema>, FieldDefFingerprint)> {
726 let mut field_mappings = Vec::with_capacity(mapping.fields.len());
727 let mut field_schemas = Vec::with_capacity(mapping.fields.len());
728
729 let mut fields_def_fps = Vec::with_capacity(mapping.fields.len());
730 for field in mapping.fields.iter() {
731 let (field_mapping, value_type, field_def_fp) =
732 analyze_value_mapping(&field.spec, op_scope)?;
733 field_mappings.push(field_mapping);
734 field_schemas.push(FieldSchema {
735 name: field.name.clone(),
736 value_type,
737 description: None,
738 });
739 fields_def_fps.push((field.name.as_str(), field_def_fp));
740 }
741 fields_def_fps.sort_by_key(|(name, _)| *name);
742 let mut def_fp_builder = FieldDefFingerprintBuilder::new();
743 for (name, def_fp) in fields_def_fps {
744 def_fp_builder.add(Some(name), def_fp)?;
745 }
746 Ok((
747 AnalyzedStructMapping {
748 fields: field_mappings,
749 },
750 field_schemas,
751 def_fp_builder.build(),
752 ))
753}
754
755fn analyze_value_mapping(
756 value_mapping: &ValueMapping,
757 op_scope: &OpScope,
758) -> Result<(AnalyzedValueMapping, EnrichedValueType, FieldDefFingerprint)> {
759 let result = match value_mapping {
760 ValueMapping::Constant(v) => {
761 let value = value::Value::from_json(v.value.clone(), &v.schema.typ)?;
762 let value_mapping = AnalyzedValueMapping::Constant { value };
763 let def_fp = FieldDefFingerprint {
764 source_op_names: HashSet::new(),
765 fingerprint: Fingerprinter::default()
766 .with(&("constant", &v.value, &v.schema.without_attrs()))?
767 .into_fingerprint(),
768 };
769 (value_mapping, v.schema.clone(), def_fp)
770 }
771
772 ValueMapping::Field(v) => {
773 let (scope_up_level, op_scope) = match &v.scope {
774 Some(scope_name) => find_scope(scope_name, op_scope)?,
775 None => (0, op_scope),
776 };
777 let data_scope = op_scope.data.lock().unwrap();
778 let (local_field_ref, value_type, def_fp) =
779 data_scope.analyze_field_path(&v.field_path, op_scope.base_value_def_fp.clone())?;
780 let schema = EnrichedValueType::from_alternative(value_type)?;
781 let value_mapping = AnalyzedValueMapping::Field(AnalyzedFieldReference {
782 local: local_field_ref,
783 scope_up_level,
784 });
785 (value_mapping, schema, def_fp)
786 }
787 };
788 Ok(result)
789}
790
791fn analyze_input_fields(
792 arg_bindings: &[OpArgBinding],
793 op_scope: &OpScope,
794) -> Result<(Vec<OpArgSchema>, FieldDefFingerprint)> {
795 let mut op_arg_schemas = Vec::with_capacity(arg_bindings.len());
796 let mut def_fp_builder = FieldDefFingerprintBuilder::new();
797 for arg_binding in arg_bindings.iter() {
798 let (analyzed_value, value_type, def_fp) =
799 analyze_value_mapping(&arg_binding.value, op_scope)?;
800 let op_arg_schema = OpArgSchema {
801 name: arg_binding.arg_name.clone(),
802 value_type,
803 analyzed_value: analyzed_value.clone(),
804 };
805 def_fp_builder.add(arg_binding.arg_name.0.as_deref(), def_fp)?;
806 op_arg_schemas.push(op_arg_schema);
807 }
808 Ok((op_arg_schemas, def_fp_builder.build()))
809}
810
811fn add_collector(
812 scope_name: &ScopeName,
813 collector_name: FieldName,
814 schema: CollectorSchema,
815 op_scope: &OpScope,
816 def_fp: FieldDefFingerprint,
817) -> Result<AnalyzedCollectorReference> {
818 let (scope_up_level, scope) = find_scope(scope_name, op_scope)?;
819 let local_ref = scope
820 .states
821 .lock()
822 .unwrap()
823 .add_collector(collector_name, schema, def_fp)?;
824 Ok(AnalyzedCollectorReference {
825 local: local_ref,
826 scope_up_level,
827 })
828}
829
830struct ExportDataFieldsInfo {
831 local_collector_ref: AnalyzedLocalCollectorReference,
832 primary_key_def: AnalyzedPrimaryKeyDef,
833 primary_key_schema: Box<[FieldSchema]>,
834 value_fields_idx: Vec<u32>,
835 value_stable: bool,
836 output_value_fingerprinter: Fingerprinter,
837 def_fp: FieldDefFingerprint,
838}
839
840impl AnalyzerContext {
841 pub(super) async fn analyze_import_op(
842 &self,
843 op_scope: &Arc<OpScope>,
844 import_op: NamedSpec<ImportOpSpec>,
845 ) -> Result<impl Future<Output = Result<AnalyzedImportOp>> + Send + use<>> {
846 let source_factory = get_source_factory(&import_op.spec.source.kind)?;
847 let (output_type, executor) = source_factory
848 .build(
849 &import_op.name,
850 serde_json::Value::Object(import_op.spec.source.spec),
851 self.flow_ctx.clone(),
852 )
853 .await?;
854
855 let op_name = import_op.name;
856 let primary_key_schema = Box::from(output_type.typ.key_schema());
857 let def_fp = FieldDefFingerprint {
858 source_op_names: HashSet::from([op_name.clone()]),
859 fingerprint: Fingerprinter::default()
860 .with(&("import", &op_name))?
861 .into_fingerprint(),
862 };
863 let output = op_scope.add_op_output(op_name.clone(), output_type, def_fp)?;
864
865 let concur_control_options = import_op
866 .spec
867 .execution_options
868 .get_concur_control_options();
869 let global_concurrency_controller = self.lib_ctx.global_concurrency_controller.clone();
870 let result_fut = async move {
871 trace!("Start building executor for source op `{op_name}`");
872 let executor = executor
873 .await
874 .with_context(|| format!("Preparing for source op: {op_name}"))?;
875 trace!("Finished building executor for source op `{op_name}`");
876 Ok(AnalyzedImportOp {
877 executor,
878 output,
879 primary_key_schema,
880 name: op_name,
881 refresh_options: import_op.spec.refresh_options,
882 concurrency_controller: concur_control::CombinedConcurrencyController::new(
883 &concur_control_options,
884 global_concurrency_controller,
885 ),
886 })
887 };
888 Ok(result_fut)
889 }
890
891 pub(super) async fn analyze_reactive_op(
892 &self,
893 op_scope: &Arc<OpScope>,
894 reactive_op: &NamedSpec<ReactiveOpSpec>,
895 ) -> Result<BoxFuture<'static, Result<AnalyzedReactiveOp>>> {
896 let reactive_op_clone = reactive_op.clone();
897 let reactive_op_name = reactive_op.name.clone();
898 let result_fut = match reactive_op_clone.spec {
899 ReactiveOpSpec::Transform(op) => {
900 let (input_field_schemas, input_def_fp) =
901 analyze_input_fields(&op.inputs, op_scope).with_context(|| {
902 format!("Preparing inputs for transform op: {}", reactive_op_name)
903 })?;
904 let spec = serde_json::Value::Object(op.op.spec.clone());
905
906 let fn_executor = get_function_factory(&op.op.kind)?;
907 let input_value_mappings = input_field_schemas
908 .iter()
909 .map(|field| field.analyzed_value.clone())
910 .collect();
911 let build_output = fn_executor
912 .build(spec, input_field_schemas, self.flow_ctx.clone())
913 .await?;
914 let output_type = build_output.output_type.typ.clone();
915 let logic_fingerprinter = Fingerprinter::default()
916 .with(&op.op)?
917 .with(&build_output.output_type.without_attrs())?
918 .with(&build_output.behavior_version)?;
919
920 let def_fp = FieldDefFingerprint {
921 source_op_names: input_def_fp.source_op_names,
922 fingerprint: Fingerprinter::default()
923 .with(&(
924 "transform",
925 &op.op,
926 &input_def_fp.fingerprint,
927 &build_output.behavior_version,
928 ))?
929 .into_fingerprint(),
930 };
931 let output = op_scope.add_op_output(
932 reactive_op_name.clone(),
933 build_output.output_type,
934 def_fp,
935 )?;
936 let op_name = reactive_op_name.clone();
937 let op_kind = op.op.kind.clone();
938
939 let execution_options_timeout = op.execution_options.timeout;
940
941 let behavior_version = build_output.behavior_version;
942 async move {
943 trace!("Start building executor for transform op `{op_name}`");
944 let executor = build_output.executor.await.with_context(|| {
945 format!("Preparing for transform op: {op_name}")
946 })?;
947 let enable_cache = executor.enable_cache();
948 let timeout = executor.timeout()
949 .or(execution_options_timeout)
950 .or(Some(TIMEOUT_THRESHOLD));
951 trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}");
952 let function_exec_info = AnalyzedFunctionExecInfo {
953 enable_cache,
954 timeout,
955 behavior_version,
956 fingerprinter: logic_fingerprinter,
957 output_type
958 };
959 if function_exec_info.enable_cache
960 && function_exec_info.behavior_version.is_none()
961 {
962 api_bail!(
963 "When caching is enabled, behavior version must be specified for transform op: {op_name}"
964 );
965 }
966 Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp {
967 name: op_name,
968 op_kind,
969 inputs: input_value_mappings,
970 function_exec_info,
971 executor,
972 output,
973 }))
974 }
975 .boxed()
976 }
977
978 ReactiveOpSpec::ForEach(foreach_op) => {
979 let (local_field_ref, sub_op_scope) = op_scope.new_foreach_op_scope(
980 foreach_op.op_scope.name.clone(),
981 &foreach_op.field_path,
982 )?;
983 let analyzed_op_scope_fut = {
984 let analyzed_op_scope_fut = self
985 .analyze_op_scope(&sub_op_scope, &foreach_op.op_scope.ops)
986 .boxed_local()
987 .await?;
988 let sub_op_scope_schema =
989 sub_op_scope.states.lock().unwrap().build_op_scope_schema();
990 op_scope
991 .states
992 .lock()
993 .unwrap()
994 .sub_scopes
995 .insert(reactive_op_name.clone(), Arc::new(sub_op_scope_schema));
996 analyzed_op_scope_fut
997 };
998 let op_name = reactive_op_name.clone();
999
1000 let concur_control_options =
1001 foreach_op.execution_options.get_concur_control_options();
1002 async move {
1003 Ok(AnalyzedReactiveOp::ForEach(AnalyzedForEachOp {
1004 local_field_ref,
1005 op_scope: analyzed_op_scope_fut
1006 .await
1007 .with_context(|| format!("Preparing for foreach op: {op_name}"))?,
1008 name: op_name,
1009 concurrency_controller: concur_control::ConcurrencyController::new(
1010 &concur_control_options,
1011 ),
1012 }))
1013 }
1014 .boxed()
1015 }
1016
1017 ReactiveOpSpec::Collect(op) => {
1018 let (struct_mapping, fields_schema, mut def_fp) =
1019 analyze_struct_mapping(&op.input, op_scope)?;
1020 let has_auto_uuid_field = op.auto_uuid_field.is_some();
1021 def_fp.fingerprint = Fingerprinter::default()
1022 .with(&(
1023 "collect",
1024 &def_fp.fingerprint,
1025 &fields_schema,
1026 &has_auto_uuid_field,
1027 ))?
1028 .into_fingerprint();
1029 let fingerprinter = Fingerprinter::default().with(&fields_schema)?;
1030
1031 let input_field_names: Vec<FieldName> =
1032 fields_schema.iter().map(|f| f.name.clone()).collect();
1033 let collector_ref = add_collector(
1034 &op.scope_name,
1035 op.collector_name.clone(),
1036 CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
1037 op_scope,
1038 def_fp,
1039 )?;
1040 let op_scope = op_scope.clone();
1041 async move {
1042 let collector_schema: Arc<CollectorSchema> = {
1044 let scope = find_scope(&op.scope_name, &op_scope)?.1;
1045 let states = scope.states.lock().unwrap();
1046 let collector = states.collectors.get(&op.collector_name).unwrap();
1047 collector.schema.clone()
1048 };
1049
1050 let field_name_to_index: HashMap<&FieldName, usize> = input_field_names
1052 .iter()
1053 .enumerate()
1054 .map(|(i, n)| (n, i))
1055 .collect();
1056 let field_index_mapping = collector_schema
1057 .fields
1058 .iter()
1059 .map(|field| field_name_to_index.get(&field.name).copied())
1060 .collect::<Vec<Option<usize>>>();
1061
1062 let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
1063 name: reactive_op_name,
1064 has_auto_uuid_field,
1065 input: struct_mapping,
1066 input_field_names,
1067 collector_schema,
1068 collector_ref,
1069 field_index_mapping,
1070 fingerprinter,
1071 });
1072 Ok(collect_op)
1073 }
1074 .boxed()
1075 }
1076 };
1077 Ok(result_fut)
1078 }
1079
1080 #[allow(clippy::too_many_arguments)]
1081 async fn analyze_export_op_group(
1082 &self,
1083 target_kind: &str,
1084 op_scope: &Arc<OpScope>,
1085 flow_inst: &FlowInstanceSpec,
1086 export_op_group: &AnalyzedExportTargetOpGroup,
1087 declarations: Vec<serde_json::Value>,
1088 targets_analyzed_ss: &mut [Option<exec_ctx::AnalyzedTargetSetupState>],
1089 declarations_analyzed_ss: &mut Vec<exec_ctx::AnalyzedTargetSetupState>,
1090 ) -> Result<Vec<impl Future<Output = Result<AnalyzedExportOp>> + Send + use<>>> {
1091 let mut collection_specs = Vec::<interface::ExportDataCollectionSpec>::new();
1092 let mut data_fields_infos = Vec::<ExportDataFieldsInfo>::new();
1093 for idx in export_op_group.op_idx.iter() {
1094 let export_op = &flow_inst.export_ops[*idx];
1095 let (local_collector_ref, collector_schema, def_fp) =
1096 op_scope
1097 .states
1098 .lock()
1099 .unwrap()
1100 .consume_collector(&export_op.spec.collector_name)?;
1101 let (value_fields_schema, data_collection_info) =
1102 match &export_op.spec.index_options.primary_key_fields {
1103 Some(fields) => {
1104 let pk_fields_idx = fields
1105 .iter()
1106 .map(|f| {
1107 collector_schema
1108 .fields
1109 .iter()
1110 .position(|field| &field.name == f)
1111 .ok_or_else(|| client_error!("field not found: {}", f))
1112 })
1113 .collect::<Result<Vec<_>>>()?;
1114
1115 let primary_key_schema = pk_fields_idx
1116 .iter()
1117 .map(|idx| collector_schema.fields[*idx].without_attrs())
1118 .collect::<Box<[_]>>();
1119 let mut value_fields_schema: Vec<FieldSchema> = vec![];
1120 let mut value_fields_idx = vec![];
1121 for (idx, field) in collector_schema.fields.iter().enumerate() {
1122 if !pk_fields_idx.contains(&idx) {
1123 value_fields_schema.push(field.without_attrs());
1124 value_fields_idx.push(idx as u32);
1125 }
1126 }
1127 let value_stable = collector_schema
1128 .auto_uuid_field_idx
1129 .as_ref()
1130 .map(|uuid_idx| pk_fields_idx.contains(uuid_idx))
1131 .unwrap_or(false);
1132 let output_value_fingerprinter =
1133 Fingerprinter::default().with(&value_fields_schema)?;
1134 (
1135 value_fields_schema,
1136 ExportDataFieldsInfo {
1137 local_collector_ref,
1138 primary_key_def: AnalyzedPrimaryKeyDef::Fields(pk_fields_idx),
1139 primary_key_schema,
1140 value_fields_idx,
1141 value_stable,
1142 output_value_fingerprinter,
1143 def_fp,
1144 },
1145 )
1146 }
1147 None => {
1148 api_bail!("Primary key fields must be specified")
1150 }
1151 };
1152 collection_specs.push(interface::ExportDataCollectionSpec {
1153 name: export_op.name.clone(),
1154 spec: serde_json::Value::Object(export_op.spec.target.spec.clone()),
1155 key_fields_schema: data_collection_info.primary_key_schema.clone(),
1156 value_fields_schema,
1157 index_options: export_op.spec.index_options.clone(),
1158 });
1159 data_fields_infos.push(data_collection_info);
1160 }
1161 let (data_collections_output, declarations_output) = export_op_group
1162 .target_factory
1163 .clone()
1164 .build(collection_specs, declarations, self.flow_ctx.clone())
1165 .await?;
1166 let analyzed_export_ops = export_op_group
1167 .op_idx
1168 .iter()
1169 .zip(data_collections_output.into_iter())
1170 .zip(data_fields_infos.into_iter())
1171 .map(|((idx, data_coll_output), data_fields_info)| {
1172 let export_op = &flow_inst.export_ops[*idx];
1173 let op_name = export_op.name.clone();
1174 let export_target_factory = export_op_group.target_factory.clone();
1175
1176 let attachments = export_op
1177 .spec
1178 .attachments
1179 .iter()
1180 .map(|attachment| {
1181 let attachment_factory = get_attachment_factory(&attachment.kind)?;
1182 let attachment_state = attachment_factory.get_state(
1183 &op_name,
1184 &export_op.spec.target.spec,
1185 serde_json::Value::Object(attachment.spec.clone()),
1186 )?;
1187 Ok((
1188 interface::AttachmentSetupKey(
1189 attachment.kind.clone(),
1190 attachment_state.setup_key,
1191 ),
1192 attachment_state.setup_state,
1193 ))
1194 })
1195 .collect::<Result<IndexMap<_, _>>>()?;
1196
1197 let export_op_ss = exec_ctx::AnalyzedTargetSetupState {
1198 target_kind: target_kind.to_string(),
1199 setup_key: data_coll_output.setup_key,
1200 desired_setup_state: data_coll_output.desired_setup_state,
1201 setup_by_user: export_op.spec.setup_by_user,
1202 key_type: Some(
1203 data_fields_info
1204 .primary_key_schema
1205 .iter()
1206 .map(|field| field.value_type.typ.clone())
1207 .collect::<Box<[_]>>(),
1208 ),
1209 attachments,
1210 };
1211 targets_analyzed_ss[*idx] = Some(export_op_ss);
1212
1213 let def_fp = FieldDefFingerprint {
1214 source_op_names: data_fields_info.def_fp.source_op_names,
1215 fingerprint: Fingerprinter::default()
1216 .with("export")?
1217 .with(&data_fields_info.def_fp.fingerprint)?
1218 .with(&export_op.spec.target)?
1219 .into_fingerprint(),
1220 };
1221 Ok(async move {
1222 trace!("Start building executor for export op `{op_name}`");
1223 let export_context = data_coll_output
1224 .export_context
1225 .await
1226 .with_context(|| format!("Preparing for export op: {op_name}"))?;
1227 trace!("Finished building executor for export op `{op_name}`");
1228 Ok(AnalyzedExportOp {
1229 name: op_name,
1230 input: data_fields_info.local_collector_ref,
1231 export_target_factory,
1232 export_context,
1233 primary_key_def: data_fields_info.primary_key_def,
1234 primary_key_schema: data_fields_info.primary_key_schema,
1235 value_fields: data_fields_info.value_fields_idx,
1236 value_stable: data_fields_info.value_stable,
1237 output_value_fingerprinter: data_fields_info.output_value_fingerprinter,
1238 def_fp,
1239 })
1240 })
1241 })
1242 .collect::<Result<Vec<_>>>()?;
1243 for (setup_key, desired_setup_state) in declarations_output {
1244 let decl_ss = exec_ctx::AnalyzedTargetSetupState {
1245 target_kind: target_kind.to_string(),
1246 setup_key,
1247 desired_setup_state,
1248 setup_by_user: false,
1249 key_type: None,
1250 attachments: IndexMap::new(),
1251 };
1252 declarations_analyzed_ss.push(decl_ss);
1253 }
1254 Ok(analyzed_export_ops)
1255 }
1256
1257 async fn analyze_op_scope(
1258 &self,
1259 op_scope: &Arc<OpScope>,
1260 reactive_ops: &[NamedSpec<ReactiveOpSpec>],
1261 ) -> Result<impl Future<Output = Result<AnalyzedOpScope>> + Send + use<>> {
1262 let mut op_futs = Vec::with_capacity(reactive_ops.len());
1263 for reactive_op in reactive_ops.iter() {
1264 op_futs.push(self.analyze_reactive_op(op_scope, reactive_op).await?);
1265 }
1266 let collector_len = op_scope.states.lock().unwrap().collectors.len();
1267 let scope_qualifier = self.build_scope_qualifier(op_scope);
1268 let result_fut = async move {
1269 Ok(AnalyzedOpScope {
1270 reactive_ops: try_join_all(op_futs).await?,
1271 collector_len,
1272 scope_qualifier,
1273 })
1274 };
1275 Ok(result_fut)
1276 }
1277
1278 fn build_scope_qualifier(&self, op_scope: &Arc<OpScope>) -> String {
1279 let mut scope_names = Vec::new();
1280 let mut current_scope = op_scope.as_ref();
1281
1282 while let Some((parent, _)) = ¤t_scope.parent {
1284 scope_names.push(current_scope.name.as_str());
1285 current_scope = parent.as_ref();
1286 }
1287
1288 scope_names.reverse();
1290
1291 let mut result = String::new();
1293 for name in scope_names {
1294 result.push_str(name);
1295 result.push('.');
1296 }
1297 result
1298 }
1299}
1300
1301pub fn build_flow_instance_context(flow_inst_name: &str) -> Arc<FlowInstanceContext> {
1302 Arc::new(FlowInstanceContext {
1303 flow_instance_name: flow_inst_name.to_string(),
1304 auth_registry: get_auth_registry().clone(),
1305 })
1306}
1307
1308fn build_flow_schema(root_op_scope: &OpScope) -> Result<FlowSchema> {
1309 let schema = (&root_op_scope.data.lock().unwrap().data).try_into()?;
1310 let root_op_scope_schema = root_op_scope.states.lock().unwrap().build_op_scope_schema();
1311 Ok(FlowSchema {
1312 schema,
1313 root_op_scope: root_op_scope_schema,
1314 })
1315}
1316
1317pub async fn analyze_flow(
1318 flow_inst: &FlowInstanceSpec,
1319 flow_ctx: Arc<FlowInstanceContext>,
1320) -> Result<(
1321 FlowSchema,
1322 AnalyzedSetupState,
1323 impl Future<Output = Result<ExecutionPlan>> + Send + use<>,
1324)> {
1325 let analyzer_ctx = AnalyzerContext {
1326 lib_ctx: get_lib_context().await?,
1327 flow_ctx,
1328 };
1329 let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new()));
1330 let root_op_scope = OpScope::new(
1331 ROOT_SCOPE_NAME.to_string(),
1332 None,
1333 root_data_scope,
1334 FieldDefFingerprint::default(),
1335 );
1336 let mut import_ops_futs = Vec::with_capacity(flow_inst.import_ops.len());
1337 for import_op in flow_inst.import_ops.iter() {
1338 import_ops_futs.push(
1339 analyzer_ctx
1340 .analyze_import_op(&root_op_scope, import_op.clone())
1341 .await
1342 .with_context(|| format!("Preparing for import op: {}", import_op.name))?,
1343 );
1344 }
1345 let op_scope_fut = analyzer_ctx
1346 .analyze_op_scope(&root_op_scope, &flow_inst.reactive_ops)
1347 .await?;
1348
1349 #[derive(Default)]
1350 struct TargetOpGroup {
1351 export_op_ids: Vec<usize>,
1352 declarations: Vec<serde_json::Value>,
1353 }
1354 let mut target_op_group = IndexMap::<String, TargetOpGroup>::new();
1355 for (idx, export_op) in flow_inst.export_ops.iter().enumerate() {
1356 target_op_group
1357 .entry(export_op.spec.target.kind.clone())
1358 .or_default()
1359 .export_op_ids
1360 .push(idx);
1361 }
1362 for declaration in flow_inst.declarations.iter() {
1363 target_op_group
1364 .entry(declaration.kind.clone())
1365 .or_default()
1366 .declarations
1367 .push(serde_json::Value::Object(declaration.spec.clone()));
1368 }
1369
1370 let mut export_ops_futs = vec![];
1371 let mut analyzed_target_op_groups = vec![];
1372
1373 let mut targets_analyzed_ss = Vec::with_capacity(flow_inst.export_ops.len());
1374 targets_analyzed_ss.resize_with(flow_inst.export_ops.len(), || None);
1375
1376 let mut declarations_analyzed_ss = Vec::with_capacity(flow_inst.declarations.len());
1377
1378 for (target_kind, op_ids) in target_op_group.into_iter() {
1379 let target_factory = get_target_factory(&target_kind)?;
1380 let analyzed_target_op_group = AnalyzedExportTargetOpGroup {
1381 target_factory,
1382 target_kind: target_kind.clone(),
1383 op_idx: op_ids.export_op_ids,
1384 };
1385 export_ops_futs.extend(
1386 analyzer_ctx
1387 .analyze_export_op_group(
1388 target_kind.as_str(),
1389 &root_op_scope,
1390 flow_inst,
1391 &analyzed_target_op_group,
1392 op_ids.declarations,
1393 &mut targets_analyzed_ss,
1394 &mut declarations_analyzed_ss,
1395 )
1396 .await
1397 .with_context(|| format!("Analyzing export ops for target `{target_kind}`"))?,
1398 );
1399 analyzed_target_op_groups.push(analyzed_target_op_group);
1400 }
1401
1402 let flow_schema = build_flow_schema(&root_op_scope)?;
1403 let analyzed_ss = exec_ctx::AnalyzedSetupState {
1404 targets: targets_analyzed_ss
1405 .into_iter()
1406 .enumerate()
1407 .map(|(idx, v)| v.ok_or_else(|| internal_error!("target op `{}` not found", idx)))
1408 .collect::<Result<Vec<_>>>()?,
1409 declarations: declarations_analyzed_ss,
1410 };
1411
1412 let legacy_fingerprint_v1 = Fingerprinter::default()
1413 .with(&flow_inst)?
1414 .with(&flow_schema.schema)?
1415 .into_fingerprint();
1416
1417 fn append_reactive_op_scope(
1418 mut fingerprinter: Fingerprinter,
1419 reactive_ops: &[NamedSpec<ReactiveOpSpec>],
1420 ) -> Result<Fingerprinter> {
1421 fingerprinter = fingerprinter.with(&reactive_ops.len())?;
1422 for reactive_op in reactive_ops.iter() {
1423 fingerprinter = fingerprinter.with(&reactive_op.name)?;
1424 match &reactive_op.spec {
1425 ReactiveOpSpec::Transform(_) => {}
1426 ReactiveOpSpec::ForEach(foreach_op) => {
1427 fingerprinter = fingerprinter.with(&foreach_op.field_path)?;
1428 fingerprinter =
1429 append_reactive_op_scope(fingerprinter, &foreach_op.op_scope.ops)?;
1430 }
1431 ReactiveOpSpec::Collect(collect_op) => {
1432 fingerprinter = fingerprinter.with(collect_op)?;
1433 }
1434 }
1435 }
1436 Ok(fingerprinter)
1437 }
1438 let current_fingerprinter =
1439 append_reactive_op_scope(Fingerprinter::default(), &flow_inst.reactive_ops)?
1440 .with(&flow_inst.export_ops)?
1441 .with(&flow_inst.declarations)?
1442 .with(&flow_schema.schema)?;
1443 let plan_fut = async move {
1444 let (import_ops, op_scope, export_ops) = try_join3(
1445 try_join_all(import_ops_futs),
1446 op_scope_fut,
1447 try_join_all(export_ops_futs),
1448 )
1449 .await?;
1450
1451 fn append_function_behavior(
1452 mut fingerprinter: Fingerprinter,
1453 reactive_ops: &[AnalyzedReactiveOp],
1454 ) -> Result<Fingerprinter> {
1455 for reactive_op in reactive_ops.iter() {
1456 match reactive_op {
1457 AnalyzedReactiveOp::Transform(transform_op) => {
1458 fingerprinter = fingerprinter.with(&transform_op.name)?.with(
1459 &transform_op
1460 .function_exec_info
1461 .fingerprinter
1462 .clone()
1463 .into_fingerprint(),
1464 )?;
1465 }
1466 AnalyzedReactiveOp::ForEach(foreach_op) => {
1467 fingerprinter = append_function_behavior(
1468 fingerprinter,
1469 &foreach_op.op_scope.reactive_ops,
1470 )?;
1471 }
1472 _ => {}
1473 }
1474 }
1475 Ok(fingerprinter)
1476 }
1477 let legacy_fingerprint_v2 =
1478 append_function_behavior(current_fingerprinter, &op_scope.reactive_ops)?
1479 .into_fingerprint();
1480 Ok(ExecutionPlan {
1481 legacy_fingerprint: vec![legacy_fingerprint_v1, legacy_fingerprint_v2],
1482 import_ops,
1483 op_scope,
1484 export_ops,
1485 export_op_groups: analyzed_target_op_groups,
1486 })
1487 };
1488
1489 Ok((flow_schema, analyzed_ss, plan_fut))
1490}
1491
1492pub async fn analyze_transient_flow<'a>(
1493 flow_inst: &TransientFlowSpec,
1494 flow_ctx: Arc<FlowInstanceContext>,
1495) -> Result<(
1496 EnrichedValueType,
1497 FlowSchema,
1498 impl Future<Output = Result<TransientExecutionPlan>> + Send + 'a,
1499)> {
1500 let mut root_data_scope = DataScopeBuilder::new();
1501 let analyzer_ctx = AnalyzerContext {
1502 lib_ctx: get_lib_context().await?,
1503 flow_ctx,
1504 };
1505 let mut input_fields = vec![];
1506 for field in flow_inst.input_fields.iter() {
1507 let analyzed_field = root_data_scope.add_field(
1508 field.name.clone(),
1509 &field.value_type,
1510 FieldDefFingerprint::default(),
1511 )?;
1512 input_fields.push(analyzed_field);
1513 }
1514 let root_op_scope = OpScope::new(
1515 ROOT_SCOPE_NAME.to_string(),
1516 None,
1517 Arc::new(Mutex::new(root_data_scope)),
1518 FieldDefFingerprint::default(),
1519 );
1520 let op_scope_fut = analyzer_ctx
1521 .analyze_op_scope(&root_op_scope, &flow_inst.reactive_ops)
1522 .await?;
1523 let (output_value, output_type, _) =
1524 analyze_value_mapping(&flow_inst.output_value, &root_op_scope)?;
1525 let data_schema = build_flow_schema(&root_op_scope)?;
1526 let plan_fut = async move {
1527 let op_scope = op_scope_fut.await?;
1528 Ok(TransientExecutionPlan {
1529 input_fields,
1530 op_scope,
1531 output_value,
1532 })
1533 };
1534 Ok((output_type, data_schema, plan_fut))
1535}