1#[cfg(feature = "persistence")]
14use crate::execution::indexing_status::SourceLogicFingerprint;
15use crate::prelude::*;
16
17use futures::future::try_join_all;
18use tokio::time::Duration;
19
20use crate::base::value::EstimatedByteSize;
21use crate::base::{schema, value};
22use crate::builder::{AnalyzedTransientFlow, plan::*};
23use utils::immutable::RefList;
24
25use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, evaluate_with_cell};
26
27const DEFAULT_TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800);
28const MIN_WARNING_THRESHOLD: Duration = Duration::from_secs(30);
29
30#[derive(Debug)]
31pub struct ScopeValueBuilder {
32 pub fields: Vec<OnceLock<value::Value<ScopeValueBuilder>>>,
34}
35
36impl value::EstimatedByteSize for ScopeValueBuilder {
37 fn estimated_detached_byte_size(&self) -> usize {
38 self.fields
39 .iter()
40 .map(|f| f.get().map_or(0, |v| v.estimated_byte_size()))
41 .sum()
42 }
43}
44
45impl From<&ScopeValueBuilder> for value::ScopeValue {
46 fn from(val: &ScopeValueBuilder) -> Self {
47 value::ScopeValue(value::FieldValues {
48 fields: val
49 .fields
50 .iter()
51 .map(|f| value::Value::from_alternative_ref(f.get().unwrap()))
52 .collect(),
53 })
54 }
55}
56
57impl From<ScopeValueBuilder> for value::ScopeValue {
58 fn from(val: ScopeValueBuilder) -> Self {
59 value::ScopeValue(value::FieldValues {
60 fields: val
61 .fields
62 .into_iter()
63 .map(|f| value::Value::from_alternative(f.into_inner().unwrap()))
64 .collect(),
65 })
66 }
67}
68
69impl ScopeValueBuilder {
70 fn new(num_fields: usize) -> Self {
71 let mut fields = Vec::with_capacity(num_fields);
72 fields.resize_with(num_fields, OnceLock::new);
73 Self { fields }
74 }
75
76 fn augmented_from(source: &value::ScopeValue, schema: &schema::TableSchema) -> Result<Self> {
77 let val_index_base = schema.key_schema().len();
78 let len = schema.row.fields.len() - val_index_base;
79
80 let mut builder = Self::new(len);
81
82 let value::ScopeValue(source_fields) = source;
83 for ((v, t), r) in source_fields
84 .fields
85 .iter()
86 .zip(schema.row.fields[val_index_base..(val_index_base + len)].iter())
87 .zip(&mut builder.fields)
88 {
89 r.set(augmented_value(v, &t.value_type.typ)?)
90 .map_err(|_| internal_error!("Value of field `{}` is already set", t.name))?;
91 }
92 Ok(builder)
93 }
94}
95
96fn augmented_value(
97 val: &value::Value,
98 val_type: &schema::ValueType,
99) -> Result<value::Value<ScopeValueBuilder>> {
100 let value = match (val, val_type) {
101 (value::Value::Null, _) => value::Value::Null,
102 (value::Value::Basic(v), _) => value::Value::Basic(v.clone()),
103 (value::Value::Struct(v), schema::ValueType::Struct(t)) => {
104 value::Value::Struct(value::FieldValues {
105 fields: v
106 .fields
107 .iter()
108 .enumerate()
109 .map(|(i, v)| augmented_value(v, &t.fields[i].value_type.typ))
110 .collect::<Result<Vec<_>>>()?,
111 })
112 }
113 (value::Value::UTable(v), schema::ValueType::Table(t)) => value::Value::UTable(
114 v.iter()
115 .map(|v| ScopeValueBuilder::augmented_from(v, t))
116 .collect::<Result<Vec<_>>>()?,
117 ),
118 (value::Value::KTable(v), schema::ValueType::Table(t)) => value::Value::KTable(
119 v.iter()
120 .map(|(k, v)| Ok((k.clone(), ScopeValueBuilder::augmented_from(v, t)?)))
121 .collect::<Result<BTreeMap<_, _>>>()?,
122 ),
123 (value::Value::LTable(v), schema::ValueType::Table(t)) => value::Value::LTable(
124 v.iter()
125 .map(|v| ScopeValueBuilder::augmented_from(v, t))
126 .collect::<Result<Vec<_>>>()?,
127 ),
128 (val, _) => internal_bail!("Value kind doesn't match the type {val_type}: {val:?}"),
129 };
130 Ok(value)
131}
132
133enum ScopeKey<'a> {
134 None,
136 MapKey(&'a value::KeyValue),
138 ListIndex(usize),
140}
141
142impl<'a> ScopeKey<'a> {
143 pub fn key(&self) -> Option<Cow<'a, value::KeyValue>> {
144 match self {
145 ScopeKey::None => None,
146 ScopeKey::MapKey(k) => Some(Cow::Borrowed(k)),
147 ScopeKey::ListIndex(i) => {
148 Some(Cow::Owned(value::KeyValue::from_single_part(*i as i64)))
149 }
150 }
151 }
152
153 pub fn value_field_index_base(&self) -> usize {
154 match *self {
155 ScopeKey::None => 0,
156 ScopeKey::MapKey(v) => v.len(),
157 ScopeKey::ListIndex(_) => 0,
158 }
159 }
160}
161
162impl std::fmt::Display for ScopeKey<'_> {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 match self {
165 ScopeKey::None => write!(f, "()"),
166 ScopeKey::MapKey(k) => write!(f, "{k}"),
167 ScopeKey::ListIndex(i) => write!(f, "[{i}]"),
168 }
169 }
170}
171
172struct ScopeEntry<'a> {
173 key: ScopeKey<'a>,
174 value: &'a ScopeValueBuilder,
175 schema: &'a schema::StructSchema,
176 collected_values: Vec<Mutex<Vec<value::FieldValues>>>,
177}
178
179impl<'a> ScopeEntry<'a> {
180 fn new(
181 key: ScopeKey<'a>,
182 value: &'a ScopeValueBuilder,
183 schema: &'a schema::StructSchema,
184 analyzed_op_scope: &AnalyzedOpScope,
185 ) -> Self {
186 let mut collected_values = Vec::with_capacity(analyzed_op_scope.collector_len);
187 collected_values.resize_with(analyzed_op_scope.collector_len, Default::default);
188
189 Self {
190 key,
191 value,
192 schema,
193 collected_values,
194 }
195 }
196
197 fn get_local_field_schema<'b>(
198 schema: &'b schema::StructSchema,
199 indices: &[u32],
200 ) -> Result<&'b schema::FieldSchema> {
201 let field_idx = indices[0] as usize;
202 let field_schema = &schema.fields[field_idx];
203 let result = if indices.len() == 1 {
204 field_schema
205 } else {
206 let struct_field_schema = match &field_schema.value_type.typ {
207 schema::ValueType::Struct(s) => s,
208 _ => internal_bail!("Expect struct field"),
209 };
210 Self::get_local_field_schema(struct_field_schema, &indices[1..])?
211 };
212 Ok(result)
213 }
214
215 fn get_local_key_field<'b>(
216 key_val: &'b value::KeyPart,
217 indices: &'_ [u32],
218 ) -> Result<&'b value::KeyPart> {
219 let result = if indices.is_empty() {
220 key_val
221 } else if let value::KeyPart::Struct(fields) = key_val {
222 Self::get_local_key_field(&fields[indices[0] as usize], &indices[1..])?
223 } else {
224 internal_bail!("Only struct can be accessed by sub field");
225 };
226 Ok(result)
227 }
228
229 fn get_local_field<'b>(
230 val: &'b value::Value<ScopeValueBuilder>,
231 indices: &'_ [u32],
232 ) -> Result<&'b value::Value<ScopeValueBuilder>> {
233 let result = if indices.is_empty() {
234 val
235 } else if let value::Value::Null = val {
236 val
237 } else if let value::Value::Struct(fields) = val {
238 Self::get_local_field(&fields.fields[indices[0] as usize], &indices[1..])?
239 } else {
240 internal_bail!("Only struct can be accessed by sub field");
241 };
242 Ok(result)
243 }
244
245 fn get_value_field_builder(
246 &self,
247 field_ref: &AnalyzedLocalFieldReference,
248 ) -> Result<&value::Value<ScopeValueBuilder>> {
249 let first_index = field_ref.fields_idx[0] as usize;
250 let index_base = self.key.value_field_index_base();
251 let val = self.value.fields[first_index - index_base]
252 .get()
253 .ok_or_else(|| internal_error!("Field {} is not set", first_index))?;
254 Self::get_local_field(val, &field_ref.fields_idx[1..])
255 }
256
257 fn get_field(&self, field_ref: &AnalyzedLocalFieldReference) -> Result<value::Value> {
258 let first_index = field_ref.fields_idx[0] as usize;
259 let index_base = self.key.value_field_index_base();
260 let result = if first_index < index_base {
261 let key_val = self
262 .key
263 .key()
264 .ok_or_else(|| internal_error!("Key is not set"))?;
265 let key_part =
266 Self::get_local_key_field(&key_val[first_index], &field_ref.fields_idx[1..])?;
267 key_part.clone().into()
268 } else {
269 let val = self.value.fields[first_index - index_base]
270 .get()
271 .ok_or_else(|| internal_error!("Field {} is not set", first_index))?;
272 let val_part = Self::get_local_field(val, &field_ref.fields_idx[1..])?;
273 value::Value::from_alternative_ref(val_part)
274 };
275 Ok(result)
276 }
277
278 fn get_field_schema(
279 &self,
280 field_ref: &AnalyzedLocalFieldReference,
281 ) -> Result<&schema::FieldSchema> {
282 Self::get_local_field_schema(self.schema, &field_ref.fields_idx)
283 }
284
285 fn define_field_w_builder(
286 &self,
287 output_field: &AnalyzedOpOutput,
288 val: value::Value<ScopeValueBuilder>,
289 ) -> Result<()> {
290 let field_index = output_field.field_idx as usize;
291 let index_base = self.key.value_field_index_base();
292 self.value.fields[field_index - index_base].set(val).map_err(|_| {
293 internal_error!("Field {field_index} for scope is already set, violating single-definition rule.")
294 })?;
295 Ok(())
296 }
297
298 fn define_field(&self, output_field: &AnalyzedOpOutput, val: &value::Value) -> Result<()> {
299 let field_index = output_field.field_idx as usize;
300 let field_schema = &self.schema.fields[field_index];
301 let val = augmented_value(val, &field_schema.value_type.typ)?;
302 self.define_field_w_builder(output_field, val)?;
303 Ok(())
304 }
305}
306
307fn assemble_value(
308 value_mapping: &AnalyzedValueMapping,
309 scoped_entries: RefList<'_, &ScopeEntry<'_>>,
310) -> Result<value::Value> {
311 let result = match value_mapping {
312 AnalyzedValueMapping::Constant { value } => value.clone(),
313 AnalyzedValueMapping::Field(field_ref) => scoped_entries
314 .headn(field_ref.scope_up_level as usize)
315 .ok_or_else(|| internal_error!("Invalid scope_up_level: {}", field_ref.scope_up_level))?
316 .get_field(&field_ref.local)?,
317 AnalyzedValueMapping::Struct(mapping) => {
318 let fields = mapping
319 .fields
320 .iter()
321 .map(|f| assemble_value(f, scoped_entries))
322 .collect::<Result<Vec<_>>>()?;
323 value::Value::Struct(value::FieldValues { fields })
324 }
325 };
326 Ok(result)
327}
328
329fn assemble_input_values<'a>(
330 value_mappings: &'a [AnalyzedValueMapping],
331 scoped_entries: RefList<'a, &ScopeEntry<'a>>,
332) -> impl Iterator<Item = Result<value::Value>> + 'a {
333 value_mappings
334 .iter()
335 .map(move |value_mapping| assemble_value(value_mapping, scoped_entries))
336}
337
338async fn evaluate_child_op_scope(
339 op_scope: &AnalyzedOpScope,
340 scoped_entries: RefList<'_, &ScopeEntry<'_>>,
341 child_scope_entry: ScopeEntry<'_>,
342 concurrency_controller: &concur_control::ConcurrencyController,
343 memory: &EvaluationMemory,
344 operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
345) -> Result<()> {
346 let _permit = concurrency_controller
347 .acquire(Some(|| {
348 child_scope_entry
349 .value
350 .fields
351 .iter()
352 .map(|f| f.get().map_or(0, |v| v.estimated_byte_size()))
353 .sum()
354 }))
355 .await?;
356 evaluate_op_scope(
357 op_scope,
358 scoped_entries.prepend(&child_scope_entry),
359 memory,
360 operation_in_process_stats,
361 )
362 .await
363 .with_context(|| {
364 format!(
365 "Evaluating in scope with key {}",
366 match child_scope_entry.key.key() {
367 Some(k) => k.to_string(),
368 None => "()".to_string(),
369 }
370 )
371 })
372}
373
374async fn evaluate_with_timeout_and_warning<F, T>(
375 eval_future: F,
376 timeout_duration: Duration,
377 warn_duration: Duration,
378 op_kind: String,
379 op_name: String,
380) -> Result<T>
381where
382 F: std::future::Future<Output = Result<T>>,
383{
384 let mut eval_future = Box::pin(eval_future);
385 let mut to_warn = warn_duration < timeout_duration;
386 let timeout_future = tokio::time::sleep(timeout_duration);
387 tokio::pin!(timeout_future);
388
389 loop {
390 tokio::select! {
391 res = &mut eval_future => {
392 return res;
393 }
394 _ = &mut timeout_future => {
395 return Err(internal_error!(
396 "Function '{}' ({}) timed out after {} seconds",
397 op_kind, op_name, timeout_duration.as_secs()
398 ));
399 }
400 _ = tokio::time::sleep(warn_duration), if to_warn => {
401 warn!(
402 "Function '{}' ({}) is taking longer than {}s (will be timed out after {}s)",
403 op_kind, op_name, warn_duration.as_secs(), timeout_duration.as_secs()
404 );
405 to_warn = false;
406 }
407 }
408 }
409}
410
411async fn evaluate_op_scope(
412 op_scope: &AnalyzedOpScope,
413 scoped_entries: RefList<'_, &ScopeEntry<'_>>,
414 memory: &EvaluationMemory,
415 operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
416) -> Result<()> {
417 let head_scope = *scoped_entries.head().unwrap();
418 for reactive_op in op_scope.reactive_ops.iter() {
419 match reactive_op {
420 AnalyzedReactiveOp::Transform(op) => {
421 if let Some(op_stats) = operation_in_process_stats {
423 let transform_key =
424 format!("transform/{}{}", op_scope.scope_qualifier, op.name);
425 op_stats.start_processing(&transform_key, 1);
426 }
427
428 let mut input_values = Vec::with_capacity(op.inputs.len());
429 for value in assemble_input_values(&op.inputs, scoped_entries) {
430 input_values.push(value?);
431 }
432
433 let timeout_duration = op
434 .function_exec_info
435 .timeout
436 .unwrap_or(DEFAULT_TIMEOUT_THRESHOLD);
437 let warn_duration = std::cmp::max(timeout_duration / 2, MIN_WARNING_THRESHOLD);
438
439 let op_name_for_warning = op.name.clone();
440 let op_kind_for_warning = op.op_kind.clone();
441
442 let result = if op.function_exec_info.enable_cache {
443 let output_value_cell = memory.get_cache_entry(
444 || -> Result<_> {
445 Ok(op
446 .function_exec_info
447 .fingerprinter
448 .clone()
449 .with(&input_values)
450 .map(|fp| fp.into_fingerprint())?)
451 },
452 &op.function_exec_info.output_type,
453 None,
454 )?;
455
456 let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || {
457 op.executor.evaluate(input_values)
458 });
459 let v = evaluate_with_timeout_and_warning(
460 eval_future,
461 timeout_duration,
462 warn_duration,
463 op_kind_for_warning,
464 op_name_for_warning,
465 )
466 .await?;
467
468 head_scope.define_field(&op.output, &v)
469 } else {
470 let eval_future = op.executor.evaluate(input_values);
471 let v = evaluate_with_timeout_and_warning(
472 eval_future,
473 timeout_duration,
474 warn_duration,
475 op_kind_for_warning,
476 op_name_for_warning,
477 )
478 .await?;
479
480 head_scope.define_field(&op.output, &v)
481 };
482
483 if let Some(op_stats) = operation_in_process_stats {
485 let transform_key =
486 format!("transform/{}{}", op_scope.scope_qualifier, op.name);
487 op_stats.finish_processing(&transform_key, 1);
488 }
489
490 result.with_context(|| format!("Evaluating Transform op `{}`", op.name))?
491 }
492
493 AnalyzedReactiveOp::ForEach(op) => {
494 let target_field_schema = head_scope.get_field_schema(&op.local_field_ref)?;
495 let table_schema = match &target_field_schema.value_type.typ {
496 schema::ValueType::Table(cs) => cs,
497 _ => internal_bail!("Expect target field to be a table"),
498 };
499
500 let target_field = head_scope.get_value_field_builder(&op.local_field_ref)?;
501 let task_futs = match target_field {
502 value::Value::Null => vec![],
503 value::Value::UTable(v) => v
504 .iter()
505 .map(|item| {
506 evaluate_child_op_scope(
507 &op.op_scope,
508 scoped_entries,
509 ScopeEntry::new(
510 ScopeKey::None,
511 item,
512 &table_schema.row,
513 &op.op_scope,
514 ),
515 &op.concurrency_controller,
516 memory,
517 operation_in_process_stats,
518 )
519 })
520 .collect::<Vec<_>>(),
521 value::Value::KTable(v) => v
522 .iter()
523 .map(|(k, v)| {
524 evaluate_child_op_scope(
525 &op.op_scope,
526 scoped_entries,
527 ScopeEntry::new(
528 ScopeKey::MapKey(k),
529 v,
530 &table_schema.row,
531 &op.op_scope,
532 ),
533 &op.concurrency_controller,
534 memory,
535 operation_in_process_stats,
536 )
537 })
538 .collect::<Vec<_>>(),
539 value::Value::LTable(v) => v
540 .iter()
541 .enumerate()
542 .map(|(i, item)| {
543 evaluate_child_op_scope(
544 &op.op_scope,
545 scoped_entries,
546 ScopeEntry::new(
547 ScopeKey::ListIndex(i),
548 item,
549 &table_schema.row,
550 &op.op_scope,
551 ),
552 &op.concurrency_controller,
553 memory,
554 operation_in_process_stats,
555 )
556 })
557 .collect::<Vec<_>>(),
558 _ => {
559 internal_bail!("Target field type is expected to be a table");
560 }
561 };
562 try_join_all(task_futs)
563 .await
564 .with_context(|| format!("Evaluating ForEach op `{}`", op.name,))?;
565 }
566
567 AnalyzedReactiveOp::Collect(op) => {
568 let mut field_values = Vec::with_capacity(
569 op.input.fields.len() + if op.has_auto_uuid_field { 1 } else { 0 },
570 );
571 let field_values_iter = assemble_input_values(&op.input.fields, scoped_entries);
572 if op.has_auto_uuid_field {
573 field_values.push(value::Value::Null);
574 for value in field_values_iter {
575 field_values.push(value?);
576 }
577 let uuid = memory.next_uuid(
578 op.fingerprinter
579 .clone()
580 .with(&field_values[1..])?
581 .into_fingerprint(),
582 )?;
583 field_values[0] = value::Value::Basic(value::BasicValue::Uuid(uuid));
584 } else {
585 for value in field_values_iter {
586 field_values.push(value?);
587 }
588 };
589 let collector_entry = scoped_entries
590 .headn(op.collector_ref.scope_up_level as usize)
591 .ok_or_else(|| internal_error!("Collector level out of bound"))?;
592
593 let input_values: Vec<value::Value> =
595 assemble_input_values(&op.input.fields, scoped_entries)
596 .collect::<Result<Vec<_>>>()?;
597
598 let mut field_values = op
600 .field_index_mapping
601 .iter()
602 .map(|idx| {
603 idx.map_or(value::Value::Null, |input_idx| {
604 input_values[input_idx].clone()
605 })
606 })
607 .collect::<Vec<_>>();
608
609 if op.has_auto_uuid_field
611 && let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx
612 {
613 let uuid = memory.next_uuid(
614 op.fingerprinter
615 .clone()
616 .with(
617 &field_values
618 .iter()
619 .enumerate()
620 .filter(|(i, _)| *i != uuid_idx)
621 .map(|(_, v)| v)
622 .collect::<Vec<_>>(),
623 )?
624 .into_fingerprint(),
625 )?;
626 field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid));
627 }
628
629 {
630 let mut collected_records = collector_entry.collected_values
631 [op.collector_ref.local.collector_idx as usize]
632 .lock()
633 .unwrap();
634 collected_records.push(value::FieldValues {
635 fields: field_values,
636 });
637 }
638 }
639 }
640 }
641 Ok(())
642}
643
644pub struct SourceRowEvaluationContext<'a> {
645 pub plan: &'a ExecutionPlan,
646 pub import_op: &'a AnalyzedImportOp,
647 pub schema: &'a schema::FlowSchema,
648 pub key: &'a value::KeyValue,
649 pub import_op_idx: usize,
650 #[cfg(feature = "persistence")]
651 pub source_logic_fp: &'a SourceLogicFingerprint,
652}
653
654#[derive(Debug)]
655pub struct EvaluateSourceEntryOutput {
656 pub data_scope: ScopeValueBuilder,
657 pub collected_values: Vec<Vec<value::FieldValues>>,
658}
659
660#[instrument(name = "evaluate_source_entry", skip_all, fields(source_name = %src_eval_ctx.import_op.name))]
661pub async fn evaluate_source_entry(
662 src_eval_ctx: &SourceRowEvaluationContext<'_>,
663 source_value: value::FieldValues,
664 memory: &EvaluationMemory,
665 operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
666) -> Result<EvaluateSourceEntryOutput> {
667 let _permit = src_eval_ctx
668 .import_op
669 .concurrency_controller
670 .acquire_bytes_with_reservation(|| source_value.estimated_byte_size())
671 .await?;
672 let root_schema = &src_eval_ctx.schema.schema;
673 let root_scope_value = ScopeValueBuilder::new(root_schema.fields.len());
674 let root_scope_entry = ScopeEntry::new(
675 ScopeKey::None,
676 &root_scope_value,
677 root_schema,
678 &src_eval_ctx.plan.op_scope,
679 );
680
681 let table_schema = match &root_schema.fields[src_eval_ctx.import_op.output.field_idx as usize]
682 .value_type
683 .typ
684 {
685 schema::ValueType::Table(cs) => cs,
686 _ => {
687 internal_bail!("Expect source output to be a table")
688 }
689 };
690
691 let scope_value =
692 ScopeValueBuilder::augmented_from(&value::ScopeValue(source_value), table_schema)?;
693 root_scope_entry.define_field_w_builder(
694 &src_eval_ctx.import_op.output,
695 value::Value::KTable(BTreeMap::from([(src_eval_ctx.key.clone(), scope_value)])),
696 )?;
697
698 for import_op in src_eval_ctx.plan.import_ops.iter() {
700 let field_idx = import_op.output.field_idx;
701 if field_idx != src_eval_ctx.import_op.output.field_idx {
702 root_scope_entry.define_field(
703 &AnalyzedOpOutput { field_idx },
704 &value::Value::KTable(BTreeMap::new()),
705 )?;
706 }
707 }
708
709 evaluate_op_scope(
710 &src_eval_ctx.plan.op_scope,
711 RefList::Nil.prepend(&root_scope_entry),
712 memory,
713 operation_in_process_stats,
714 )
715 .await?;
716 let collected_values = root_scope_entry
717 .collected_values
718 .into_iter()
719 .map(|v| v.into_inner().unwrap())
720 .collect::<Vec<_>>();
721 Ok(EvaluateSourceEntryOutput {
722 data_scope: root_scope_value,
723 collected_values,
724 })
725}
726
727#[instrument(name = "evaluate_transient_flow", skip_all, fields(flow_name = %flow.transient_flow_instance.name))]
728pub async fn evaluate_transient_flow(
729 flow: &AnalyzedTransientFlow,
730 input_values: &Vec<value::Value>,
731) -> Result<value::Value> {
732 let root_schema = &flow.data_schema.schema;
733 let root_scope_value = ScopeValueBuilder::new(root_schema.fields.len());
734 let root_scope_entry = ScopeEntry::new(
735 ScopeKey::None,
736 &root_scope_value,
737 root_schema,
738 &flow.execution_plan.op_scope,
739 );
740
741 if input_values.len() != flow.execution_plan.input_fields.len() {
742 client_bail!(
743 "Input values length mismatch: expect {}, got {}",
744 flow.execution_plan.input_fields.len(),
745 input_values.len()
746 );
747 }
748 for (field, value) in flow.execution_plan.input_fields.iter().zip(input_values) {
749 root_scope_entry.define_field(field, value)?;
750 }
751 let eval_memory = EvaluationMemory::new(
752 chrono::Utc::now(),
753 None,
754 EvaluationMemoryOptions {
755 enable_cache: false,
756 evaluation_only: true,
757 },
758 );
759 evaluate_op_scope(
760 &flow.execution_plan.op_scope,
761 RefList::Nil.prepend(&root_scope_entry),
762 &eval_memory,
763 None, )
765 .await?;
766 let output_value = assemble_value(
767 &flow.execution_plan.output_value,
768 RefList::Nil.prepend(&root_scope_entry),
769 )?;
770 Ok(output_value)
771}