Skip to main content

recoco_core/execution/
dumper.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13use crate::execution::indexing_status::SourceLogicFingerprint;
14use crate::prelude::*;
15
16use futures::{StreamExt, future::try_join_all};
17use itertools::Itertools;
18use serde::ser::SerializeSeq;
19use sqlx::PgPool;
20use std::path::{Path, PathBuf};
21use yaml_rust2::YamlEmitter;
22
23use super::evaluator::SourceRowEvaluationContext;
24use super::memoization::EvaluationMemoryOptions;
25use super::row_indexer;
26use crate::base::{schema, value};
27use crate::builder::plan::{AnalyzedImportOp, ExecutionPlan};
28use crate::ops::interface::SourceExecutorReadOptions;
29use utils::yaml_ser::YamlSerializer;
30
31#[derive(Debug, Clone, Deserialize)]
32pub struct EvaluateAndDumpOptions {
33    pub output_dir: String,
34    pub use_cache: bool,
35}
36
37const FILENAME_PREFIX_MAX_LENGTH: usize = 128;
38
39struct TargetExportData<'a> {
40    schema: &'a Vec<schema::FieldSchema>,
41    // The purpose is to make rows sorted by primary key.
42    data: BTreeMap<value::KeyValue, &'a value::FieldValues>,
43}
44
45impl Serialize for TargetExportData<'_> {
46    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
47    where
48        S: serde::Serializer,
49    {
50        let mut seq = serializer.serialize_seq(Some(self.data.len()))?;
51        for (_, values) in self.data.iter() {
52            seq.serialize_element(&value::TypedFieldsValue {
53                schema: self.schema,
54                values_iter: values.fields.iter(),
55            })?;
56        }
57        seq.end()
58    }
59}
60
61#[derive(Serialize)]
62struct SourceOutputData<'a> {
63    key: value::TypedFieldsValue<'a, std::slice::Iter<'a, value::Value>>,
64
65    #[serde(skip_serializing_if = "Option::is_none")]
66    exports: Option<IndexMap<&'a str, TargetExportData<'a>>>,
67
68    #[serde(skip_serializing_if = "Option::is_none")]
69    error: Option<String>,
70}
71
72struct Dumper<'a> {
73    plan: &'a ExecutionPlan,
74    setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext,
75    schema: &'a schema::FlowSchema,
76    pool: &'a PgPool,
77    options: EvaluateAndDumpOptions,
78}
79
80impl<'a> Dumper<'a> {
81    async fn evaluate_source_entry<'b>(
82        &'a self,
83        import_op_idx: usize,
84        import_op: &'a AnalyzedImportOp,
85        key: &value::KeyValue,
86        key_aux_info: &serde_json::Value,
87        source_logic_fp: &SourceLogicFingerprint,
88        collected_values_buffer: &'b mut Vec<Vec<value::FieldValues>>,
89    ) -> Result<Option<IndexMap<&'b str, TargetExportData<'b>>>>
90    where
91        'a: 'b,
92    {
93        let data_builder = row_indexer::evaluate_source_entry_with_memory(
94            &SourceRowEvaluationContext {
95                plan: self.plan,
96                import_op,
97                schema: self.schema,
98                key,
99                import_op_idx,
100                source_logic_fp,
101            },
102            key_aux_info,
103            self.setup_execution_ctx,
104            EvaluationMemoryOptions {
105                enable_cache: self.options.use_cache,
106                evaluation_only: true,
107            },
108            self.pool,
109        )
110        .await?;
111
112        let data_builder = if let Some(data_builder) = data_builder {
113            data_builder
114        } else {
115            return Ok(None);
116        };
117
118        *collected_values_buffer = data_builder.collected_values;
119        let exports = self
120            .plan
121            .export_ops
122            .iter()
123            .map(|export_op| -> Result<_> {
124                let collector_idx = export_op.input.collector_idx as usize;
125                let entry = (
126                    export_op.name.as_str(),
127                    TargetExportData {
128                        schema: &self.schema.root_op_scope.collectors[collector_idx]
129                            .spec
130                            .fields,
131                        data: collected_values_buffer[collector_idx]
132                            .iter()
133                            .map(|v| -> Result<_> {
134                                let key = row_indexer::extract_primary_key_for_export(
135                                    &export_op.primary_key_def,
136                                    v,
137                                )?;
138                                Ok((key, v))
139                            })
140                            .collect::<Result<_>>()?,
141                    },
142                );
143                Ok(entry)
144            })
145            .collect::<Result<_>>()?;
146        Ok(Some(exports))
147    }
148
149    async fn evaluate_and_dump_source_entry(
150        &self,
151        import_op_idx: usize,
152        import_op: &AnalyzedImportOp,
153        key: value::KeyValue,
154        key_aux_info: serde_json::Value,
155        file_path: PathBuf,
156    ) -> Result<()> {
157        let source_logic_fp = SourceLogicFingerprint::new(
158            self.plan,
159            import_op_idx,
160            &self.setup_execution_ctx.export_ops,
161            self.plan.legacy_fingerprint.clone(),
162        )?;
163        let _permit = import_op
164            .concurrency_controller
165            .acquire(concur_control::BYTES_UNKNOWN_YET)
166            .await?;
167        let mut collected_values_buffer = Vec::new();
168        let (exports, error) = match self
169            .evaluate_source_entry(
170                import_op_idx,
171                import_op,
172                &key,
173                &key_aux_info,
174                &source_logic_fp,
175                &mut collected_values_buffer,
176            )
177            .await
178        {
179            Ok(exports) => (exports, None),
180            Err(e) => (None, Some(format!("{e:?}"))),
181        };
182        let key_values: Vec<value::Value> = key.into_iter().map(|v| v.into()).collect::<Vec<_>>();
183        let file_data = SourceOutputData {
184            key: value::TypedFieldsValue {
185                schema: &import_op.primary_key_schema,
186                values_iter: key_values.iter(),
187            },
188            exports,
189            error,
190        };
191
192        let yaml_output = {
193            let mut yaml_output = String::new();
194            let yaml_data = YamlSerializer::serialize(&file_data)?;
195            let mut yaml_emitter = YamlEmitter::new(&mut yaml_output);
196            yaml_emitter.multiline_strings(true);
197            yaml_emitter.compact(true);
198            yaml_emitter.dump(&yaml_data)?;
199            yaml_output
200        };
201        tokio::fs::write(file_path, yaml_output).await?;
202
203        Ok(())
204    }
205
206    async fn evaluate_and_dump_for_source(
207        &self,
208        import_op_idx: usize,
209        import_op: &AnalyzedImportOp,
210    ) -> Result<()> {
211        let mut keys_by_filename_prefix: IndexMap<
212            String,
213            Vec<(value::KeyValue, serde_json::Value)>,
214        > = IndexMap::new();
215
216        let mut rows_stream = import_op
217            .executor
218            .list(&SourceExecutorReadOptions {
219                include_ordinal: false,
220                include_content_version_fp: false,
221                include_value: false,
222            })
223            .await?;
224        while let Some(rows) = rows_stream.next().await {
225            for row in rows?.into_iter() {
226                let mut s = row
227                    .key
228                    .encode_to_strs()
229                    .into_iter()
230                    .map(|s| urlencoding::encode(&s).into_owned())
231                    .join(":");
232                s.truncate(
233                    (0..(FILENAME_PREFIX_MAX_LENGTH - import_op.name.as_str().len()))
234                        .rev()
235                        .find(|i| s.is_char_boundary(*i))
236                        .unwrap_or(0),
237                );
238                keys_by_filename_prefix
239                    .entry(s)
240                    .or_default()
241                    .push((row.key, row.key_aux_info));
242            }
243        }
244        let output_dir = Path::new(&self.options.output_dir);
245        let evaluate_futs =
246            keys_by_filename_prefix
247                .into_iter()
248                .flat_map(|(filename_prefix, keys)| {
249                    let num_keys = keys.len();
250                    keys.into_iter()
251                        .enumerate()
252                        .map(move |(i, (key, key_aux_info))| {
253                            let extra_id = if num_keys > 1 {
254                                Cow::Owned(format!(".{i}"))
255                            } else {
256                                Cow::Borrowed("")
257                            };
258                            let file_name =
259                                format!("{}@{}{}.yaml", import_op.name, filename_prefix, extra_id);
260                            let file_path = output_dir.join(Path::new(&file_name));
261                            self.evaluate_and_dump_source_entry(
262                                import_op_idx,
263                                import_op,
264                                key,
265                                key_aux_info,
266                                file_path,
267                            )
268                        })
269                });
270        try_join_all(evaluate_futs).await?;
271        Ok(())
272    }
273
274    async fn evaluate_and_dump(&self) -> Result<()> {
275        try_join_all(
276            self.plan
277                .import_ops
278                .iter()
279                .enumerate()
280                .map(|(idx, import_op)| self.evaluate_and_dump_for_source(idx, import_op)),
281        )
282        .await?;
283        Ok(())
284    }
285}
286
287pub async fn evaluate_and_dump(
288    plan: &ExecutionPlan,
289    setup_execution_ctx: &exec_ctx::FlowSetupExecutionContext,
290    schema: &schema::FlowSchema,
291    options: EvaluateAndDumpOptions,
292    pool: &PgPool,
293) -> Result<()> {
294    let output_dir = Path::new(&options.output_dir);
295    if output_dir.exists() {
296        if !output_dir.is_dir() {
297            return Err(client_error!("The path exists and is not a directory"));
298        }
299    } else {
300        tokio::fs::create_dir(output_dir).await?;
301    }
302
303    let dumper = Dumper {
304        plan,
305        setup_execution_ctx,
306        schema,
307        pool,
308        options,
309    };
310    dumper.evaluate_and_dump().await
311}