Skip to main content

osp_cli/dsl/
verbs.rs

1use crate::dsl::model::{ParsedStage, ParsedStageKind};
2
3#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4pub enum VerbStreaming {
5    Streamable,
6    Conditional,
7    Materializes,
8    Meta,
9}
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub struct VerbInfo {
13    pub verb: &'static str,
14    pub summary: &'static str,
15    pub streaming: VerbStreaming,
16    pub streaming_note: &'static str,
17}
18
19const VERBS: &[VerbInfo] = &[
20    VerbInfo {
21        verb: "F",
22        summary: "Filter rows",
23        streaming: VerbStreaming::Streamable,
24        streaming_note: "row-by-row filter",
25    },
26    VerbInfo {
27        verb: "P",
28        summary: "Project columns",
29        streaming: VerbStreaming::Streamable,
30        streaming_note: "row-by-row projection/fanout",
31    },
32    VerbInfo {
33        verb: "S",
34        summary: "Sort rows",
35        streaming: VerbStreaming::Materializes,
36        streaming_note: "sorting needs the full input",
37    },
38    VerbInfo {
39        verb: "G",
40        summary: "Group rows",
41        streaming: VerbStreaming::Materializes,
42        streaming_note: "grouping needs the full input",
43    },
44    VerbInfo {
45        verb: "A",
46        summary: "Aggregate rows/groups",
47        streaming: VerbStreaming::Materializes,
48        streaming_note: "aggregation needs the full input or full groups",
49    },
50    VerbInfo {
51        verb: "L",
52        summary: "Limit rows",
53        streaming: VerbStreaming::Conditional,
54        streaming_note: "head limits stream; tail/negative forms materialize",
55    },
56    VerbInfo {
57        verb: "Z",
58        summary: "Collapse grouped output",
59        streaming: VerbStreaming::Materializes,
60        streaming_note: "collapse only runs after grouped output exists",
61    },
62    VerbInfo {
63        verb: "C",
64        summary: "Count rows",
65        streaming: VerbStreaming::Materializes,
66        streaming_note: "count needs the full input or full groups",
67    },
68    VerbInfo {
69        verb: "Y",
70        summary: "Mark output for copy",
71        streaming: VerbStreaming::Streamable,
72        streaming_note: "passthrough marker",
73    },
74    VerbInfo {
75        verb: "H",
76        summary: "Show DSL help",
77        streaming: VerbStreaming::Meta,
78        streaming_note: "help stage; not part of data execution",
79    },
80    VerbInfo {
81        verb: "V",
82        summary: "Value-only quick search",
83        streaming: VerbStreaming::Conditional,
84        streaming_note: "flat rows stream with a two-row lookahead; grouped input still materializes",
85    },
86    VerbInfo {
87        verb: "K",
88        summary: "Key-only quick search",
89        streaming: VerbStreaming::Conditional,
90        streaming_note: "flat rows stream with a two-row lookahead; grouped input still materializes",
91    },
92    VerbInfo {
93        verb: "?",
94        summary: "Clean rows / exists filter",
95        streaming: VerbStreaming::Conditional,
96        streaming_note: "flat rows stream; grouped input still materializes",
97    },
98    VerbInfo {
99        verb: "U",
100        summary: "Unroll list field",
101        streaming: VerbStreaming::Streamable,
102        streaming_note: "row-by-row selector fanout",
103    },
104    VerbInfo {
105        verb: "JQ",
106        summary: "Run jq expression",
107        streaming: VerbStreaming::Materializes,
108        streaming_note: "jq receives the full current payload",
109    },
110    VerbInfo {
111        verb: "VAL",
112        summary: "Extract values",
113        streaming: VerbStreaming::Streamable,
114        streaming_note: "row-by-row value extraction",
115    },
116    VerbInfo {
117        verb: "VALUE",
118        summary: "Extract values",
119        streaming: VerbStreaming::Streamable,
120        streaming_note: "row-by-row value extraction",
121    },
122];
123
124pub fn registered_verbs() -> &'static [VerbInfo] {
125    VERBS
126}
127
128pub fn registered_explicit_verbs() -> Vec<&'static str> {
129    VERBS
130        .iter()
131        .filter(|info| !matches!(info.streaming, VerbStreaming::Meta))
132        .map(|info| info.verb)
133        .collect()
134}
135
136pub fn verb_info(verb: &str) -> Option<&'static VerbInfo> {
137    VERBS
138        .iter()
139        .find(|info| info.verb.eq_ignore_ascii_case(verb))
140}
141
142pub fn is_registered_explicit_verb(verb: &str) -> bool {
143    VERBS
144        .iter()
145        .filter(|info| !matches!(info.streaming, VerbStreaming::Meta))
146        .any(|info| info.verb.eq_ignore_ascii_case(verb))
147}
148
149pub fn render_streaming_badge(streaming: VerbStreaming) -> Option<&'static str> {
150    match streaming {
151        VerbStreaming::Streamable | VerbStreaming::Meta => None,
152        VerbStreaming::Conditional => Some("[conditional]"),
153        VerbStreaming::Materializes => Some("[materializes]"),
154    }
155}
156
157pub(crate) fn stage_can_stream_rows(stage: &ParsedStage) -> bool {
158    if matches!(stage.kind, ParsedStageKind::Quick) {
159        return true;
160    }
161    if !matches!(stage.kind, ParsedStageKind::Explicit) {
162        return false;
163    }
164
165    match stage.verb.as_str() {
166        "F" | "P" | "VAL" | "VALUE" | "Y" | "U" | "V" | "K" | "?" => true,
167        "L" => crate::dsl::stages::limit::parse_limit_spec(&stage.spec)
168            .map(|spec| spec.is_head_only())
169            .unwrap_or(false),
170        _ => false,
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use crate::dsl::{
177        model::{ParsedStage, ParsedStageKind},
178        verbs::{
179            VerbStreaming, is_registered_explicit_verb, registered_explicit_verbs,
180            render_streaming_badge, stage_can_stream_rows, verb_info,
181        },
182    };
183
184    #[test]
185    fn stage_streamability_matches_real_barriers_unit() {
186        assert!(stage_can_stream_rows(&ParsedStage::new(
187            ParsedStageKind::Explicit,
188            "F",
189            "uid=alice",
190            "F uid=alice",
191        )));
192        assert!(stage_can_stream_rows(&ParsedStage::new(
193            ParsedStageKind::Explicit,
194            "L",
195            "10 0",
196            "L 10 0",
197        )));
198        assert!(!stage_can_stream_rows(&ParsedStage::new(
199            ParsedStageKind::Explicit,
200            "L",
201            "-2",
202            "L -2",
203        )));
204        assert!(!stage_can_stream_rows(&ParsedStage::new(
205            ParsedStageKind::Explicit,
206            "A",
207            "count",
208            "A count",
209        )));
210        assert!(stage_can_stream_rows(&ParsedStage::new(
211            ParsedStageKind::Quick,
212            "UID",
213            "",
214            "uid",
215        )));
216    }
217
218    #[test]
219    fn verb_metadata_exposes_streaming_annotations_unit() {
220        let aggregate = verb_info("A").expect("aggregate verb should exist");
221        assert_eq!(aggregate.streaming, VerbStreaming::Materializes);
222        assert_eq!(
223            render_streaming_badge(aggregate.streaming),
224            Some("[materializes]")
225        );
226
227        let filter = verb_info("F").expect("filter verb should exist");
228        assert_eq!(filter.streaming, VerbStreaming::Streamable);
229        assert_eq!(render_streaming_badge(filter.streaming), None);
230    }
231
232    #[test]
233    fn explicit_verb_registration_is_derived_from_metadata_unit() {
234        let verbs = registered_explicit_verbs();
235        assert!(verbs.contains(&"F"));
236        assert!(verbs.contains(&"JQ"));
237        assert!(!verbs.contains(&"H"));
238        assert!(is_registered_explicit_verb("val"));
239        assert!(!is_registered_explicit_verb("h"));
240    }
241}