1use crate::core::{
32 output_model::{
33 OutputDocument, OutputItems, OutputMeta, OutputResult, RenderRecommendation,
34 output_items_from_value,
35 },
36 row::Row,
37};
38use anyhow::{Result, anyhow};
39
40use super::value as value_stage;
41use crate::dsl::verbs::{
42 aggregate, collapse, copy, filter, group, jq, limit, project, question, quick, sort, unroll,
43 values,
44};
45use crate::dsl::{
46 compiled::{CompiledPipeline, CompiledStage, SemanticEffect, StageBehavior},
47 eval::context::RowContext,
48 parse::pipeline::parse_stage_list,
49};
50
51pub fn apply_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
81 apply_output_pipeline(OutputResult::from_rows(rows), stages)
82}
83
84pub fn apply_output_pipeline(output: OutputResult, stages: &[String]) -> Result<OutputResult> {
113 execute_pipeline_items(
114 output.items,
115 output.document,
116 output.meta.wants_copy,
117 output.meta.render_recommendation,
118 stages,
119 )
120}
121
122pub fn execute_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
149 execute_pipeline_streaming(rows, stages)
150}
151
152pub fn execute_pipeline_streaming<I>(rows: I, stages: &[String]) -> Result<OutputResult>
180where
181 I: IntoIterator<Item = Row>,
182 I::IntoIter: 'static,
183{
184 let parsed = parse_stage_list(stages)?;
185 let compiled = CompiledPipeline::from_parsed(parsed)?;
186 PipelineExecutor::new_stream(rows.into_iter(), false, compiled).run()
187}
188
189fn execute_pipeline_items(
190 items: OutputItems,
191 initial_document: Option<OutputDocument>,
192 initial_wants_copy: bool,
193 initial_render_recommendation: Option<RenderRecommendation>,
194 stages: &[String],
195) -> Result<OutputResult> {
196 let parsed = parse_stage_list(stages)?;
197 let compiled = CompiledPipeline::from_parsed(parsed)?;
198 PipelineExecutor::new(
199 items,
200 initial_document,
201 initial_wants_copy,
202 initial_render_recommendation,
203 compiled,
204 )
205 .run()
206}
207
208type RowStream = Box<dyn Iterator<Item = Result<Row>>>;
213
214enum PipelineItems {
215 RowStream(RowStream),
216 Materialized(OutputItems),
217 Semantic(serde_json::Value),
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
221enum StageExecutionRoute {
222 Semantic(SemanticEffect),
223 Stream,
224 Materialized,
225}
226
227struct PipelineExecutor {
228 items: PipelineItems,
229 document: Option<OutputDocument>,
230 wants_copy: bool,
231 render_recommendation: Option<RenderRecommendation>,
232 compiled: CompiledPipeline,
233}
234
235impl PipelineExecutor {
236 fn new(
237 items: OutputItems,
238 document: Option<OutputDocument>,
239 wants_copy: bool,
240 render_recommendation: Option<RenderRecommendation>,
241 compiled: CompiledPipeline,
242 ) -> Self {
243 let items = if let Some(document) = document.as_ref() {
244 PipelineItems::Semantic(document.value.clone())
248 } else {
249 match items {
250 OutputItems::Rows(rows) => {
251 PipelineItems::RowStream(Box::new(rows.into_iter().map(Ok)))
252 }
253 OutputItems::Groups(groups) => {
254 PipelineItems::Materialized(OutputItems::Groups(groups))
255 }
256 }
257 };
258 Self {
259 items,
260 document,
261 wants_copy,
262 render_recommendation,
263 compiled,
264 }
265 }
266
267 fn new_stream<I>(rows: I, wants_copy: bool, compiled: CompiledPipeline) -> Self
268 where
269 I: Iterator<Item = Row> + 'static,
270 {
271 Self {
272 items: PipelineItems::RowStream(Box::new(rows.map(Ok))),
273 document: None,
274 wants_copy,
275 render_recommendation: None,
276 compiled,
277 }
278 }
279
280 fn run(mut self) -> Result<OutputResult> {
281 let stages = self.compiled.stages.clone();
282 for stage in &stages {
283 self.apply_stage(stage)?;
284 }
285 self.into_output_result()
286 }
287
288 fn apply_stage(&mut self, stage: &CompiledStage) -> Result<()> {
289 let behavior = stage.behavior();
290 self.apply_stage_side_effects(stage);
291 if !behavior.preserves_render_recommendation {
292 self.render_recommendation = None;
293 }
294
295 match resolve_stage_execution_route(&self.items, behavior) {
296 StageExecutionRoute::Semantic(semantic_effect) => {
297 self.apply_semantic_stage(stage, semantic_effect)
298 }
299 StageExecutionRoute::Stream => self.apply_stream_stage(stage),
300 StageExecutionRoute::Materialized => {
301 let items = self.materialize_items()?;
302 self.items = PipelineItems::Materialized(self.apply_flat_stage(items, stage)?);
303 self.sync_document_to_items();
304 Ok(())
305 }
306 }
307 }
308
309 fn apply_stage_side_effects(&mut self, stage: &CompiledStage) {
310 if matches!(stage, CompiledStage::Copy) {
311 self.wants_copy = true;
312 }
313 }
314
315 fn apply_semantic_stage(
316 &mut self,
317 stage: &CompiledStage,
318 semantic_effect: SemanticEffect,
319 ) -> Result<()> {
320 let items = std::mem::replace(
321 &mut self.items,
322 PipelineItems::Semantic(serde_json::Value::Null),
323 );
324 let PipelineItems::Semantic(value) = items else {
325 self.items = items;
326 return Err(anyhow!("semantic stage dispatch requires semantic items"));
327 };
328
329 let transformed = value_stage::apply_stage(value, stage)?;
330 self.items = PipelineItems::Semantic(transformed);
331 match semantic_effect {
332 SemanticEffect::Preserve | SemanticEffect::Transform => {
336 self.sync_document_to_items();
337 }
338 SemanticEffect::Degrade => {
341 self.document = None;
342 }
343 }
344 Ok(())
345 }
346
347 fn apply_stream_stage(&mut self, stage: &CompiledStage) -> Result<()> {
348 let stream = match std::mem::replace(
349 &mut self.items,
350 PipelineItems::RowStream(Box::new(std::iter::empty())),
351 ) {
352 PipelineItems::RowStream(stream) => stream,
353 PipelineItems::Materialized(items) => {
354 debug_assert!(
355 false,
356 "apply_stream_stage called after pipeline had already materialized"
357 );
358 self.items = PipelineItems::Materialized(items);
359 return Ok(());
360 }
361 PipelineItems::Semantic(value) => {
362 debug_assert!(
363 false,
364 "apply_stream_stage called for semantic payload execution"
365 );
366 self.items = PipelineItems::Semantic(value);
367 return Ok(());
368 }
369 };
370
371 if let Some(plan) = stage.quick_plan().cloned() {
372 self.items =
373 PipelineItems::RowStream(Box::new(quick::stream_rows_with_plan(stream, plan)));
374 return Ok(());
375 }
376
377 self.items = PipelineItems::RowStream(match stage {
378 CompiledStage::Filter(plan) => {
379 let plan = plan.clone();
380 Box::new(stream.filter_map(move |row| match row {
381 Ok(row) if plan.matches(&row) => Some(Ok(row)),
382 Ok(_) => None,
383 Err(err) => Some(Err(err)),
384 }))
385 }
386 CompiledStage::Project(plan) => {
387 let plan = plan.clone();
388 stream_row_fanout_result(stream, move |row| plan.project_row(&row))
389 }
390 CompiledStage::Unroll(plan) => {
391 let plan = plan.clone();
392 stream_row_fanout_result(stream, move |row| plan.expand_row(&row))
393 }
394 CompiledStage::Values(plan) => {
395 let plan = plan.clone();
396 stream_row_fanout(stream, move |row| plan.extract_row(&row))
397 }
398 CompiledStage::Limit(spec) => {
399 debug_assert!(spec.is_head_only());
400 Box::new(
401 stream
402 .skip(spec.offset as usize)
403 .take(spec.count.max(0) as usize),
404 )
405 }
406 CompiledStage::Copy => stream,
407 CompiledStage::Clean => Box::new(stream.filter_map(|row| match row {
408 Ok(row) => question::clean_row(row).map(Ok),
409 Err(err) => Some(Err(err)),
410 })),
411 other => {
412 return Err(anyhow!(
413 "stream stage not implemented for compiled stage: {:?}",
414 other
415 ));
416 }
417 });
418 Ok(())
419 }
420
421 fn apply_flat_stage(
422 &mut self,
423 items: OutputItems,
424 stage: &CompiledStage,
425 ) -> Result<OutputItems> {
426 if let Some(plan) = stage.quick_plan() {
427 return match items {
428 OutputItems::Rows(rows) => {
429 quick::apply_with_plan(rows, plan).map(OutputItems::Rows)
430 }
431 OutputItems::Groups(groups) => {
432 quick::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
433 }
434 };
435 }
436
437 match stage {
438 CompiledStage::Filter(plan) => match items {
439 OutputItems::Rows(rows) => {
440 filter::apply_with_plan(rows, plan).map(OutputItems::Rows)
441 }
442 OutputItems::Groups(groups) => {
443 filter::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
444 }
445 },
446 CompiledStage::Project(plan) => match items {
447 OutputItems::Rows(rows) => {
448 project::apply_with_plan(rows, plan).map(OutputItems::Rows)
449 }
450 OutputItems::Groups(groups) => {
451 project::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
452 }
453 },
454 CompiledStage::Unroll(plan) => match items {
455 OutputItems::Rows(rows) => {
456 unroll::apply_with_plan(rows, plan).map(OutputItems::Rows)
457 }
458 OutputItems::Groups(groups) => {
459 unroll::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
460 }
461 },
462 CompiledStage::Values(plan) => match items {
463 OutputItems::Rows(rows) => {
464 values::apply_with_plan(rows, plan).map(OutputItems::Rows)
465 }
466 OutputItems::Groups(groups) => {
467 values::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
468 }
469 },
470 CompiledStage::Limit(spec) => match items {
471 OutputItems::Rows(rows) => {
472 Ok(OutputItems::Rows(limit::apply_with_spec(rows, *spec)))
473 }
474 OutputItems::Groups(groups) => {
475 Ok(OutputItems::Groups(limit::apply_with_spec(groups, *spec)))
476 }
477 },
478 CompiledStage::Sort(plan) => sort::apply_with_plan(items, plan),
479 CompiledStage::Group(spec) => match items {
480 OutputItems::Rows(rows) => Ok(OutputItems::Groups(group::group_rows_with_plan(
481 rows, spec,
482 )?)),
483 OutputItems::Groups(groups) => Ok(OutputItems::Groups(
484 group::regroup_groups_with_plan(groups, spec)?,
485 )),
486 },
487 CompiledStage::Aggregate(plan) => aggregate::apply_with_plan(items, plan),
488 CompiledStage::Collapse => collapse::apply(items),
489 CompiledStage::CountMacro => aggregate::count_macro(items, ""),
490 CompiledStage::Copy => Ok(match items {
491 OutputItems::Rows(rows) => OutputItems::Rows(copy::apply(rows)),
492 OutputItems::Groups(groups) => OutputItems::Groups(groups),
493 }),
494 CompiledStage::Clean => Ok(question::clean_items(items)),
495 CompiledStage::Jq(expr) => jq::apply_with_expr(items, expr),
496 CompiledStage::Quick(_)
497 | CompiledStage::Question(_)
498 | CompiledStage::ValueQuick(_)
499 | CompiledStage::KeyQuick(_) => Err(anyhow!(
500 "quick family should have been handled before flat-stage dispatch"
501 )),
502 }
503 }
504
505 fn materialize_items(&mut self) -> Result<OutputItems> {
506 match std::mem::replace(
507 &mut self.items,
508 PipelineItems::Materialized(OutputItems::Rows(Vec::new())),
509 ) {
510 PipelineItems::RowStream(stream) => {
511 let rows = materialize_row_stream(stream)?;
512 Ok(OutputItems::Rows(rows))
513 }
514 PipelineItems::Materialized(items) => Ok(items),
515 PipelineItems::Semantic(value) => Ok(output_items_from_value(value)),
516 }
517 }
518
519 fn finish_items(&mut self) -> Result<OutputItems> {
520 self.materialize_items()
521 }
522
523 fn into_output_result(mut self) -> Result<OutputResult> {
524 let semantic_value = if let PipelineItems::Semantic(ref v) = self.items {
527 Some(v.clone())
528 } else {
529 None
530 };
531 let items = self.finish_items()?;
532 let meta = self.build_output_meta(&items);
533 let document = match semantic_value {
534 Some(value) => self.document.map(|document| OutputDocument {
535 kind: document.kind,
536 value,
537 }),
538 None => self.document,
539 };
540
541 Ok(OutputResult {
542 items,
543 document,
544 meta,
545 })
546 }
547
548 fn sync_document_to_items(&mut self) {
549 let Some(document) = self.document.as_mut() else {
550 return;
551 };
552 match &self.items {
553 PipelineItems::Materialized(items) => {
554 *document = document.project_over_items(items);
555 }
556 PipelineItems::Semantic(value) => {
557 document.value = value.clone();
558 }
559 PipelineItems::RowStream(_) => {}
560 }
561 }
562
563 fn build_output_meta(&self, items: &OutputItems) -> OutputMeta {
564 let key_index = match items {
565 OutputItems::Rows(rows) => RowContext::from_rows(rows).key_index().to_vec(),
566 OutputItems::Groups(groups) => {
567 let headers = groups.iter().map(merged_group_header).collect::<Vec<_>>();
568 RowContext::from_rows(&headers).key_index().to_vec()
569 }
570 };
571
572 OutputMeta {
573 key_index,
574 column_align: Vec::new(),
575 wants_copy: self.wants_copy,
576 grouped: matches!(items, OutputItems::Groups(_)),
577 render_recommendation: self.render_recommendation,
578 }
579 }
580}
581
582fn materialize_row_stream(stream: RowStream) -> Result<Vec<Row>> {
583 stream.collect()
584}
585
586fn stream_row_fanout<I, F>(stream: RowStream, fanout: F) -> RowStream
587where
588 I: IntoIterator<Item = Row>,
589 F: Fn(Row) -> I + 'static,
590{
591 Box::new(stream.flat_map(move |row| {
592 match row {
593 Ok(row) => fanout(row)
594 .into_iter()
595 .map(Ok)
596 .collect::<Vec<_>>()
597 .into_iter(),
598 Err(err) => vec![Err(err)].into_iter(),
599 }
600 }))
601}
602
603fn stream_row_fanout_result<I, F>(stream: RowStream, fanout: F) -> RowStream
604where
605 I: IntoIterator<Item = Row>,
606 F: Fn(Row) -> Result<I> + 'static,
607{
608 Box::new(stream.flat_map(move |row| match row {
609 Ok(row) => match fanout(row) {
610 Ok(rows) => rows.into_iter().map(Ok).collect::<Vec<_>>().into_iter(),
611 Err(err) => vec![Err(err)].into_iter(),
612 },
613 Err(err) => vec![Err(err)].into_iter(),
614 }))
615}
616
617fn merged_group_header(group: &crate::core::output_model::Group) -> Row {
618 let mut row = group.groups.clone();
619 row.extend(group.aggregates.clone());
620 row
621}
622
623fn resolve_stage_execution_route(
624 items: &PipelineItems,
625 behavior: StageBehavior,
626) -> StageExecutionRoute {
627 match items {
628 PipelineItems::Semantic(_) => StageExecutionRoute::Semantic(behavior.semantic_effect),
629 PipelineItems::RowStream(_) if behavior.can_stream => StageExecutionRoute::Stream,
630 PipelineItems::RowStream(_) | PipelineItems::Materialized(_) => {
631 StageExecutionRoute::Materialized
632 }
633 }
634}
635
636#[cfg(test)]
637#[path = "tests/engine.rs"]
638mod tests;