Skip to main content

recoco_core/builder/
flow_builder.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13#[cfg(feature = "persistence")]
14use crate::setup::ObjectSetupChange;
15use crate::{base::schema::EnrichedValueType, builder::plan::FieldDefFingerprint, prelude::*};
16
17use recoco_utils::fingerprint::Fingerprinter;
18#[cfg(feature = "persistence")]
19use std::collections::btree_map;
20use std::ops::Deref;
21
22use super::analyzer::{
23    AnalyzerContext, CollectorBuilder, DataScopeBuilder, OpScope, ValueTypeBuilder,
24    build_flow_instance_context,
25};
26use crate::lib_context::FlowContext;
27use crate::{
28    base::{
29        schema::{CollectorSchema, FieldSchema},
30        spec::{FieldName, NamedSpec},
31    },
32    lib_context::LibContext,
33    ops::interface::FlowInstanceContext,
34};
35
36#[derive(Debug, Clone)]
37pub struct OpScopeRef(Arc<OpScope>);
38
39impl From<Arc<OpScope>> for OpScopeRef {
40    fn from(scope: Arc<OpScope>) -> Self {
41        Self(scope)
42    }
43}
44
45impl Deref for OpScopeRef {
46    type Target = Arc<OpScope>;
47
48    fn deref(&self) -> &Self::Target {
49        &self.0
50    }
51}
52
53impl std::fmt::Display for OpScopeRef {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        write!(f, "{}", self.0)
56    }
57}
58
59impl OpScopeRef {
60    pub fn add_collector(&mut self, name: String) -> Result<DataCollector> {
61        let collector = DataCollector {
62            name,
63            scope: self.0.clone(),
64            collector: Mutex::new(None),
65        };
66        Ok(collector)
67    }
68}
69
70#[derive(Debug, Clone)]
71pub struct DataType {
72    schema: schema::EnrichedValueType,
73}
74
75impl From<schema::EnrichedValueType> for DataType {
76    fn from(schema: schema::EnrichedValueType) -> Self {
77        Self { schema }
78    }
79}
80
81impl DataType {
82    pub fn schema(&self) -> schema::EnrichedValueType {
83        self.schema.clone()
84    }
85}
86
87#[derive(Debug, Clone)]
88pub struct DataSlice {
89    scope: Arc<OpScope>,
90    value: Arc<spec::ValueMapping>,
91}
92
93impl DataSlice {
94    pub fn data_type(&self) -> Result<DataType> {
95        Ok(DataType::from(self.value_type()?))
96    }
97
98    pub fn field(&self, field_name: &str) -> Result<Option<DataSlice>> {
99        let value_mapping = match self.value.as_ref() {
100            spec::ValueMapping::Field(spec::FieldMapping { scope, field_path }) => {
101                let data_scope_builder = self.scope.data.lock().unwrap();
102                let struct_schema = {
103                    let (_, val_type, _) = data_scope_builder
104                        .analyze_field_path(field_path, self.scope.base_value_def_fp.clone())?;
105                    match &val_type.typ {
106                        ValueTypeBuilder::Struct(struct_type) => struct_type,
107                        _ => return Err(client_error!("expect struct type in field path")),
108                    }
109                };
110                if struct_schema.find_field(field_name).is_none() {
111                    return Ok(None);
112                }
113                spec::ValueMapping::Field(spec::FieldMapping {
114                    scope: scope.clone(),
115                    field_path: spec::FieldPath(
116                        field_path
117                            .iter()
118                            .cloned()
119                            .chain([field_name.to_string()])
120                            .collect(),
121                    ),
122                })
123            }
124
125            spec::ValueMapping::Constant { .. } => {
126                return Err(client_error!("field access not supported for literal",));
127            }
128        };
129        Ok(Some(DataSlice {
130            scope: self.scope.clone(),
131            value: Arc::new(value_mapping),
132        }))
133    }
134}
135
136impl DataSlice {
137    fn extract_value_mapping(&self) -> spec::ValueMapping {
138        match self.value.as_ref() {
139            spec::ValueMapping::Field(v) => spec::ValueMapping::Field(spec::FieldMapping {
140                field_path: v.field_path.clone(),
141                scope: v.scope.clone().or_else(|| Some(self.scope.name.clone())),
142            }),
143            v => v.clone(),
144        }
145    }
146
147    fn value_type(&self) -> Result<schema::EnrichedValueType> {
148        let result = match self.value.as_ref() {
149            spec::ValueMapping::Constant(c) => c.schema.clone(),
150            spec::ValueMapping::Field(v) => {
151                let data_scope_builder = self.scope.data.lock().unwrap();
152                let (_, val_type, _) = data_scope_builder
153                    .analyze_field_path(&v.field_path, self.scope.base_value_def_fp.clone())?;
154                EnrichedValueType::from_alternative(val_type)?
155            }
156        };
157        Ok(result)
158    }
159}
160
161impl std::fmt::Display for DataSlice {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        write!(f, "DataSlice(")?;
164        match self.value_type() {
165            Ok(value_type) => write!(f, "{value_type}")?,
166            Err(e) => write!(f, "<error: {}>", e)?,
167        }
168        write!(f, "; {} {}) ", self.scope, self.value)?;
169        Ok(())
170    }
171}
172
173pub struct DataCollector {
174    name: String,
175    scope: Arc<OpScope>,
176    collector: Mutex<Option<CollectorBuilder>>,
177}
178
179impl std::fmt::Display for DataCollector {
180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181        let collector = self.collector.lock().unwrap();
182        write!(f, "DataCollector \"{}\" ({}", self.name, self.scope)?;
183        if let Some(collector) = collector.as_ref() {
184            write!(f, ": {}", collector.schema)?;
185            if collector.is_used {
186                write!(f, " (used)")?;
187            }
188        }
189        write!(f, ")")?;
190        Ok(())
191    }
192}
193
194pub struct FlowBuilder {
195    lib_context: Arc<LibContext>,
196    flow_inst_context: Arc<FlowInstanceContext>,
197
198    root_op_scope: Arc<OpScope>,
199    flow_instance_name: String,
200    reactive_ops: Vec<NamedSpec<spec::ReactiveOpSpec>>,
201
202    direct_input_fields: Vec<FieldSchema>,
203    direct_output_value: Option<spec::ValueMapping>,
204
205    import_ops: Vec<NamedSpec<spec::ImportOpSpec>>,
206    export_ops: Vec<NamedSpec<spec::ExportOpSpec>>,
207
208    declarations: Vec<spec::OpSpec>,
209
210    next_generated_op_id: usize,
211}
212
213impl FlowBuilder {
214    pub async fn new(name: &str) -> Result<Self> {
215        let _span = info_span!("flow_builder.new", flow_name = %name).entered();
216        let lib_context = get_lib_context().await?;
217        let root_op_scope = OpScope::new(
218            spec::ROOT_SCOPE_NAME.to_string(),
219            None,
220            Arc::new(Mutex::new(DataScopeBuilder::new())),
221            FieldDefFingerprint::default(),
222        );
223        let flow_inst_context = build_flow_instance_context(name);
224        let result = Self {
225            lib_context,
226            flow_inst_context,
227            root_op_scope,
228            flow_instance_name: name.to_string(),
229
230            reactive_ops: vec![],
231
232            import_ops: vec![],
233            export_ops: vec![],
234
235            direct_input_fields: vec![],
236            direct_output_value: None,
237
238            declarations: vec![],
239
240            next_generated_op_id: 0,
241        };
242        Ok(result)
243    }
244
245    pub fn root_scope(&self) -> OpScopeRef {
246        OpScopeRef(self.root_op_scope.clone())
247    }
248
249    pub async fn add_source(
250        &mut self,
251        kind: String,
252        op_spec: serde_json::Map<String, serde_json::Value>,
253        target_scope: Option<OpScopeRef>,
254        name: String,
255        refresh_options: Option<spec::SourceRefreshOptions>,
256        execution_options: Option<spec::ExecutionOptions>,
257    ) -> Result<DataSlice> {
258        let _span = info_span!("flow_builder.add_source", flow_name = %self.flow_instance_name, source_name = %name, source_kind = %kind).entered();
259        if let Some(target_scope) = target_scope
260            && *target_scope != self.root_op_scope
261        {
262            return Err(client_error!("source can only be added to the root scope",));
263        }
264        let import_op = spec::NamedSpec {
265            name,
266            spec: spec::ImportOpSpec {
267                source: spec::OpSpec {
268                    kind,
269                    spec: op_spec,
270                },
271                refresh_options: refresh_options.unwrap_or_default(),
272                execution_options: execution_options.unwrap_or_default(),
273            },
274        };
275        let analyzer_ctx = AnalyzerContext {
276            lib_ctx: self.lib_context.clone(),
277            flow_ctx: self.flow_inst_context.clone(),
278        };
279        let analyzed = analyzer_ctx
280            .analyze_import_op(&self.root_op_scope, import_op.clone())
281            .await?;
282        std::mem::drop(analyzed);
283
284        let result = Self::last_field_to_data_slice(&self.root_op_scope)?;
285        self.import_ops.push(import_op);
286        Ok(result)
287    }
288
289    pub fn constant(
290        &self,
291        value_type: schema::EnrichedValueType,
292        value: serde_json::Value,
293    ) -> Result<DataSlice> {
294        let schema = value_type;
295        let slice = DataSlice {
296            scope: self.root_op_scope.clone(),
297            value: Arc::new(spec::ValueMapping::Constant(spec::ConstantMapping {
298                schema: schema.clone(),
299                value,
300            })),
301        };
302        Ok(slice)
303    }
304
305    pub fn add_direct_input(
306        &mut self,
307        name: String,
308        value_type: schema::EnrichedValueType,
309    ) -> Result<DataSlice> {
310        {
311            let mut root_data_scope = self.root_op_scope.data.lock().unwrap();
312            root_data_scope.add_field(
313                name.clone(),
314                &value_type,
315                FieldDefFingerprint {
316                    source_op_names: HashSet::from([name.clone()]),
317                    fingerprint: Fingerprinter::default()
318                        .with("input")
319                        .map_err(Error::from)?
320                        .with(&name)
321                        .map_err(Error::from)?
322                        .into_fingerprint(),
323                },
324            )?;
325        }
326        let result = Self::last_field_to_data_slice(&self.root_op_scope)?;
327        self.direct_input_fields.push(FieldSchema {
328            name,
329            value_type,
330            description: None,
331        });
332        Ok(result)
333    }
334
335    pub fn set_direct_output(&mut self, data_slice: DataSlice) -> Result<()> {
336        if data_slice.scope != self.root_op_scope {
337            return Err(client_error!(
338                "direct output must be value in the root scope",
339            ));
340        }
341        self.direct_output_value = Some(data_slice.extract_value_mapping());
342        Ok(())
343    }
344
345    pub fn for_each(
346        &mut self,
347        data_slice: DataSlice,
348        execution_options: Option<spec::ExecutionOptions>,
349    ) -> Result<OpScopeRef> {
350        let parent_scope = &data_slice.scope;
351        let field_path = match data_slice.value.as_ref() {
352            spec::ValueMapping::Field(v) => &v.field_path,
353            _ => return Err(client_error!("expect field path")),
354        };
355        let num_parent_layers = parent_scope.ancestors().count();
356        let scope_name = format!(
357            "{}_{}",
358            field_path.last().map_or("", |s| s.as_str()),
359            num_parent_layers
360        );
361        let (_, child_op_scope) =
362            parent_scope.new_foreach_op_scope(scope_name.clone(), field_path)?;
363
364        let reactive_op = spec::NamedSpec {
365            name: format!(".for_each.{}", self.next_generated_op_id),
366            spec: spec::ReactiveOpSpec::ForEach(spec::ForEachOpSpec {
367                field_path: field_path.clone(),
368                op_scope: spec::ReactiveOpScope {
369                    name: scope_name,
370                    ops: vec![],
371                },
372                execution_options: execution_options.unwrap_or_default(),
373            }),
374        };
375        self.next_generated_op_id += 1;
376        self.get_mut_reactive_ops(parent_scope)?.push(reactive_op);
377
378        Ok(OpScopeRef(child_op_scope))
379    }
380
381    pub async fn transform(
382        &mut self,
383        kind: String,
384        op_spec: serde_json::Map<String, serde_json::Value>,
385        args: Vec<(DataSlice, Option<String>)>,
386        target_scope: Option<OpScopeRef>,
387        name: String,
388    ) -> Result<DataSlice> {
389        let _span = info_span!("flow_builder.transform", flow_name = %self.flow_instance_name, op_name = %name, op_kind = %kind).entered();
390        let spec = spec::OpSpec {
391            kind,
392            spec: op_spec,
393        };
394        let op_scope = Self::minimum_common_scope(
395            args.iter().map(|(ds, _)| &ds.scope),
396            target_scope.as_ref().map(|s| &s.0),
397        )?;
398
399        let reactive_op = spec::NamedSpec {
400            name,
401            spec: spec::ReactiveOpSpec::Transform(spec::TransformOpSpec {
402                inputs: args
403                    .iter()
404                    .map(|(ds, arg_name)| spec::OpArgBinding {
405                        arg_name: spec::OpArgName(arg_name.clone()),
406                        value: ds.extract_value_mapping(),
407                    })
408                    .collect(),
409                op: spec,
410                execution_options: Default::default(),
411            }),
412        };
413
414        let analyzer_ctx = AnalyzerContext {
415            lib_ctx: self.lib_context.clone(),
416            flow_ctx: self.flow_inst_context.clone(),
417        };
418        let analyzed = analyzer_ctx
419            .analyze_reactive_op(op_scope, &reactive_op)
420            .await?;
421        std::mem::drop(analyzed);
422
423        self.get_mut_reactive_ops(op_scope)?.push(reactive_op);
424
425        let result = Self::last_field_to_data_slice(op_scope)?;
426        Ok(result)
427    }
428
429    pub async fn collect(
430        &mut self,
431        collector: &DataCollector,
432        fields: Vec<(FieldName, DataSlice)>,
433        auto_uuid_field: Option<FieldName>,
434    ) -> Result<()> {
435        let _span = info_span!("flow_builder.collect", flow_name = %self.flow_instance_name, collector_name = %collector.name).entered();
436        let common_scope =
437            Self::minimum_common_scope(fields.iter().map(|(_, ds)| &ds.scope), None)?;
438        let name = format!(".collect.{}", self.next_generated_op_id);
439        self.next_generated_op_id += 1;
440
441        let reactive_op = spec::NamedSpec {
442            name,
443            spec: spec::ReactiveOpSpec::Collect(spec::CollectOpSpec {
444                input: spec::StructMapping {
445                    fields: fields
446                        .iter()
447                        .map(|(name, ds)| NamedSpec {
448                            name: name.clone(),
449                            spec: ds.extract_value_mapping(),
450                        })
451                        .collect(),
452                },
453                scope_name: collector.scope.name.clone(),
454                collector_name: collector.name.clone(),
455                auto_uuid_field: auto_uuid_field.clone(),
456            }),
457        };
458
459        let analyzer_ctx = AnalyzerContext {
460            lib_ctx: self.lib_context.clone(),
461            flow_ctx: self.flow_inst_context.clone(),
462        };
463        let analyzed = analyzer_ctx
464            .analyze_reactive_op(common_scope, &reactive_op)
465            .await?;
466        std::mem::drop(analyzed);
467
468        self.get_mut_reactive_ops(common_scope)?.push(reactive_op);
469
470        let collector_schema = CollectorSchema::from_fields(
471            fields
472                .into_iter()
473                .map(|(name, ds)| {
474                    Ok(FieldSchema {
475                        name,
476                        value_type: ds.value_type()?,
477                        description: None,
478                    })
479                })
480                .collect::<Result<Vec<FieldSchema>>>()?,
481            auto_uuid_field,
482        );
483        {
484            // TODO: Pass in the right field def fingerprint
485            let mut collector = collector.collector.lock().unwrap();
486            if let Some(collector) = collector.as_mut() {
487                collector.collect(&collector_schema, FieldDefFingerprint::default())?;
488            } else {
489                *collector = Some(CollectorBuilder::new(
490                    Arc::new(collector_schema),
491                    FieldDefFingerprint::default(),
492                ));
493            }
494        }
495
496        Ok(())
497    }
498
499    pub fn export(
500        &mut self,
501        name: String,
502        kind: String,
503        op_spec: serde_json::Map<String, serde_json::Value>,
504        attachments: Vec<spec::OpSpec>,
505        index_options: spec::IndexOptions,
506        input: &DataCollector,
507        setup_by_user: bool,
508    ) -> Result<()> {
509        let _span = info_span!("flow_builder.export", flow_name = %self.flow_instance_name, export_name = %name, target_kind = %kind).entered();
510        let spec = spec::OpSpec {
511            kind,
512            spec: op_spec,
513        };
514
515        if input.scope != self.root_op_scope {
516            return Err(client_error!(
517                "Export can only work on collectors belonging to the root scope.",
518            ));
519        }
520        self.export_ops.push(spec::NamedSpec {
521            name,
522            spec: spec::ExportOpSpec {
523                collector_name: input.name.clone(),
524                target: spec,
525                attachments,
526                index_options,
527                setup_by_user,
528            },
529        });
530        Ok(())
531    }
532
533    pub fn declare(&mut self, op_spec: spec::OpSpec) -> Result<()> {
534        self.declarations.push(op_spec);
535        Ok(())
536    }
537
538    pub fn scope_field(&self, scope: OpScopeRef, field_name: &str) -> Result<Option<DataSlice>> {
539        {
540            let scope_builder = scope.0.data.lock().unwrap();
541            if scope_builder.data.find_field(field_name).is_none() {
542                return Err(client_error!("field {field_name} not found"));
543            }
544        }
545        Ok(Some(DataSlice {
546            scope: scope.0,
547            value: Arc::new(spec::ValueMapping::Field(spec::FieldMapping {
548                scope: None,
549                field_path: spec::FieldPath(vec![field_name.to_string()]),
550            })),
551        }))
552    }
553
554    #[cfg(feature = "persistence")]
555    pub async fn build_flow(&self) -> Result<Flow> {
556        let _span =
557            info_span!("flow_builder.build_flow", flow_name = %self.flow_instance_name).entered();
558        let spec = spec::FlowInstanceSpec {
559            name: self.flow_instance_name.clone(),
560            import_ops: self.import_ops.clone(),
561            reactive_ops: self.reactive_ops.clone(),
562            export_ops: self.export_ops.clone(),
563            declarations: self.declarations.clone(),
564        };
565        let flow_instance_ctx = self.flow_inst_context.clone();
566
567        let flow_ctx = {
568            let analyzed_flow =
569                super::AnalyzedFlow::from_flow_instance(spec, flow_instance_ctx).await?;
570            let persistence_ctx = self.lib_context.require_persistence_ctx()?;
571            let flow_ctx = {
572                let flow_setup_ctx = persistence_ctx.setup_ctx.read().await;
573                FlowContext::new(
574                    Arc::new(analyzed_flow),
575                    flow_setup_ctx
576                        .all_setup_states
577                        .flows
578                        .get(&self.flow_instance_name),
579                )
580                .await?
581            };
582
583            // Apply internal-only changes if any.
584            {
585                let mut flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().write().await;
586                if flow_exec_ctx.setup_change.has_internal_changes()
587                    && !flow_exec_ctx.setup_change.has_external_changes()
588                {
589                    let mut lib_setup_ctx = persistence_ctx.setup_ctx.write().await;
590                    let mut output_buffer = Vec::<u8>::new();
591                    setup::apply_changes_for_flow_ctx(
592                        setup::FlowSetupChangeAction::Setup,
593                        &flow_ctx,
594                        &mut flow_exec_ctx,
595                        &mut lib_setup_ctx,
596                        &persistence_ctx.builtin_db_pool,
597                        &mut output_buffer,
598                    )
599                    .await?;
600                    trace!(
601                        "Applied internal-only change for flow {}:\n{}",
602                        self.flow_instance_name,
603                        String::from_utf8_lossy(&output_buffer)
604                    );
605                }
606            }
607
608            Ok::<_, Error>(flow_ctx)
609        }?;
610
611        let mut flow_ctxs = self.lib_context.flows.lock().unwrap();
612        let flow_ctx = match flow_ctxs.entry(self.flow_instance_name.clone()) {
613            btree_map::Entry::Occupied(_) => {
614                return Err(client_error!(
615                    "flow instance name already exists: {}",
616                    self.flow_instance_name
617                ));
618            }
619            btree_map::Entry::Vacant(entry) => {
620                let flow_ctx = Arc::new(flow_ctx);
621                entry.insert(flow_ctx.clone());
622                flow_ctx
623            }
624        };
625        Ok(Flow(flow_ctx))
626    }
627
628    pub async fn build_transient_flow(&self) -> Result<TransientFlow> {
629        if self.direct_input_fields.is_empty() {
630            return Err(client_error!("expect at least one direct input"));
631        }
632        let direct_output_value = if let Some(direct_output_value) = &self.direct_output_value {
633            direct_output_value
634        } else {
635            return Err(client_error!("expect direct output"));
636        };
637        let spec = spec::TransientFlowSpec {
638            name: self.flow_instance_name.clone(),
639            input_fields: self.direct_input_fields.clone(),
640            reactive_ops: self.reactive_ops.clone(),
641            output_value: direct_output_value.clone(),
642        };
643
644        let analyzed_flow = super::AnalyzedTransientFlow::from_transient_flow(spec).await?;
645
646        Ok(TransientFlow(Arc::new(analyzed_flow)))
647    }
648}
649
650impl std::fmt::Display for FlowBuilder {
651    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652        write!(f, "Flow instance name: {}\n\n", self.flow_instance_name)?;
653        for op in self.import_ops.iter() {
654            write!(
655                f,
656                "Source op {}\n{}\n",
657                op.name,
658                serde_json::to_string_pretty(&op.spec).unwrap_or_default()
659            )?;
660        }
661        for field in self.direct_input_fields.iter() {
662            writeln!(f, "Direct input {}: {}", field.name, field.value_type)?;
663        }
664        if !self.direct_input_fields.is_empty() {
665            writeln!(f)?;
666        }
667        for op in self.reactive_ops.iter() {
668            write!(
669                f,
670                "Reactive op {}\n{}\n",
671                op.name,
672                serde_json::to_string_pretty(&op.spec).unwrap_or_default()
673            )?;
674        }
675        for op in self.export_ops.iter() {
676            write!(
677                f,
678                "Export op {}\n{}\n",
679                op.name,
680                serde_json::to_string_pretty(&op.spec).unwrap_or_default()
681            )?;
682        }
683        if let Some(output) = &self.direct_output_value {
684            write!(f, "Direct output: {output}\n\n")?;
685        }
686        Ok(())
687    }
688}
689
690impl FlowBuilder {
691    fn last_field_to_data_slice(op_scope: &Arc<OpScope>) -> Result<DataSlice> {
692        let data_scope = op_scope.data.lock().unwrap();
693        let last_field = data_scope.last_field().unwrap();
694        let result = DataSlice {
695            scope: op_scope.clone(),
696            value: Arc::new(spec::ValueMapping::Field(spec::FieldMapping {
697                scope: None,
698                field_path: spec::FieldPath(vec![last_field.name.clone()]),
699            })),
700        };
701        Ok(result)
702    }
703
704    fn minimum_common_scope<'a>(
705        scopes: impl Iterator<Item = &'a Arc<OpScope>>,
706        target_scope: Option<&'a Arc<OpScope>>,
707    ) -> Result<&'a Arc<OpScope>> {
708        let mut scope_iter = scopes;
709        let mut common_scope = scope_iter
710            .next()
711            .ok_or_else(|| api_error!("expect at least one input"))?;
712        for scope in scope_iter {
713            if scope.is_op_scope_descendant(common_scope) {
714                common_scope = scope;
715            } else if !common_scope.is_op_scope_descendant(scope) {
716                api_bail!(
717                    "expect all arguments share the common scope, got {} and {} exclusive to each other",
718                    common_scope,
719                    scope
720                );
721            }
722        }
723        if let Some(target_scope) = target_scope {
724            if !target_scope.is_op_scope_descendant(common_scope) {
725                api_bail!(
726                    "the field can only be attached to a scope or sub-scope of the input value. Target scope: {}, input scope: {}",
727                    target_scope,
728                    common_scope
729                );
730            }
731            common_scope = target_scope;
732        }
733        Ok(common_scope)
734    }
735
736    fn get_mut_reactive_ops<'a>(
737        &'a mut self,
738        op_scope: &OpScope,
739    ) -> Result<&'a mut Vec<spec::NamedSpec<spec::ReactiveOpSpec>>> {
740        Self::get_mut_reactive_ops_internal(op_scope, &mut self.reactive_ops)
741    }
742
743    fn get_mut_reactive_ops_internal<'a>(
744        op_scope: &OpScope,
745        root_reactive_ops: &'a mut Vec<spec::NamedSpec<spec::ReactiveOpSpec>>,
746    ) -> Result<&'a mut Vec<spec::NamedSpec<spec::ReactiveOpSpec>>> {
747        let result = match &op_scope.parent {
748            None => root_reactive_ops,
749            Some((parent_op_scope, field_path)) => {
750                let parent_reactive_ops =
751                    Self::get_mut_reactive_ops_internal(parent_op_scope, root_reactive_ops)?;
752                // Reuse the last foreach if matched, otherwise create a new one.
753                match parent_reactive_ops.last() {
754                    Some(spec::NamedSpec {
755                        spec: spec::ReactiveOpSpec::ForEach(foreach_spec),
756                        ..
757                    }) if &foreach_spec.field_path == field_path
758                        && foreach_spec.op_scope.name == op_scope.name => {}
759
760                    _ => {
761                        api_bail!("already out of op scope `{}`", op_scope.name);
762                    }
763                }
764                match &mut parent_reactive_ops.last_mut().unwrap().spec {
765                    spec::ReactiveOpSpec::ForEach(foreach_spec) => &mut foreach_spec.op_scope.ops,
766                    _ => unreachable!(),
767                }
768            }
769        };
770        Ok(result)
771    }
772}
773
774pub struct Flow(pub Arc<FlowContext>);
775
776impl Flow {
777    pub async fn run(&self) -> Result<()> {
778        // Placeholder for run implementation
779        Ok(())
780    }
781}
782
783pub struct TransientFlow(pub Arc<super::AnalyzedTransientFlow>);