1use crate::core::{
24 output_model::{
25 OutputDocument, OutputItems, OutputMeta, OutputResult, RenderRecommendation,
26 output_items_from_value,
27 },
28 row::Row,
29};
30use anyhow::{Result, anyhow};
31
32use super::value as value_stage;
33use crate::dsl::verbs::{
34 aggregate, collapse, copy, filter, group, jq, limit, project, question, quick, sort, unroll,
35 values,
36};
37use crate::dsl::{
38 compiled::{CompiledPipeline, CompiledStage, SemanticEffect},
39 eval::context::RowContext,
40 parse::pipeline::parse_stage_list,
41};
42
43pub fn apply_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
48 apply_output_pipeline(OutputResult::from_rows(rows), stages)
49}
50
51pub fn apply_output_pipeline(output: OutputResult, stages: &[String]) -> Result<OutputResult> {
56 execute_pipeline_items(
57 output.items,
58 output.document,
59 output.meta.wants_copy,
60 output.meta.render_recommendation,
61 stages,
62 )
63}
64
65pub fn execute_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
70 execute_pipeline_streaming(rows, stages)
71}
72
73pub fn execute_pipeline_streaming<I>(rows: I, stages: &[String]) -> Result<OutputResult>
78where
79 I: IntoIterator<Item = Row>,
80 I::IntoIter: 'static,
81{
82 let parsed = parse_stage_list(stages)?;
83 let compiled = CompiledPipeline::from_parsed(parsed)?;
84 PipelineExecutor::new_stream(rows.into_iter(), false, compiled).run()
85}
86
87fn execute_pipeline_items(
88 items: OutputItems,
89 initial_document: Option<OutputDocument>,
90 initial_wants_copy: bool,
91 initial_render_recommendation: Option<RenderRecommendation>,
92 stages: &[String],
93) -> Result<OutputResult> {
94 let parsed = parse_stage_list(stages)?;
95 let compiled = CompiledPipeline::from_parsed(parsed)?;
96 PipelineExecutor::new(
97 items,
98 initial_document,
99 initial_wants_copy,
100 initial_render_recommendation,
101 compiled,
102 )
103 .run()
104}
105
106type RowStream = Box<dyn Iterator<Item = Result<Row>>>;
111
112enum PipelineItems {
113 RowStream(RowStream),
114 Materialized(OutputItems),
115 Semantic(serde_json::Value),
116}
117
118struct PipelineExecutor {
119 items: PipelineItems,
120 document: Option<OutputDocument>,
121 wants_copy: bool,
122 render_recommendation: Option<RenderRecommendation>,
123 compiled: CompiledPipeline,
124}
125
126impl PipelineExecutor {
127 fn new(
128 items: OutputItems,
129 document: Option<OutputDocument>,
130 wants_copy: bool,
131 render_recommendation: Option<RenderRecommendation>,
132 compiled: CompiledPipeline,
133 ) -> Self {
134 let items = if let Some(document) = document.as_ref() {
135 PipelineItems::Semantic(document.value.clone())
139 } else {
140 match items {
141 OutputItems::Rows(rows) => {
142 PipelineItems::RowStream(Box::new(rows.into_iter().map(Ok)))
143 }
144 OutputItems::Groups(groups) => {
145 PipelineItems::Materialized(OutputItems::Groups(groups))
146 }
147 }
148 };
149 Self {
150 items,
151 document,
152 wants_copy,
153 render_recommendation,
154 compiled,
155 }
156 }
157
158 fn new_stream<I>(rows: I, wants_copy: bool, compiled: CompiledPipeline) -> Self
159 where
160 I: Iterator<Item = Row> + 'static,
161 {
162 Self {
163 items: PipelineItems::RowStream(Box::new(rows.map(Ok))),
164 document: None,
165 wants_copy,
166 render_recommendation: None,
167 compiled,
168 }
169 }
170
171 fn run(mut self) -> Result<OutputResult> {
172 let stages = self.compiled.stages.clone();
173 for stage in &stages {
174 self.apply_stage(stage)?;
175 }
176 self.into_output_result()
177 }
178
179 fn apply_stage(&mut self, stage: &CompiledStage) -> Result<()> {
180 if !stage.preserves_render_recommendation() {
181 self.render_recommendation = None;
182 }
183
184 if matches!(self.items, PipelineItems::Semantic(_)) {
185 self.apply_semantic_stage(stage)?;
186 return Ok(());
187 }
188
189 if stage.can_stream()
190 && let PipelineItems::RowStream(_) = self.items
191 {
192 self.apply_stream_stage(stage)?;
193 return Ok(());
194 }
195
196 let items = self.materialize_items()?;
197 self.items = PipelineItems::Materialized(self.apply_flat_stage(items, stage)?);
198 self.sync_document_to_items();
199 Ok(())
200 }
201
202 fn apply_semantic_stage(&mut self, stage: &CompiledStage) -> Result<()> {
203 let PipelineItems::Semantic(value) = std::mem::replace(
204 &mut self.items,
205 PipelineItems::Semantic(serde_json::Value::Null),
206 ) else {
207 unreachable!("semantic stage dispatch requires semantic items");
208 };
209
210 if matches!(stage, CompiledStage::Copy) {
211 self.wants_copy = true;
212 }
213
214 let transformed = value_stage::apply_stage(value, stage)?;
215 self.items = PipelineItems::Semantic(transformed);
216 match stage.semantic_effect() {
217 SemanticEffect::Preserve | SemanticEffect::Transform => {
221 self.sync_document_to_items();
222 }
223 SemanticEffect::Degrade => {
226 self.document = None;
227 }
228 }
229 Ok(())
230 }
231
232 fn apply_stream_stage(&mut self, stage: &CompiledStage) -> Result<()> {
233 let stream = match std::mem::replace(
234 &mut self.items,
235 PipelineItems::RowStream(Box::new(std::iter::empty())),
236 ) {
237 PipelineItems::RowStream(stream) => stream,
238 PipelineItems::Materialized(items) => {
239 debug_assert!(
240 false,
241 "apply_stream_stage called after pipeline had already materialized"
242 );
243 self.items = PipelineItems::Materialized(items);
244 return Ok(());
245 }
246 PipelineItems::Semantic(value) => {
247 debug_assert!(
248 false,
249 "apply_stream_stage called for semantic payload execution"
250 );
251 self.items = PipelineItems::Semantic(value);
252 return Ok(());
253 }
254 };
255
256 self.items = PipelineItems::RowStream(match stage {
257 CompiledStage::Quick(plan) => {
258 Box::new(quick::stream_rows_with_plan(stream, plan.clone()))
259 }
260 CompiledStage::Filter(plan) => {
261 let plan = plan.clone();
262 Box::new(stream.filter_map(move |row| match row {
263 Ok(row) if plan.matches(&row) => Some(Ok(row)),
264 Ok(_) => None,
265 Err(err) => Some(Err(err)),
266 }))
267 }
268 CompiledStage::Project(plan) => {
269 let plan = plan.clone();
270 stream_row_fanout_result(stream, move |row| plan.project_row(&row))
271 }
272 CompiledStage::Unroll(plan) => {
273 let plan = plan.clone();
274 stream_row_fanout_result(stream, move |row| plan.expand_row(&row))
275 }
276 CompiledStage::Values(plan) => {
277 let plan = plan.clone();
278 stream_row_fanout(stream, move |row| plan.extract_row(&row))
279 }
280 CompiledStage::Limit(spec) => {
281 debug_assert!(spec.is_head_only());
282 Box::new(
283 stream
284 .skip(spec.offset as usize)
285 .take(spec.count.max(0) as usize),
286 )
287 }
288 CompiledStage::Copy => {
289 self.wants_copy = true;
290 stream
291 }
292 CompiledStage::ValueQuick(plan)
293 | CompiledStage::KeyQuick(plan)
294 | CompiledStage::Question(plan) => {
295 Box::new(quick::stream_rows_with_plan(stream, plan.clone()))
296 }
297 CompiledStage::Clean => Box::new(stream.filter_map(|row| match row {
298 Ok(row) => question::clean_row(row).map(Ok),
299 Err(err) => Some(Err(err)),
300 })),
301 other => {
302 return Err(anyhow!(
303 "stream stage not implemented for compiled stage: {:?}",
304 other
305 ));
306 }
307 });
308 Ok(())
309 }
310
311 fn apply_flat_stage(
312 &mut self,
313 items: OutputItems,
314 stage: &CompiledStage,
315 ) -> Result<OutputItems> {
316 match stage {
317 CompiledStage::Quick(plan) => match items {
318 OutputItems::Rows(rows) => {
319 quick::apply_with_plan(rows, plan).map(OutputItems::Rows)
320 }
321 OutputItems::Groups(groups) => {
322 quick::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
323 }
324 },
325 CompiledStage::Filter(plan) => match items {
326 OutputItems::Rows(rows) => {
327 filter::apply_with_plan(rows, plan).map(OutputItems::Rows)
328 }
329 OutputItems::Groups(groups) => {
330 filter::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
331 }
332 },
333 CompiledStage::Project(plan) => match items {
334 OutputItems::Rows(rows) => {
335 project::apply_with_plan(rows, plan).map(OutputItems::Rows)
336 }
337 OutputItems::Groups(groups) => {
338 project::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
339 }
340 },
341 CompiledStage::Unroll(plan) => match items {
342 OutputItems::Rows(rows) => {
343 unroll::apply_with_plan(rows, plan).map(OutputItems::Rows)
344 }
345 OutputItems::Groups(groups) => {
346 unroll::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
347 }
348 },
349 CompiledStage::Values(plan) => match items {
350 OutputItems::Rows(rows) => {
351 values::apply_with_plan(rows, plan).map(OutputItems::Rows)
352 }
353 OutputItems::Groups(groups) => {
354 values::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
355 }
356 },
357 CompiledStage::ValueQuick(plan)
358 | CompiledStage::KeyQuick(plan)
359 | CompiledStage::Question(plan) => match items {
360 OutputItems::Rows(rows) => {
361 quick::apply_with_plan(rows, plan).map(OutputItems::Rows)
362 }
363 OutputItems::Groups(groups) => {
364 quick::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
365 }
366 },
367 CompiledStage::Limit(spec) => match items {
368 OutputItems::Rows(rows) => {
369 Ok(OutputItems::Rows(limit::apply_with_spec(rows, *spec)))
370 }
371 OutputItems::Groups(groups) => {
372 Ok(OutputItems::Groups(limit::apply_with_spec(groups, *spec)))
373 }
374 },
375 CompiledStage::Sort(plan) => sort::apply_with_plan(items, plan),
376 CompiledStage::Group(spec) => match items {
377 OutputItems::Rows(rows) => Ok(OutputItems::Groups(group::group_rows_with_plan(
378 rows, spec,
379 )?)),
380 OutputItems::Groups(groups) => Ok(OutputItems::Groups(
381 group::regroup_groups_with_plan(groups, spec)?,
382 )),
383 },
384 CompiledStage::Aggregate(plan) => aggregate::apply_with_plan(items, plan),
385 CompiledStage::Collapse => collapse::apply(items),
386 CompiledStage::CountMacro => aggregate::count_macro(items, ""),
387 CompiledStage::Copy => {
388 self.wants_copy = true;
389 Ok(match items {
390 OutputItems::Rows(rows) => OutputItems::Rows(copy::apply(rows)),
391 OutputItems::Groups(groups) => OutputItems::Groups(groups),
392 })
393 }
394 CompiledStage::Clean => Ok(question::clean_items(items)),
395 CompiledStage::Jq(expr) => jq::apply_with_expr(items, expr),
396 }
397 }
398
399 fn materialize_items(&mut self) -> Result<OutputItems> {
400 match std::mem::replace(
401 &mut self.items,
402 PipelineItems::Materialized(OutputItems::Rows(Vec::new())),
403 ) {
404 PipelineItems::RowStream(stream) => {
405 let rows = materialize_row_stream(stream)?;
406 Ok(OutputItems::Rows(rows))
407 }
408 PipelineItems::Materialized(items) => Ok(items),
409 PipelineItems::Semantic(value) => Ok(output_items_from_value(value)),
410 }
411 }
412
413 fn finish_items(&mut self) -> Result<OutputItems> {
414 self.materialize_items()
415 }
416
417 fn into_output_result(mut self) -> Result<OutputResult> {
418 let semantic_value = if let PipelineItems::Semantic(ref v) = self.items {
421 Some(v.clone())
422 } else {
423 None
424 };
425 let items = self.finish_items()?;
426 let meta = self.build_output_meta(&items);
427 let document = match semantic_value {
428 Some(value) => self.document.map(|document| OutputDocument {
429 kind: document.kind,
430 value,
431 }),
432 None => self.document,
433 };
434
435 Ok(OutputResult {
436 items,
437 document,
438 meta,
439 })
440 }
441
442 fn sync_document_to_items(&mut self) {
443 let Some(document) = self.document.as_mut() else {
444 return;
445 };
446 match &self.items {
447 PipelineItems::Materialized(items) => {
448 *document = document.project_over_items(items);
449 }
450 PipelineItems::Semantic(value) => {
451 document.value = value.clone();
452 }
453 PipelineItems::RowStream(_) => {}
454 }
455 }
456
457 fn build_output_meta(&self, items: &OutputItems) -> OutputMeta {
458 let key_index = match items {
459 OutputItems::Rows(rows) => RowContext::from_rows(rows).key_index().to_vec(),
460 OutputItems::Groups(groups) => {
461 let headers = groups.iter().map(merged_group_header).collect::<Vec<_>>();
462 RowContext::from_rows(&headers).key_index().to_vec()
463 }
464 };
465
466 OutputMeta {
467 key_index,
468 column_align: Vec::new(),
469 wants_copy: self.wants_copy,
470 grouped: matches!(items, OutputItems::Groups(_)),
471 render_recommendation: self.render_recommendation,
472 }
473 }
474}
475
476fn materialize_row_stream(stream: RowStream) -> Result<Vec<Row>> {
477 stream.collect()
478}
479
480fn stream_row_fanout<I, F>(stream: RowStream, fanout: F) -> RowStream
481where
482 I: IntoIterator<Item = Row>,
483 F: Fn(Row) -> I + 'static,
484{
485 Box::new(stream.flat_map(move |row| {
486 match row {
487 Ok(row) => fanout(row)
488 .into_iter()
489 .map(Ok)
490 .collect::<Vec<_>>()
491 .into_iter(),
492 Err(err) => vec![Err(err)].into_iter(),
493 }
494 }))
495}
496
497fn stream_row_fanout_result<I, F>(stream: RowStream, fanout: F) -> RowStream
498where
499 I: IntoIterator<Item = Row>,
500 F: Fn(Row) -> Result<I> + 'static,
501{
502 Box::new(stream.flat_map(move |row| match row {
503 Ok(row) => match fanout(row) {
504 Ok(rows) => rows.into_iter().map(Ok).collect::<Vec<_>>().into_iter(),
505 Err(err) => vec![Err(err)].into_iter(),
506 },
507 Err(err) => vec![Err(err)].into_iter(),
508 }))
509}
510
511fn merged_group_header(group: &crate::core::output_model::Group) -> Row {
512 let mut row = group.groups.clone();
513 row.extend(group.aggregates.clone());
514 row
515}
516
517#[cfg(test)]
518#[path = "tests/engine.rs"]
519mod tests;