1use crate::core::{
2 output_model::{OutputItems, OutputMeta, OutputResult},
3 row::Row,
4};
5use anyhow::{Result, anyhow};
6
7use crate::dsl::{
8 eval::context::RowContext,
9 model::{ParsedPipeline, ParsedStage, ParsedStageKind},
10 parse::pipeline::parse_stage_list,
11 stages::{
12 aggregate, collapse, copy, filter, group, jq, limit, project, question, quick, sort, values,
13 },
14 verbs::stage_can_stream_rows,
15};
16
17pub fn apply_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
22 apply_output_pipeline(OutputResult::from_rows(rows), stages)
23}
24
25pub fn apply_output_pipeline(output: OutputResult, stages: &[String]) -> Result<OutputResult> {
30 execute_pipeline_items(output.items, output.meta.wants_copy, stages)
31}
32
33pub fn execute_pipeline(rows: Vec<Row>, stages: &[String]) -> Result<OutputResult> {
38 execute_pipeline_streaming(rows, stages)
39}
40
41pub fn execute_pipeline_streaming<I>(rows: I, stages: &[String]) -> Result<OutputResult>
46where
47 I: IntoIterator<Item = Row>,
48 I::IntoIter: 'static,
49{
50 let parsed = parse_stage_list(stages)?;
51 PipelineExecutor::new_stream(rows.into_iter(), false, parsed).run()
52}
53
54fn execute_pipeline_items(
55 items: OutputItems,
56 initial_wants_copy: bool,
57 stages: &[String],
58) -> Result<OutputResult> {
59 let parsed = parse_stage_list(stages)?;
60 PipelineExecutor::new(items, initial_wants_copy, parsed).run()
61}
62
63type RowStream = Box<dyn Iterator<Item = Result<Row>>>;
68
69enum PipelineItems {
70 RowStream(RowStream),
71 Materialized(OutputItems),
72}
73
74struct PipelineExecutor {
75 items: PipelineItems,
76 wants_copy: bool,
77 parsed: ParsedPipeline,
78}
79
80impl PipelineExecutor {
81 fn new(items: OutputItems, wants_copy: bool, parsed: ParsedPipeline) -> Self {
82 Self {
83 items: match items {
84 OutputItems::Rows(rows) => {
85 PipelineItems::RowStream(Box::new(rows.into_iter().map(Ok)))
86 }
87 OutputItems::Groups(groups) => {
88 PipelineItems::Materialized(OutputItems::Groups(groups))
89 }
90 },
91 wants_copy,
92 parsed,
93 }
94 }
95
96 fn new_stream<I>(rows: I, wants_copy: bool, parsed: ParsedPipeline) -> Self
97 where
98 I: Iterator<Item = Row> + 'static,
99 {
100 Self {
101 items: PipelineItems::RowStream(Box::new(rows.map(Ok))),
102 wants_copy,
103 parsed,
104 }
105 }
106
107 fn run(mut self) -> Result<OutputResult> {
108 let stages = self.parsed.stages.clone();
109 for stage in &stages {
110 if stage.verb.is_empty() {
111 continue;
112 }
113 self.apply_stage(stage)?;
114 }
115
116 let items = self.finish_items()?;
117 let meta = self.build_output_meta(&items);
118
119 Ok(OutputResult { meta, items })
120 }
121
122 fn apply_stage(&mut self, stage: &ParsedStage) -> Result<()> {
123 if stage_can_stream_rows(stage)
124 && let PipelineItems::RowStream(_) = self.items
125 {
126 self.apply_stream_stage(stage)?;
127 return Ok(());
128 }
129
130 let items = self.materialize_items()?;
131 self.items = PipelineItems::Materialized(match stage.kind {
132 ParsedStageKind::Quick => self.apply_quick_stage(items, stage)?,
133 ParsedStageKind::UnknownExplicit => {
134 return Err(anyhow!("unknown DSL verb: {}", stage.verb));
135 }
136 ParsedStageKind::Explicit => self.apply_explicit_stage(items, stage)?,
137 });
138 Ok(())
139 }
140
141 fn apply_stream_stage(&mut self, stage: &ParsedStage) -> Result<()> {
142 let stream = match std::mem::replace(
143 &mut self.items,
144 PipelineItems::RowStream(Box::new(std::iter::empty())),
145 ) {
146 PipelineItems::RowStream(stream) => stream,
147 PipelineItems::Materialized(items) => {
148 debug_assert!(
149 false,
150 "apply_stream_stage called after pipeline had already materialized"
151 );
152 self.items = PipelineItems::Materialized(items);
153 return Ok(());
154 }
155 };
156
157 self.items = PipelineItems::RowStream(match stage.verb.as_str() {
158 _ if matches!(stage.kind, ParsedStageKind::Quick) => {
159 let plan = quick::compile(&stage.raw)?;
160 Box::new(quick::stream_rows_with_plan(stream, plan))
161 }
162 "F" => {
163 let plan = filter::compile(&stage.spec)?;
164 Box::new(stream.filter_map(move |row| match row {
165 Ok(row) if plan.matches(&row) => Some(Ok(row)),
166 Ok(_) => None,
167 Err(err) => Some(Err(err)),
168 }))
169 }
170 "P" => {
171 let plan = project::compile(&stage.spec)?;
172 stream_row_fanout(stream, move |row| plan.project_row(&row))
173 }
174 "VAL" | "VALUE" => {
175 let plan = values::compile(&stage.spec);
176 stream_row_fanout(stream, move |row| plan.extract_row(&row))
177 }
178 "L" => {
179 let spec = limit::parse_limit_spec(&stage.spec)?;
180 debug_assert!(spec.is_head_only());
181 Box::new(
182 stream
183 .skip(spec.offset as usize)
184 .take(spec.count.max(0) as usize),
185 )
186 }
187 "Y" => {
188 self.wants_copy = true;
189 stream
190 }
191 "V" | "K" => {
192 let plan = quick::compile(&format!(
193 "{}{}{}",
194 stage.verb,
195 if stage.spec.is_empty() { "" } else { " " },
196 stage.spec
197 ))?;
198 Box::new(quick::stream_rows_with_plan(stream, plan))
199 }
200 "U" => {
201 let field = stage.spec.trim();
202 if field.is_empty() {
203 return Err(anyhow!("U: missing field name to unroll"));
204 }
205 let plan = project::compile(&format!("{field}[]"))?;
206 stream_row_fanout(stream, move |row| plan.project_row(&row))
207 }
208 "?" => {
209 if stage.spec.trim().is_empty() {
210 Box::new(stream.filter_map(|row| match row {
211 Ok(row) => question::clean_row(row).map(Ok),
212 Err(err) => Some(Err(err)),
213 }))
214 } else {
215 let plan = quick::compile(&format!("? {}", stage.spec))?;
216 Box::new(quick::stream_rows_with_plan(stream, plan))
217 }
218 }
219 other => return Err(anyhow!("stream stage not implemented for verb: {other}")),
220 });
221 Ok(())
222 }
223
224 fn apply_quick_stage(&self, items: OutputItems, stage: &ParsedStage) -> Result<OutputItems> {
225 map_rows(items, |rows| quick::apply(rows, &stage.raw))
226 }
227
228 fn apply_explicit_stage(
229 &mut self,
230 items: OutputItems,
231 stage: &ParsedStage,
232 ) -> Result<OutputItems> {
233 match stage.verb.as_str() {
234 "P" => self.project(items, stage),
235 "V" => self.apply_quick_alias(items, stage, "V"),
237 "K" => self.apply_quick_alias(items, stage, "K"),
238 "VAL" | "VALUE" => map_rows(items, |rows| values::apply(rows, &stage.spec)),
240 "F" => self.filter(items, stage),
241 "G" => self.group(items, stage),
242 "A" => aggregate::apply(items, &stage.spec),
243 "S" => sort::apply(items, &stage.spec),
244 "L" => self.limit(items, stage),
245 "Z" => Ok(collapse::apply(items)),
246 "C" => aggregate::count_macro(items, &stage.spec),
247 "Y" => self.copy(items, stage),
248 "U" => self.unroll(items, stage),
249 "?" => question::apply(items, &stage.spec),
250 "JQ" => jq::apply(items, &stage.spec),
251 _ => Err(anyhow!("unknown DSL verb: {}", stage.verb)),
252 }
253 }
254
255 fn project(&self, items: OutputItems, stage: &ParsedStage) -> Result<OutputItems> {
256 match items {
257 OutputItems::Rows(rows) => Ok(OutputItems::Rows(project::apply(rows, &stage.spec)?)),
258 OutputItems::Groups(groups) => Ok(OutputItems::Groups(project::apply_groups(
259 groups,
260 &stage.spec,
261 )?)),
262 }
263 }
264
265 fn filter(&self, items: OutputItems, stage: &ParsedStage) -> Result<OutputItems> {
266 match items {
267 OutputItems::Rows(rows) => Ok(OutputItems::Rows(filter::apply(rows, &stage.spec)?)),
268 OutputItems::Groups(groups) => Ok(OutputItems::Groups(filter::apply_groups(
269 groups,
270 &stage.spec,
271 )?)),
272 }
273 }
274
275 fn group(&self, items: OutputItems, stage: &ParsedStage) -> Result<OutputItems> {
276 match items {
277 OutputItems::Rows(rows) => {
278 Ok(OutputItems::Groups(group::group_rows(rows, &stage.spec)?))
279 }
280 OutputItems::Groups(groups) => Ok(OutputItems::Groups(group::regroup_groups(
281 groups,
282 &stage.spec,
283 )?)),
284 }
285 }
286
287 fn limit(&self, items: OutputItems, stage: &ParsedStage) -> Result<OutputItems> {
288 match items {
289 OutputItems::Rows(rows) => Ok(OutputItems::Rows(limit::apply(rows, &stage.spec)?)),
290 OutputItems::Groups(groups) => {
291 Ok(OutputItems::Groups(limit::apply(groups, &stage.spec)?))
292 }
293 }
294 }
295
296 fn copy(&mut self, items: OutputItems, _stage: &ParsedStage) -> Result<OutputItems> {
297 self.wants_copy = true;
298 map_rows(items, |rows| Ok(copy::apply(rows)))
299 }
300
301 fn unroll(&self, items: OutputItems, stage: &ParsedStage) -> Result<OutputItems> {
302 let field = stage.spec.trim();
303 if field.is_empty() {
304 return Err(anyhow!("U: missing field name to unroll"));
305 }
306
307 let selector = format!("{field}[]");
308 match items {
309 OutputItems::Rows(rows) => Ok(OutputItems::Rows(project::apply(rows, &selector)?)),
310 OutputItems::Groups(groups) => Ok(OutputItems::Groups(project::apply_groups(
311 groups, &selector,
312 )?)),
313 }
314 }
315
316 fn apply_quick_alias(
317 &self,
318 items: OutputItems,
319 stage: &ParsedStage,
320 alias: &str,
321 ) -> Result<OutputItems> {
322 let quick_spec = if stage.spec.is_empty() {
323 alias.to_string()
324 } else {
325 format!("{alias} {}", stage.spec)
326 };
327 map_rows(items, |rows| quick::apply(rows, &quick_spec))
328 }
329
330 fn materialize_items(&mut self) -> Result<OutputItems> {
331 match std::mem::replace(
332 &mut self.items,
333 PipelineItems::Materialized(OutputItems::Rows(Vec::new())),
334 ) {
335 PipelineItems::RowStream(stream) => {
336 let rows = materialize_row_stream(stream)?;
337 Ok(OutputItems::Rows(rows))
338 }
339 PipelineItems::Materialized(items) => Ok(items),
340 }
341 }
342
343 fn finish_items(&mut self) -> Result<OutputItems> {
344 self.materialize_items()
345 }
346
347 fn build_output_meta(&self, items: &OutputItems) -> OutputMeta {
348 let key_index = match items {
349 OutputItems::Rows(rows) => RowContext::from_rows(rows).key_index().to_vec(),
350 OutputItems::Groups(groups) => {
351 let headers = groups.iter().map(merged_group_header).collect::<Vec<_>>();
352 RowContext::from_rows(&headers).key_index().to_vec()
353 }
354 };
355
356 OutputMeta {
357 key_index,
358 column_align: Vec::new(),
359 wants_copy: self.wants_copy,
360 grouped: matches!(items, OutputItems::Groups(_)),
361 }
362 }
363}
364
365fn materialize_row_stream(stream: RowStream) -> Result<Vec<Row>> {
366 stream.collect()
367}
368
369fn stream_row_fanout<I, F>(stream: RowStream, fanout: F) -> RowStream
370where
371 I: IntoIterator<Item = Row>,
372 F: Fn(Row) -> I + 'static,
373{
374 Box::new(stream.flat_map(move |row| {
375 match row {
376 Ok(row) => fanout(row)
377 .into_iter()
378 .map(Ok)
379 .collect::<Vec<_>>()
380 .into_iter(),
381 Err(err) => vec![Err(err)].into_iter(),
382 }
383 }))
384}
385
386fn merged_group_header(group: &crate::core::output_model::Group) -> Row {
387 let mut row = group.groups.clone();
388 row.extend(group.aggregates.clone());
389 row
390}
391
392fn map_rows(
393 items: OutputItems,
394 map_fn: impl FnOnce(Vec<Row>) -> Result<Vec<Row>>,
395) -> Result<OutputItems> {
396 match items {
397 OutputItems::Rows(rows) => map_fn(rows).map(OutputItems::Rows),
398 OutputItems::Groups(groups) => Ok(OutputItems::Groups(groups)),
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use crate::core::output_model::{OutputItems, OutputResult};
408 use serde_json::json;
409
410 use super::{
411 apply_output_pipeline, apply_pipeline, execute_pipeline, execute_pipeline_streaming,
412 };
413
414 fn output_rows(output: &OutputResult) -> &[crate::core::row::Row] {
415 output.as_rows().expect("expected row output")
416 }
417
418 #[test]
419 fn project_then_filter_pipeline_works() {
420 let rows = vec![
421 json!({"uid": "oistes", "cn": "Oistein"})
422 .as_object()
423 .cloned()
424 .expect("object"),
425 json!({"uid": "andreasd", "cn": "Andreas"})
426 .as_object()
427 .cloned()
428 .expect("object"),
429 ];
430
431 let stages = vec!["P uid,cn".to_string(), "F uid=oistes".to_string()];
432 let output = apply_pipeline(rows, &stages).expect("pipeline should pass");
433
434 assert_eq!(output_rows(&output).len(), 1);
435 assert_eq!(
436 output_rows(&output)[0]
437 .get("uid")
438 .and_then(|value| value.as_str()),
439 Some("oistes")
440 );
441 }
442
443 #[test]
444 fn bare_quick_stage_without_verb_still_works() {
445 let rows = vec![
446 json!({"uid": "oistes"})
447 .as_object()
448 .cloned()
449 .expect("object"),
450 json!({"uid": "andreasd"})
451 .as_object()
452 .cloned()
453 .expect("object"),
454 ];
455
456 let stages = vec!["oist".to_string()];
457 let output = apply_pipeline(rows, &stages).expect("pipeline should pass");
458 assert_eq!(output_rows(&output).len(), 1);
459 }
460
461 #[test]
462 fn unknown_single_letter_verb_errors() {
463 let rows = vec![
464 json!({"uid": "oistes"})
465 .as_object()
466 .cloned()
467 .expect("object"),
468 ];
469
470 let err =
471 apply_pipeline(rows, &["R oist".to_string()]).expect_err("unknown verb should fail");
472 assert!(err.to_string().contains("unknown DSL verb"));
473 }
474
475 #[test]
476 fn copy_stage_sets_meta_flag() {
477 let rows = vec![
478 json!({"uid": "oistes"})
479 .as_object()
480 .cloned()
481 .expect("object"),
482 ];
483
484 let stages = vec!["Y".to_string()];
485 let output = execute_pipeline(rows, &stages).expect("pipeline should pass");
486
487 assert!(output.meta.wants_copy);
488 }
489
490 #[test]
491 fn value_scope_alias_filters_by_value() {
492 let rows = vec![
493 json!({"uid": "oistes"})
494 .as_object()
495 .cloned()
496 .expect("object"),
497 json!({"uid": "andreasd"})
498 .as_object()
499 .cloned()
500 .expect("object"),
501 ];
502
503 let stages = vec!["V oist".to_string()];
504 let output = apply_pipeline(rows, &stages).expect("pipeline should pass");
505 assert_eq!(output_rows(&output).len(), 1);
506 assert_eq!(
507 output_rows(&output)[0]
508 .get("uid")
509 .and_then(|value| value.as_str()),
510 Some("oistes")
511 );
512 }
513
514 #[test]
515 fn question_stage_cleans_empty_fields() {
516 let rows = vec![
517 json!({"uid": "oistes", "note": "", "tags": []})
518 .as_object()
519 .cloned()
520 .expect("object"),
521 json!({"uid": "andreasd", "note": "ok", "extra": null})
522 .as_object()
523 .cloned()
524 .expect("object"),
525 ];
526
527 let output = apply_pipeline(rows, &["?".to_string()]).expect("pipeline should pass");
528 assert_eq!(output_rows(&output).len(), 2);
529 assert!(output_rows(&output)[0].contains_key("uid"));
530 assert!(!output_rows(&output)[0].contains_key("note"));
531 assert!(!output_rows(&output)[0].contains_key("tags"));
532 assert!(output_rows(&output)[1].contains_key("note"));
533 assert!(!output_rows(&output)[1].contains_key("extra"));
534 }
535
536 #[test]
537 fn question_stage_with_spec_filters_existence() {
538 let rows = vec![
539 json!({"uid": "oistes"})
540 .as_object()
541 .cloned()
542 .expect("object"),
543 json!({"cn": "Andreas"})
544 .as_object()
545 .cloned()
546 .expect("object"),
547 ];
548
549 let output = apply_pipeline(rows, &["? uid".to_string()]).expect("pipeline should pass");
550 assert_eq!(output_rows(&output).len(), 1);
551 assert!(output_rows(&output)[0].contains_key("uid"));
552 }
553
554 #[test]
555 fn streaming_executor_matches_eager_for_streamable_row_pipeline() {
556 let rows = vec![
557 json!({"uid": "alice", "active": true, "members": ["a", "b"]})
558 .as_object()
559 .cloned()
560 .expect("object"),
561 json!({"uid": "bob", "active": false, "members": ["c"]})
562 .as_object()
563 .cloned()
564 .expect("object"),
565 ];
566 let stages = vec![
567 "F active=true".to_string(),
568 "P uid,members[]".to_string(),
569 "L 2".to_string(),
570 ];
571
572 let eager = apply_pipeline(rows.clone(), &stages).expect("eager pipeline should pass");
573 let streaming =
574 execute_pipeline_streaming(rows, &stages).expect("streaming pipeline should pass");
575
576 assert_eq!(streaming, eager);
577 }
578
579 #[test]
580 fn streaming_executor_matches_eager_for_quick_hot_path() {
581 let rows = vec![
582 json!({"uid": "alice", "mail": "alice@example.org"})
583 .as_object()
584 .cloned()
585 .expect("object"),
586 json!({"uid": "bob", "mail": "bob@example.org"})
587 .as_object()
588 .cloned()
589 .expect("object"),
590 json!({"uid": "carol", "mail": "carol@example.org"})
591 .as_object()
592 .cloned()
593 .expect("object"),
594 ];
595 let stages = vec!["alice".to_string()];
596
597 let eager = apply_pipeline(rows.clone(), &stages).expect("eager pipeline should pass");
598 let streaming =
599 execute_pipeline_streaming(rows, &stages).expect("streaming pipeline should pass");
600
601 assert_eq!(streaming, eager);
602 }
603
604 #[test]
605 fn streaming_executor_preserves_single_row_quick_magic() {
606 let rows = vec![
607 json!({"uid": "alice", "members": ["eng", "ops"]})
608 .as_object()
609 .cloned()
610 .expect("object"),
611 ];
612 let stages = vec!["members".to_string()];
613
614 let eager = apply_pipeline(rows.clone(), &stages).expect("eager pipeline should pass");
615 let streaming =
616 execute_pipeline_streaming(rows, &stages).expect("streaming pipeline should pass");
617
618 assert_eq!(streaming, eager);
619 assert_eq!(output_rows(&streaming).len(), 1);
620 }
621
622 #[test]
623 fn streaming_executor_preserves_copy_flag_and_value_fanout() {
624 let rows = vec![
625 json!({"uid": "alice", "roles": ["eng", "ops"]})
626 .as_object()
627 .cloned()
628 .expect("object"),
629 ];
630
631 let output =
632 execute_pipeline_streaming(rows, &["Y".to_string(), "VALUE roles".to_string()])
633 .expect("streaming pipeline should pass");
634
635 assert!(output.meta.wants_copy);
636 assert_eq!(output_rows(&output).len(), 2);
637 }
638
639 #[test]
640 fn unroll_stage_expands_list_field() {
641 let rows = vec![
642 json!({"members": ["a", "b"], "cn": "grp"})
643 .as_object()
644 .cloned()
645 .expect("object"),
646 ];
647
648 let output =
649 apply_pipeline(rows, &["U members".to_string()]).expect("pipeline should pass");
650
651 assert_eq!(output_rows(&output).len(), 2);
652 assert_eq!(
653 output_rows(&output)
654 .iter()
655 .map(|row| row.get("members").cloned().expect("member"))
656 .collect::<Vec<_>>(),
657 vec![json!("a"), json!("b")]
658 );
659 }
660
661 #[test]
662 fn unroll_requires_field_name() {
663 let rows = vec![
664 json!({"members": ["a", "b"]})
665 .as_object()
666 .cloned()
667 .expect("object"),
668 ];
669
670 let err = apply_pipeline(rows, &["U".to_string()]).expect_err("pipeline should fail");
671 assert!(err.to_string().contains("missing field name"));
672 }
673
674 #[test]
675 fn grouped_output_meta_uses_group_headers() {
676 let output = apply_output_pipeline(
677 OutputResult {
678 items: OutputItems::Groups(vec![crate::core::output_model::Group {
679 groups: json!({"dept": "sales"})
680 .as_object()
681 .cloned()
682 .expect("object"),
683 aggregates: json!({"total": 2}).as_object().cloned().expect("object"),
684 rows: vec![],
685 }]),
686 meta: Default::default(),
687 },
688 &[],
689 )
690 .expect("pipeline should pass");
691
692 assert_eq!(output.meta.key_index, vec!["dept", "total"]);
693 assert!(output.meta.grouped);
694 }
695
696 #[test]
697 fn grouped_rows_ignore_flat_row_only_projection_and_copy_preserves_flag() {
698 let grouped = OutputResult {
699 items: OutputItems::Groups(vec![crate::core::output_model::Group {
700 groups: json!({"dept": "sales"})
701 .as_object()
702 .cloned()
703 .expect("object"),
704 aggregates: json!({"total": 2}).as_object().cloned().expect("object"),
705 rows: vec![
706 json!({"uid": "alice"})
707 .as_object()
708 .cloned()
709 .expect("object"),
710 ],
711 }]),
712 meta: Default::default(),
713 };
714
715 let projected =
716 apply_output_pipeline(grouped.clone(), &["P uid".to_string()]).expect("pipeline works");
717 assert_eq!(projected.items, grouped.items);
718
719 let copied = apply_output_pipeline(grouped, &["Y".to_string()]).expect("copy works");
720 assert!(copied.meta.wants_copy);
721 assert!(copied.meta.grouped);
722 }
723
724 #[test]
725 fn streaming_materializes_cleanly_at_sort_barrier() {
726 let rows = vec![
727 json!({"uid": "bob"}).as_object().cloned().expect("object"),
728 json!({"uid": "alice"})
729 .as_object()
730 .cloned()
731 .expect("object"),
732 ];
733
734 let output = execute_pipeline_streaming(rows, &["S uid".to_string()])
735 .expect("streaming pipeline should pass");
736
737 assert_eq!(
738 output_rows(&output)
739 .iter()
740 .map(|row| row
741 .get("uid")
742 .and_then(|value| value.as_str())
743 .unwrap_or_default())
744 .collect::<Vec<_>>(),
745 vec!["alice", "bob"]
746 );
747 }
748
749 #[test]
750 fn grouped_output_pipeline_covers_flat_row_only_explicit_stage_aliases_unit() {
751 let grouped = OutputResult {
752 items: OutputItems::Groups(vec![crate::core::output_model::Group {
753 groups: json!({"team": "ops"}).as_object().cloned().expect("object"),
754 aggregates: json!({"count": 2}).as_object().cloned().expect("object"),
755 rows: vec![
756 json!({"uid": "alice", "roles": ["eng", "ops"]})
757 .as_object()
758 .cloned()
759 .expect("object"),
760 ],
761 }]),
762 meta: Default::default(),
763 };
764
765 for stage in [
766 "V alice",
767 "K alice",
768 "VALUE uid",
769 "F uid=alice",
770 "? uid",
771 "Y",
772 ] {
773 let output = apply_output_pipeline(grouped.clone(), &[stage.to_string()])
774 .expect("grouped pipeline should succeed");
775 assert!(matches!(output.items, OutputItems::Groups(_)));
776 }
777 }
778
779 #[test]
780 fn grouped_output_pipeline_covers_group_limit_and_unroll_paths_unit() {
781 let grouped = OutputResult {
782 items: OutputItems::Groups(vec![
783 crate::core::output_model::Group {
784 groups: json!({"team": "ops"}).as_object().cloned().expect("object"),
785 aggregates: json!({"count": 2}).as_object().cloned().expect("object"),
786 rows: vec![
787 json!({"uid": "alice", "roles": ["eng", "ops"]})
788 .as_object()
789 .cloned()
790 .expect("object"),
791 ],
792 },
793 crate::core::output_model::Group {
794 groups: json!({"team": "eng"}).as_object().cloned().expect("object"),
795 aggregates: json!({"count": 1}).as_object().cloned().expect("object"),
796 rows: vec![
797 json!({"uid": "bob", "roles": ["ops"]})
798 .as_object()
799 .cloned()
800 .expect("object"),
801 ],
802 },
803 ]),
804 meta: Default::default(),
805 };
806
807 let regrouped = apply_output_pipeline(grouped.clone(), &["G team".to_string()])
808 .expect("group regroup should succeed");
809 assert!(matches!(regrouped.items, OutputItems::Groups(_)));
810
811 let limited = apply_output_pipeline(grouped.clone(), &["L 1".to_string()])
812 .expect("group limit should succeed");
813 let OutputItems::Groups(limited_groups) = limited.items else {
814 panic!("expected grouped output");
815 };
816 assert_eq!(limited_groups.len(), 1);
817
818 let unrolled = apply_output_pipeline(grouped, &["U roles".to_string()])
819 .expect("group unroll should succeed");
820 assert!(matches!(unrolled.items, OutputItems::Groups(_)));
821 }
822
823 #[test]
824 fn streaming_pipeline_covers_stream_stage_variants_and_errors_unit() {
825 let rows = vec![
826 json!({"uid": "alice", "active": true, "roles": ["eng", "ops"]})
827 .as_object()
828 .cloned()
829 .expect("object"),
830 json!({"uid": "bob", "active": false, "roles": ["ops"]})
831 .as_object()
832 .cloned()
833 .expect("object"),
834 ];
835
836 let value_output = execute_pipeline_streaming(rows.clone(), &["VALUE uid".to_string()])
837 .expect("streaming values should succeed");
838 assert_eq!(output_rows(&value_output).len(), 2);
839
840 let filtered = execute_pipeline_streaming(rows.clone(), &["? uid".to_string()])
841 .expect("question filter should stream");
842 assert_eq!(output_rows(&filtered).len(), 2);
843
844 let cleaned = execute_pipeline_streaming(rows.clone(), &["?".to_string()])
845 .expect("question clean should stream");
846 assert_eq!(output_rows(&cleaned).len(), 2);
847
848 let limited = execute_pipeline_streaming(rows.clone(), &["L 1".to_string()])
849 .expect("head limit should stream");
850 assert_eq!(output_rows(&limited).len(), 1);
851
852 let unrolled = execute_pipeline_streaming(rows.clone(), &["U roles".to_string()])
853 .expect("unroll should stream");
854 assert_eq!(output_rows(&unrolled).len(), 3);
855
856 let err = execute_pipeline_streaming(rows, &["U".to_string()])
857 .expect_err("missing unroll field should fail");
858 assert!(err.to_string().contains("missing field name"));
859 }
860
861 #[test]
862 fn apply_output_pipeline_covers_explicit_materializing_row_stages_unit() {
863 let rows = vec![
864 json!({"uid": "bob", "dept": "ops"})
865 .as_object()
866 .cloned()
867 .expect("object"),
868 json!({"uid": "alice", "dept": "ops"})
869 .as_object()
870 .cloned()
871 .expect("object"),
872 json!({"uid": "carol", "dept": "eng"})
873 .as_object()
874 .cloned()
875 .expect("object"),
876 ];
877
878 let sorted = apply_pipeline(rows.clone(), &["S uid".to_string()]).expect("sort works");
879 assert_eq!(
880 output_rows(&sorted)[0]
881 .get("uid")
882 .and_then(|value| value.as_str()),
883 Some("alice")
884 );
885
886 let grouped = apply_pipeline(rows.clone(), &["G dept".to_string()]).expect("group works");
887 assert!(grouped.meta.grouped);
888
889 let aggregated =
890 apply_pipeline(rows.clone(), &["A count total".to_string()]).expect("aggregate works");
891 assert!(!output_rows(&aggregated).is_empty());
892
893 let counted = apply_pipeline(rows.clone(), &["C".to_string()]).expect("count works");
894 assert_eq!(output_rows(&counted).len(), 1);
895
896 let collapsed = apply_pipeline(rows.clone(), &["G dept".to_string(), "Z".to_string()])
897 .expect("collapse works");
898 assert!(matches!(collapsed.items, OutputItems::Rows(_)));
899
900 let err = apply_pipeline(rows, &["R nope".to_string()])
901 .expect_err("unknown explicit stage should fail");
902 assert!(err.to_string().contains("unknown DSL verb"));
903 }
904}