1use crate::dsl::model::{ParsedStage, ParsedStageKind};
2
3#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4pub enum VerbStreaming {
5 Streamable,
6 Conditional,
7 Materializes,
8 Meta,
9}
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub struct VerbInfo {
13 pub verb: &'static str,
14 pub summary: &'static str,
15 pub streaming: VerbStreaming,
16 pub streaming_note: &'static str,
17}
18
19const VERBS: &[VerbInfo] = &[
20 VerbInfo {
21 verb: "F",
22 summary: "Filter rows",
23 streaming: VerbStreaming::Streamable,
24 streaming_note: "row-by-row filter",
25 },
26 VerbInfo {
27 verb: "P",
28 summary: "Project columns",
29 streaming: VerbStreaming::Streamable,
30 streaming_note: "row-by-row projection/fanout",
31 },
32 VerbInfo {
33 verb: "S",
34 summary: "Sort rows",
35 streaming: VerbStreaming::Materializes,
36 streaming_note: "sorting needs the full input",
37 },
38 VerbInfo {
39 verb: "G",
40 summary: "Group rows",
41 streaming: VerbStreaming::Materializes,
42 streaming_note: "grouping needs the full input",
43 },
44 VerbInfo {
45 verb: "A",
46 summary: "Aggregate rows/groups",
47 streaming: VerbStreaming::Materializes,
48 streaming_note: "aggregation needs the full input or full groups",
49 },
50 VerbInfo {
51 verb: "L",
52 summary: "Limit rows",
53 streaming: VerbStreaming::Conditional,
54 streaming_note: "head limits stream; tail/negative forms materialize",
55 },
56 VerbInfo {
57 verb: "Z",
58 summary: "Collapse grouped output",
59 streaming: VerbStreaming::Materializes,
60 streaming_note: "collapse only runs after grouped output exists",
61 },
62 VerbInfo {
63 verb: "C",
64 summary: "Count rows",
65 streaming: VerbStreaming::Materializes,
66 streaming_note: "count needs the full input or full groups",
67 },
68 VerbInfo {
69 verb: "Y",
70 summary: "Mark output for copy",
71 streaming: VerbStreaming::Streamable,
72 streaming_note: "passthrough marker",
73 },
74 VerbInfo {
75 verb: "H",
76 summary: "Show DSL help",
77 streaming: VerbStreaming::Meta,
78 streaming_note: "help stage; not part of data execution",
79 },
80 VerbInfo {
81 verb: "V",
82 summary: "Value-only quick search",
83 streaming: VerbStreaming::Conditional,
84 streaming_note: "flat rows stream with a two-row lookahead; grouped input still materializes",
85 },
86 VerbInfo {
87 verb: "K",
88 summary: "Key-only quick search",
89 streaming: VerbStreaming::Conditional,
90 streaming_note: "flat rows stream with a two-row lookahead; grouped input still materializes",
91 },
92 VerbInfo {
93 verb: "?",
94 summary: "Clean rows / exists filter",
95 streaming: VerbStreaming::Conditional,
96 streaming_note: "flat rows stream; grouped input still materializes",
97 },
98 VerbInfo {
99 verb: "U",
100 summary: "Unroll list field",
101 streaming: VerbStreaming::Streamable,
102 streaming_note: "row-by-row selector fanout",
103 },
104 VerbInfo {
105 verb: "JQ",
106 summary: "Run jq expression",
107 streaming: VerbStreaming::Materializes,
108 streaming_note: "jq receives the full current payload",
109 },
110 VerbInfo {
111 verb: "VAL",
112 summary: "Extract values",
113 streaming: VerbStreaming::Streamable,
114 streaming_note: "row-by-row value extraction",
115 },
116 VerbInfo {
117 verb: "VALUE",
118 summary: "Extract values",
119 streaming: VerbStreaming::Streamable,
120 streaming_note: "row-by-row value extraction",
121 },
122];
123
124pub fn registered_verbs() -> &'static [VerbInfo] {
125 VERBS
126}
127
128pub fn registered_explicit_verbs() -> Vec<&'static str> {
129 VERBS
130 .iter()
131 .filter(|info| !matches!(info.streaming, VerbStreaming::Meta))
132 .map(|info| info.verb)
133 .collect()
134}
135
136pub fn verb_info(verb: &str) -> Option<&'static VerbInfo> {
137 VERBS
138 .iter()
139 .find(|info| info.verb.eq_ignore_ascii_case(verb))
140}
141
142pub fn is_registered_explicit_verb(verb: &str) -> bool {
143 VERBS
144 .iter()
145 .filter(|info| !matches!(info.streaming, VerbStreaming::Meta))
146 .any(|info| info.verb.eq_ignore_ascii_case(verb))
147}
148
149pub fn render_streaming_badge(streaming: VerbStreaming) -> Option<&'static str> {
150 match streaming {
151 VerbStreaming::Streamable | VerbStreaming::Meta => None,
152 VerbStreaming::Conditional => Some("[conditional]"),
153 VerbStreaming::Materializes => Some("[materializes]"),
154 }
155}
156
157pub(crate) fn stage_can_stream_rows(stage: &ParsedStage) -> bool {
158 if matches!(stage.kind, ParsedStageKind::Quick) {
159 return true;
160 }
161 if !matches!(stage.kind, ParsedStageKind::Explicit) {
162 return false;
163 }
164
165 match stage.verb.as_str() {
166 "F" | "P" | "VAL" | "VALUE" | "Y" | "U" | "V" | "K" | "?" => true,
167 "L" => crate::dsl::stages::limit::parse_limit_spec(&stage.spec)
168 .map(|spec| spec.is_head_only())
169 .unwrap_or(false),
170 _ => false,
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use crate::dsl::{
177 model::{ParsedStage, ParsedStageKind},
178 verbs::{
179 VerbStreaming, is_registered_explicit_verb, registered_explicit_verbs,
180 render_streaming_badge, stage_can_stream_rows, verb_info,
181 },
182 };
183
184 #[test]
185 fn stage_streamability_matches_real_barriers_unit() {
186 assert!(stage_can_stream_rows(&ParsedStage::new(
187 ParsedStageKind::Explicit,
188 "F",
189 "uid=alice",
190 "F uid=alice",
191 )));
192 assert!(stage_can_stream_rows(&ParsedStage::new(
193 ParsedStageKind::Explicit,
194 "L",
195 "10 0",
196 "L 10 0",
197 )));
198 assert!(!stage_can_stream_rows(&ParsedStage::new(
199 ParsedStageKind::Explicit,
200 "L",
201 "-2",
202 "L -2",
203 )));
204 assert!(!stage_can_stream_rows(&ParsedStage::new(
205 ParsedStageKind::Explicit,
206 "A",
207 "count",
208 "A count",
209 )));
210 assert!(stage_can_stream_rows(&ParsedStage::new(
211 ParsedStageKind::Quick,
212 "UID",
213 "",
214 "uid",
215 )));
216 }
217
218 #[test]
219 fn verb_metadata_exposes_streaming_annotations_unit() {
220 let aggregate = verb_info("A").expect("aggregate verb should exist");
221 assert_eq!(aggregate.streaming, VerbStreaming::Materializes);
222 assert_eq!(
223 render_streaming_badge(aggregate.streaming),
224 Some("[materializes]")
225 );
226
227 let filter = verb_info("F").expect("filter verb should exist");
228 assert_eq!(filter.streaming, VerbStreaming::Streamable);
229 assert_eq!(render_streaming_badge(filter.streaming), None);
230 }
231
232 #[test]
233 fn explicit_verb_registration_is_derived_from_metadata_unit() {
234 let verbs = registered_explicit_verbs();
235 assert!(verbs.contains(&"F"));
236 assert!(verbs.contains(&"JQ"));
237 assert!(!verbs.contains(&"H"));
238 assert!(is_registered_explicit_verb("val"));
239 assert!(!is_registered_explicit_verb("h"));
240 }
241}