1#[cfg(feature = "persistence")]
14use crate::setup::ObjectSetupChange;
15use crate::{base::schema::EnrichedValueType, builder::plan::FieldDefFingerprint, prelude::*};
16
17use recoco_utils::fingerprint::Fingerprinter;
18#[cfg(feature = "persistence")]
19use std::collections::btree_map;
20use std::ops::Deref;
21
22use super::analyzer::{
23 AnalyzerContext, CollectorBuilder, DataScopeBuilder, OpScope, ValueTypeBuilder,
24 build_flow_instance_context,
25};
26use crate::lib_context::FlowContext;
27use crate::{
28 base::{
29 schema::{CollectorSchema, FieldSchema},
30 spec::{FieldName, NamedSpec},
31 },
32 lib_context::LibContext,
33 ops::interface::FlowInstanceContext,
34};
35
36#[derive(Debug, Clone)]
37pub struct OpScopeRef(Arc<OpScope>);
38
39impl From<Arc<OpScope>> for OpScopeRef {
40 fn from(scope: Arc<OpScope>) -> Self {
41 Self(scope)
42 }
43}
44
45impl Deref for OpScopeRef {
46 type Target = Arc<OpScope>;
47
48 fn deref(&self) -> &Self::Target {
49 &self.0
50 }
51}
52
53impl std::fmt::Display for OpScopeRef {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 write!(f, "{}", self.0)
56 }
57}
58
59impl OpScopeRef {
60 pub fn add_collector(&mut self, name: String) -> Result<DataCollector> {
61 let collector = DataCollector {
62 name,
63 scope: self.0.clone(),
64 collector: Mutex::new(None),
65 };
66 Ok(collector)
67 }
68}
69
70#[derive(Debug, Clone)]
71pub struct DataType {
72 schema: schema::EnrichedValueType,
73}
74
75impl From<schema::EnrichedValueType> for DataType {
76 fn from(schema: schema::EnrichedValueType) -> Self {
77 Self { schema }
78 }
79}
80
81impl DataType {
82 pub fn schema(&self) -> schema::EnrichedValueType {
83 self.schema.clone()
84 }
85}
86
87#[derive(Debug, Clone)]
88pub struct DataSlice {
89 scope: Arc<OpScope>,
90 value: Arc<spec::ValueMapping>,
91}
92
93impl DataSlice {
94 pub fn data_type(&self) -> Result<DataType> {
95 Ok(DataType::from(self.value_type()?))
96 }
97
98 pub fn field(&self, field_name: &str) -> Result<Option<DataSlice>> {
99 let value_mapping = match self.value.as_ref() {
100 spec::ValueMapping::Field(spec::FieldMapping { scope, field_path }) => {
101 let data_scope_builder = self.scope.data.lock().unwrap();
102 let struct_schema = {
103 let (_, val_type, _) = data_scope_builder
104 .analyze_field_path(field_path, self.scope.base_value_def_fp.clone())?;
105 match &val_type.typ {
106 ValueTypeBuilder::Struct(struct_type) => struct_type,
107 _ => return Err(client_error!("expect struct type in field path")),
108 }
109 };
110 if struct_schema.find_field(field_name).is_none() {
111 return Ok(None);
112 }
113 spec::ValueMapping::Field(spec::FieldMapping {
114 scope: scope.clone(),
115 field_path: spec::FieldPath(
116 field_path
117 .iter()
118 .cloned()
119 .chain([field_name.to_string()])
120 .collect(),
121 ),
122 })
123 }
124
125 spec::ValueMapping::Constant { .. } => {
126 return Err(client_error!("field access not supported for literal",));
127 }
128 };
129 Ok(Some(DataSlice {
130 scope: self.scope.clone(),
131 value: Arc::new(value_mapping),
132 }))
133 }
134}
135
136impl DataSlice {
137 fn extract_value_mapping(&self) -> spec::ValueMapping {
138 match self.value.as_ref() {
139 spec::ValueMapping::Field(v) => spec::ValueMapping::Field(spec::FieldMapping {
140 field_path: v.field_path.clone(),
141 scope: v.scope.clone().or_else(|| Some(self.scope.name.clone())),
142 }),
143 v => v.clone(),
144 }
145 }
146
147 fn value_type(&self) -> Result<schema::EnrichedValueType> {
148 let result = match self.value.as_ref() {
149 spec::ValueMapping::Constant(c) => c.schema.clone(),
150 spec::ValueMapping::Field(v) => {
151 let data_scope_builder = self.scope.data.lock().unwrap();
152 let (_, val_type, _) = data_scope_builder
153 .analyze_field_path(&v.field_path, self.scope.base_value_def_fp.clone())?;
154 EnrichedValueType::from_alternative(val_type)?
155 }
156 };
157 Ok(result)
158 }
159}
160
161impl std::fmt::Display for DataSlice {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 write!(f, "DataSlice(")?;
164 match self.value_type() {
165 Ok(value_type) => write!(f, "{value_type}")?,
166 Err(e) => write!(f, "<error: {}>", e)?,
167 }
168 write!(f, "; {} {}) ", self.scope, self.value)?;
169 Ok(())
170 }
171}
172
173pub struct DataCollector {
174 name: String,
175 scope: Arc<OpScope>,
176 collector: Mutex<Option<CollectorBuilder>>,
177}
178
179impl std::fmt::Display for DataCollector {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 let collector = self.collector.lock().unwrap();
182 write!(f, "DataCollector \"{}\" ({}", self.name, self.scope)?;
183 if let Some(collector) = collector.as_ref() {
184 write!(f, ": {}", collector.schema)?;
185 if collector.is_used {
186 write!(f, " (used)")?;
187 }
188 }
189 write!(f, ")")?;
190 Ok(())
191 }
192}
193
194pub struct FlowBuilder {
195 lib_context: Arc<LibContext>,
196 flow_inst_context: Arc<FlowInstanceContext>,
197
198 root_op_scope: Arc<OpScope>,
199 flow_instance_name: String,
200 reactive_ops: Vec<NamedSpec<spec::ReactiveOpSpec>>,
201
202 direct_input_fields: Vec<FieldSchema>,
203 direct_output_value: Option<spec::ValueMapping>,
204
205 import_ops: Vec<NamedSpec<spec::ImportOpSpec>>,
206 export_ops: Vec<NamedSpec<spec::ExportOpSpec>>,
207
208 declarations: Vec<spec::OpSpec>,
209
210 next_generated_op_id: usize,
211}
212
213impl FlowBuilder {
214 pub async fn new(name: &str) -> Result<Self> {
215 let _span = info_span!("flow_builder.new", flow_name = %name).entered();
216 let lib_context = get_lib_context().await?;
217 let root_op_scope = OpScope::new(
218 spec::ROOT_SCOPE_NAME.to_string(),
219 None,
220 Arc::new(Mutex::new(DataScopeBuilder::new())),
221 FieldDefFingerprint::default(),
222 );
223 let flow_inst_context = build_flow_instance_context(name);
224 let result = Self {
225 lib_context,
226 flow_inst_context,
227 root_op_scope,
228 flow_instance_name: name.to_string(),
229
230 reactive_ops: vec![],
231
232 import_ops: vec![],
233 export_ops: vec![],
234
235 direct_input_fields: vec![],
236 direct_output_value: None,
237
238 declarations: vec![],
239
240 next_generated_op_id: 0,
241 };
242 Ok(result)
243 }
244
245 pub fn root_scope(&self) -> OpScopeRef {
246 OpScopeRef(self.root_op_scope.clone())
247 }
248
249 pub async fn add_source(
250 &mut self,
251 kind: String,
252 op_spec: serde_json::Map<String, serde_json::Value>,
253 target_scope: Option<OpScopeRef>,
254 name: String,
255 refresh_options: Option<spec::SourceRefreshOptions>,
256 execution_options: Option<spec::ExecutionOptions>,
257 ) -> Result<DataSlice> {
258 let _span = info_span!("flow_builder.add_source", flow_name = %self.flow_instance_name, source_name = %name, source_kind = %kind).entered();
259 if let Some(target_scope) = target_scope
260 && *target_scope != self.root_op_scope
261 {
262 return Err(client_error!("source can only be added to the root scope",));
263 }
264 let import_op = spec::NamedSpec {
265 name,
266 spec: spec::ImportOpSpec {
267 source: spec::OpSpec {
268 kind,
269 spec: op_spec,
270 },
271 refresh_options: refresh_options.unwrap_or_default(),
272 execution_options: execution_options.unwrap_or_default(),
273 },
274 };
275 let analyzer_ctx = AnalyzerContext {
276 lib_ctx: self.lib_context.clone(),
277 flow_ctx: self.flow_inst_context.clone(),
278 };
279 let analyzed = analyzer_ctx
280 .analyze_import_op(&self.root_op_scope, import_op.clone())
281 .await?;
282 std::mem::drop(analyzed);
283
284 let result = Self::last_field_to_data_slice(&self.root_op_scope)?;
285 self.import_ops.push(import_op);
286 Ok(result)
287 }
288
289 pub fn constant(
290 &self,
291 value_type: schema::EnrichedValueType,
292 value: serde_json::Value,
293 ) -> Result<DataSlice> {
294 let schema = value_type;
295 let slice = DataSlice {
296 scope: self.root_op_scope.clone(),
297 value: Arc::new(spec::ValueMapping::Constant(spec::ConstantMapping {
298 schema: schema.clone(),
299 value,
300 })),
301 };
302 Ok(slice)
303 }
304
305 pub fn add_direct_input(
306 &mut self,
307 name: String,
308 value_type: schema::EnrichedValueType,
309 ) -> Result<DataSlice> {
310 {
311 let mut root_data_scope = self.root_op_scope.data.lock().unwrap();
312 root_data_scope.add_field(
313 name.clone(),
314 &value_type,
315 FieldDefFingerprint {
316 source_op_names: HashSet::from([name.clone()]),
317 fingerprint: Fingerprinter::default()
318 .with("input")
319 .map_err(Error::from)?
320 .with(&name)
321 .map_err(Error::from)?
322 .into_fingerprint(),
323 },
324 )?;
325 }
326 let result = Self::last_field_to_data_slice(&self.root_op_scope)?;
327 self.direct_input_fields.push(FieldSchema {
328 name,
329 value_type,
330 description: None,
331 });
332 Ok(result)
333 }
334
335 pub fn set_direct_output(&mut self, data_slice: DataSlice) -> Result<()> {
336 if data_slice.scope != self.root_op_scope {
337 return Err(client_error!(
338 "direct output must be value in the root scope",
339 ));
340 }
341 self.direct_output_value = Some(data_slice.extract_value_mapping());
342 Ok(())
343 }
344
345 pub fn for_each(
346 &mut self,
347 data_slice: DataSlice,
348 execution_options: Option<spec::ExecutionOptions>,
349 ) -> Result<OpScopeRef> {
350 let parent_scope = &data_slice.scope;
351 let field_path = match data_slice.value.as_ref() {
352 spec::ValueMapping::Field(v) => &v.field_path,
353 _ => return Err(client_error!("expect field path")),
354 };
355 let num_parent_layers = parent_scope.ancestors().count();
356 let scope_name = format!(
357 "{}_{}",
358 field_path.last().map_or("", |s| s.as_str()),
359 num_parent_layers
360 );
361 let (_, child_op_scope) =
362 parent_scope.new_foreach_op_scope(scope_name.clone(), field_path)?;
363
364 let reactive_op = spec::NamedSpec {
365 name: format!(".for_each.{}", self.next_generated_op_id),
366 spec: spec::ReactiveOpSpec::ForEach(spec::ForEachOpSpec {
367 field_path: field_path.clone(),
368 op_scope: spec::ReactiveOpScope {
369 name: scope_name,
370 ops: vec![],
371 },
372 execution_options: execution_options.unwrap_or_default(),
373 }),
374 };
375 self.next_generated_op_id += 1;
376 self.get_mut_reactive_ops(parent_scope)?.push(reactive_op);
377
378 Ok(OpScopeRef(child_op_scope))
379 }
380
381 pub async fn transform(
382 &mut self,
383 kind: String,
384 op_spec: serde_json::Map<String, serde_json::Value>,
385 args: Vec<(DataSlice, Option<String>)>,
386 target_scope: Option<OpScopeRef>,
387 name: String,
388 ) -> Result<DataSlice> {
389 let _span = info_span!("flow_builder.transform", flow_name = %self.flow_instance_name, op_name = %name, op_kind = %kind).entered();
390 let spec = spec::OpSpec {
391 kind,
392 spec: op_spec,
393 };
394 let op_scope = Self::minimum_common_scope(
395 args.iter().map(|(ds, _)| &ds.scope),
396 target_scope.as_ref().map(|s| &s.0),
397 )?;
398
399 let reactive_op = spec::NamedSpec {
400 name,
401 spec: spec::ReactiveOpSpec::Transform(spec::TransformOpSpec {
402 inputs: args
403 .iter()
404 .map(|(ds, arg_name)| spec::OpArgBinding {
405 arg_name: spec::OpArgName(arg_name.clone()),
406 value: ds.extract_value_mapping(),
407 })
408 .collect(),
409 op: spec,
410 execution_options: Default::default(),
411 }),
412 };
413
414 let analyzer_ctx = AnalyzerContext {
415 lib_ctx: self.lib_context.clone(),
416 flow_ctx: self.flow_inst_context.clone(),
417 };
418 let analyzed = analyzer_ctx
419 .analyze_reactive_op(op_scope, &reactive_op)
420 .await?;
421 std::mem::drop(analyzed);
422
423 self.get_mut_reactive_ops(op_scope)?.push(reactive_op);
424
425 let result = Self::last_field_to_data_slice(op_scope)?;
426 Ok(result)
427 }
428
429 pub async fn collect(
430 &mut self,
431 collector: &DataCollector,
432 fields: Vec<(FieldName, DataSlice)>,
433 auto_uuid_field: Option<FieldName>,
434 ) -> Result<()> {
435 let _span = info_span!("flow_builder.collect", flow_name = %self.flow_instance_name, collector_name = %collector.name).entered();
436 let common_scope =
437 Self::minimum_common_scope(fields.iter().map(|(_, ds)| &ds.scope), None)?;
438 let name = format!(".collect.{}", self.next_generated_op_id);
439 self.next_generated_op_id += 1;
440
441 let reactive_op = spec::NamedSpec {
442 name,
443 spec: spec::ReactiveOpSpec::Collect(spec::CollectOpSpec {
444 input: spec::StructMapping {
445 fields: fields
446 .iter()
447 .map(|(name, ds)| NamedSpec {
448 name: name.clone(),
449 spec: ds.extract_value_mapping(),
450 })
451 .collect(),
452 },
453 scope_name: collector.scope.name.clone(),
454 collector_name: collector.name.clone(),
455 auto_uuid_field: auto_uuid_field.clone(),
456 }),
457 };
458
459 let analyzer_ctx = AnalyzerContext {
460 lib_ctx: self.lib_context.clone(),
461 flow_ctx: self.flow_inst_context.clone(),
462 };
463 let analyzed = analyzer_ctx
464 .analyze_reactive_op(common_scope, &reactive_op)
465 .await?;
466 std::mem::drop(analyzed);
467
468 self.get_mut_reactive_ops(common_scope)?.push(reactive_op);
469
470 let collector_schema = CollectorSchema::from_fields(
471 fields
472 .into_iter()
473 .map(|(name, ds)| {
474 Ok(FieldSchema {
475 name,
476 value_type: ds.value_type()?,
477 description: None,
478 })
479 })
480 .collect::<Result<Vec<FieldSchema>>>()?,
481 auto_uuid_field,
482 );
483 {
484 let mut collector = collector.collector.lock().unwrap();
486 if let Some(collector) = collector.as_mut() {
487 collector.collect(&collector_schema, FieldDefFingerprint::default())?;
488 } else {
489 *collector = Some(CollectorBuilder::new(
490 Arc::new(collector_schema),
491 FieldDefFingerprint::default(),
492 ));
493 }
494 }
495
496 Ok(())
497 }
498
499 pub fn export(
500 &mut self,
501 name: String,
502 kind: String,
503 op_spec: serde_json::Map<String, serde_json::Value>,
504 attachments: Vec<spec::OpSpec>,
505 index_options: spec::IndexOptions,
506 input: &DataCollector,
507 setup_by_user: bool,
508 ) -> Result<()> {
509 let _span = info_span!("flow_builder.export", flow_name = %self.flow_instance_name, export_name = %name, target_kind = %kind).entered();
510 let spec = spec::OpSpec {
511 kind,
512 spec: op_spec,
513 };
514
515 if input.scope != self.root_op_scope {
516 return Err(client_error!(
517 "Export can only work on collectors belonging to the root scope.",
518 ));
519 }
520 self.export_ops.push(spec::NamedSpec {
521 name,
522 spec: spec::ExportOpSpec {
523 collector_name: input.name.clone(),
524 target: spec,
525 attachments,
526 index_options,
527 setup_by_user,
528 },
529 });
530 Ok(())
531 }
532
533 pub fn declare(&mut self, op_spec: spec::OpSpec) -> Result<()> {
534 self.declarations.push(op_spec);
535 Ok(())
536 }
537
538 pub fn scope_field(&self, scope: OpScopeRef, field_name: &str) -> Result<Option<DataSlice>> {
539 {
540 let scope_builder = scope.0.data.lock().unwrap();
541 if scope_builder.data.find_field(field_name).is_none() {
542 return Err(client_error!("field {field_name} not found"));
543 }
544 }
545 Ok(Some(DataSlice {
546 scope: scope.0,
547 value: Arc::new(spec::ValueMapping::Field(spec::FieldMapping {
548 scope: None,
549 field_path: spec::FieldPath(vec![field_name.to_string()]),
550 })),
551 }))
552 }
553
554 #[cfg(feature = "persistence")]
555 pub async fn build_flow(&self) -> Result<Flow> {
556 let _span =
557 info_span!("flow_builder.build_flow", flow_name = %self.flow_instance_name).entered();
558 let spec = spec::FlowInstanceSpec {
559 name: self.flow_instance_name.clone(),
560 import_ops: self.import_ops.clone(),
561 reactive_ops: self.reactive_ops.clone(),
562 export_ops: self.export_ops.clone(),
563 declarations: self.declarations.clone(),
564 };
565 let flow_instance_ctx = self.flow_inst_context.clone();
566
567 let flow_ctx = {
568 let analyzed_flow =
569 super::AnalyzedFlow::from_flow_instance(spec, flow_instance_ctx).await?;
570 let persistence_ctx = self.lib_context.require_persistence_ctx()?;
571 let flow_ctx = {
572 let flow_setup_ctx = persistence_ctx.setup_ctx.read().await;
573 FlowContext::new(
574 Arc::new(analyzed_flow),
575 flow_setup_ctx
576 .all_setup_states
577 .flows
578 .get(&self.flow_instance_name),
579 )
580 .await?
581 };
582
583 {
585 let mut flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().write().await;
586 if flow_exec_ctx.setup_change.has_internal_changes()
587 && !flow_exec_ctx.setup_change.has_external_changes()
588 {
589 let mut lib_setup_ctx = persistence_ctx.setup_ctx.write().await;
590 let mut output_buffer = Vec::<u8>::new();
591 setup::apply_changes_for_flow_ctx(
592 setup::FlowSetupChangeAction::Setup,
593 &flow_ctx,
594 &mut flow_exec_ctx,
595 &mut lib_setup_ctx,
596 &persistence_ctx.builtin_db_pool,
597 &mut output_buffer,
598 )
599 .await?;
600 trace!(
601 "Applied internal-only change for flow {}:\n{}",
602 self.flow_instance_name,
603 String::from_utf8_lossy(&output_buffer)
604 );
605 }
606 }
607
608 Ok::<_, Error>(flow_ctx)
609 }?;
610
611 let mut flow_ctxs = self.lib_context.flows.lock().unwrap();
612 let flow_ctx = match flow_ctxs.entry(self.flow_instance_name.clone()) {
613 btree_map::Entry::Occupied(_) => {
614 return Err(client_error!(
615 "flow instance name already exists: {}",
616 self.flow_instance_name
617 ));
618 }
619 btree_map::Entry::Vacant(entry) => {
620 let flow_ctx = Arc::new(flow_ctx);
621 entry.insert(flow_ctx.clone());
622 flow_ctx
623 }
624 };
625 Ok(Flow(flow_ctx))
626 }
627
628 pub async fn build_transient_flow(&self) -> Result<TransientFlow> {
629 if self.direct_input_fields.is_empty() {
630 return Err(client_error!("expect at least one direct input"));
631 }
632 let direct_output_value = if let Some(direct_output_value) = &self.direct_output_value {
633 direct_output_value
634 } else {
635 return Err(client_error!("expect direct output"));
636 };
637 let spec = spec::TransientFlowSpec {
638 name: self.flow_instance_name.clone(),
639 input_fields: self.direct_input_fields.clone(),
640 reactive_ops: self.reactive_ops.clone(),
641 output_value: direct_output_value.clone(),
642 };
643
644 let analyzed_flow = super::AnalyzedTransientFlow::from_transient_flow(spec).await?;
645
646 Ok(TransientFlow(Arc::new(analyzed_flow)))
647 }
648}
649
650impl std::fmt::Display for FlowBuilder {
651 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652 write!(f, "Flow instance name: {}\n\n", self.flow_instance_name)?;
653 for op in self.import_ops.iter() {
654 write!(
655 f,
656 "Source op {}\n{}\n",
657 op.name,
658 serde_json::to_string_pretty(&op.spec).unwrap_or_default()
659 )?;
660 }
661 for field in self.direct_input_fields.iter() {
662 writeln!(f, "Direct input {}: {}", field.name, field.value_type)?;
663 }
664 if !self.direct_input_fields.is_empty() {
665 writeln!(f)?;
666 }
667 for op in self.reactive_ops.iter() {
668 write!(
669 f,
670 "Reactive op {}\n{}\n",
671 op.name,
672 serde_json::to_string_pretty(&op.spec).unwrap_or_default()
673 )?;
674 }
675 for op in self.export_ops.iter() {
676 write!(
677 f,
678 "Export op {}\n{}\n",
679 op.name,
680 serde_json::to_string_pretty(&op.spec).unwrap_or_default()
681 )?;
682 }
683 if let Some(output) = &self.direct_output_value {
684 write!(f, "Direct output: {output}\n\n")?;
685 }
686 Ok(())
687 }
688}
689
690impl FlowBuilder {
691 fn last_field_to_data_slice(op_scope: &Arc<OpScope>) -> Result<DataSlice> {
692 let data_scope = op_scope.data.lock().unwrap();
693 let last_field = data_scope.last_field().unwrap();
694 let result = DataSlice {
695 scope: op_scope.clone(),
696 value: Arc::new(spec::ValueMapping::Field(spec::FieldMapping {
697 scope: None,
698 field_path: spec::FieldPath(vec![last_field.name.clone()]),
699 })),
700 };
701 Ok(result)
702 }
703
704 fn minimum_common_scope<'a>(
705 scopes: impl Iterator<Item = &'a Arc<OpScope>>,
706 target_scope: Option<&'a Arc<OpScope>>,
707 ) -> Result<&'a Arc<OpScope>> {
708 let mut scope_iter = scopes;
709 let mut common_scope = scope_iter
710 .next()
711 .ok_or_else(|| api_error!("expect at least one input"))?;
712 for scope in scope_iter {
713 if scope.is_op_scope_descendant(common_scope) {
714 common_scope = scope;
715 } else if !common_scope.is_op_scope_descendant(scope) {
716 api_bail!(
717 "expect all arguments share the common scope, got {} and {} exclusive to each other",
718 common_scope,
719 scope
720 );
721 }
722 }
723 if let Some(target_scope) = target_scope {
724 if !target_scope.is_op_scope_descendant(common_scope) {
725 api_bail!(
726 "the field can only be attached to a scope or sub-scope of the input value. Target scope: {}, input scope: {}",
727 target_scope,
728 common_scope
729 );
730 }
731 common_scope = target_scope;
732 }
733 Ok(common_scope)
734 }
735
736 fn get_mut_reactive_ops<'a>(
737 &'a mut self,
738 op_scope: &OpScope,
739 ) -> Result<&'a mut Vec<spec::NamedSpec<spec::ReactiveOpSpec>>> {
740 Self::get_mut_reactive_ops_internal(op_scope, &mut self.reactive_ops)
741 }
742
743 fn get_mut_reactive_ops_internal<'a>(
744 op_scope: &OpScope,
745 root_reactive_ops: &'a mut Vec<spec::NamedSpec<spec::ReactiveOpSpec>>,
746 ) -> Result<&'a mut Vec<spec::NamedSpec<spec::ReactiveOpSpec>>> {
747 let result = match &op_scope.parent {
748 None => root_reactive_ops,
749 Some((parent_op_scope, field_path)) => {
750 let parent_reactive_ops =
751 Self::get_mut_reactive_ops_internal(parent_op_scope, root_reactive_ops)?;
752 match parent_reactive_ops.last() {
754 Some(spec::NamedSpec {
755 spec: spec::ReactiveOpSpec::ForEach(foreach_spec),
756 ..
757 }) if &foreach_spec.field_path == field_path
758 && foreach_spec.op_scope.name == op_scope.name => {}
759
760 _ => {
761 api_bail!("already out of op scope `{}`", op_scope.name);
762 }
763 }
764 match &mut parent_reactive_ops.last_mut().unwrap().spec {
765 spec::ReactiveOpSpec::ForEach(foreach_spec) => &mut foreach_spec.op_scope.ops,
766 _ => unreachable!(),
767 }
768 }
769 };
770 Ok(result)
771 }
772}
773
774pub struct Flow(pub Arc<FlowContext>);
775
776impl Flow {
777 pub async fn run(&self) -> Result<()> {
778 Ok(())
780 }
781}
782
783pub struct TransientFlow(pub Arc<super::AnalyzedTransientFlow>);