1use crate::execution::indexing_status::SourceLogicFingerprint;
14use crate::prelude::*;
15
16use futures::{StreamExt, future::try_join_all};
17use itertools::Itertools;
18use serde::ser::SerializeSeq;
19use sqlx::PgPool;
20use std::path::{Path, PathBuf};
21use yaml_rust2::YamlEmitter;
22
23use super::evaluator::SourceRowEvaluationContext;
24use super::memoization::EvaluationMemoryOptions;
25use super::row_indexer;
26use crate::base::{schema, value};
27use crate::builder::plan::{AnalyzedImportOp, ExecutionPlan};
28use crate::ops::interface::SourceExecutorReadOptions;
29use utils::yaml_ser::YamlSerializer;
30
31#[derive(Debug, Clone, Deserialize)]
32pub struct EvaluateAndDumpOptions {
33 pub output_dir: String,
34 pub use_cache: bool,
35}
36
37const FILENAME_PREFIX_MAX_LENGTH: usize = 128;
38
39struct TargetExportData<'a> {
40 schema: &'a Vec<schema::FieldSchema>,
41 data: BTreeMap<value::KeyValue, &'a value::FieldValues>,
43}
44
45impl Serialize for TargetExportData<'_> {
46 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
47 where
48 S: serde::Serializer,
49 {
50 let mut seq = serializer.serialize_seq(Some(self.data.len()))?;
51 for (_, values) in self.data.iter() {
52 seq.serialize_element(&value::TypedFieldsValue {
53 schema: self.schema,
54 values_iter: values.fields.iter(),
55 })?;
56 }
57 seq.end()
58 }
59}
60
61#[derive(Serialize)]
62struct SourceOutputData<'a> {
63 key: value::TypedFieldsValue<'a, std::slice::Iter<'a, value::Value>>,
64
65 #[serde(skip_serializing_if = "Option::is_none")]
66 exports: Option<IndexMap<&'a str, TargetExportData<'a>>>,
67
68 #[serde(skip_serializing_if = "Option::is_none")]
69 error: Option<String>,
70}
71
72struct Dumper<'a> {
73 plan: &'a ExecutionPlan,
74 setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext,
75 schema: &'a schema::FlowSchema,
76 pool: &'a PgPool,
77 options: EvaluateAndDumpOptions,
78}
79
80impl<'a> Dumper<'a> {
81 async fn evaluate_source_entry<'b>(
82 &'a self,
83 import_op_idx: usize,
84 import_op: &'a AnalyzedImportOp,
85 key: &value::KeyValue,
86 key_aux_info: &serde_json::Value,
87 source_logic_fp: &SourceLogicFingerprint,
88 collected_values_buffer: &'b mut Vec<Vec<value::FieldValues>>,
89 ) -> Result<Option<IndexMap<&'b str, TargetExportData<'b>>>>
90 where
91 'a: 'b,
92 {
93 let data_builder = row_indexer::evaluate_source_entry_with_memory(
94 &SourceRowEvaluationContext {
95 plan: self.plan,
96 import_op,
97 schema: self.schema,
98 key,
99 import_op_idx,
100 source_logic_fp,
101 },
102 key_aux_info,
103 self.setup_execution_ctx,
104 EvaluationMemoryOptions {
105 enable_cache: self.options.use_cache,
106 evaluation_only: true,
107 },
108 self.pool,
109 )
110 .await?;
111
112 let data_builder = if let Some(data_builder) = data_builder {
113 data_builder
114 } else {
115 return Ok(None);
116 };
117
118 *collected_values_buffer = data_builder.collected_values;
119 let exports = self
120 .plan
121 .export_ops
122 .iter()
123 .map(|export_op| -> Result<_> {
124 let collector_idx = export_op.input.collector_idx as usize;
125 let entry = (
126 export_op.name.as_str(),
127 TargetExportData {
128 schema: &self.schema.root_op_scope.collectors[collector_idx]
129 .spec
130 .fields,
131 data: collected_values_buffer[collector_idx]
132 .iter()
133 .map(|v| -> Result<_> {
134 let key = row_indexer::extract_primary_key_for_export(
135 &export_op.primary_key_def,
136 v,
137 )?;
138 Ok((key, v))
139 })
140 .collect::<Result<_>>()?,
141 },
142 );
143 Ok(entry)
144 })
145 .collect::<Result<_>>()?;
146 Ok(Some(exports))
147 }
148
149 async fn evaluate_and_dump_source_entry(
150 &self,
151 import_op_idx: usize,
152 import_op: &AnalyzedImportOp,
153 key: value::KeyValue,
154 key_aux_info: serde_json::Value,
155 file_path: PathBuf,
156 ) -> Result<()> {
157 let source_logic_fp = SourceLogicFingerprint::new(
158 self.plan,
159 import_op_idx,
160 &self.setup_execution_ctx.export_ops,
161 self.plan.legacy_fingerprint.clone(),
162 )?;
163 let _permit = import_op
164 .concurrency_controller
165 .acquire(concur_control::BYTES_UNKNOWN_YET)
166 .await?;
167 let mut collected_values_buffer = Vec::new();
168 let (exports, error) = match self
169 .evaluate_source_entry(
170 import_op_idx,
171 import_op,
172 &key,
173 &key_aux_info,
174 &source_logic_fp,
175 &mut collected_values_buffer,
176 )
177 .await
178 {
179 Ok(exports) => (exports, None),
180 Err(e) => (None, Some(format!("{e:?}"))),
181 };
182 let key_values: Vec<value::Value> = key.into_iter().map(|v| v.into()).collect::<Vec<_>>();
183 let file_data = SourceOutputData {
184 key: value::TypedFieldsValue {
185 schema: &import_op.primary_key_schema,
186 values_iter: key_values.iter(),
187 },
188 exports,
189 error,
190 };
191
192 let yaml_output = {
193 let mut yaml_output = String::new();
194 let yaml_data = YamlSerializer::serialize(&file_data)?;
195 let mut yaml_emitter = YamlEmitter::new(&mut yaml_output);
196 yaml_emitter.multiline_strings(true);
197 yaml_emitter.compact(true);
198 yaml_emitter.dump(&yaml_data)?;
199 yaml_output
200 };
201 tokio::fs::write(file_path, yaml_output).await?;
202
203 Ok(())
204 }
205
206 async fn evaluate_and_dump_for_source(
207 &self,
208 import_op_idx: usize,
209 import_op: &AnalyzedImportOp,
210 ) -> Result<()> {
211 let mut keys_by_filename_prefix: IndexMap<
212 String,
213 Vec<(value::KeyValue, serde_json::Value)>,
214 > = IndexMap::new();
215
216 let mut rows_stream = import_op
217 .executor
218 .list(&SourceExecutorReadOptions {
219 include_ordinal: false,
220 include_content_version_fp: false,
221 include_value: false,
222 })
223 .await?;
224 while let Some(rows) = rows_stream.next().await {
225 for row in rows?.into_iter() {
226 let mut s = row
227 .key
228 .encode_to_strs()
229 .into_iter()
230 .map(|s| urlencoding::encode(&s).into_owned())
231 .join(":");
232 s.truncate(
233 (0..(FILENAME_PREFIX_MAX_LENGTH - import_op.name.as_str().len()))
234 .rev()
235 .find(|i| s.is_char_boundary(*i))
236 .unwrap_or(0),
237 );
238 keys_by_filename_prefix
239 .entry(s)
240 .or_default()
241 .push((row.key, row.key_aux_info));
242 }
243 }
244 let output_dir = Path::new(&self.options.output_dir);
245 let evaluate_futs =
246 keys_by_filename_prefix
247 .into_iter()
248 .flat_map(|(filename_prefix, keys)| {
249 let num_keys = keys.len();
250 keys.into_iter()
251 .enumerate()
252 .map(move |(i, (key, key_aux_info))| {
253 let extra_id = if num_keys > 1 {
254 Cow::Owned(format!(".{i}"))
255 } else {
256 Cow::Borrowed("")
257 };
258 let file_name =
259 format!("{}@{}{}.yaml", import_op.name, filename_prefix, extra_id);
260 let file_path = output_dir.join(Path::new(&file_name));
261 self.evaluate_and_dump_source_entry(
262 import_op_idx,
263 import_op,
264 key,
265 key_aux_info,
266 file_path,
267 )
268 })
269 });
270 try_join_all(evaluate_futs).await?;
271 Ok(())
272 }
273
274 async fn evaluate_and_dump(&self) -> Result<()> {
275 try_join_all(
276 self.plan
277 .import_ops
278 .iter()
279 .enumerate()
280 .map(|(idx, import_op)| self.evaluate_and_dump_for_source(idx, import_op)),
281 )
282 .await?;
283 Ok(())
284 }
285}
286
287pub async fn evaluate_and_dump(
288 plan: &ExecutionPlan,
289 setup_execution_ctx: &exec_ctx::FlowSetupExecutionContext,
290 schema: &schema::FlowSchema,
291 options: EvaluateAndDumpOptions,
292 pool: &PgPool,
293) -> Result<()> {
294 let output_dir = Path::new(&options.output_dir);
295 if output_dir.exists() {
296 if !output_dir.is_dir() {
297 return Err(client_error!("The path exists and is not a directory"));
298 }
299 } else {
300 tokio::fs::create_dir(output_dir).await?;
301 }
302
303 let dumper = Dumper {
304 plan,
305 setup_execution_ctx,
306 schema,
307 pool,
308 options,
309 };
310 dumper.evaluate_and_dump().await
311}