1use crate::core::{
32 output_model::{
33 OutputDocument, OutputItems, OutputMeta, OutputResult, RenderRecommendation,
34 output_items_from_value,
35 },
36 row::Row,
37};
38use anyhow::{Result, anyhow};
39
40use super::value as value_stage;
41use crate::dsl::verbs::{
42 aggregate, collapse, copy, filter, group, jq, limit, project, question, quick, sort, unroll,
43 values,
44};
45use crate::dsl::{
46 compiled::{CompiledPipeline, CompiledStage, SemanticEffect},
47 eval::context::RowContext,
48 parse::pipeline::parse_stage_list,
49};
50
51pub fn apply_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
81 apply_output_pipeline(OutputResult::from_rows(rows), stages)
82}
83
84pub fn apply_output_pipeline(output: OutputResult, stages: &[String]) -> Result<OutputResult> {
113 execute_pipeline_items(
114 output.items,
115 output.document,
116 output.meta.wants_copy,
117 output.meta.render_recommendation,
118 stages,
119 )
120}
121
122pub fn execute_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
149 execute_pipeline_streaming(rows, stages)
150}
151
152pub fn execute_pipeline_streaming<I>(rows: I, stages: &[String]) -> Result<OutputResult>
180where
181 I: IntoIterator<Item = Row>,
182 I::IntoIter: 'static,
183{
184 let parsed = parse_stage_list(stages)?;
185 let compiled = CompiledPipeline::from_parsed(parsed)?;
186 PipelineExecutor::new_stream(rows.into_iter(), false, compiled).run()
187}
188
189fn execute_pipeline_items(
190 items: OutputItems,
191 initial_document: Option<OutputDocument>,
192 initial_wants_copy: bool,
193 initial_render_recommendation: Option<RenderRecommendation>,
194 stages: &[String],
195) -> Result<OutputResult> {
196 let parsed = parse_stage_list(stages)?;
197 let compiled = CompiledPipeline::from_parsed(parsed)?;
198 PipelineExecutor::new(
199 items,
200 initial_document,
201 initial_wants_copy,
202 initial_render_recommendation,
203 compiled,
204 )
205 .run()
206}
207
208type RowStream = Box<dyn Iterator<Item = Result<Row>>>;
213
214enum PipelineItems {
215 RowStream(RowStream),
216 Materialized(OutputItems),
217 Semantic(serde_json::Value),
218}
219
220struct PipelineExecutor {
221 items: PipelineItems,
222 document: Option<OutputDocument>,
223 wants_copy: bool,
224 render_recommendation: Option<RenderRecommendation>,
225 compiled: CompiledPipeline,
226}
227
228impl PipelineExecutor {
229 fn new(
230 items: OutputItems,
231 document: Option<OutputDocument>,
232 wants_copy: bool,
233 render_recommendation: Option<RenderRecommendation>,
234 compiled: CompiledPipeline,
235 ) -> Self {
236 let items = if let Some(document) = document.as_ref() {
237 PipelineItems::Semantic(document.value.clone())
241 } else {
242 match items {
243 OutputItems::Rows(rows) => {
244 PipelineItems::RowStream(Box::new(rows.into_iter().map(Ok)))
245 }
246 OutputItems::Groups(groups) => {
247 PipelineItems::Materialized(OutputItems::Groups(groups))
248 }
249 }
250 };
251 Self {
252 items,
253 document,
254 wants_copy,
255 render_recommendation,
256 compiled,
257 }
258 }
259
260 fn new_stream<I>(rows: I, wants_copy: bool, compiled: CompiledPipeline) -> Self
261 where
262 I: Iterator<Item = Row> + 'static,
263 {
264 Self {
265 items: PipelineItems::RowStream(Box::new(rows.map(Ok))),
266 document: None,
267 wants_copy,
268 render_recommendation: None,
269 compiled,
270 }
271 }
272
273 fn run(mut self) -> Result<OutputResult> {
274 let stages = self.compiled.stages.clone();
275 for stage in &stages {
276 self.apply_stage(stage)?;
277 }
278 self.into_output_result()
279 }
280
281 fn apply_stage(&mut self, stage: &CompiledStage) -> Result<()> {
282 if !stage.preserves_render_recommendation() {
283 self.render_recommendation = None;
284 }
285
286 if matches!(self.items, PipelineItems::Semantic(_)) {
287 self.apply_semantic_stage(stage)?;
288 return Ok(());
289 }
290
291 if stage.can_stream()
292 && let PipelineItems::RowStream(_) = self.items
293 {
294 self.apply_stream_stage(stage)?;
295 return Ok(());
296 }
297
298 let items = self.materialize_items()?;
299 self.items = PipelineItems::Materialized(self.apply_flat_stage(items, stage)?);
300 self.sync_document_to_items();
301 Ok(())
302 }
303
304 fn apply_semantic_stage(&mut self, stage: &CompiledStage) -> Result<()> {
305 let items = std::mem::replace(
306 &mut self.items,
307 PipelineItems::Semantic(serde_json::Value::Null),
308 );
309 let PipelineItems::Semantic(value) = items else {
310 self.items = items;
311 return Err(anyhow!("semantic stage dispatch requires semantic items"));
312 };
313
314 if matches!(stage, CompiledStage::Copy) {
315 self.wants_copy = true;
316 }
317
318 let transformed = value_stage::apply_stage(value, stage)?;
319 self.items = PipelineItems::Semantic(transformed);
320 match stage.semantic_effect() {
321 SemanticEffect::Preserve | SemanticEffect::Transform => {
325 self.sync_document_to_items();
326 }
327 SemanticEffect::Degrade => {
330 self.document = None;
331 }
332 }
333 Ok(())
334 }
335
336 fn apply_stream_stage(&mut self, stage: &CompiledStage) -> Result<()> {
337 let stream = match std::mem::replace(
338 &mut self.items,
339 PipelineItems::RowStream(Box::new(std::iter::empty())),
340 ) {
341 PipelineItems::RowStream(stream) => stream,
342 PipelineItems::Materialized(items) => {
343 debug_assert!(
344 false,
345 "apply_stream_stage called after pipeline had already materialized"
346 );
347 self.items = PipelineItems::Materialized(items);
348 return Ok(());
349 }
350 PipelineItems::Semantic(value) => {
351 debug_assert!(
352 false,
353 "apply_stream_stage called for semantic payload execution"
354 );
355 self.items = PipelineItems::Semantic(value);
356 return Ok(());
357 }
358 };
359
360 self.items = PipelineItems::RowStream(match stage {
361 CompiledStage::Quick(plan) => {
362 Box::new(quick::stream_rows_with_plan(stream, plan.clone()))
363 }
364 CompiledStage::Filter(plan) => {
365 let plan = plan.clone();
366 Box::new(stream.filter_map(move |row| match row {
367 Ok(row) if plan.matches(&row) => Some(Ok(row)),
368 Ok(_) => None,
369 Err(err) => Some(Err(err)),
370 }))
371 }
372 CompiledStage::Project(plan) => {
373 let plan = plan.clone();
374 stream_row_fanout_result(stream, move |row| plan.project_row(&row))
375 }
376 CompiledStage::Unroll(plan) => {
377 let plan = plan.clone();
378 stream_row_fanout_result(stream, move |row| plan.expand_row(&row))
379 }
380 CompiledStage::Values(plan) => {
381 let plan = plan.clone();
382 stream_row_fanout(stream, move |row| plan.extract_row(&row))
383 }
384 CompiledStage::Limit(spec) => {
385 debug_assert!(spec.is_head_only());
386 Box::new(
387 stream
388 .skip(spec.offset as usize)
389 .take(spec.count.max(0) as usize),
390 )
391 }
392 CompiledStage::Copy => {
393 self.wants_copy = true;
394 stream
395 }
396 CompiledStage::ValueQuick(plan)
397 | CompiledStage::KeyQuick(plan)
398 | CompiledStage::Question(plan) => {
399 Box::new(quick::stream_rows_with_plan(stream, plan.clone()))
400 }
401 CompiledStage::Clean => Box::new(stream.filter_map(|row| match row {
402 Ok(row) => question::clean_row(row).map(Ok),
403 Err(err) => Some(Err(err)),
404 })),
405 other => {
406 return Err(anyhow!(
407 "stream stage not implemented for compiled stage: {:?}",
408 other
409 ));
410 }
411 });
412 Ok(())
413 }
414
415 fn apply_flat_stage(
416 &mut self,
417 items: OutputItems,
418 stage: &CompiledStage,
419 ) -> Result<OutputItems> {
420 match stage {
421 CompiledStage::Quick(plan) => match items {
422 OutputItems::Rows(rows) => {
423 quick::apply_with_plan(rows, plan).map(OutputItems::Rows)
424 }
425 OutputItems::Groups(groups) => {
426 quick::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
427 }
428 },
429 CompiledStage::Filter(plan) => match items {
430 OutputItems::Rows(rows) => {
431 filter::apply_with_plan(rows, plan).map(OutputItems::Rows)
432 }
433 OutputItems::Groups(groups) => {
434 filter::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
435 }
436 },
437 CompiledStage::Project(plan) => match items {
438 OutputItems::Rows(rows) => {
439 project::apply_with_plan(rows, plan).map(OutputItems::Rows)
440 }
441 OutputItems::Groups(groups) => {
442 project::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
443 }
444 },
445 CompiledStage::Unroll(plan) => match items {
446 OutputItems::Rows(rows) => {
447 unroll::apply_with_plan(rows, plan).map(OutputItems::Rows)
448 }
449 OutputItems::Groups(groups) => {
450 unroll::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
451 }
452 },
453 CompiledStage::Values(plan) => match items {
454 OutputItems::Rows(rows) => {
455 values::apply_with_plan(rows, plan).map(OutputItems::Rows)
456 }
457 OutputItems::Groups(groups) => {
458 values::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
459 }
460 },
461 CompiledStage::ValueQuick(plan)
462 | CompiledStage::KeyQuick(plan)
463 | CompiledStage::Question(plan) => match items {
464 OutputItems::Rows(rows) => {
465 quick::apply_with_plan(rows, plan).map(OutputItems::Rows)
466 }
467 OutputItems::Groups(groups) => {
468 quick::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
469 }
470 },
471 CompiledStage::Limit(spec) => match items {
472 OutputItems::Rows(rows) => {
473 Ok(OutputItems::Rows(limit::apply_with_spec(rows, *spec)))
474 }
475 OutputItems::Groups(groups) => {
476 Ok(OutputItems::Groups(limit::apply_with_spec(groups, *spec)))
477 }
478 },
479 CompiledStage::Sort(plan) => sort::apply_with_plan(items, plan),
480 CompiledStage::Group(spec) => match items {
481 OutputItems::Rows(rows) => Ok(OutputItems::Groups(group::group_rows_with_plan(
482 rows, spec,
483 )?)),
484 OutputItems::Groups(groups) => Ok(OutputItems::Groups(
485 group::regroup_groups_with_plan(groups, spec)?,
486 )),
487 },
488 CompiledStage::Aggregate(plan) => aggregate::apply_with_plan(items, plan),
489 CompiledStage::Collapse => collapse::apply(items),
490 CompiledStage::CountMacro => aggregate::count_macro(items, ""),
491 CompiledStage::Copy => {
492 self.wants_copy = true;
493 Ok(match items {
494 OutputItems::Rows(rows) => OutputItems::Rows(copy::apply(rows)),
495 OutputItems::Groups(groups) => OutputItems::Groups(groups),
496 })
497 }
498 CompiledStage::Clean => Ok(question::clean_items(items)),
499 CompiledStage::Jq(expr) => jq::apply_with_expr(items, expr),
500 }
501 }
502
503 fn materialize_items(&mut self) -> Result<OutputItems> {
504 match std::mem::replace(
505 &mut self.items,
506 PipelineItems::Materialized(OutputItems::Rows(Vec::new())),
507 ) {
508 PipelineItems::RowStream(stream) => {
509 let rows = materialize_row_stream(stream)?;
510 Ok(OutputItems::Rows(rows))
511 }
512 PipelineItems::Materialized(items) => Ok(items),
513 PipelineItems::Semantic(value) => Ok(output_items_from_value(value)),
514 }
515 }
516
517 fn finish_items(&mut self) -> Result<OutputItems> {
518 self.materialize_items()
519 }
520
521 fn into_output_result(mut self) -> Result<OutputResult> {
522 let semantic_value = if let PipelineItems::Semantic(ref v) = self.items {
525 Some(v.clone())
526 } else {
527 None
528 };
529 let items = self.finish_items()?;
530 let meta = self.build_output_meta(&items);
531 let document = match semantic_value {
532 Some(value) => self.document.map(|document| OutputDocument {
533 kind: document.kind,
534 value,
535 }),
536 None => self.document,
537 };
538
539 Ok(OutputResult {
540 items,
541 document,
542 meta,
543 })
544 }
545
546 fn sync_document_to_items(&mut self) {
547 let Some(document) = self.document.as_mut() else {
548 return;
549 };
550 match &self.items {
551 PipelineItems::Materialized(items) => {
552 *document = document.project_over_items(items);
553 }
554 PipelineItems::Semantic(value) => {
555 document.value = value.clone();
556 }
557 PipelineItems::RowStream(_) => {}
558 }
559 }
560
561 fn build_output_meta(&self, items: &OutputItems) -> OutputMeta {
562 let key_index = match items {
563 OutputItems::Rows(rows) => RowContext::from_rows(rows).key_index().to_vec(),
564 OutputItems::Groups(groups) => {
565 let headers = groups.iter().map(merged_group_header).collect::<Vec<_>>();
566 RowContext::from_rows(&headers).key_index().to_vec()
567 }
568 };
569
570 OutputMeta {
571 key_index,
572 column_align: Vec::new(),
573 wants_copy: self.wants_copy,
574 grouped: matches!(items, OutputItems::Groups(_)),
575 render_recommendation: self.render_recommendation,
576 }
577 }
578}
579
580fn materialize_row_stream(stream: RowStream) -> Result<Vec<Row>> {
581 stream.collect()
582}
583
584fn stream_row_fanout<I, F>(stream: RowStream, fanout: F) -> RowStream
585where
586 I: IntoIterator<Item = Row>,
587 F: Fn(Row) -> I + 'static,
588{
589 Box::new(stream.flat_map(move |row| {
590 match row {
591 Ok(row) => fanout(row)
592 .into_iter()
593 .map(Ok)
594 .collect::<Vec<_>>()
595 .into_iter(),
596 Err(err) => vec![Err(err)].into_iter(),
597 }
598 }))
599}
600
601fn stream_row_fanout_result<I, F>(stream: RowStream, fanout: F) -> RowStream
602where
603 I: IntoIterator<Item = Row>,
604 F: Fn(Row) -> Result<I> + 'static,
605{
606 Box::new(stream.flat_map(move |row| match row {
607 Ok(row) => match fanout(row) {
608 Ok(rows) => rows.into_iter().map(Ok).collect::<Vec<_>>().into_iter(),
609 Err(err) => vec![Err(err)].into_iter(),
610 },
611 Err(err) => vec![Err(err)].into_iter(),
612 }))
613}
614
615fn merged_group_header(group: &crate::core::output_model::Group) -> Row {
616 let mut row = group.groups.clone();
617 row.extend(group.aggregates.clone());
618 row
619}
620
621#[cfg(test)]
622#[path = "tests/engine.rs"]
623mod tests;