Skip to main content

osp_cli/dsl/
engine.rs

1//! Self-contained executor body for the canonical DSL.
2//!
3//! The document-first executor keeps semantic JSON canonical through the
4//! pipeline and lowers to rows/groups only where verb semantics need that
5//! substrate.
6//!
7//! Rule of thumb:
8//! - selector verbs narrow or rewrite addressed structure
9//! - collection verbs operate on row/group collections
10//! - the semantic payload stays canonical JSON until a stage intentionally
11//!   degrades it
12//!
13//! Example:
14//! - `help | P commands[].name` stays on the semantic path and rebuilds
15//!   `{"commands": [{"name": ...}, ...]}`
16//! - `... | VALUE name` then transforms that narrowed structure into value rows
17//! - `... | G value` crosses onto the row/group substrate on purpose
18//!
19//! Keep that boundary explicit. If a selector verb starts looking like a custom
20//! row/group traversal, it usually belongs in `verbs::selector` or `verbs::json`
21//! instead of growing new engine-side special cases.
22
23use crate::core::{
24    output_model::{
25        OutputDocument, OutputItems, OutputMeta, OutputResult, RenderRecommendation,
26        output_items_from_value,
27    },
28    row::Row,
29};
30use anyhow::{Result, anyhow};
31
32use super::value as value_stage;
33use crate::dsl::verbs::{
34    aggregate, collapse, copy, filter, group, jq, limit, project, question, quick, sort, unroll,
35    values,
36};
37use crate::dsl::{
38    compiled::{CompiledPipeline, CompiledStage, SemanticEffect},
39    eval::context::RowContext,
40    parse::pipeline::parse_stage_list,
41};
42
43/// Apply a pipeline to plain row output.
44///
45/// This starts with `wants_copy = false` because there is no prior output meta
46/// to preserve.
47pub fn apply_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
48    apply_output_pipeline(OutputResult::from_rows(rows), stages)
49}
50
51/// Apply a pipeline to existing output without flattening grouped data first.
52///
53/// Unlike `apply_pipeline`, this preserves the incoming `OutputMeta.wants_copy`
54/// bit when continuing an existing output flow.
55pub fn apply_output_pipeline(output: OutputResult, stages: &[String]) -> Result<OutputResult> {
56    execute_pipeline_items(
57        output.items,
58        output.document,
59        output.meta.wants_copy,
60        output.meta.render_recommendation,
61        stages,
62    )
63}
64
65/// Execute a pipeline starting from plain rows.
66///
67/// This is the lower-level row entrypoint used by tests and internal helpers.
68/// Like `apply_pipeline`, it starts with `wants_copy = false`.
69pub fn execute_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
70    execute_pipeline_streaming(rows, stages)
71}
72
73/// Execute a pipeline from any row iterator.
74///
75/// This keeps flat row stages on an iterator-backed path until a stage
76/// requires full materialization (for example sort/group/aggregate/jq).
77pub fn execute_pipeline_streaming<I>(rows: I, stages: &[String]) -> Result<OutputResult>
78where
79    I: IntoIterator<Item = Row>,
80    I::IntoIter: 'static,
81{
82    let parsed = parse_stage_list(stages)?;
83    let compiled = CompiledPipeline::from_parsed(parsed)?;
84    PipelineExecutor::new_stream(rows.into_iter(), false, compiled).run()
85}
86
87fn execute_pipeline_items(
88    items: OutputItems,
89    initial_document: Option<OutputDocument>,
90    initial_wants_copy: bool,
91    initial_render_recommendation: Option<RenderRecommendation>,
92    stages: &[String],
93) -> Result<OutputResult> {
94    let parsed = parse_stage_list(stages)?;
95    let compiled = CompiledPipeline::from_parsed(parsed)?;
96    PipelineExecutor::new(
97        items,
98        initial_document,
99        initial_wants_copy,
100        initial_render_recommendation,
101        compiled,
102    )
103    .run()
104}
105
106/// Small stateful executor for one parsed pipeline.
107///
108/// Keeping execution state on a struct makes it easier to read the pipeline
109/// flow without carrying `items` / `wants_copy` through every helper.
110type RowStream = Box<dyn Iterator<Item = Result<Row>>>;
111
112enum PipelineItems {
113    RowStream(RowStream),
114    Materialized(OutputItems),
115    Semantic(serde_json::Value),
116}
117
118struct PipelineExecutor {
119    items: PipelineItems,
120    document: Option<OutputDocument>,
121    wants_copy: bool,
122    render_recommendation: Option<RenderRecommendation>,
123    compiled: CompiledPipeline,
124}
125
126impl PipelineExecutor {
127    fn new(
128        items: OutputItems,
129        document: Option<OutputDocument>,
130        wants_copy: bool,
131        render_recommendation: Option<RenderRecommendation>,
132        compiled: CompiledPipeline,
133    ) -> Self {
134        let items = if let Some(document) = document.as_ref() {
135            // Semantic payloads stay canonical as JSON through the DSL.
136            // Generic rows/groups are derived only when the pipeline needs to
137            // emit the final `OutputResult`.
138            PipelineItems::Semantic(document.value.clone())
139        } else {
140            match items {
141                OutputItems::Rows(rows) => {
142                    PipelineItems::RowStream(Box::new(rows.into_iter().map(Ok)))
143                }
144                OutputItems::Groups(groups) => {
145                    PipelineItems::Materialized(OutputItems::Groups(groups))
146                }
147            }
148        };
149        Self {
150            items,
151            document,
152            wants_copy,
153            render_recommendation,
154            compiled,
155        }
156    }
157
158    fn new_stream<I>(rows: I, wants_copy: bool, compiled: CompiledPipeline) -> Self
159    where
160        I: Iterator<Item = Row> + 'static,
161    {
162        Self {
163            items: PipelineItems::RowStream(Box::new(rows.map(Ok))),
164            document: None,
165            wants_copy,
166            render_recommendation: None,
167            compiled,
168        }
169    }
170
171    fn run(mut self) -> Result<OutputResult> {
172        let stages = self.compiled.stages.clone();
173        for stage in &stages {
174            self.apply_stage(stage)?;
175        }
176        self.into_output_result()
177    }
178
179    fn apply_stage(&mut self, stage: &CompiledStage) -> Result<()> {
180        if !stage.preserves_render_recommendation() {
181            self.render_recommendation = None;
182        }
183
184        if matches!(self.items, PipelineItems::Semantic(_)) {
185            self.apply_semantic_stage(stage)?;
186            return Ok(());
187        }
188
189        if stage.can_stream()
190            && let PipelineItems::RowStream(_) = self.items
191        {
192            self.apply_stream_stage(stage)?;
193            return Ok(());
194        }
195
196        let items = self.materialize_items()?;
197        self.items = PipelineItems::Materialized(self.apply_flat_stage(items, stage)?);
198        self.sync_document_to_items();
199        Ok(())
200    }
201
202    fn apply_semantic_stage(&mut self, stage: &CompiledStage) -> Result<()> {
203        let PipelineItems::Semantic(value) = std::mem::replace(
204            &mut self.items,
205            PipelineItems::Semantic(serde_json::Value::Null),
206        ) else {
207            unreachable!("semantic stage dispatch requires semantic items");
208        };
209
210        if matches!(stage, CompiledStage::Copy) {
211            self.wants_copy = true;
212        }
213
214        let transformed = value_stage::apply_stage(value, stage)?;
215        self.items = PipelineItems::Semantic(transformed);
216        match stage.semantic_effect() {
217            // Preserve/transform both keep the semantic payload attached. The
218            // renderer decides later whether the transformed JSON still
219            // restores as the original semantic kind.
220            SemanticEffect::Preserve | SemanticEffect::Transform => {
221                self.sync_document_to_items();
222            }
223            // Destructive stages like `C`, `Z`, and `JQ` intentionally stop
224            // claiming the result is still guide/help-shaped semantic output.
225            SemanticEffect::Degrade => {
226                self.document = None;
227            }
228        }
229        Ok(())
230    }
231
232    fn apply_stream_stage(&mut self, stage: &CompiledStage) -> Result<()> {
233        let stream = match std::mem::replace(
234            &mut self.items,
235            PipelineItems::RowStream(Box::new(std::iter::empty())),
236        ) {
237            PipelineItems::RowStream(stream) => stream,
238            PipelineItems::Materialized(items) => {
239                debug_assert!(
240                    false,
241                    "apply_stream_stage called after pipeline had already materialized"
242                );
243                self.items = PipelineItems::Materialized(items);
244                return Ok(());
245            }
246            PipelineItems::Semantic(value) => {
247                debug_assert!(
248                    false,
249                    "apply_stream_stage called for semantic payload execution"
250                );
251                self.items = PipelineItems::Semantic(value);
252                return Ok(());
253            }
254        };
255
256        self.items = PipelineItems::RowStream(match stage {
257            CompiledStage::Quick(plan) => {
258                Box::new(quick::stream_rows_with_plan(stream, plan.clone()))
259            }
260            CompiledStage::Filter(plan) => {
261                let plan = plan.clone();
262                Box::new(stream.filter_map(move |row| match row {
263                    Ok(row) if plan.matches(&row) => Some(Ok(row)),
264                    Ok(_) => None,
265                    Err(err) => Some(Err(err)),
266                }))
267            }
268            CompiledStage::Project(plan) => {
269                let plan = plan.clone();
270                stream_row_fanout_result(stream, move |row| plan.project_row(&row))
271            }
272            CompiledStage::Unroll(plan) => {
273                let plan = plan.clone();
274                stream_row_fanout_result(stream, move |row| plan.expand_row(&row))
275            }
276            CompiledStage::Values(plan) => {
277                let plan = plan.clone();
278                stream_row_fanout(stream, move |row| plan.extract_row(&row))
279            }
280            CompiledStage::Limit(spec) => {
281                debug_assert!(spec.is_head_only());
282                Box::new(
283                    stream
284                        .skip(spec.offset as usize)
285                        .take(spec.count.max(0) as usize),
286                )
287            }
288            CompiledStage::Copy => {
289                self.wants_copy = true;
290                stream
291            }
292            CompiledStage::ValueQuick(plan)
293            | CompiledStage::KeyQuick(plan)
294            | CompiledStage::Question(plan) => {
295                Box::new(quick::stream_rows_with_plan(stream, plan.clone()))
296            }
297            CompiledStage::Clean => Box::new(stream.filter_map(|row| match row {
298                Ok(row) => question::clean_row(row).map(Ok),
299                Err(err) => Some(Err(err)),
300            })),
301            other => {
302                return Err(anyhow!(
303                    "stream stage not implemented for compiled stage: {:?}",
304                    other
305                ));
306            }
307        });
308        Ok(())
309    }
310
311    fn apply_flat_stage(
312        &mut self,
313        items: OutputItems,
314        stage: &CompiledStage,
315    ) -> Result<OutputItems> {
316        match stage {
317            CompiledStage::Quick(plan) => match items {
318                OutputItems::Rows(rows) => {
319                    quick::apply_with_plan(rows, plan).map(OutputItems::Rows)
320                }
321                OutputItems::Groups(groups) => {
322                    quick::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
323                }
324            },
325            CompiledStage::Filter(plan) => match items {
326                OutputItems::Rows(rows) => {
327                    filter::apply_with_plan(rows, plan).map(OutputItems::Rows)
328                }
329                OutputItems::Groups(groups) => {
330                    filter::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
331                }
332            },
333            CompiledStage::Project(plan) => match items {
334                OutputItems::Rows(rows) => {
335                    project::apply_with_plan(rows, plan).map(OutputItems::Rows)
336                }
337                OutputItems::Groups(groups) => {
338                    project::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
339                }
340            },
341            CompiledStage::Unroll(plan) => match items {
342                OutputItems::Rows(rows) => {
343                    unroll::apply_with_plan(rows, plan).map(OutputItems::Rows)
344                }
345                OutputItems::Groups(groups) => {
346                    unroll::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
347                }
348            },
349            CompiledStage::Values(plan) => match items {
350                OutputItems::Rows(rows) => {
351                    values::apply_with_plan(rows, plan).map(OutputItems::Rows)
352                }
353                OutputItems::Groups(groups) => {
354                    values::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
355                }
356            },
357            CompiledStage::ValueQuick(plan)
358            | CompiledStage::KeyQuick(plan)
359            | CompiledStage::Question(plan) => match items {
360                OutputItems::Rows(rows) => {
361                    quick::apply_with_plan(rows, plan).map(OutputItems::Rows)
362                }
363                OutputItems::Groups(groups) => {
364                    quick::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
365                }
366            },
367            CompiledStage::Limit(spec) => match items {
368                OutputItems::Rows(rows) => {
369                    Ok(OutputItems::Rows(limit::apply_with_spec(rows, *spec)))
370                }
371                OutputItems::Groups(groups) => {
372                    Ok(OutputItems::Groups(limit::apply_with_spec(groups, *spec)))
373                }
374            },
375            CompiledStage::Sort(plan) => sort::apply_with_plan(items, plan),
376            CompiledStage::Group(spec) => match items {
377                OutputItems::Rows(rows) => Ok(OutputItems::Groups(group::group_rows_with_plan(
378                    rows, spec,
379                )?)),
380                OutputItems::Groups(groups) => Ok(OutputItems::Groups(
381                    group::regroup_groups_with_plan(groups, spec)?,
382                )),
383            },
384            CompiledStage::Aggregate(plan) => aggregate::apply_with_plan(items, plan),
385            CompiledStage::Collapse => collapse::apply(items),
386            CompiledStage::CountMacro => aggregate::count_macro(items, ""),
387            CompiledStage::Copy => {
388                self.wants_copy = true;
389                Ok(match items {
390                    OutputItems::Rows(rows) => OutputItems::Rows(copy::apply(rows)),
391                    OutputItems::Groups(groups) => OutputItems::Groups(groups),
392                })
393            }
394            CompiledStage::Clean => Ok(question::clean_items(items)),
395            CompiledStage::Jq(expr) => jq::apply_with_expr(items, expr),
396        }
397    }
398
399    fn materialize_items(&mut self) -> Result<OutputItems> {
400        match std::mem::replace(
401            &mut self.items,
402            PipelineItems::Materialized(OutputItems::Rows(Vec::new())),
403        ) {
404            PipelineItems::RowStream(stream) => {
405                let rows = materialize_row_stream(stream)?;
406                Ok(OutputItems::Rows(rows))
407            }
408            PipelineItems::Materialized(items) => Ok(items),
409            PipelineItems::Semantic(value) => Ok(output_items_from_value(value)),
410        }
411    }
412
413    fn finish_items(&mut self) -> Result<OutputItems> {
414        self.materialize_items()
415    }
416
417    fn into_output_result(mut self) -> Result<OutputResult> {
418        // Capture the semantic value before finish_items replaces self.items via
419        // mem::replace — after that call self.items is always Materialized.
420        let semantic_value = if let PipelineItems::Semantic(ref v) = self.items {
421            Some(v.clone())
422        } else {
423            None
424        };
425        let items = self.finish_items()?;
426        let meta = self.build_output_meta(&items);
427        let document = match semantic_value {
428            Some(value) => self.document.map(|document| OutputDocument {
429                kind: document.kind,
430                value,
431            }),
432            None => self.document,
433        };
434
435        Ok(OutputResult {
436            items,
437            document,
438            meta,
439        })
440    }
441
442    fn sync_document_to_items(&mut self) {
443        let Some(document) = self.document.as_mut() else {
444            return;
445        };
446        match &self.items {
447            PipelineItems::Materialized(items) => {
448                *document = document.project_over_items(items);
449            }
450            PipelineItems::Semantic(value) => {
451                document.value = value.clone();
452            }
453            PipelineItems::RowStream(_) => {}
454        }
455    }
456
457    fn build_output_meta(&self, items: &OutputItems) -> OutputMeta {
458        let key_index = match items {
459            OutputItems::Rows(rows) => RowContext::from_rows(rows).key_index().to_vec(),
460            OutputItems::Groups(groups) => {
461                let headers = groups.iter().map(merged_group_header).collect::<Vec<_>>();
462                RowContext::from_rows(&headers).key_index().to_vec()
463            }
464        };
465
466        OutputMeta {
467            key_index,
468            column_align: Vec::new(),
469            wants_copy: self.wants_copy,
470            grouped: matches!(items, OutputItems::Groups(_)),
471            render_recommendation: self.render_recommendation,
472        }
473    }
474}
475
476fn materialize_row_stream(stream: RowStream) -> Result<Vec<Row>> {
477    stream.collect()
478}
479
480fn stream_row_fanout<I, F>(stream: RowStream, fanout: F) -> RowStream
481where
482    I: IntoIterator<Item = Row>,
483    F: Fn(Row) -> I + 'static,
484{
485    Box::new(stream.flat_map(move |row| {
486        match row {
487            Ok(row) => fanout(row)
488                .into_iter()
489                .map(Ok)
490                .collect::<Vec<_>>()
491                .into_iter(),
492            Err(err) => vec![Err(err)].into_iter(),
493        }
494    }))
495}
496
497fn stream_row_fanout_result<I, F>(stream: RowStream, fanout: F) -> RowStream
498where
499    I: IntoIterator<Item = Row>,
500    F: Fn(Row) -> Result<I> + 'static,
501{
502    Box::new(stream.flat_map(move |row| match row {
503        Ok(row) => match fanout(row) {
504            Ok(rows) => rows.into_iter().map(Ok).collect::<Vec<_>>().into_iter(),
505            Err(err) => vec![Err(err)].into_iter(),
506        },
507        Err(err) => vec![Err(err)].into_iter(),
508    }))
509}
510
511fn merged_group_header(group: &crate::core::output_model::Group) -> Row {
512    let mut row = group.groups.clone();
513    row.extend(group.aggregates.clone());
514    row
515}
516
517#[cfg(test)]
518#[path = "tests/engine.rs"]
519mod tests;