Skip to main content

osp_cli/dsl/
engine.rs

1//! Self-contained executor body for the canonical DSL.
2//!
3//! The document-first executor keeps semantic JSON canonical through the
4//! pipeline and lowers to rows/groups only where verb semantics need that
5//! substrate.
6//!
7//! Rule of thumb:
8//! - selector verbs narrow or rewrite addressed structure
9//! - collection verbs operate on row/group collections
10//! - the semantic payload stays canonical JSON until a stage intentionally
11//!   degrades it
12//!
13//! Example:
14//! - `help | P commands[].name` stays on the semantic path and rebuilds
15//!   `{"commands": [{"name": ...}, ...]}`
16//! - `... | VALUE name` then transforms that narrowed structure into value rows
17//! - `... | G value` crosses onto the row/group substrate on purpose
18//!
19//! Keep that boundary explicit. If a selector verb starts looking like a custom
20//! row/group traversal, it usually belongs in `verbs::selector` or `verbs::json`
21//! instead of growing new engine-side special cases.
22//!
23//! Caller rule of thumb:
24//!
25//! - [`apply_pipeline`] is the friendly "I already have rows" entrypoint
26//! - [`apply_output_pipeline`] is the continuation path when output already has
27//!   semantic-document or metadata state attached
28//! - [`execute_pipeline_streaming`] is the iterator-oriented path when callers
29//!   want streamable stages to avoid eager materialization
30
31use crate::core::{
32    output_model::{
33        OutputDocument, OutputItems, OutputMeta, OutputResult, RenderRecommendation,
34        output_items_from_value,
35    },
36    row::Row,
37};
38use anyhow::{Result, anyhow};
39
40use super::value as value_stage;
41use crate::dsl::verbs::{
42    aggregate, collapse, copy, filter, group, jq, limit, project, question, quick, sort, unroll,
43    values,
44};
45use crate::dsl::{
46    compiled::{CompiledPipeline, CompiledStage, SemanticEffect, StageBehavior},
47    eval::context::RowContext,
48    parse::pipeline::parse_stage_list,
49};
50
51/// Apply a pipeline to plain row output.
52///
53/// Use this when a command has already produced `Vec<Row>` and you want the
54/// ordinary `osp` pipeline behavior without thinking about existing output
55/// metadata.
56///
57/// This starts with `wants_copy = false` because there is no prior output meta
58/// to preserve.
59///
60/// # Examples
61///
62/// ```
63/// use osp_cli::dsl::apply_pipeline;
64/// use osp_cli::row;
65///
66/// let output = apply_pipeline(
67///     vec![
68///         row! { "uid" => "alice", "team" => "ops" },
69///         row! { "uid" => "bob", "team" => "infra" },
70///     ],
71///     &["F team=ops".to_string(), "P uid".to_string()],
72/// )?;
73///
74/// let rows = output.as_rows().unwrap();
75/// assert_eq!(rows.len(), 1);
76/// assert_eq!(rows[0]["uid"], "alice");
77/// assert!(!rows[0].contains_key("team"));
78/// # Ok::<(), anyhow::Error>(())
79/// ```
80pub fn apply_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
81    apply_output_pipeline(OutputResult::from_rows(rows), stages)
82}
83
84/// Apply a pipeline to existing output without flattening grouped data first.
85///
86/// Unlike `apply_pipeline`, this preserves the incoming `OutputMeta.wants_copy`
87/// bit when continuing an existing output flow.
88///
89/// Use this when the command already produced an [`OutputResult`] and later
90/// stages should inherit its render/document metadata instead of starting from
91/// scratch.
92///
93/// # Examples
94///
95/// ```
96/// use osp_cli::core::output_model::OutputResult;
97/// use osp_cli::dsl::apply_output_pipeline;
98/// use osp_cli::row;
99///
100/// let mut output = OutputResult::from_rows(vec![
101///     row! { "uid" => "alice" },
102///     row! { "uid" => "bob" },
103/// ]);
104/// output.meta.wants_copy = true;
105///
106/// let limited = apply_output_pipeline(output, &["L 1".to_string()])?;
107///
108/// assert!(limited.meta.wants_copy);
109/// assert_eq!(limited.as_rows().unwrap().len(), 1);
110/// # Ok::<(), anyhow::Error>(())
111/// ```
112pub fn apply_output_pipeline(output: OutputResult, stages: &[String]) -> Result<OutputResult> {
113    execute_pipeline_items(
114        output.items,
115        output.document,
116        output.meta.wants_copy,
117        output.meta.render_recommendation,
118        stages,
119    )
120}
121
122/// Execute a pipeline starting from plain rows.
123///
124/// This is the lower-level row entrypoint used by tests and internal helpers.
125/// Like `apply_pipeline`, it starts with `wants_copy = false`.
126///
127/// Prefer [`apply_pipeline`] for the common "rows in, output out" path. This
128/// entrypoint is more useful when you want the execution wording to match the
129/// streaming variant below.
130///
131/// # Examples
132///
133/// ```
134/// use osp_cli::dsl::execute_pipeline;
135/// use osp_cli::row;
136///
137/// let output = execute_pipeline(
138///     vec![
139///         row! { "uid" => "bob" },
140///         row! { "uid" => "alice" },
141///     ],
142///     &["S uid".to_string()],
143/// )?;
144///
145/// assert_eq!(output.as_rows().unwrap()[0]["uid"], "alice");
146/// # Ok::<(), anyhow::Error>(())
147/// ```
148pub fn execute_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
149    execute_pipeline_streaming(rows, stages)
150}
151
152/// Execute a pipeline from any row iterator.
153///
154/// This keeps flat row stages on an iterator-backed path until a stage
155/// requires full materialization (for example sort/group/aggregate/jq).
156///
157/// Use this when rows come from an iterator and you want streamable stages like
158/// `F`, `P`, `U`, and head-only `L` to stay incremental for as long as
159/// possible.
160///
161/// # Examples
162///
163/// ```
164/// use osp_cli::dsl::execute_pipeline_streaming;
165/// use osp_cli::row;
166///
167/// let output = execute_pipeline_streaming(
168///     vec![
169///         row! { "uid" => "alice" },
170///         row! { "uid" => "bob" },
171///     ],
172///     &["L 1".to_string()],
173/// )?;
174///
175/// assert_eq!(output.as_rows().unwrap()[0]["uid"], "alice");
176/// assert_eq!(output.as_rows().unwrap().len(), 1);
177/// # Ok::<(), anyhow::Error>(())
178/// ```
179pub fn execute_pipeline_streaming<I>(rows: I, stages: &[String]) -> Result<OutputResult>
180where
181    I: IntoIterator<Item = Row>,
182    I::IntoIter: 'static,
183{
184    let parsed = parse_stage_list(stages)?;
185    let compiled = CompiledPipeline::from_parsed(parsed)?;
186    PipelineExecutor::new_stream(rows.into_iter(), false, compiled).run()
187}
188
189fn execute_pipeline_items(
190    items: OutputItems,
191    initial_document: Option<OutputDocument>,
192    initial_wants_copy: bool,
193    initial_render_recommendation: Option<RenderRecommendation>,
194    stages: &[String],
195) -> Result<OutputResult> {
196    let parsed = parse_stage_list(stages)?;
197    let compiled = CompiledPipeline::from_parsed(parsed)?;
198    PipelineExecutor::new(
199        items,
200        initial_document,
201        initial_wants_copy,
202        initial_render_recommendation,
203        compiled,
204    )
205    .run()
206}
207
208/// Small stateful executor for one parsed pipeline.
209///
210/// Keeping execution state on a struct makes it easier to read the pipeline
211/// flow without carrying `items` / `wants_copy` through every helper.
212type RowStream = Box<dyn Iterator<Item = Result<Row>>>;
213
214enum PipelineItems {
215    RowStream(RowStream),
216    Materialized(OutputItems),
217    Semantic(serde_json::Value),
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
221enum StageExecutionRoute {
222    Semantic(SemanticEffect),
223    Stream,
224    Materialized,
225}
226
227struct PipelineExecutor {
228    items: PipelineItems,
229    document: Option<OutputDocument>,
230    wants_copy: bool,
231    render_recommendation: Option<RenderRecommendation>,
232    compiled: CompiledPipeline,
233}
234
235impl PipelineExecutor {
236    fn new(
237        items: OutputItems,
238        document: Option<OutputDocument>,
239        wants_copy: bool,
240        render_recommendation: Option<RenderRecommendation>,
241        compiled: CompiledPipeline,
242    ) -> Self {
243        let items = if let Some(document) = document.as_ref() {
244            // Semantic payloads stay canonical as JSON through the DSL.
245            // Generic rows/groups are derived only when the pipeline needs to
246            // emit the final `OutputResult`.
247            PipelineItems::Semantic(document.value.clone())
248        } else {
249            match items {
250                OutputItems::Rows(rows) => {
251                    PipelineItems::RowStream(Box::new(rows.into_iter().map(Ok)))
252                }
253                OutputItems::Groups(groups) => {
254                    PipelineItems::Materialized(OutputItems::Groups(groups))
255                }
256            }
257        };
258        Self {
259            items,
260            document,
261            wants_copy,
262            render_recommendation,
263            compiled,
264        }
265    }
266
267    fn new_stream<I>(rows: I, wants_copy: bool, compiled: CompiledPipeline) -> Self
268    where
269        I: Iterator<Item = Row> + 'static,
270    {
271        Self {
272            items: PipelineItems::RowStream(Box::new(rows.map(Ok))),
273            document: None,
274            wants_copy,
275            render_recommendation: None,
276            compiled,
277        }
278    }
279
280    fn run(mut self) -> Result<OutputResult> {
281        let stages = self.compiled.stages.clone();
282        for stage in &stages {
283            self.apply_stage(stage)?;
284        }
285        self.into_output_result()
286    }
287
288    fn apply_stage(&mut self, stage: &CompiledStage) -> Result<()> {
289        let behavior = stage.behavior();
290        self.apply_stage_side_effects(stage);
291        if !behavior.preserves_render_recommendation {
292            self.render_recommendation = None;
293        }
294
295        match resolve_stage_execution_route(&self.items, behavior) {
296            StageExecutionRoute::Semantic(semantic_effect) => {
297                self.apply_semantic_stage(stage, semantic_effect)
298            }
299            StageExecutionRoute::Stream => self.apply_stream_stage(stage),
300            StageExecutionRoute::Materialized => {
301                let items = self.materialize_items()?;
302                self.items = PipelineItems::Materialized(self.apply_flat_stage(items, stage)?);
303                self.sync_document_to_items();
304                Ok(())
305            }
306        }
307    }
308
309    fn apply_stage_side_effects(&mut self, stage: &CompiledStage) {
310        if matches!(stage, CompiledStage::Copy) {
311            self.wants_copy = true;
312        }
313    }
314
315    fn apply_semantic_stage(
316        &mut self,
317        stage: &CompiledStage,
318        semantic_effect: SemanticEffect,
319    ) -> Result<()> {
320        let items = std::mem::replace(
321            &mut self.items,
322            PipelineItems::Semantic(serde_json::Value::Null),
323        );
324        let PipelineItems::Semantic(value) = items else {
325            self.items = items;
326            return Err(anyhow!("semantic stage dispatch requires semantic items"));
327        };
328
329        let transformed = value_stage::apply_stage(value, stage)?;
330        self.items = PipelineItems::Semantic(transformed);
331        match semantic_effect {
332            // Preserve/transform both keep the semantic payload attached. The
333            // renderer decides later whether the transformed JSON still
334            // restores as the original semantic kind.
335            SemanticEffect::Preserve | SemanticEffect::Transform => {
336                self.sync_document_to_items();
337            }
338            // Destructive stages like `C`, `Z`, and `JQ` intentionally stop
339            // claiming the result is still guide/help-shaped semantic output.
340            SemanticEffect::Degrade => {
341                self.document = None;
342            }
343        }
344        Ok(())
345    }
346
347    fn apply_stream_stage(&mut self, stage: &CompiledStage) -> Result<()> {
348        let stream = match std::mem::replace(
349            &mut self.items,
350            PipelineItems::RowStream(Box::new(std::iter::empty())),
351        ) {
352            PipelineItems::RowStream(stream) => stream,
353            PipelineItems::Materialized(items) => {
354                debug_assert!(
355                    false,
356                    "apply_stream_stage called after pipeline had already materialized"
357                );
358                self.items = PipelineItems::Materialized(items);
359                return Ok(());
360            }
361            PipelineItems::Semantic(value) => {
362                debug_assert!(
363                    false,
364                    "apply_stream_stage called for semantic payload execution"
365                );
366                self.items = PipelineItems::Semantic(value);
367                return Ok(());
368            }
369        };
370
371        if let Some(plan) = stage.quick_plan().cloned() {
372            self.items =
373                PipelineItems::RowStream(Box::new(quick::stream_rows_with_plan(stream, plan)));
374            return Ok(());
375        }
376
377        self.items = PipelineItems::RowStream(match stage {
378            CompiledStage::Filter(plan) => {
379                let plan = plan.clone();
380                Box::new(stream.filter_map(move |row| match row {
381                    Ok(row) if plan.matches(&row) => Some(Ok(row)),
382                    Ok(_) => None,
383                    Err(err) => Some(Err(err)),
384                }))
385            }
386            CompiledStage::Project(plan) => {
387                let plan = plan.clone();
388                stream_row_fanout_result(stream, move |row| plan.project_row(&row))
389            }
390            CompiledStage::Unroll(plan) => {
391                let plan = plan.clone();
392                stream_row_fanout_result(stream, move |row| plan.expand_row(&row))
393            }
394            CompiledStage::Values(plan) => {
395                let plan = plan.clone();
396                stream_row_fanout(stream, move |row| plan.extract_row(&row))
397            }
398            CompiledStage::Limit(spec) => {
399                debug_assert!(spec.is_head_only());
400                Box::new(
401                    stream
402                        .skip(spec.offset as usize)
403                        .take(spec.count.max(0) as usize),
404                )
405            }
406            CompiledStage::Copy => stream,
407            CompiledStage::Clean => Box::new(stream.filter_map(|row| match row {
408                Ok(row) => question::clean_row(row).map(Ok),
409                Err(err) => Some(Err(err)),
410            })),
411            other => {
412                return Err(anyhow!(
413                    "stream stage not implemented for compiled stage: {:?}",
414                    other
415                ));
416            }
417        });
418        Ok(())
419    }
420
421    fn apply_flat_stage(
422        &mut self,
423        items: OutputItems,
424        stage: &CompiledStage,
425    ) -> Result<OutputItems> {
426        if let Some(plan) = stage.quick_plan() {
427            return match items {
428                OutputItems::Rows(rows) => {
429                    quick::apply_with_plan(rows, plan).map(OutputItems::Rows)
430                }
431                OutputItems::Groups(groups) => {
432                    quick::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
433                }
434            };
435        }
436
437        match stage {
438            CompiledStage::Filter(plan) => match items {
439                OutputItems::Rows(rows) => {
440                    filter::apply_with_plan(rows, plan).map(OutputItems::Rows)
441                }
442                OutputItems::Groups(groups) => {
443                    filter::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
444                }
445            },
446            CompiledStage::Project(plan) => match items {
447                OutputItems::Rows(rows) => {
448                    project::apply_with_plan(rows, plan).map(OutputItems::Rows)
449                }
450                OutputItems::Groups(groups) => {
451                    project::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
452                }
453            },
454            CompiledStage::Unroll(plan) => match items {
455                OutputItems::Rows(rows) => {
456                    unroll::apply_with_plan(rows, plan).map(OutputItems::Rows)
457                }
458                OutputItems::Groups(groups) => {
459                    unroll::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
460                }
461            },
462            CompiledStage::Values(plan) => match items {
463                OutputItems::Rows(rows) => {
464                    values::apply_with_plan(rows, plan).map(OutputItems::Rows)
465                }
466                OutputItems::Groups(groups) => {
467                    values::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
468                }
469            },
470            CompiledStage::Limit(spec) => match items {
471                OutputItems::Rows(rows) => {
472                    Ok(OutputItems::Rows(limit::apply_with_spec(rows, *spec)))
473                }
474                OutputItems::Groups(groups) => {
475                    Ok(OutputItems::Groups(limit::apply_with_spec(groups, *spec)))
476                }
477            },
478            CompiledStage::Sort(plan) => sort::apply_with_plan(items, plan),
479            CompiledStage::Group(spec) => match items {
480                OutputItems::Rows(rows) => Ok(OutputItems::Groups(group::group_rows_with_plan(
481                    rows, spec,
482                )?)),
483                OutputItems::Groups(groups) => Ok(OutputItems::Groups(
484                    group::regroup_groups_with_plan(groups, spec)?,
485                )),
486            },
487            CompiledStage::Aggregate(plan) => aggregate::apply_with_plan(items, plan),
488            CompiledStage::Collapse => collapse::apply(items),
489            CompiledStage::CountMacro => aggregate::count_macro(items, ""),
490            CompiledStage::Copy => Ok(match items {
491                OutputItems::Rows(rows) => OutputItems::Rows(copy::apply(rows)),
492                OutputItems::Groups(groups) => OutputItems::Groups(groups),
493            }),
494            CompiledStage::Clean => Ok(question::clean_items(items)),
495            CompiledStage::Jq(expr) => jq::apply_with_expr(items, expr),
496            CompiledStage::Quick(_)
497            | CompiledStage::Question(_)
498            | CompiledStage::ValueQuick(_)
499            | CompiledStage::KeyQuick(_) => Err(anyhow!(
500                "quick family should have been handled before flat-stage dispatch"
501            )),
502        }
503    }
504
505    fn materialize_items(&mut self) -> Result<OutputItems> {
506        match std::mem::replace(
507            &mut self.items,
508            PipelineItems::Materialized(OutputItems::Rows(Vec::new())),
509        ) {
510            PipelineItems::RowStream(stream) => {
511                let rows = materialize_row_stream(stream)?;
512                Ok(OutputItems::Rows(rows))
513            }
514            PipelineItems::Materialized(items) => Ok(items),
515            PipelineItems::Semantic(value) => Ok(output_items_from_value(value)),
516        }
517    }
518
519    fn finish_items(&mut self) -> Result<OutputItems> {
520        self.materialize_items()
521    }
522
523    fn into_output_result(mut self) -> Result<OutputResult> {
524        // Capture the semantic value before finish_items replaces self.items via
525        // mem::replace — after that call self.items is always Materialized.
526        let semantic_value = if let PipelineItems::Semantic(ref v) = self.items {
527            Some(v.clone())
528        } else {
529            None
530        };
531        let items = self.finish_items()?;
532        let meta = self.build_output_meta(&items);
533        let document = match semantic_value {
534            Some(value) => self.document.map(|document| OutputDocument {
535                kind: document.kind,
536                value,
537            }),
538            None => self.document,
539        };
540
541        Ok(OutputResult {
542            items,
543            document,
544            meta,
545        })
546    }
547
548    fn sync_document_to_items(&mut self) {
549        let Some(document) = self.document.as_mut() else {
550            return;
551        };
552        match &self.items {
553            PipelineItems::Materialized(items) => {
554                *document = document.project_over_items(items);
555            }
556            PipelineItems::Semantic(value) => {
557                document.value = value.clone();
558            }
559            PipelineItems::RowStream(_) => {}
560        }
561    }
562
563    fn build_output_meta(&self, items: &OutputItems) -> OutputMeta {
564        let key_index = match items {
565            OutputItems::Rows(rows) => RowContext::from_rows(rows).key_index().to_vec(),
566            OutputItems::Groups(groups) => {
567                let headers = groups.iter().map(merged_group_header).collect::<Vec<_>>();
568                RowContext::from_rows(&headers).key_index().to_vec()
569            }
570        };
571
572        OutputMeta {
573            key_index,
574            column_align: Vec::new(),
575            wants_copy: self.wants_copy,
576            grouped: matches!(items, OutputItems::Groups(_)),
577            render_recommendation: self.render_recommendation,
578        }
579    }
580}
581
582fn materialize_row_stream(stream: RowStream) -> Result<Vec<Row>> {
583    stream.collect()
584}
585
586fn stream_row_fanout<I, F>(stream: RowStream, fanout: F) -> RowStream
587where
588    I: IntoIterator<Item = Row>,
589    F: Fn(Row) -> I + 'static,
590{
591    Box::new(stream.flat_map(move |row| {
592        match row {
593            Ok(row) => fanout(row)
594                .into_iter()
595                .map(Ok)
596                .collect::<Vec<_>>()
597                .into_iter(),
598            Err(err) => vec![Err(err)].into_iter(),
599        }
600    }))
601}
602
603fn stream_row_fanout_result<I, F>(stream: RowStream, fanout: F) -> RowStream
604where
605    I: IntoIterator<Item = Row>,
606    F: Fn(Row) -> Result<I> + 'static,
607{
608    Box::new(stream.flat_map(move |row| match row {
609        Ok(row) => match fanout(row) {
610            Ok(rows) => rows.into_iter().map(Ok).collect::<Vec<_>>().into_iter(),
611            Err(err) => vec![Err(err)].into_iter(),
612        },
613        Err(err) => vec![Err(err)].into_iter(),
614    }))
615}
616
617fn merged_group_header(group: &crate::core::output_model::Group) -> Row {
618    let mut row = group.groups.clone();
619    row.extend(group.aggregates.clone());
620    row
621}
622
623fn resolve_stage_execution_route(
624    items: &PipelineItems,
625    behavior: StageBehavior,
626) -> StageExecutionRoute {
627    match items {
628        PipelineItems::Semantic(_) => StageExecutionRoute::Semantic(behavior.semantic_effect),
629        PipelineItems::RowStream(_) if behavior.can_stream => StageExecutionRoute::Stream,
630        PipelineItems::RowStream(_) | PipelineItems::Materialized(_) => {
631            StageExecutionRoute::Materialized
632        }
633    }
634}
635
636#[cfg(test)]
637#[path = "tests/engine.rs"]
638mod tests;