Skip to main content

recoco_core/execution/
evaluator.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13#[cfg(feature = "persistence")]
14use crate::execution::indexing_status::SourceLogicFingerprint;
15use crate::prelude::*;
16
17use futures::future::try_join_all;
18use tokio::time::Duration;
19
20use crate::base::value::EstimatedByteSize;
21use crate::base::{schema, value};
22use crate::builder::{AnalyzedTransientFlow, plan::*};
23use utils::immutable::RefList;
24
25use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, evaluate_with_cell};
26
27const DEFAULT_TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800);
28const MIN_WARNING_THRESHOLD: Duration = Duration::from_secs(30);
29
30#[derive(Debug)]
31pub struct ScopeValueBuilder {
32    // TODO: Share the same lock for values produced in the same execution scope, for stricter atomicity.
33    pub fields: Vec<OnceLock<value::Value<ScopeValueBuilder>>>,
34}
35
36impl value::EstimatedByteSize for ScopeValueBuilder {
37    fn estimated_detached_byte_size(&self) -> usize {
38        self.fields
39            .iter()
40            .map(|f| f.get().map_or(0, |v| v.estimated_byte_size()))
41            .sum()
42    }
43}
44
45impl From<&ScopeValueBuilder> for value::ScopeValue {
46    fn from(val: &ScopeValueBuilder) -> Self {
47        value::ScopeValue(value::FieldValues {
48            fields: val
49                .fields
50                .iter()
51                .map(|f| value::Value::from_alternative_ref(f.get().unwrap()))
52                .collect(),
53        })
54    }
55}
56
57impl From<ScopeValueBuilder> for value::ScopeValue {
58    fn from(val: ScopeValueBuilder) -> Self {
59        value::ScopeValue(value::FieldValues {
60            fields: val
61                .fields
62                .into_iter()
63                .map(|f| value::Value::from_alternative(f.into_inner().unwrap()))
64                .collect(),
65        })
66    }
67}
68
69impl ScopeValueBuilder {
70    fn new(num_fields: usize) -> Self {
71        let mut fields = Vec::with_capacity(num_fields);
72        fields.resize_with(num_fields, OnceLock::new);
73        Self { fields }
74    }
75
76    fn augmented_from(source: &value::ScopeValue, schema: &schema::TableSchema) -> Result<Self> {
77        let val_index_base = schema.key_schema().len();
78        let len = schema.row.fields.len() - val_index_base;
79
80        let mut builder = Self::new(len);
81
82        let value::ScopeValue(source_fields) = source;
83        for ((v, t), r) in source_fields
84            .fields
85            .iter()
86            .zip(schema.row.fields[val_index_base..(val_index_base + len)].iter())
87            .zip(&mut builder.fields)
88        {
89            r.set(augmented_value(v, &t.value_type.typ)?)
90                .map_err(|_| internal_error!("Value of field `{}` is already set", t.name))?;
91        }
92        Ok(builder)
93    }
94}
95
96fn augmented_value(
97    val: &value::Value,
98    val_type: &schema::ValueType,
99) -> Result<value::Value<ScopeValueBuilder>> {
100    let value = match (val, val_type) {
101        (value::Value::Null, _) => value::Value::Null,
102        (value::Value::Basic(v), _) => value::Value::Basic(v.clone()),
103        (value::Value::Struct(v), schema::ValueType::Struct(t)) => {
104            value::Value::Struct(value::FieldValues {
105                fields: v
106                    .fields
107                    .iter()
108                    .enumerate()
109                    .map(|(i, v)| augmented_value(v, &t.fields[i].value_type.typ))
110                    .collect::<Result<Vec<_>>>()?,
111            })
112        }
113        (value::Value::UTable(v), schema::ValueType::Table(t)) => value::Value::UTable(
114            v.iter()
115                .map(|v| ScopeValueBuilder::augmented_from(v, t))
116                .collect::<Result<Vec<_>>>()?,
117        ),
118        (value::Value::KTable(v), schema::ValueType::Table(t)) => value::Value::KTable(
119            v.iter()
120                .map(|(k, v)| Ok((k.clone(), ScopeValueBuilder::augmented_from(v, t)?)))
121                .collect::<Result<BTreeMap<_, _>>>()?,
122        ),
123        (value::Value::LTable(v), schema::ValueType::Table(t)) => value::Value::LTable(
124            v.iter()
125                .map(|v| ScopeValueBuilder::augmented_from(v, t))
126                .collect::<Result<Vec<_>>>()?,
127        ),
128        (val, _) => internal_bail!("Value kind doesn't match the type {val_type}: {val:?}"),
129    };
130    Ok(value)
131}
132
133enum ScopeKey<'a> {
134    /// For root struct and UTable.
135    None,
136    /// For KTable row.
137    MapKey(&'a value::KeyValue),
138    /// For LTable row.
139    ListIndex(usize),
140}
141
142impl<'a> ScopeKey<'a> {
143    pub fn key(&self) -> Option<Cow<'a, value::KeyValue>> {
144        match self {
145            ScopeKey::None => None,
146            ScopeKey::MapKey(k) => Some(Cow::Borrowed(k)),
147            ScopeKey::ListIndex(i) => {
148                Some(Cow::Owned(value::KeyValue::from_single_part(*i as i64)))
149            }
150        }
151    }
152
153    pub fn value_field_index_base(&self) -> usize {
154        match *self {
155            ScopeKey::None => 0,
156            ScopeKey::MapKey(v) => v.len(),
157            ScopeKey::ListIndex(_) => 0,
158        }
159    }
160}
161
162impl std::fmt::Display for ScopeKey<'_> {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        match self {
165            ScopeKey::None => write!(f, "()"),
166            ScopeKey::MapKey(k) => write!(f, "{k}"),
167            ScopeKey::ListIndex(i) => write!(f, "[{i}]"),
168        }
169    }
170}
171
172struct ScopeEntry<'a> {
173    key: ScopeKey<'a>,
174    value: &'a ScopeValueBuilder,
175    schema: &'a schema::StructSchema,
176    collected_values: Vec<Mutex<Vec<value::FieldValues>>>,
177}
178
179impl<'a> ScopeEntry<'a> {
180    fn new(
181        key: ScopeKey<'a>,
182        value: &'a ScopeValueBuilder,
183        schema: &'a schema::StructSchema,
184        analyzed_op_scope: &AnalyzedOpScope,
185    ) -> Self {
186        let mut collected_values = Vec::with_capacity(analyzed_op_scope.collector_len);
187        collected_values.resize_with(analyzed_op_scope.collector_len, Default::default);
188
189        Self {
190            key,
191            value,
192            schema,
193            collected_values,
194        }
195    }
196
197    fn get_local_field_schema<'b>(
198        schema: &'b schema::StructSchema,
199        indices: &[u32],
200    ) -> Result<&'b schema::FieldSchema> {
201        let field_idx = indices[0] as usize;
202        let field_schema = &schema.fields[field_idx];
203        let result = if indices.len() == 1 {
204            field_schema
205        } else {
206            let struct_field_schema = match &field_schema.value_type.typ {
207                schema::ValueType::Struct(s) => s,
208                _ => internal_bail!("Expect struct field"),
209            };
210            Self::get_local_field_schema(struct_field_schema, &indices[1..])?
211        };
212        Ok(result)
213    }
214
215    fn get_local_key_field<'b>(
216        key_val: &'b value::KeyPart,
217        indices: &'_ [u32],
218    ) -> Result<&'b value::KeyPart> {
219        let result = if indices.is_empty() {
220            key_val
221        } else if let value::KeyPart::Struct(fields) = key_val {
222            Self::get_local_key_field(&fields[indices[0] as usize], &indices[1..])?
223        } else {
224            internal_bail!("Only struct can be accessed by sub field");
225        };
226        Ok(result)
227    }
228
229    fn get_local_field<'b>(
230        val: &'b value::Value<ScopeValueBuilder>,
231        indices: &'_ [u32],
232    ) -> Result<&'b value::Value<ScopeValueBuilder>> {
233        let result = if indices.is_empty() {
234            val
235        } else if let value::Value::Null = val {
236            val
237        } else if let value::Value::Struct(fields) = val {
238            Self::get_local_field(&fields.fields[indices[0] as usize], &indices[1..])?
239        } else {
240            internal_bail!("Only struct can be accessed by sub field");
241        };
242        Ok(result)
243    }
244
245    fn get_value_field_builder(
246        &self,
247        field_ref: &AnalyzedLocalFieldReference,
248    ) -> Result<&value::Value<ScopeValueBuilder>> {
249        let first_index = field_ref.fields_idx[0] as usize;
250        let index_base = self.key.value_field_index_base();
251        let val = self.value.fields[first_index - index_base]
252            .get()
253            .ok_or_else(|| internal_error!("Field {} is not set", first_index))?;
254        Self::get_local_field(val, &field_ref.fields_idx[1..])
255    }
256
257    fn get_field(&self, field_ref: &AnalyzedLocalFieldReference) -> Result<value::Value> {
258        let first_index = field_ref.fields_idx[0] as usize;
259        let index_base = self.key.value_field_index_base();
260        let result = if first_index < index_base {
261            let key_val = self
262                .key
263                .key()
264                .ok_or_else(|| internal_error!("Key is not set"))?;
265            let key_part =
266                Self::get_local_key_field(&key_val[first_index], &field_ref.fields_idx[1..])?;
267            key_part.clone().into()
268        } else {
269            let val = self.value.fields[first_index - index_base]
270                .get()
271                .ok_or_else(|| internal_error!("Field {} is not set", first_index))?;
272            let val_part = Self::get_local_field(val, &field_ref.fields_idx[1..])?;
273            value::Value::from_alternative_ref(val_part)
274        };
275        Ok(result)
276    }
277
278    fn get_field_schema(
279        &self,
280        field_ref: &AnalyzedLocalFieldReference,
281    ) -> Result<&schema::FieldSchema> {
282        Self::get_local_field_schema(self.schema, &field_ref.fields_idx)
283    }
284
285    fn define_field_w_builder(
286        &self,
287        output_field: &AnalyzedOpOutput,
288        val: value::Value<ScopeValueBuilder>,
289    ) -> Result<()> {
290        let field_index = output_field.field_idx as usize;
291        let index_base = self.key.value_field_index_base();
292        self.value.fields[field_index - index_base].set(val).map_err(|_| {
293            internal_error!("Field {field_index} for scope is already set, violating single-definition rule.")
294        })?;
295        Ok(())
296    }
297
298    fn define_field(&self, output_field: &AnalyzedOpOutput, val: &value::Value) -> Result<()> {
299        let field_index = output_field.field_idx as usize;
300        let field_schema = &self.schema.fields[field_index];
301        let val = augmented_value(val, &field_schema.value_type.typ)?;
302        self.define_field_w_builder(output_field, val)?;
303        Ok(())
304    }
305}
306
307fn assemble_value(
308    value_mapping: &AnalyzedValueMapping,
309    scoped_entries: RefList<'_, &ScopeEntry<'_>>,
310) -> Result<value::Value> {
311    let result = match value_mapping {
312        AnalyzedValueMapping::Constant { value } => value.clone(),
313        AnalyzedValueMapping::Field(field_ref) => scoped_entries
314            .headn(field_ref.scope_up_level as usize)
315            .ok_or_else(|| internal_error!("Invalid scope_up_level: {}", field_ref.scope_up_level))?
316            .get_field(&field_ref.local)?,
317        AnalyzedValueMapping::Struct(mapping) => {
318            let fields = mapping
319                .fields
320                .iter()
321                .map(|f| assemble_value(f, scoped_entries))
322                .collect::<Result<Vec<_>>>()?;
323            value::Value::Struct(value::FieldValues { fields })
324        }
325    };
326    Ok(result)
327}
328
329fn assemble_input_values<'a>(
330    value_mappings: &'a [AnalyzedValueMapping],
331    scoped_entries: RefList<'a, &ScopeEntry<'a>>,
332) -> impl Iterator<Item = Result<value::Value>> + 'a {
333    value_mappings
334        .iter()
335        .map(move |value_mapping| assemble_value(value_mapping, scoped_entries))
336}
337
338async fn evaluate_child_op_scope(
339    op_scope: &AnalyzedOpScope,
340    scoped_entries: RefList<'_, &ScopeEntry<'_>>,
341    child_scope_entry: ScopeEntry<'_>,
342    concurrency_controller: &concur_control::ConcurrencyController,
343    memory: &EvaluationMemory,
344    operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
345) -> Result<()> {
346    let _permit = concurrency_controller
347        .acquire(Some(|| {
348            child_scope_entry
349                .value
350                .fields
351                .iter()
352                .map(|f| f.get().map_or(0, |v| v.estimated_byte_size()))
353                .sum()
354        }))
355        .await?;
356    evaluate_op_scope(
357        op_scope,
358        scoped_entries.prepend(&child_scope_entry),
359        memory,
360        operation_in_process_stats,
361    )
362    .await
363    .with_context(|| {
364        format!(
365            "Evaluating in scope with key {}",
366            match child_scope_entry.key.key() {
367                Some(k) => k.to_string(),
368                None => "()".to_string(),
369            }
370        )
371    })
372}
373
374async fn evaluate_with_timeout_and_warning<F, T>(
375    eval_future: F,
376    timeout_duration: Duration,
377    warn_duration: Duration,
378    op_kind: String,
379    op_name: String,
380) -> Result<T>
381where
382    F: std::future::Future<Output = Result<T>>,
383{
384    let mut eval_future = Box::pin(eval_future);
385    let mut to_warn = warn_duration < timeout_duration;
386    let timeout_future = tokio::time::sleep(timeout_duration);
387    tokio::pin!(timeout_future);
388
389    loop {
390        tokio::select! {
391            res = &mut eval_future => {
392                return res;
393            }
394            _ = &mut timeout_future => {
395                return Err(internal_error!(
396                    "Function '{}' ({}) timed out after {} seconds",
397                    op_kind, op_name, timeout_duration.as_secs()
398                ));
399            }
400            _ = tokio::time::sleep(warn_duration), if to_warn => {
401                warn!(
402                    "Function '{}' ({}) is taking longer than {}s (will be timed out after {}s)",
403                    op_kind, op_name, warn_duration.as_secs(), timeout_duration.as_secs()
404                );
405                to_warn = false;
406            }
407        }
408    }
409}
410
411async fn evaluate_op_scope(
412    op_scope: &AnalyzedOpScope,
413    scoped_entries: RefList<'_, &ScopeEntry<'_>>,
414    memory: &EvaluationMemory,
415    operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
416) -> Result<()> {
417    let head_scope = *scoped_entries.head().unwrap();
418    for reactive_op in op_scope.reactive_ops.iter() {
419        match reactive_op {
420            AnalyzedReactiveOp::Transform(op) => {
421                // Track transform operation start
422                if let Some(op_stats) = operation_in_process_stats {
423                    let transform_key =
424                        format!("transform/{}{}", op_scope.scope_qualifier, op.name);
425                    op_stats.start_processing(&transform_key, 1);
426                }
427
428                let mut input_values = Vec::with_capacity(op.inputs.len());
429                for value in assemble_input_values(&op.inputs, scoped_entries) {
430                    input_values.push(value?);
431                }
432
433                let timeout_duration = op
434                    .function_exec_info
435                    .timeout
436                    .unwrap_or(DEFAULT_TIMEOUT_THRESHOLD);
437                let warn_duration = std::cmp::max(timeout_duration / 2, MIN_WARNING_THRESHOLD);
438
439                let op_name_for_warning = op.name.clone();
440                let op_kind_for_warning = op.op_kind.clone();
441
442                let result = if op.function_exec_info.enable_cache {
443                    let output_value_cell = memory.get_cache_entry(
444                        || -> Result<_> {
445                            Ok(op
446                                .function_exec_info
447                                .fingerprinter
448                                .clone()
449                                .with(&input_values)
450                                .map(|fp| fp.into_fingerprint())?)
451                        },
452                        &op.function_exec_info.output_type,
453                        /*ttl=*/ None,
454                    )?;
455
456                    let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || {
457                        op.executor.evaluate(input_values)
458                    });
459                    let v = evaluate_with_timeout_and_warning(
460                        eval_future,
461                        timeout_duration,
462                        warn_duration,
463                        op_kind_for_warning,
464                        op_name_for_warning,
465                    )
466                    .await?;
467
468                    head_scope.define_field(&op.output, &v)
469                } else {
470                    let eval_future = op.executor.evaluate(input_values);
471                    let v = evaluate_with_timeout_and_warning(
472                        eval_future,
473                        timeout_duration,
474                        warn_duration,
475                        op_kind_for_warning,
476                        op_name_for_warning,
477                    )
478                    .await?;
479
480                    head_scope.define_field(&op.output, &v)
481                };
482
483                // Track transform operation completion
484                if let Some(op_stats) = operation_in_process_stats {
485                    let transform_key =
486                        format!("transform/{}{}", op_scope.scope_qualifier, op.name);
487                    op_stats.finish_processing(&transform_key, 1);
488                }
489
490                result.with_context(|| format!("Evaluating Transform op `{}`", op.name))?
491            }
492
493            AnalyzedReactiveOp::ForEach(op) => {
494                let target_field_schema = head_scope.get_field_schema(&op.local_field_ref)?;
495                let table_schema = match &target_field_schema.value_type.typ {
496                    schema::ValueType::Table(cs) => cs,
497                    _ => internal_bail!("Expect target field to be a table"),
498                };
499
500                let target_field = head_scope.get_value_field_builder(&op.local_field_ref)?;
501                let task_futs = match target_field {
502                    value::Value::Null => vec![],
503                    value::Value::UTable(v) => v
504                        .iter()
505                        .map(|item| {
506                            evaluate_child_op_scope(
507                                &op.op_scope,
508                                scoped_entries,
509                                ScopeEntry::new(
510                                    ScopeKey::None,
511                                    item,
512                                    &table_schema.row,
513                                    &op.op_scope,
514                                ),
515                                &op.concurrency_controller,
516                                memory,
517                                operation_in_process_stats,
518                            )
519                        })
520                        .collect::<Vec<_>>(),
521                    value::Value::KTable(v) => v
522                        .iter()
523                        .map(|(k, v)| {
524                            evaluate_child_op_scope(
525                                &op.op_scope,
526                                scoped_entries,
527                                ScopeEntry::new(
528                                    ScopeKey::MapKey(k),
529                                    v,
530                                    &table_schema.row,
531                                    &op.op_scope,
532                                ),
533                                &op.concurrency_controller,
534                                memory,
535                                operation_in_process_stats,
536                            )
537                        })
538                        .collect::<Vec<_>>(),
539                    value::Value::LTable(v) => v
540                        .iter()
541                        .enumerate()
542                        .map(|(i, item)| {
543                            evaluate_child_op_scope(
544                                &op.op_scope,
545                                scoped_entries,
546                                ScopeEntry::new(
547                                    ScopeKey::ListIndex(i),
548                                    item,
549                                    &table_schema.row,
550                                    &op.op_scope,
551                                ),
552                                &op.concurrency_controller,
553                                memory,
554                                operation_in_process_stats,
555                            )
556                        })
557                        .collect::<Vec<_>>(),
558                    _ => {
559                        internal_bail!("Target field type is expected to be a table");
560                    }
561                };
562                try_join_all(task_futs)
563                    .await
564                    .with_context(|| format!("Evaluating ForEach op `{}`", op.name,))?;
565            }
566
567            AnalyzedReactiveOp::Collect(op) => {
568                let mut field_values = Vec::with_capacity(
569                    op.input.fields.len() + if op.has_auto_uuid_field { 1 } else { 0 },
570                );
571                let field_values_iter = assemble_input_values(&op.input.fields, scoped_entries);
572                if op.has_auto_uuid_field {
573                    field_values.push(value::Value::Null);
574                    for value in field_values_iter {
575                        field_values.push(value?);
576                    }
577                    let uuid = memory.next_uuid(
578                        op.fingerprinter
579                            .clone()
580                            .with(&field_values[1..])?
581                            .into_fingerprint(),
582                    )?;
583                    field_values[0] = value::Value::Basic(value::BasicValue::Uuid(uuid));
584                } else {
585                    for value in field_values_iter {
586                        field_values.push(value?);
587                    }
588                };
589                let collector_entry = scoped_entries
590                    .headn(op.collector_ref.scope_up_level as usize)
591                    .ok_or_else(|| internal_error!("Collector level out of bound"))?;
592
593                // Assemble input values
594                let input_values: Vec<value::Value> =
595                    assemble_input_values(&op.input.fields, scoped_entries)
596                        .collect::<Result<Vec<_>>>()?;
597
598                // Create field_values vector for all fields in the merged schema
599                let mut field_values = op
600                    .field_index_mapping
601                    .iter()
602                    .map(|idx| {
603                        idx.map_or(value::Value::Null, |input_idx| {
604                            input_values[input_idx].clone()
605                        })
606                    })
607                    .collect::<Vec<_>>();
608
609                // Handle auto_uuid_field (assumed to be at position 0 for efficiency)
610                if op.has_auto_uuid_field
611                    && let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx
612                {
613                    let uuid = memory.next_uuid(
614                        op.fingerprinter
615                            .clone()
616                            .with(
617                                &field_values
618                                    .iter()
619                                    .enumerate()
620                                    .filter(|(i, _)| *i != uuid_idx)
621                                    .map(|(_, v)| v)
622                                    .collect::<Vec<_>>(),
623                            )?
624                            .into_fingerprint(),
625                    )?;
626                    field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid));
627                }
628
629                {
630                    let mut collected_records = collector_entry.collected_values
631                        [op.collector_ref.local.collector_idx as usize]
632                        .lock()
633                        .unwrap();
634                    collected_records.push(value::FieldValues {
635                        fields: field_values,
636                    });
637                }
638            }
639        }
640    }
641    Ok(())
642}
643
644pub struct SourceRowEvaluationContext<'a> {
645    pub plan: &'a ExecutionPlan,
646    pub import_op: &'a AnalyzedImportOp,
647    pub schema: &'a schema::FlowSchema,
648    pub key: &'a value::KeyValue,
649    pub import_op_idx: usize,
650    #[cfg(feature = "persistence")]
651    pub source_logic_fp: &'a SourceLogicFingerprint,
652}
653
654#[derive(Debug)]
655pub struct EvaluateSourceEntryOutput {
656    pub data_scope: ScopeValueBuilder,
657    pub collected_values: Vec<Vec<value::FieldValues>>,
658}
659
660#[instrument(name = "evaluate_source_entry", skip_all, fields(source_name = %src_eval_ctx.import_op.name))]
661pub async fn evaluate_source_entry(
662    src_eval_ctx: &SourceRowEvaluationContext<'_>,
663    source_value: value::FieldValues,
664    memory: &EvaluationMemory,
665    operation_in_process_stats: Option<&execution::stats::OperationInProcessStats>,
666) -> Result<EvaluateSourceEntryOutput> {
667    let _permit = src_eval_ctx
668        .import_op
669        .concurrency_controller
670        .acquire_bytes_with_reservation(|| source_value.estimated_byte_size())
671        .await?;
672    let root_schema = &src_eval_ctx.schema.schema;
673    let root_scope_value = ScopeValueBuilder::new(root_schema.fields.len());
674    let root_scope_entry = ScopeEntry::new(
675        ScopeKey::None,
676        &root_scope_value,
677        root_schema,
678        &src_eval_ctx.plan.op_scope,
679    );
680
681    let table_schema = match &root_schema.fields[src_eval_ctx.import_op.output.field_idx as usize]
682        .value_type
683        .typ
684    {
685        schema::ValueType::Table(cs) => cs,
686        _ => {
687            internal_bail!("Expect source output to be a table")
688        }
689    };
690
691    let scope_value =
692        ScopeValueBuilder::augmented_from(&value::ScopeValue(source_value), table_schema)?;
693    root_scope_entry.define_field_w_builder(
694        &src_eval_ctx.import_op.output,
695        value::Value::KTable(BTreeMap::from([(src_eval_ctx.key.clone(), scope_value)])),
696    )?;
697
698    // Fill other source fields with empty tables
699    for import_op in src_eval_ctx.plan.import_ops.iter() {
700        let field_idx = import_op.output.field_idx;
701        if field_idx != src_eval_ctx.import_op.output.field_idx {
702            root_scope_entry.define_field(
703                &AnalyzedOpOutput { field_idx },
704                &value::Value::KTable(BTreeMap::new()),
705            )?;
706        }
707    }
708
709    evaluate_op_scope(
710        &src_eval_ctx.plan.op_scope,
711        RefList::Nil.prepend(&root_scope_entry),
712        memory,
713        operation_in_process_stats,
714    )
715    .await?;
716    let collected_values = root_scope_entry
717        .collected_values
718        .into_iter()
719        .map(|v| v.into_inner().unwrap())
720        .collect::<Vec<_>>();
721    Ok(EvaluateSourceEntryOutput {
722        data_scope: root_scope_value,
723        collected_values,
724    })
725}
726
727#[instrument(name = "evaluate_transient_flow", skip_all, fields(flow_name = %flow.transient_flow_instance.name))]
728pub async fn evaluate_transient_flow(
729    flow: &AnalyzedTransientFlow,
730    input_values: &Vec<value::Value>,
731) -> Result<value::Value> {
732    let root_schema = &flow.data_schema.schema;
733    let root_scope_value = ScopeValueBuilder::new(root_schema.fields.len());
734    let root_scope_entry = ScopeEntry::new(
735        ScopeKey::None,
736        &root_scope_value,
737        root_schema,
738        &flow.execution_plan.op_scope,
739    );
740
741    if input_values.len() != flow.execution_plan.input_fields.len() {
742        client_bail!(
743            "Input values length mismatch: expect {}, got {}",
744            flow.execution_plan.input_fields.len(),
745            input_values.len()
746        );
747    }
748    for (field, value) in flow.execution_plan.input_fields.iter().zip(input_values) {
749        root_scope_entry.define_field(field, value)?;
750    }
751    let eval_memory = EvaluationMemory::new(
752        chrono::Utc::now(),
753        None,
754        EvaluationMemoryOptions {
755            enable_cache: false,
756            evaluation_only: true,
757        },
758    );
759    evaluate_op_scope(
760        &flow.execution_plan.op_scope,
761        RefList::Nil.prepend(&root_scope_entry),
762        &eval_memory,
763        None, // No operation stats for transient flows
764    )
765    .await?;
766    let output_value = assemble_value(
767        &flow.execution_plan.output_value,
768        RefList::Nil.prepend(&root_scope_entry),
769    )?;
770    Ok(output_value)
771}