Skip to main content

osp_cli/dsl/eval/
engine.rs

1use crate::core::{
2    output_model::{OutputItems, OutputMeta, OutputResult},
3    row::Row,
4};
5use anyhow::{Result, anyhow};
6
7use crate::dsl::{
8    eval::context::RowContext,
9    model::{ParsedPipeline, ParsedStage, ParsedStageKind},
10    parse::pipeline::parse_stage_list,
11    stages::{
12        aggregate, collapse, copy, filter, group, jq, limit, project, question, quick, sort, values,
13    },
14    verbs::stage_can_stream_rows,
15};
16
17/// Apply a pipeline to plain row output.
18///
19/// This starts with `wants_copy = false` because there is no prior output meta
20/// to preserve.
21pub fn apply_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
22    apply_output_pipeline(OutputResult::from_rows(rows), stages)
23}
24
25/// Apply a pipeline to existing output without flattening grouped data first.
26///
27/// Unlike `apply_pipeline`, this preserves the incoming `OutputMeta.wants_copy`
28/// bit when continuing an existing output flow.
29pub fn apply_output_pipeline(output: OutputResult, stages: &[String]) -> Result<OutputResult> {
30    execute_pipeline_items(output.items, output.meta.wants_copy, stages)
31}
32
33/// Execute a pipeline starting from plain rows.
34///
35/// This is the lower-level row entrypoint used by tests and internal helpers.
36/// Like `apply_pipeline`, it starts with `wants_copy = false`.
37pub fn execute_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
38    execute_pipeline_streaming(rows, stages)
39}
40
41/// Execute a pipeline from any row iterator.
42///
43/// This keeps flat row stages on an iterator-backed path until a stage
44/// requires full materialization (for example sort/group/aggregate/jq).
45pub fn execute_pipeline_streaming<I>(rows: I, stages: &[String]) -> Result<OutputResult>
46where
47    I: IntoIterator<Item = Row>,
48    I::IntoIter: 'static,
49{
50    let parsed = parse_stage_list(stages)?;
51    PipelineExecutor::new_stream(rows.into_iter(), false, parsed).run()
52}
53
54fn execute_pipeline_items(
55    items: OutputItems,
56    initial_wants_copy: bool,
57    stages: &[String],
58) -> Result<OutputResult> {
59    let parsed = parse_stage_list(stages)?;
60    PipelineExecutor::new(items, initial_wants_copy, parsed).run()
61}
62
63/// Small stateful executor for one parsed pipeline.
64///
65/// Keeping execution state on a struct makes it easier to read the pipeline
66/// flow without carrying `items` / `wants_copy` through every helper.
67type RowStream = Box<dyn Iterator<Item = Result<Row>>>;
68
69enum PipelineItems {
70    RowStream(RowStream),
71    Materialized(OutputItems),
72}
73
74struct PipelineExecutor {
75    items: PipelineItems,
76    wants_copy: bool,
77    parsed: ParsedPipeline,
78}
79
80impl PipelineExecutor {
81    fn new(items: OutputItems, wants_copy: bool, parsed: ParsedPipeline) -> Self {
82        Self {
83            items: match items {
84                OutputItems::Rows(rows) => {
85                    PipelineItems::RowStream(Box::new(rows.into_iter().map(Ok)))
86                }
87                OutputItems::Groups(groups) => {
88                    PipelineItems::Materialized(OutputItems::Groups(groups))
89                }
90            },
91            wants_copy,
92            parsed,
93        }
94    }
95
96    fn new_stream<I>(rows: I, wants_copy: bool, parsed: ParsedPipeline) -> Self
97    where
98        I: Iterator<Item = Row> + 'static,
99    {
100        Self {
101            items: PipelineItems::RowStream(Box::new(rows.map(Ok))),
102            wants_copy,
103            parsed,
104        }
105    }
106
107    fn run(mut self) -> Result<OutputResult> {
108        let stages = self.parsed.stages.clone();
109        for stage in &stages {
110            if stage.verb.is_empty() {
111                continue;
112            }
113            self.apply_stage(stage)?;
114        }
115
116        let items = self.finish_items()?;
117        let meta = self.build_output_meta(&items);
118
119        Ok(OutputResult { meta, items })
120    }
121
122    fn apply_stage(&mut self, stage: &ParsedStage) -> Result<()> {
123        if stage_can_stream_rows(stage)
124            && let PipelineItems::RowStream(_) = self.items
125        {
126            self.apply_stream_stage(stage)?;
127            return Ok(());
128        }
129
130        let items = self.materialize_items()?;
131        self.items = PipelineItems::Materialized(match stage.kind {
132            ParsedStageKind::Quick => self.apply_quick_stage(items, stage)?,
133            ParsedStageKind::UnknownExplicit => {
134                return Err(anyhow!("unknown DSL verb: {}", stage.verb));
135            }
136            ParsedStageKind::Explicit => self.apply_explicit_stage(items, stage)?,
137        });
138        Ok(())
139    }
140
141    fn apply_stream_stage(&mut self, stage: &ParsedStage) -> Result<()> {
142        let stream = match std::mem::replace(
143            &mut self.items,
144            PipelineItems::RowStream(Box::new(std::iter::empty())),
145        ) {
146            PipelineItems::RowStream(stream) => stream,
147            PipelineItems::Materialized(items) => {
148                debug_assert!(
149                    false,
150                    "apply_stream_stage called after pipeline had already materialized"
151                );
152                self.items = PipelineItems::Materialized(items);
153                return Ok(());
154            }
155        };
156
157        self.items = PipelineItems::RowStream(match stage.verb.as_str() {
158            _ if matches!(stage.kind, ParsedStageKind::Quick) => {
159                let plan = quick::compile(&stage.raw)?;
160                Box::new(quick::stream_rows_with_plan(stream, plan))
161            }
162            "F" => {
163                let plan = filter::compile(&stage.spec)?;
164                Box::new(stream.filter_map(move |row| match row {
165                    Ok(row) if plan.matches(&row) => Some(Ok(row)),
166                    Ok(_) => None,
167                    Err(err) => Some(Err(err)),
168                }))
169            }
170            "P" => {
171                let plan = project::compile(&stage.spec)?;
172                stream_row_fanout(stream, move |row| plan.project_row(&row))
173            }
174            "VAL" | "VALUE" => {
175                let plan = values::compile(&stage.spec);
176                stream_row_fanout(stream, move |row| plan.extract_row(&row))
177            }
178            "L" => {
179                let spec = limit::parse_limit_spec(&stage.spec)?;
180                debug_assert!(spec.is_head_only());
181                Box::new(
182                    stream
183                        .skip(spec.offset as usize)
184                        .take(spec.count.max(0) as usize),
185                )
186            }
187            "Y" => {
188                self.wants_copy = true;
189                stream
190            }
191            "V" | "K" => {
192                let plan = quick::compile(&format!(
193                    "{}{}{}",
194                    stage.verb,
195                    if stage.spec.is_empty() { "" } else { " " },
196                    stage.spec
197                ))?;
198                Box::new(quick::stream_rows_with_plan(stream, plan))
199            }
200            "U" => {
201                let field = stage.spec.trim();
202                if field.is_empty() {
203                    return Err(anyhow!("U: missing field name to unroll"));
204                }
205                let plan = project::compile(&format!("{field}[]"))?;
206                stream_row_fanout(stream, move |row| plan.project_row(&row))
207            }
208            "?" => {
209                if stage.spec.trim().is_empty() {
210                    Box::new(stream.filter_map(|row| match row {
211                        Ok(row) => question::clean_row(row).map(Ok),
212                        Err(err) => Some(Err(err)),
213                    }))
214                } else {
215                    let plan = quick::compile(&format!("? {}", stage.spec))?;
216                    Box::new(quick::stream_rows_with_plan(stream, plan))
217                }
218            }
219            other => return Err(anyhow!("stream stage not implemented for verb: {other}")),
220        });
221        Ok(())
222    }
223
224    fn apply_quick_stage(&self, items: OutputItems, stage: &ParsedStage) -> Result<OutputItems> {
225        map_rows(items, |rows| quick::apply(rows, &stage.raw))
226    }
227
228    fn apply_explicit_stage(
229        &mut self,
230        items: OutputItems,
231        stage: &ParsedStage,
232    ) -> Result<OutputItems> {
233        match stage.verb.as_str() {
234            "P" => self.project(items, stage),
235            // `V` is a quick-search scope alias ("value-only"), not a values stage.
236            "V" => self.apply_quick_alias(items, stage, "V"),
237            "K" => self.apply_quick_alias(items, stage, "K"),
238            // `VAL` / `VALUE` produce explicit `{"value": ...}` rows.
239            "VAL" | "VALUE" => map_rows(items, |rows| values::apply(rows, &stage.spec)),
240            "F" => self.filter(items, stage),
241            "G" => self.group(items, stage),
242            "A" => aggregate::apply(items, &stage.spec),
243            "S" => sort::apply(items, &stage.spec),
244            "L" => self.limit(items, stage),
245            "Z" => Ok(collapse::apply(items)),
246            "C" => aggregate::count_macro(items, &stage.spec),
247            "Y" => self.copy(items, stage),
248            "U" => self.unroll(items, stage),
249            "?" => question::apply(items, &stage.spec),
250            "JQ" => jq::apply(items, &stage.spec),
251            _ => Err(anyhow!("unknown DSL verb: {}", stage.verb)),
252        }
253    }
254
255    fn project(&self, items: OutputItems, stage: &ParsedStage) -> Result<OutputItems> {
256        match items {
257            OutputItems::Rows(rows) => Ok(OutputItems::Rows(project::apply(rows, &stage.spec)?)),
258            OutputItems::Groups(groups) => Ok(OutputItems::Groups(project::apply_groups(
259                groups,
260                &stage.spec,
261            )?)),
262        }
263    }
264
265    fn filter(&self, items: OutputItems, stage: &ParsedStage) -> Result<OutputItems> {
266        match items {
267            OutputItems::Rows(rows) => Ok(OutputItems::Rows(filter::apply(rows, &stage.spec)?)),
268            OutputItems::Groups(groups) => Ok(OutputItems::Groups(filter::apply_groups(
269                groups,
270                &stage.spec,
271            )?)),
272        }
273    }
274
275    fn group(&self, items: OutputItems, stage: &ParsedStage) -> Result<OutputItems> {
276        match items {
277            OutputItems::Rows(rows) => {
278                Ok(OutputItems::Groups(group::group_rows(rows, &stage.spec)?))
279            }
280            OutputItems::Groups(groups) => Ok(OutputItems::Groups(group::regroup_groups(
281                groups,
282                &stage.spec,
283            )?)),
284        }
285    }
286
287    fn limit(&self, items: OutputItems, stage: &ParsedStage) -> Result<OutputItems> {
288        match items {
289            OutputItems::Rows(rows) => Ok(OutputItems::Rows(limit::apply(rows, &stage.spec)?)),
290            OutputItems::Groups(groups) => {
291                Ok(OutputItems::Groups(limit::apply(groups, &stage.spec)?))
292            }
293        }
294    }
295
296    fn copy(&mut self, items: OutputItems, _stage: &ParsedStage) -> Result<OutputItems> {
297        self.wants_copy = true;
298        map_rows(items, |rows| Ok(copy::apply(rows)))
299    }
300
301    fn unroll(&self, items: OutputItems, stage: &ParsedStage) -> Result<OutputItems> {
302        let field = stage.spec.trim();
303        if field.is_empty() {
304            return Err(anyhow!("U: missing field name to unroll"));
305        }
306
307        let selector = format!("{field}[]");
308        match items {
309            OutputItems::Rows(rows) => Ok(OutputItems::Rows(project::apply(rows, &selector)?)),
310            OutputItems::Groups(groups) => Ok(OutputItems::Groups(project::apply_groups(
311                groups, &selector,
312            )?)),
313        }
314    }
315
316    fn apply_quick_alias(
317        &self,
318        items: OutputItems,
319        stage: &ParsedStage,
320        alias: &str,
321    ) -> Result<OutputItems> {
322        let quick_spec = if stage.spec.is_empty() {
323            alias.to_string()
324        } else {
325            format!("{alias} {}", stage.spec)
326        };
327        map_rows(items, |rows| quick::apply(rows, &quick_spec))
328    }
329
330    fn materialize_items(&mut self) -> Result<OutputItems> {
331        match std::mem::replace(
332            &mut self.items,
333            PipelineItems::Materialized(OutputItems::Rows(Vec::new())),
334        ) {
335            PipelineItems::RowStream(stream) => {
336                let rows = materialize_row_stream(stream)?;
337                Ok(OutputItems::Rows(rows))
338            }
339            PipelineItems::Materialized(items) => Ok(items),
340        }
341    }
342
343    fn finish_items(&mut self) -> Result<OutputItems> {
344        self.materialize_items()
345    }
346
347    fn build_output_meta(&self, items: &OutputItems) -> OutputMeta {
348        let key_index = match items {
349            OutputItems::Rows(rows) => RowContext::from_rows(rows).key_index().to_vec(),
350            OutputItems::Groups(groups) => {
351                let headers = groups.iter().map(merged_group_header).collect::<Vec<_>>();
352                RowContext::from_rows(&headers).key_index().to_vec()
353            }
354        };
355
356        OutputMeta {
357            key_index,
358            column_align: Vec::new(),
359            wants_copy: self.wants_copy,
360            grouped: matches!(items, OutputItems::Groups(_)),
361        }
362    }
363}
364
365fn materialize_row_stream(stream: RowStream) -> Result<Vec<Row>> {
366    stream.collect()
367}
368
369fn stream_row_fanout<I, F>(stream: RowStream, fanout: F) -> RowStream
370where
371    I: IntoIterator<Item = Row>,
372    F: Fn(Row) -> I + 'static,
373{
374    Box::new(stream.flat_map(move |row| {
375        match row {
376            Ok(row) => fanout(row)
377                .into_iter()
378                .map(Ok)
379                .collect::<Vec<_>>()
380                .into_iter(),
381            Err(err) => vec![Err(err)].into_iter(),
382        }
383    }))
384}
385
386fn merged_group_header(group: &crate::core::output_model::Group) -> Row {
387    let mut row = group.groups.clone();
388    row.extend(group.aggregates.clone());
389    row
390}
391
392fn map_rows(
393    items: OutputItems,
394    map_fn: impl FnOnce(Vec<Row>) -> Result<Vec<Row>>,
395) -> Result<OutputItems> {
396    match items {
397        OutputItems::Rows(rows) => map_fn(rows).map(OutputItems::Rows),
398        // These stages only make sense on flat rows. When the pipeline is
399        // already grouped, leave groups unchanged instead of flattening them
400        // implicitly behind the caller's back.
401        OutputItems::Groups(groups) => Ok(OutputItems::Groups(groups)),
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use crate::core::output_model::{OutputItems, OutputResult};
408    use serde_json::json;
409
410    use super::{
411        apply_output_pipeline, apply_pipeline, execute_pipeline, execute_pipeline_streaming,
412    };
413
414    fn output_rows(output: &OutputResult) -> &[crate::core::row::Row] {
415        output.as_rows().expect("expected row output")
416    }
417
418    #[test]
419    fn project_then_filter_pipeline_works() {
420        let rows = vec![
421            json!({"uid": "oistes", "cn": "Oistein"})
422                .as_object()
423                .cloned()
424                .expect("object"),
425            json!({"uid": "andreasd", "cn": "Andreas"})
426                .as_object()
427                .cloned()
428                .expect("object"),
429        ];
430
431        let stages = vec!["P uid,cn".to_string(), "F uid=oistes".to_string()];
432        let output = apply_pipeline(rows, &stages).expect("pipeline should pass");
433
434        assert_eq!(output_rows(&output).len(), 1);
435        assert_eq!(
436            output_rows(&output)[0]
437                .get("uid")
438                .and_then(|value| value.as_str()),
439            Some("oistes")
440        );
441    }
442
443    #[test]
444    fn bare_quick_stage_without_verb_still_works() {
445        let rows = vec![
446            json!({"uid": "oistes"})
447                .as_object()
448                .cloned()
449                .expect("object"),
450            json!({"uid": "andreasd"})
451                .as_object()
452                .cloned()
453                .expect("object"),
454        ];
455
456        let stages = vec!["oist".to_string()];
457        let output = apply_pipeline(rows, &stages).expect("pipeline should pass");
458        assert_eq!(output_rows(&output).len(), 1);
459    }
460
461    #[test]
462    fn unknown_single_letter_verb_errors() {
463        let rows = vec![
464            json!({"uid": "oistes"})
465                .as_object()
466                .cloned()
467                .expect("object"),
468        ];
469
470        let err =
471            apply_pipeline(rows, &["R oist".to_string()]).expect_err("unknown verb should fail");
472        assert!(err.to_string().contains("unknown DSL verb"));
473    }
474
475    #[test]
476    fn copy_stage_sets_meta_flag() {
477        let rows = vec![
478            json!({"uid": "oistes"})
479                .as_object()
480                .cloned()
481                .expect("object"),
482        ];
483
484        let stages = vec!["Y".to_string()];
485        let output = execute_pipeline(rows, &stages).expect("pipeline should pass");
486
487        assert!(output.meta.wants_copy);
488    }
489
490    #[test]
491    fn value_scope_alias_filters_by_value() {
492        let rows = vec![
493            json!({"uid": "oistes"})
494                .as_object()
495                .cloned()
496                .expect("object"),
497            json!({"uid": "andreasd"})
498                .as_object()
499                .cloned()
500                .expect("object"),
501        ];
502
503        let stages = vec!["V oist".to_string()];
504        let output = apply_pipeline(rows, &stages).expect("pipeline should pass");
505        assert_eq!(output_rows(&output).len(), 1);
506        assert_eq!(
507            output_rows(&output)[0]
508                .get("uid")
509                .and_then(|value| value.as_str()),
510            Some("oistes")
511        );
512    }
513
514    #[test]
515    fn question_stage_cleans_empty_fields() {
516        let rows = vec![
517            json!({"uid": "oistes", "note": "", "tags": []})
518                .as_object()
519                .cloned()
520                .expect("object"),
521            json!({"uid": "andreasd", "note": "ok", "extra": null})
522                .as_object()
523                .cloned()
524                .expect("object"),
525        ];
526
527        let output = apply_pipeline(rows, &["?".to_string()]).expect("pipeline should pass");
528        assert_eq!(output_rows(&output).len(), 2);
529        assert!(output_rows(&output)[0].contains_key("uid"));
530        assert!(!output_rows(&output)[0].contains_key("note"));
531        assert!(!output_rows(&output)[0].contains_key("tags"));
532        assert!(output_rows(&output)[1].contains_key("note"));
533        assert!(!output_rows(&output)[1].contains_key("extra"));
534    }
535
536    #[test]
537    fn question_stage_with_spec_filters_existence() {
538        let rows = vec![
539            json!({"uid": "oistes"})
540                .as_object()
541                .cloned()
542                .expect("object"),
543            json!({"cn": "Andreas"})
544                .as_object()
545                .cloned()
546                .expect("object"),
547        ];
548
549        let output = apply_pipeline(rows, &["? uid".to_string()]).expect("pipeline should pass");
550        assert_eq!(output_rows(&output).len(), 1);
551        assert!(output_rows(&output)[0].contains_key("uid"));
552    }
553
554    #[test]
555    fn streaming_executor_matches_eager_for_streamable_row_pipeline() {
556        let rows = vec![
557            json!({"uid": "alice", "active": true, "members": ["a", "b"]})
558                .as_object()
559                .cloned()
560                .expect("object"),
561            json!({"uid": "bob", "active": false, "members": ["c"]})
562                .as_object()
563                .cloned()
564                .expect("object"),
565        ];
566        let stages = vec![
567            "F active=true".to_string(),
568            "P uid,members[]".to_string(),
569            "L 2".to_string(),
570        ];
571
572        let eager = apply_pipeline(rows.clone(), &stages).expect("eager pipeline should pass");
573        let streaming =
574            execute_pipeline_streaming(rows, &stages).expect("streaming pipeline should pass");
575
576        assert_eq!(streaming, eager);
577    }
578
579    #[test]
580    fn streaming_executor_matches_eager_for_quick_hot_path() {
581        let rows = vec![
582            json!({"uid": "alice", "mail": "alice@example.org"})
583                .as_object()
584                .cloned()
585                .expect("object"),
586            json!({"uid": "bob", "mail": "bob@example.org"})
587                .as_object()
588                .cloned()
589                .expect("object"),
590            json!({"uid": "carol", "mail": "carol@example.org"})
591                .as_object()
592                .cloned()
593                .expect("object"),
594        ];
595        let stages = vec!["alice".to_string()];
596
597        let eager = apply_pipeline(rows.clone(), &stages).expect("eager pipeline should pass");
598        let streaming =
599            execute_pipeline_streaming(rows, &stages).expect("streaming pipeline should pass");
600
601        assert_eq!(streaming, eager);
602    }
603
604    #[test]
605    fn streaming_executor_preserves_single_row_quick_magic() {
606        let rows = vec![
607            json!({"uid": "alice", "members": ["eng", "ops"]})
608                .as_object()
609                .cloned()
610                .expect("object"),
611        ];
612        let stages = vec!["members".to_string()];
613
614        let eager = apply_pipeline(rows.clone(), &stages).expect("eager pipeline should pass");
615        let streaming =
616            execute_pipeline_streaming(rows, &stages).expect("streaming pipeline should pass");
617
618        assert_eq!(streaming, eager);
619        assert_eq!(output_rows(&streaming).len(), 1);
620    }
621
622    #[test]
623    fn streaming_executor_preserves_copy_flag_and_value_fanout() {
624        let rows = vec![
625            json!({"uid": "alice", "roles": ["eng", "ops"]})
626                .as_object()
627                .cloned()
628                .expect("object"),
629        ];
630
631        let output =
632            execute_pipeline_streaming(rows, &["Y".to_string(), "VALUE roles".to_string()])
633                .expect("streaming pipeline should pass");
634
635        assert!(output.meta.wants_copy);
636        assert_eq!(output_rows(&output).len(), 2);
637    }
638
639    #[test]
640    fn unroll_stage_expands_list_field() {
641        let rows = vec![
642            json!({"members": ["a", "b"], "cn": "grp"})
643                .as_object()
644                .cloned()
645                .expect("object"),
646        ];
647
648        let output =
649            apply_pipeline(rows, &["U members".to_string()]).expect("pipeline should pass");
650
651        assert_eq!(output_rows(&output).len(), 2);
652        assert_eq!(
653            output_rows(&output)
654                .iter()
655                .map(|row| row.get("members").cloned().expect("member"))
656                .collect::<Vec<_>>(),
657            vec![json!("a"), json!("b")]
658        );
659    }
660
661    #[test]
662    fn unroll_requires_field_name() {
663        let rows = vec![
664            json!({"members": ["a", "b"]})
665                .as_object()
666                .cloned()
667                .expect("object"),
668        ];
669
670        let err = apply_pipeline(rows, &["U".to_string()]).expect_err("pipeline should fail");
671        assert!(err.to_string().contains("missing field name"));
672    }
673
674    #[test]
675    fn grouped_output_meta_uses_group_headers() {
676        let output = apply_output_pipeline(
677            OutputResult {
678                items: OutputItems::Groups(vec![crate::core::output_model::Group {
679                    groups: json!({"dept": "sales"})
680                        .as_object()
681                        .cloned()
682                        .expect("object"),
683                    aggregates: json!({"total": 2}).as_object().cloned().expect("object"),
684                    rows: vec![],
685                }]),
686                meta: Default::default(),
687            },
688            &[],
689        )
690        .expect("pipeline should pass");
691
692        assert_eq!(output.meta.key_index, vec!["dept", "total"]);
693        assert!(output.meta.grouped);
694    }
695
696    #[test]
697    fn grouped_rows_ignore_flat_row_only_projection_and_copy_preserves_flag() {
698        let grouped = OutputResult {
699            items: OutputItems::Groups(vec![crate::core::output_model::Group {
700                groups: json!({"dept": "sales"})
701                    .as_object()
702                    .cloned()
703                    .expect("object"),
704                aggregates: json!({"total": 2}).as_object().cloned().expect("object"),
705                rows: vec![
706                    json!({"uid": "alice"})
707                        .as_object()
708                        .cloned()
709                        .expect("object"),
710                ],
711            }]),
712            meta: Default::default(),
713        };
714
715        let projected =
716            apply_output_pipeline(grouped.clone(), &["P uid".to_string()]).expect("pipeline works");
717        assert_eq!(projected.items, grouped.items);
718
719        let copied = apply_output_pipeline(grouped, &["Y".to_string()]).expect("copy works");
720        assert!(copied.meta.wants_copy);
721        assert!(copied.meta.grouped);
722    }
723
724    #[test]
725    fn streaming_materializes_cleanly_at_sort_barrier() {
726        let rows = vec![
727            json!({"uid": "bob"}).as_object().cloned().expect("object"),
728            json!({"uid": "alice"})
729                .as_object()
730                .cloned()
731                .expect("object"),
732        ];
733
734        let output = execute_pipeline_streaming(rows, &["S uid".to_string()])
735            .expect("streaming pipeline should pass");
736
737        assert_eq!(
738            output_rows(&output)
739                .iter()
740                .map(|row| row
741                    .get("uid")
742                    .and_then(|value| value.as_str())
743                    .unwrap_or_default())
744                .collect::<Vec<_>>(),
745            vec!["alice", "bob"]
746        );
747    }
748
749    #[test]
750    fn grouped_output_pipeline_covers_flat_row_only_explicit_stage_aliases_unit() {
751        let grouped = OutputResult {
752            items: OutputItems::Groups(vec![crate::core::output_model::Group {
753                groups: json!({"team": "ops"}).as_object().cloned().expect("object"),
754                aggregates: json!({"count": 2}).as_object().cloned().expect("object"),
755                rows: vec![
756                    json!({"uid": "alice", "roles": ["eng", "ops"]})
757                        .as_object()
758                        .cloned()
759                        .expect("object"),
760                ],
761            }]),
762            meta: Default::default(),
763        };
764
765        for stage in [
766            "V alice",
767            "K alice",
768            "VALUE uid",
769            "F uid=alice",
770            "? uid",
771            "Y",
772        ] {
773            let output = apply_output_pipeline(grouped.clone(), &[stage.to_string()])
774                .expect("grouped pipeline should succeed");
775            assert!(matches!(output.items, OutputItems::Groups(_)));
776        }
777    }
778
779    #[test]
780    fn grouped_output_pipeline_covers_group_limit_and_unroll_paths_unit() {
781        let grouped = OutputResult {
782            items: OutputItems::Groups(vec![
783                crate::core::output_model::Group {
784                    groups: json!({"team": "ops"}).as_object().cloned().expect("object"),
785                    aggregates: json!({"count": 2}).as_object().cloned().expect("object"),
786                    rows: vec![
787                        json!({"uid": "alice", "roles": ["eng", "ops"]})
788                            .as_object()
789                            .cloned()
790                            .expect("object"),
791                    ],
792                },
793                crate::core::output_model::Group {
794                    groups: json!({"team": "eng"}).as_object().cloned().expect("object"),
795                    aggregates: json!({"count": 1}).as_object().cloned().expect("object"),
796                    rows: vec![
797                        json!({"uid": "bob", "roles": ["ops"]})
798                            .as_object()
799                            .cloned()
800                            .expect("object"),
801                    ],
802                },
803            ]),
804            meta: Default::default(),
805        };
806
807        let regrouped = apply_output_pipeline(grouped.clone(), &["G team".to_string()])
808            .expect("group regroup should succeed");
809        assert!(matches!(regrouped.items, OutputItems::Groups(_)));
810
811        let limited = apply_output_pipeline(grouped.clone(), &["L 1".to_string()])
812            .expect("group limit should succeed");
813        let OutputItems::Groups(limited_groups) = limited.items else {
814            panic!("expected grouped output");
815        };
816        assert_eq!(limited_groups.len(), 1);
817
818        let unrolled = apply_output_pipeline(grouped, &["U roles".to_string()])
819            .expect("group unroll should succeed");
820        assert!(matches!(unrolled.items, OutputItems::Groups(_)));
821    }
822
823    #[test]
824    fn streaming_pipeline_covers_stream_stage_variants_and_errors_unit() {
825        let rows = vec![
826            json!({"uid": "alice", "active": true, "roles": ["eng", "ops"]})
827                .as_object()
828                .cloned()
829                .expect("object"),
830            json!({"uid": "bob", "active": false, "roles": ["ops"]})
831                .as_object()
832                .cloned()
833                .expect("object"),
834        ];
835
836        let value_output = execute_pipeline_streaming(rows.clone(), &["VALUE uid".to_string()])
837            .expect("streaming values should succeed");
838        assert_eq!(output_rows(&value_output).len(), 2);
839
840        let filtered = execute_pipeline_streaming(rows.clone(), &["? uid".to_string()])
841            .expect("question filter should stream");
842        assert_eq!(output_rows(&filtered).len(), 2);
843
844        let cleaned = execute_pipeline_streaming(rows.clone(), &["?".to_string()])
845            .expect("question clean should stream");
846        assert_eq!(output_rows(&cleaned).len(), 2);
847
848        let limited = execute_pipeline_streaming(rows.clone(), &["L 1".to_string()])
849            .expect("head limit should stream");
850        assert_eq!(output_rows(&limited).len(), 1);
851
852        let unrolled = execute_pipeline_streaming(rows.clone(), &["U roles".to_string()])
853            .expect("unroll should stream");
854        assert_eq!(output_rows(&unrolled).len(), 3);
855
856        let err = execute_pipeline_streaming(rows, &["U".to_string()])
857            .expect_err("missing unroll field should fail");
858        assert!(err.to_string().contains("missing field name"));
859    }
860
861    #[test]
862    fn apply_output_pipeline_covers_explicit_materializing_row_stages_unit() {
863        let rows = vec![
864            json!({"uid": "bob", "dept": "ops"})
865                .as_object()
866                .cloned()
867                .expect("object"),
868            json!({"uid": "alice", "dept": "ops"})
869                .as_object()
870                .cloned()
871                .expect("object"),
872            json!({"uid": "carol", "dept": "eng"})
873                .as_object()
874                .cloned()
875                .expect("object"),
876        ];
877
878        let sorted = apply_pipeline(rows.clone(), &["S uid".to_string()]).expect("sort works");
879        assert_eq!(
880            output_rows(&sorted)[0]
881                .get("uid")
882                .and_then(|value| value.as_str()),
883            Some("alice")
884        );
885
886        let grouped = apply_pipeline(rows.clone(), &["G dept".to_string()]).expect("group works");
887        assert!(grouped.meta.grouped);
888
889        let aggregated =
890            apply_pipeline(rows.clone(), &["A count total".to_string()]).expect("aggregate works");
891        assert!(!output_rows(&aggregated).is_empty());
892
893        let counted = apply_pipeline(rows.clone(), &["C".to_string()]).expect("count works");
894        assert_eq!(output_rows(&counted).len(), 1);
895
896        let collapsed = apply_pipeline(rows.clone(), &["G dept".to_string(), "Z".to_string()])
897            .expect("collapse works");
898        assert!(matches!(collapsed.items, OutputItems::Rows(_)));
899
900        let err = apply_pipeline(rows, &["R nope".to_string()])
901            .expect_err("unknown explicit stage should fail");
902        assert!(err.to_string().contains("unknown DSL verb"));
903    }
904}