Skip to main content

osp_cli/dsl/
engine.rs

1//! Self-contained executor body for the canonical DSL.
2//!
3//! The document-first executor keeps semantic JSON canonical through the
4//! pipeline and lowers to rows/groups only where verb semantics need that
5//! substrate.
6//!
7//! Rule of thumb:
8//! - selector verbs narrow or rewrite addressed structure
9//! - collection verbs operate on row/group collections
10//! - the semantic payload stays canonical JSON until a stage intentionally
11//!   degrades it
12//!
13//! Example:
14//! - `help | P commands[].name` stays on the semantic path and rebuilds
15//!   `{"commands": [{"name": ...}, ...]}`
16//! - `... | VALUE name` then transforms that narrowed structure into value rows
17//! - `... | G value` crosses onto the row/group substrate on purpose
18//!
19//! Keep that boundary explicit. If a selector verb starts looking like a custom
20//! row/group traversal, it usually belongs in `verbs::selector` or `verbs::json`
21//! instead of growing new engine-side special cases.
22//!
23//! Caller rule of thumb:
24//!
25//! - [`apply_pipeline`] is the friendly "I already have rows" entrypoint
26//! - [`apply_output_pipeline`] is the continuation path when output already has
27//!   semantic-document or metadata state attached
28//! - [`execute_pipeline_streaming`] is the iterator-oriented path when callers
29//!   want streamable stages to avoid eager materialization
30
31use crate::core::{
32    output_model::{
33        OutputDocument, OutputItems, OutputMeta, OutputResult, RenderRecommendation,
34        output_items_from_value,
35    },
36    row::Row,
37};
38use anyhow::{Result, anyhow};
39
40use super::value as value_stage;
41use crate::dsl::verbs::{
42    aggregate, collapse, copy, filter, group, jq, limit, project, question, quick, sort, unroll,
43    values,
44};
45use crate::dsl::{
46    compiled::{CompiledPipeline, CompiledStage, SemanticEffect},
47    eval::context::RowContext,
48    parse::pipeline::parse_stage_list,
49};
50
51/// Apply a pipeline to plain row output.
52///
53/// Use this when a command has already produced `Vec<Row>` and you want the
54/// ordinary `osp` pipeline behavior without thinking about existing output
55/// metadata.
56///
57/// This starts with `wants_copy = false` because there is no prior output meta
58/// to preserve.
59///
60/// # Examples
61///
62/// ```
63/// use osp_cli::dsl::apply_pipeline;
64/// use osp_cli::row;
65///
66/// let output = apply_pipeline(
67///     vec![
68///         row! { "uid" => "alice", "team" => "ops" },
69///         row! { "uid" => "bob", "team" => "infra" },
70///     ],
71///     &["F team=ops".to_string(), "P uid".to_string()],
72/// )?;
73///
74/// let rows = output.as_rows().unwrap();
75/// assert_eq!(rows.len(), 1);
76/// assert_eq!(rows[0]["uid"], "alice");
77/// assert!(!rows[0].contains_key("team"));
78/// # Ok::<(), anyhow::Error>(())
79/// ```
80pub fn apply_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
81    apply_output_pipeline(OutputResult::from_rows(rows), stages)
82}
83
84/// Apply a pipeline to existing output without flattening grouped data first.
85///
86/// Unlike `apply_pipeline`, this preserves the incoming `OutputMeta.wants_copy`
87/// bit when continuing an existing output flow.
88///
89/// Use this when the command already produced an [`OutputResult`] and later
90/// stages should inherit its render/document metadata instead of starting from
91/// scratch.
92///
93/// # Examples
94///
95/// ```
96/// use osp_cli::core::output_model::OutputResult;
97/// use osp_cli::dsl::apply_output_pipeline;
98/// use osp_cli::row;
99///
100/// let mut output = OutputResult::from_rows(vec![
101///     row! { "uid" => "alice" },
102///     row! { "uid" => "bob" },
103/// ]);
104/// output.meta.wants_copy = true;
105///
106/// let limited = apply_output_pipeline(output, &["L 1".to_string()])?;
107///
108/// assert!(limited.meta.wants_copy);
109/// assert_eq!(limited.as_rows().unwrap().len(), 1);
110/// # Ok::<(), anyhow::Error>(())
111/// ```
112pub fn apply_output_pipeline(output: OutputResult, stages: &[String]) -> Result<OutputResult> {
113    execute_pipeline_items(
114        output.items,
115        output.document,
116        output.meta.wants_copy,
117        output.meta.render_recommendation,
118        stages,
119    )
120}
121
122/// Execute a pipeline starting from plain rows.
123///
124/// This is the lower-level row entrypoint used by tests and internal helpers.
125/// Like `apply_pipeline`, it starts with `wants_copy = false`.
126///
127/// Prefer [`apply_pipeline`] for the common "rows in, output out" path. This
128/// entrypoint is more useful when you want the execution wording to match the
129/// streaming variant below.
130///
131/// # Examples
132///
133/// ```
134/// use osp_cli::dsl::execute_pipeline;
135/// use osp_cli::row;
136///
137/// let output = execute_pipeline(
138///     vec![
139///         row! { "uid" => "bob" },
140///         row! { "uid" => "alice" },
141///     ],
142///     &["S uid".to_string()],
143/// )?;
144///
145/// assert_eq!(output.as_rows().unwrap()[0]["uid"], "alice");
146/// # Ok::<(), anyhow::Error>(())
147/// ```
148pub fn execute_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
149    execute_pipeline_streaming(rows, stages)
150}
151
152/// Execute a pipeline from any row iterator.
153///
154/// This keeps flat row stages on an iterator-backed path until a stage
155/// requires full materialization (for example sort/group/aggregate/jq).
156///
157/// Use this when rows come from an iterator and you want streamable stages like
158/// `F`, `P`, `U`, and head-only `L` to stay incremental for as long as
159/// possible.
160///
161/// # Examples
162///
163/// ```
164/// use osp_cli::dsl::execute_pipeline_streaming;
165/// use osp_cli::row;
166///
167/// let output = execute_pipeline_streaming(
168///     vec![
169///         row! { "uid" => "alice" },
170///         row! { "uid" => "bob" },
171///     ],
172///     &["L 1".to_string()],
173/// )?;
174///
175/// assert_eq!(output.as_rows().unwrap()[0]["uid"], "alice");
176/// assert_eq!(output.as_rows().unwrap().len(), 1);
177/// # Ok::<(), anyhow::Error>(())
178/// ```
179pub fn execute_pipeline_streaming<I>(rows: I, stages: &[String]) -> Result<OutputResult>
180where
181    I: IntoIterator<Item = Row>,
182    I::IntoIter: 'static,
183{
184    let parsed = parse_stage_list(stages)?;
185    let compiled = CompiledPipeline::from_parsed(parsed)?;
186    PipelineExecutor::new_stream(rows.into_iter(), false, compiled).run()
187}
188
189fn execute_pipeline_items(
190    items: OutputItems,
191    initial_document: Option<OutputDocument>,
192    initial_wants_copy: bool,
193    initial_render_recommendation: Option<RenderRecommendation>,
194    stages: &[String],
195) -> Result<OutputResult> {
196    let parsed = parse_stage_list(stages)?;
197    let compiled = CompiledPipeline::from_parsed(parsed)?;
198    PipelineExecutor::new(
199        items,
200        initial_document,
201        initial_wants_copy,
202        initial_render_recommendation,
203        compiled,
204    )
205    .run()
206}
207
208/// Small stateful executor for one parsed pipeline.
209///
210/// Keeping execution state on a struct makes it easier to read the pipeline
211/// flow without carrying `items` / `wants_copy` through every helper.
212type RowStream = Box<dyn Iterator<Item = Result<Row>>>;
213
214enum PipelineItems {
215    RowStream(RowStream),
216    Materialized(OutputItems),
217    Semantic(serde_json::Value),
218}
219
220struct PipelineExecutor {
221    items: PipelineItems,
222    document: Option<OutputDocument>,
223    wants_copy: bool,
224    render_recommendation: Option<RenderRecommendation>,
225    compiled: CompiledPipeline,
226}
227
228impl PipelineExecutor {
229    fn new(
230        items: OutputItems,
231        document: Option<OutputDocument>,
232        wants_copy: bool,
233        render_recommendation: Option<RenderRecommendation>,
234        compiled: CompiledPipeline,
235    ) -> Self {
236        let items = if let Some(document) = document.as_ref() {
237            // Semantic payloads stay canonical as JSON through the DSL.
238            // Generic rows/groups are derived only when the pipeline needs to
239            // emit the final `OutputResult`.
240            PipelineItems::Semantic(document.value.clone())
241        } else {
242            match items {
243                OutputItems::Rows(rows) => {
244                    PipelineItems::RowStream(Box::new(rows.into_iter().map(Ok)))
245                }
246                OutputItems::Groups(groups) => {
247                    PipelineItems::Materialized(OutputItems::Groups(groups))
248                }
249            }
250        };
251        Self {
252            items,
253            document,
254            wants_copy,
255            render_recommendation,
256            compiled,
257        }
258    }
259
260    fn new_stream<I>(rows: I, wants_copy: bool, compiled: CompiledPipeline) -> Self
261    where
262        I: Iterator<Item = Row> + 'static,
263    {
264        Self {
265            items: PipelineItems::RowStream(Box::new(rows.map(Ok))),
266            document: None,
267            wants_copy,
268            render_recommendation: None,
269            compiled,
270        }
271    }
272
273    fn run(mut self) -> Result<OutputResult> {
274        let stages = self.compiled.stages.clone();
275        for stage in &stages {
276            self.apply_stage(stage)?;
277        }
278        self.into_output_result()
279    }
280
281    fn apply_stage(&mut self, stage: &CompiledStage) -> Result<()> {
282        if !stage.preserves_render_recommendation() {
283            self.render_recommendation = None;
284        }
285
286        if matches!(self.items, PipelineItems::Semantic(_)) {
287            self.apply_semantic_stage(stage)?;
288            return Ok(());
289        }
290
291        if stage.can_stream()
292            && let PipelineItems::RowStream(_) = self.items
293        {
294            self.apply_stream_stage(stage)?;
295            return Ok(());
296        }
297
298        let items = self.materialize_items()?;
299        self.items = PipelineItems::Materialized(self.apply_flat_stage(items, stage)?);
300        self.sync_document_to_items();
301        Ok(())
302    }
303
304    fn apply_semantic_stage(&mut self, stage: &CompiledStage) -> Result<()> {
305        let items = std::mem::replace(
306            &mut self.items,
307            PipelineItems::Semantic(serde_json::Value::Null),
308        );
309        let PipelineItems::Semantic(value) = items else {
310            self.items = items;
311            return Err(anyhow!("semantic stage dispatch requires semantic items"));
312        };
313
314        if matches!(stage, CompiledStage::Copy) {
315            self.wants_copy = true;
316        }
317
318        let transformed = value_stage::apply_stage(value, stage)?;
319        self.items = PipelineItems::Semantic(transformed);
320        match stage.semantic_effect() {
321            // Preserve/transform both keep the semantic payload attached. The
322            // renderer decides later whether the transformed JSON still
323            // restores as the original semantic kind.
324            SemanticEffect::Preserve | SemanticEffect::Transform => {
325                self.sync_document_to_items();
326            }
327            // Destructive stages like `C`, `Z`, and `JQ` intentionally stop
328            // claiming the result is still guide/help-shaped semantic output.
329            SemanticEffect::Degrade => {
330                self.document = None;
331            }
332        }
333        Ok(())
334    }
335
336    fn apply_stream_stage(&mut self, stage: &CompiledStage) -> Result<()> {
337        let stream = match std::mem::replace(
338            &mut self.items,
339            PipelineItems::RowStream(Box::new(std::iter::empty())),
340        ) {
341            PipelineItems::RowStream(stream) => stream,
342            PipelineItems::Materialized(items) => {
343                debug_assert!(
344                    false,
345                    "apply_stream_stage called after pipeline had already materialized"
346                );
347                self.items = PipelineItems::Materialized(items);
348                return Ok(());
349            }
350            PipelineItems::Semantic(value) => {
351                debug_assert!(
352                    false,
353                    "apply_stream_stage called for semantic payload execution"
354                );
355                self.items = PipelineItems::Semantic(value);
356                return Ok(());
357            }
358        };
359
360        self.items = PipelineItems::RowStream(match stage {
361            CompiledStage::Quick(plan) => {
362                Box::new(quick::stream_rows_with_plan(stream, plan.clone()))
363            }
364            CompiledStage::Filter(plan) => {
365                let plan = plan.clone();
366                Box::new(stream.filter_map(move |row| match row {
367                    Ok(row) if plan.matches(&row) => Some(Ok(row)),
368                    Ok(_) => None,
369                    Err(err) => Some(Err(err)),
370                }))
371            }
372            CompiledStage::Project(plan) => {
373                let plan = plan.clone();
374                stream_row_fanout_result(stream, move |row| plan.project_row(&row))
375            }
376            CompiledStage::Unroll(plan) => {
377                let plan = plan.clone();
378                stream_row_fanout_result(stream, move |row| plan.expand_row(&row))
379            }
380            CompiledStage::Values(plan) => {
381                let plan = plan.clone();
382                stream_row_fanout(stream, move |row| plan.extract_row(&row))
383            }
384            CompiledStage::Limit(spec) => {
385                debug_assert!(spec.is_head_only());
386                Box::new(
387                    stream
388                        .skip(spec.offset as usize)
389                        .take(spec.count.max(0) as usize),
390                )
391            }
392            CompiledStage::Copy => {
393                self.wants_copy = true;
394                stream
395            }
396            CompiledStage::ValueQuick(plan)
397            | CompiledStage::KeyQuick(plan)
398            | CompiledStage::Question(plan) => {
399                Box::new(quick::stream_rows_with_plan(stream, plan.clone()))
400            }
401            CompiledStage::Clean => Box::new(stream.filter_map(|row| match row {
402                Ok(row) => question::clean_row(row).map(Ok),
403                Err(err) => Some(Err(err)),
404            })),
405            other => {
406                return Err(anyhow!(
407                    "stream stage not implemented for compiled stage: {:?}",
408                    other
409                ));
410            }
411        });
412        Ok(())
413    }
414
415    fn apply_flat_stage(
416        &mut self,
417        items: OutputItems,
418        stage: &CompiledStage,
419    ) -> Result<OutputItems> {
420        match stage {
421            CompiledStage::Quick(plan) => match items {
422                OutputItems::Rows(rows) => {
423                    quick::apply_with_plan(rows, plan).map(OutputItems::Rows)
424                }
425                OutputItems::Groups(groups) => {
426                    quick::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
427                }
428            },
429            CompiledStage::Filter(plan) => match items {
430                OutputItems::Rows(rows) => {
431                    filter::apply_with_plan(rows, plan).map(OutputItems::Rows)
432                }
433                OutputItems::Groups(groups) => {
434                    filter::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
435                }
436            },
437            CompiledStage::Project(plan) => match items {
438                OutputItems::Rows(rows) => {
439                    project::apply_with_plan(rows, plan).map(OutputItems::Rows)
440                }
441                OutputItems::Groups(groups) => {
442                    project::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
443                }
444            },
445            CompiledStage::Unroll(plan) => match items {
446                OutputItems::Rows(rows) => {
447                    unroll::apply_with_plan(rows, plan).map(OutputItems::Rows)
448                }
449                OutputItems::Groups(groups) => {
450                    unroll::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
451                }
452            },
453            CompiledStage::Values(plan) => match items {
454                OutputItems::Rows(rows) => {
455                    values::apply_with_plan(rows, plan).map(OutputItems::Rows)
456                }
457                OutputItems::Groups(groups) => {
458                    values::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
459                }
460            },
461            CompiledStage::ValueQuick(plan)
462            | CompiledStage::KeyQuick(plan)
463            | CompiledStage::Question(plan) => match items {
464                OutputItems::Rows(rows) => {
465                    quick::apply_with_plan(rows, plan).map(OutputItems::Rows)
466                }
467                OutputItems::Groups(groups) => {
468                    quick::apply_groups_with_plan(groups, plan).map(OutputItems::Groups)
469                }
470            },
471            CompiledStage::Limit(spec) => match items {
472                OutputItems::Rows(rows) => {
473                    Ok(OutputItems::Rows(limit::apply_with_spec(rows, *spec)))
474                }
475                OutputItems::Groups(groups) => {
476                    Ok(OutputItems::Groups(limit::apply_with_spec(groups, *spec)))
477                }
478            },
479            CompiledStage::Sort(plan) => sort::apply_with_plan(items, plan),
480            CompiledStage::Group(spec) => match items {
481                OutputItems::Rows(rows) => Ok(OutputItems::Groups(group::group_rows_with_plan(
482                    rows, spec,
483                )?)),
484                OutputItems::Groups(groups) => Ok(OutputItems::Groups(
485                    group::regroup_groups_with_plan(groups, spec)?,
486                )),
487            },
488            CompiledStage::Aggregate(plan) => aggregate::apply_with_plan(items, plan),
489            CompiledStage::Collapse => collapse::apply(items),
490            CompiledStage::CountMacro => aggregate::count_macro(items, ""),
491            CompiledStage::Copy => {
492                self.wants_copy = true;
493                Ok(match items {
494                    OutputItems::Rows(rows) => OutputItems::Rows(copy::apply(rows)),
495                    OutputItems::Groups(groups) => OutputItems::Groups(groups),
496                })
497            }
498            CompiledStage::Clean => Ok(question::clean_items(items)),
499            CompiledStage::Jq(expr) => jq::apply_with_expr(items, expr),
500        }
501    }
502
503    fn materialize_items(&mut self) -> Result<OutputItems> {
504        match std::mem::replace(
505            &mut self.items,
506            PipelineItems::Materialized(OutputItems::Rows(Vec::new())),
507        ) {
508            PipelineItems::RowStream(stream) => {
509                let rows = materialize_row_stream(stream)?;
510                Ok(OutputItems::Rows(rows))
511            }
512            PipelineItems::Materialized(items) => Ok(items),
513            PipelineItems::Semantic(value) => Ok(output_items_from_value(value)),
514        }
515    }
516
517    fn finish_items(&mut self) -> Result<OutputItems> {
518        self.materialize_items()
519    }
520
521    fn into_output_result(mut self) -> Result<OutputResult> {
522        // Capture the semantic value before finish_items replaces self.items via
523        // mem::replace — after that call self.items is always Materialized.
524        let semantic_value = if let PipelineItems::Semantic(ref v) = self.items {
525            Some(v.clone())
526        } else {
527            None
528        };
529        let items = self.finish_items()?;
530        let meta = self.build_output_meta(&items);
531        let document = match semantic_value {
532            Some(value) => self.document.map(|document| OutputDocument {
533                kind: document.kind,
534                value,
535            }),
536            None => self.document,
537        };
538
539        Ok(OutputResult {
540            items,
541            document,
542            meta,
543        })
544    }
545
546    fn sync_document_to_items(&mut self) {
547        let Some(document) = self.document.as_mut() else {
548            return;
549        };
550        match &self.items {
551            PipelineItems::Materialized(items) => {
552                *document = document.project_over_items(items);
553            }
554            PipelineItems::Semantic(value) => {
555                document.value = value.clone();
556            }
557            PipelineItems::RowStream(_) => {}
558        }
559    }
560
561    fn build_output_meta(&self, items: &OutputItems) -> OutputMeta {
562        let key_index = match items {
563            OutputItems::Rows(rows) => RowContext::from_rows(rows).key_index().to_vec(),
564            OutputItems::Groups(groups) => {
565                let headers = groups.iter().map(merged_group_header).collect::<Vec<_>>();
566                RowContext::from_rows(&headers).key_index().to_vec()
567            }
568        };
569
570        OutputMeta {
571            key_index,
572            column_align: Vec::new(),
573            wants_copy: self.wants_copy,
574            grouped: matches!(items, OutputItems::Groups(_)),
575            render_recommendation: self.render_recommendation,
576        }
577    }
578}
579
580fn materialize_row_stream(stream: RowStream) -> Result<Vec<Row>> {
581    stream.collect()
582}
583
584fn stream_row_fanout<I, F>(stream: RowStream, fanout: F) -> RowStream
585where
586    I: IntoIterator<Item = Row>,
587    F: Fn(Row) -> I + 'static,
588{
589    Box::new(stream.flat_map(move |row| {
590        match row {
591            Ok(row) => fanout(row)
592                .into_iter()
593                .map(Ok)
594                .collect::<Vec<_>>()
595                .into_iter(),
596            Err(err) => vec![Err(err)].into_iter(),
597        }
598    }))
599}
600
601fn stream_row_fanout_result<I, F>(stream: RowStream, fanout: F) -> RowStream
602where
603    I: IntoIterator<Item = Row>,
604    F: Fn(Row) -> Result<I> + 'static,
605{
606    Box::new(stream.flat_map(move |row| match row {
607        Ok(row) => match fanout(row) {
608            Ok(rows) => rows.into_iter().map(Ok).collect::<Vec<_>>().into_iter(),
609            Err(err) => vec![Err(err)].into_iter(),
610        },
611        Err(err) => vec![Err(err)].into_iter(),
612    }))
613}
614
615fn merged_group_header(group: &crate::core::output_model::Group) -> Row {
616    let mut row = group.groups.clone();
617    row.extend(group.aggregates.clone());
618    row
619}
620
621#[cfg(test)]
622#[path = "tests/engine.rs"]
623mod tests;