use crate::core::{
output_model::{
OutputDocument, OutputItems, OutputMeta, OutputResult, RenderRecommendation,
output_items_from_value,
},
row::Row,
};
use anyhow::{Result, anyhow};
use super::value as value_stage;
use crate::dsl::verbs::{
aggregate, collapse, copy, filter, group, jq, limit, project, question, quick, sort, unroll,
values,
};
use crate::dsl::{
compiled::{CompiledPipeline, CompiledStage, SemanticEffect, StageBehavior},
eval::context::RowContext,
parse::pipeline::parse_stage_list,
};
pub fn apply_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
apply_output_pipeline(OutputResult::from_rows(rows), stages)
}
pub fn apply_output_pipeline(output: OutputResult, stages: &[String]) -> Result<OutputResult> {
execute_pipeline_items(
output.items,
output.document,
output.meta.wants_copy,
output.meta.render_recommendation,
stages,
)
}
pub fn execute_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
execute_pipeline_streaming(rows, stages)
}
pub fn execute_pipeline_streaming<I>(rows: I, stages: &[String]) -> Result<OutputResult>
where
I: IntoIterator<Item = Row>,
I::IntoIter: 'static,
{
let parsed = parse_stage_list(stages)?;
let compiled = CompiledPipeline::from_parsed(parsed)?;
PipelineExecutor::new_stream(rows.into_iter(), false, compiled).run()
}
fn execute_pipeline_items(
items: OutputItems,
initial_document: Option<OutputDocument>,
initial_wants_copy: bool,
initial_render_recommendation: Option<RenderRecommendation>,
stages: &[String],
) -> Result<OutputResult> {
let parsed = parse_stage_list(stages)?;
let compiled = CompiledPipeline::from_parsed(parsed)?;
PipelineExecutor::new(
items,
initial_document,
initial_wants_copy,
initial_render_recommendation,
compiled,
)
.run()
}
type RowStream = Box<dyn Iterator<Item = Result<Row>>>;
enum PipelineItems {
RowStream(RowStream),
Materialized(OutputItems),
Semantic(serde_json::Value),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum StageExecutionRoute {
Semantic(SemanticEffect),
Stream,
Materialized,
}
struct PipelineExecutor {
items: PipelineItems,
document: Option<OutputDocument>,
wants_copy: bool,
render_recommendation: Option<RenderRecommendation>,
compiled: CompiledPipeline,
}
impl PipelineExecutor {
fn new(
items: OutputItems,
document: Option<OutputDocument>,
wants_copy: bool,
render_recommendation: Option<RenderRecommendation>,
compiled: CompiledPipeline,
) -> Self {
let items = if let Some(document) = document.as_ref() {
PipelineItems::Semantic(document.value.clone())
} else {
match items {
OutputItems::Rows(rows) => {
PipelineItems::RowStream(Box::new(rows.into_iter().map(Ok)))
}
OutputItems::Groups(groups) => {
PipelineItems::Materialized(OutputItems::Groups(groups))
}
}
};
Self {
items,
document,
wants_copy,
render_recommendation,
compiled,
}
}
fn new_stream<I>(rows: I, wants_copy: bool, compiled: CompiledPipeline) -> Self
where
I: Iterator<Item = Row> + 'static,
{
Self {
items: PipelineItems::RowStream(Box::new(rows.map(Ok))),
document: None,
wants_copy,
render_recommendation: None,
compiled,
}
}
fn run(mut self) -> Result<OutputResult> {
let stages = self.compiled.stages.clone();
for stage in &stages {
self.apply_stage(stage)?;
}
self.into_output_result()
}
fn apply_stage(&mut self, stage: &CompiledStage) -> Result<()> {
let behavior = stage.behavior();
self.apply_stage_side_effects(stage);
if !behavior.preserves_render_recommendation {
self.render_recommendation = None;
}
match resolve_stage_execution_route(&self.items, behavior) {
StageExecutionRoute::Semantic(semantic_effect) => {
self.apply_semantic_stage(stage, semantic_effect)
}
StageExecutionRoute::Stream => self.apply_stream_stage(stage),
StageExecutionRoute::Materialized => {
let items = self.materialize_items()?;
self.items = PipelineItems::Materialized(self.apply_flat_stage(items, stage)?);
self.sync_document_to_items();
Ok(())
}
}
}
fn apply_stage_side_effects(&mut self, stage: &CompiledStage) {
if matches!(stage, CompiledStage::Copy) {
self.wants_copy = true;
}
}
fn apply_semantic_stage(
&mut self,
stage: &CompiledStage,
semantic_effect: SemanticEffect,
) -> Result<()> {
let items = std::mem::replace(
&mut self.items,
PipelineItems::Semantic(serde_json::Value::Null),
);
let PipelineItems::Semantic(value) = items else {
self.items = items;
return Err(anyhow!("semantic stage dispatch requires semantic items"));
};
let transformed = value_stage::apply_stage(value, stage)?;
self.items = PipelineItems::Semantic(transformed);
match semantic_effect {
SemanticEffect::Preserve | SemanticEffect::Transform => {
self.sync_document_to_items();
}
SemanticEffect::Degrade => {
self.document = None;
}
}
Ok(())
}
fn apply_stream_stage(&mut self, stage: &CompiledStage) -> Result<()> {
let stream = match std::mem::replace(
&mut self.items,
PipelineItems::RowStream(Box::new(std::iter::empty())),
) {
PipelineItems::RowStream(stream) => stream,
PipelineItems::Materialized(items) => {
debug_assert!(
false,
"apply_stream_stage called after pipeline had already materialized"
);
self.items = PipelineItems::Materialized(items);
return Ok(());
}
PipelineItems::Semantic(value) => {
debug_assert!(
false,
"apply_stream_stage called for semantic payload execution"
);
self.items = PipelineItems::Semantic(value);
return Ok(());
}
};
if let Some(plan) = stage.quick_plan().cloned() {
self.items =
PipelineItems::RowStream(Box::new(quick::stream_rows_with_plan(stream, plan)));
return Ok(());
}
self.items = PipelineItems::RowStream(match stage {
CompiledStage::Filter(plan) => {
let plan = plan.clone();
Box::new(stream.filter_map(move |row| match row {
Ok(row) if plan.matches(&row) => Some(Ok(row)),
Ok(_) => None,
Err(err) => Some(Err(err)),
}))
}
CompiledStage::Project(plan) => {
let plan = plan.clone();
stream_row_fanout_result(stream, move |row| plan.project_row(&row))
}
CompiledStage::Unroll(plan) => {
let plan = plan.clone();
stream_row_fanout_result(stream, move |row| plan.expand_row(&row))
}
CompiledStage::Values(plan) => {
let plan = plan.clone();
stream_row_fanout(stream, move |row| plan.extract_row(&row))
}
CompiledStage::Limit(spec) => {
debug_assert!(spec.is_head_only());
Box::new(
stream
.skip(spec.offset as usize)
.take(spec.count.max(0) as usize),
)
}
CompiledStage::Copy => stream,
CompiledStage::Clean => Box::new(stream.filter_map(|row| match row {
Ok(row) => question::clean_row(row).map(Ok),
Err(err) => Some(Err(err)),
})),
other => {
return Err(anyhow!(
"stream stage not implemented for compiled stage: {:?}",
other
));
}
});
Ok(())
}
fn apply_flat_stage(
&mut self,
items: OutputItems,
stage: &CompiledStage,
) -> Result<OutputItems> {
if let Some(plan) = stage.quick_plan() {
return match items {
OutputItems::Rows(rows) => {
quick::apply_with_plan(rows, plan).map(OutputItems::Rows)
}
OutputItems::Groups(groups) => {
quick::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
}
};
}
match stage {
CompiledStage::Filter(plan) => match items {
OutputItems::Rows(rows) => {
filter::apply_with_plan(rows, plan).map(OutputItems::Rows)
}
OutputItems::Groups(groups) => {
filter::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
}
},
CompiledStage::Project(plan) => match items {
OutputItems::Rows(rows) => {
project::apply_with_plan(rows, plan).map(OutputItems::Rows)
}
OutputItems::Groups(groups) => {
project::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
}
},
CompiledStage::Unroll(plan) => match items {
OutputItems::Rows(rows) => {
unroll::apply_with_plan(rows, plan).map(OutputItems::Rows)
}
OutputItems::Groups(groups) => {
unroll::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
}
},
CompiledStage::Values(plan) => match items {
OutputItems::Rows(rows) => {
values::apply_with_plan(rows, plan).map(OutputItems::Rows)
}
OutputItems::Groups(groups) => {
values::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
}
},
CompiledStage::Limit(spec) => match items {
OutputItems::Rows(rows) => {
Ok(OutputItems::Rows(limit::apply_with_spec(rows, *spec)))
}
OutputItems::Groups(groups) => {
Ok(OutputItems::Groups(limit::apply_with_spec(groups, *spec)))
}
},
CompiledStage::Sort(plan) => sort::apply_with_plan(items, plan),
CompiledStage::Group(spec) => match items {
OutputItems::Rows(rows) => Ok(OutputItems::Groups(group::group_rows_with_plan(
rows, spec,
)?)),
OutputItems::Groups(groups) => Ok(OutputItems::Groups(
group::regroup_groups_with_plan(groups, spec)?,
)),
},
CompiledStage::Aggregate(plan) => aggregate::apply_with_plan(items, plan),
CompiledStage::Collapse => collapse::apply(items),
CompiledStage::CountMacro => aggregate::count_macro(items, ""),
CompiledStage::Copy => Ok(match items {
OutputItems::Rows(rows) => OutputItems::Rows(copy::apply(rows)),
OutputItems::Groups(groups) => OutputItems::Groups(groups),
}),
CompiledStage::Clean => Ok(question::clean_items(items)),
CompiledStage::Jq(expr) => jq::apply_with_expr(items, expr),
CompiledStage::Quick(_)
| CompiledStage::Question(_)
| CompiledStage::ValueQuick(_)
| CompiledStage::KeyQuick(_) => Err(anyhow!(
"quick family should have been handled before flat-stage dispatch"
)),
}
}
fn materialize_items(&mut self) -> Result<OutputItems> {
match std::mem::replace(
&mut self.items,
PipelineItems::Materialized(OutputItems::Rows(Vec::new())),
) {
PipelineItems::RowStream(stream) => {
let rows = materialize_row_stream(stream)?;
Ok(OutputItems::Rows(rows))
}
PipelineItems::Materialized(items) => Ok(items),
PipelineItems::Semantic(value) => Ok(output_items_from_value(value)),
}
}
fn finish_items(&mut self) -> Result<OutputItems> {
self.materialize_items()
}
fn into_output_result(mut self) -> Result<OutputResult> {
let semantic_value = if let PipelineItems::Semantic(ref v) = self.items {
Some(v.clone())
} else {
None
};
let items = self.finish_items()?;
let meta = self.build_output_meta(&items);
let document = match semantic_value {
Some(value) => self.document.map(|document| OutputDocument {
kind: document.kind,
value,
}),
None => self.document,
};
Ok(OutputResult {
items,
document,
meta,
})
}
fn sync_document_to_items(&mut self) {
let Some(document) = self.document.as_mut() else {
return;
};
match &self.items {
PipelineItems::Materialized(items) => {
*document = document.project_over_items(items);
}
PipelineItems::Semantic(value) => {
document.value = value.clone();
}
PipelineItems::RowStream(_) => {}
}
}
fn build_output_meta(&self, items: &OutputItems) -> OutputMeta {
let key_index = match items {
OutputItems::Rows(rows) => RowContext::from_rows(rows).key_index().to_vec(),
OutputItems::Groups(groups) => {
let headers = groups.iter().map(merged_group_header).collect::<Vec<_>>();
RowContext::from_rows(&headers).key_index().to_vec()
}
};
OutputMeta {
key_index,
column_align: Vec::new(),
wants_copy: self.wants_copy,
grouped: matches!(items, OutputItems::Groups(_)),
render_recommendation: self.render_recommendation,
}
}
}
fn materialize_row_stream(stream: RowStream) -> Result<Vec<Row>> {
stream.collect()
}
fn stream_row_fanout<I, F>(stream: RowStream, fanout: F) -> RowStream
where
I: IntoIterator<Item = Row>,
F: Fn(Row) -> I + 'static,
{
Box::new(stream.flat_map(move |row| {
match row {
Ok(row) => fanout(row)
.into_iter()
.map(Ok)
.collect::<Vec<_>>()
.into_iter(),
Err(err) => vec![Err(err)].into_iter(),
}
}))
}
fn stream_row_fanout_result<I, F>(stream: RowStream, fanout: F) -> RowStream
where
I: IntoIterator<Item = Row>,
F: Fn(Row) -> Result<I> + 'static,
{
Box::new(stream.flat_map(move |row| match row {
Ok(row) => match fanout(row) {
Ok(rows) => rows.into_iter().map(Ok).collect::<Vec<_>>().into_iter(),
Err(err) => vec![Err(err)].into_iter(),
},
Err(err) => vec![Err(err)].into_iter(),
}))
}
fn merged_group_header(group: &crate::core::output_model::Group) -> Row {
let mut row = group.groups.clone();
row.extend(group.aggregates.clone());
row
}
fn resolve_stage_execution_route(
items: &PipelineItems,
behavior: StageBehavior,
) -> StageExecutionRoute {
match items {
PipelineItems::Semantic(_) => StageExecutionRoute::Semantic(behavior.semantic_effect),
PipelineItems::RowStream(_) if behavior.can_stream => StageExecutionRoute::Stream,
PipelineItems::RowStream(_) | PipelineItems::Materialized(_) => {
StageExecutionRoute::Materialized
}
}
}
#[cfg(test)]
#[path = "tests/engine.rs"]
mod tests;