1#[cfg(test)]
2use crate::dsl::model::{ParsedStage, ParsedStageKind};
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub enum VerbStreaming {
7 Streamable,
9 Conditional,
11 Materializes,
13 Meta,
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub struct VerbInfo {
20 pub verb: &'static str,
22 pub summary: &'static str,
24 pub streaming: VerbStreaming,
26 pub streaming_note: &'static str,
28}
29
30const VERBS: &[VerbInfo] = &[
31 VerbInfo {
32 verb: "F",
33 summary: "Filter rows",
34 streaming: VerbStreaming::Streamable,
35 streaming_note: "row-by-row filter",
36 },
37 VerbInfo {
38 verb: "P",
39 summary: "Project columns",
40 streaming: VerbStreaming::Streamable,
41 streaming_note: "row-by-row projection/fanout",
42 },
43 VerbInfo {
44 verb: "S",
45 summary: "Sort rows",
46 streaming: VerbStreaming::Materializes,
47 streaming_note: "sorting needs the full input",
48 },
49 VerbInfo {
50 verb: "G",
51 summary: "Group rows",
52 streaming: VerbStreaming::Materializes,
53 streaming_note: "grouping needs the full input",
54 },
55 VerbInfo {
56 verb: "A",
57 summary: "Aggregate rows/groups",
58 streaming: VerbStreaming::Materializes,
59 streaming_note: "aggregation needs the full input or full groups",
60 },
61 VerbInfo {
62 verb: "L",
63 summary: "Limit rows",
64 streaming: VerbStreaming::Conditional,
65 streaming_note: "head limits stream; tail/negative forms materialize",
66 },
67 VerbInfo {
68 verb: "Z",
69 summary: "Collapse grouped output",
70 streaming: VerbStreaming::Materializes,
71 streaming_note: "collapse only runs after grouped output exists",
72 },
73 VerbInfo {
74 verb: "C",
75 summary: "Count rows",
76 streaming: VerbStreaming::Materializes,
77 streaming_note: "count needs the full input or full groups",
78 },
79 VerbInfo {
80 verb: "Y",
81 summary: "Mark output for copy",
82 streaming: VerbStreaming::Streamable,
83 streaming_note: "passthrough marker",
84 },
85 VerbInfo {
86 verb: "H",
87 summary: "Show DSL help",
88 streaming: VerbStreaming::Meta,
89 streaming_note: "help stage; not part of data execution",
90 },
91 VerbInfo {
92 verb: "V",
93 summary: "Value-only quick search",
94 streaming: VerbStreaming::Conditional,
95 streaming_note: "flat rows stream with a two-row lookahead; grouped input still materializes",
96 },
97 VerbInfo {
98 verb: "K",
99 summary: "Key-only quick search",
100 streaming: VerbStreaming::Conditional,
101 streaming_note: "flat rows stream with a two-row lookahead; grouped input still materializes",
102 },
103 VerbInfo {
104 verb: "?",
105 summary: "Clean rows / exists filter",
106 streaming: VerbStreaming::Conditional,
107 streaming_note: "flat rows stream; grouped input still materializes",
108 },
109 VerbInfo {
110 verb: "U",
111 summary: "Unroll list field",
112 streaming: VerbStreaming::Streamable,
113 streaming_note: "row-by-row selector fanout",
114 },
115 VerbInfo {
116 verb: "JQ",
117 summary: "Run jq-like expression",
118 streaming: VerbStreaming::Materializes,
119 streaming_note: "jq receives the full current payload",
120 },
121 VerbInfo {
122 verb: "VAL",
123 summary: "Extract values",
124 streaming: VerbStreaming::Streamable,
125 streaming_note: "row-by-row value extraction",
126 },
127 VerbInfo {
128 verb: "VALUE",
129 summary: "Extract values",
130 streaming: VerbStreaming::Streamable,
131 streaming_note: "row-by-row value extraction",
132 },
133];
134
135pub fn registered_verbs() -> &'static [VerbInfo] {
137 VERBS
138}
139
140#[cfg(test)]
141pub fn registered_explicit_verbs() -> Vec<&'static str> {
143 VERBS
144 .iter()
145 .filter(|info| !matches!(info.streaming, VerbStreaming::Meta))
146 .map(|info| info.verb)
147 .collect()
148}
149
150pub fn verb_info(verb: &str) -> Option<&'static VerbInfo> {
152 VERBS
153 .iter()
154 .find(|info| info.verb.eq_ignore_ascii_case(verb))
155}
156
157pub fn is_registered_explicit_verb(verb: &str) -> bool {
159 VERBS
160 .iter()
161 .filter(|info| !matches!(info.streaming, VerbStreaming::Meta))
162 .any(|info| info.verb.eq_ignore_ascii_case(verb))
163}
164
165pub fn render_streaming_badge(streaming: VerbStreaming) -> Option<&'static str> {
167 match streaming {
168 VerbStreaming::Streamable | VerbStreaming::Meta => None,
169 VerbStreaming::Conditional => Some("[conditional]"),
170 VerbStreaming::Materializes => Some("[materializes]"),
171 }
172}
173
174#[cfg(test)]
175pub(crate) fn stage_can_stream_rows(stage: &ParsedStage) -> bool {
176 if matches!(stage.kind, ParsedStageKind::Quick) {
177 return true;
178 }
179 if !matches!(stage.kind, ParsedStageKind::Explicit) {
180 return false;
181 }
182
183 match stage.verb.as_str() {
184 "F" | "P" | "VAL" | "VALUE" | "Y" | "U" | "V" | "K" | "?" => true,
185 "L" => crate::dsl::verbs::limit::parse_limit_spec(&stage.spec)
186 .map(|spec| spec.is_head_only())
187 .unwrap_or(false),
188 _ => false,
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use crate::dsl::{
195 model::{ParsedStage, ParsedStageKind},
196 verb_info::{
197 VerbStreaming, is_registered_explicit_verb, registered_explicit_verbs,
198 render_streaming_badge, stage_can_stream_rows, verb_info,
199 },
200 };
201
202 #[test]
203 fn stage_streamability_matches_real_barriers_unit() {
204 assert!(stage_can_stream_rows(&ParsedStage::new(
205 ParsedStageKind::Explicit,
206 "F",
207 "uid=alice",
208 "F uid=alice",
209 )));
210 assert!(stage_can_stream_rows(&ParsedStage::new(
211 ParsedStageKind::Explicit,
212 "L",
213 "10 0",
214 "L 10 0",
215 )));
216 assert!(!stage_can_stream_rows(&ParsedStage::new(
217 ParsedStageKind::Explicit,
218 "L",
219 "-2",
220 "L -2",
221 )));
222 assert!(!stage_can_stream_rows(&ParsedStage::new(
223 ParsedStageKind::Explicit,
224 "A",
225 "count",
226 "A count",
227 )));
228 assert!(stage_can_stream_rows(&ParsedStage::new(
229 ParsedStageKind::Quick,
230 "UID",
231 "",
232 "uid",
233 )));
234 }
235
236 #[test]
237 fn verb_metadata_exposes_streaming_annotations_unit() {
238 let aggregate = verb_info("A").expect("aggregate verb should exist");
239 assert_eq!(aggregate.streaming, VerbStreaming::Materializes);
240 assert_eq!(
241 render_streaming_badge(aggregate.streaming),
242 Some("[materializes]")
243 );
244
245 let filter = verb_info("F").expect("filter verb should exist");
246 assert_eq!(filter.streaming, VerbStreaming::Streamable);
247 assert_eq!(render_streaming_badge(filter.streaming), None);
248 }
249
250 #[test]
251 fn explicit_verb_registration_is_derived_from_metadata_unit() {
252 let verbs = registered_explicit_verbs();
253 assert!(verbs.contains(&"F"));
254 assert!(verbs.contains(&"JQ"));
255 assert!(!verbs.contains(&"H"));
256 assert!(is_registered_explicit_verb("val"));
257 assert!(!is_registered_explicit_verb("h"));
258 }
259}