1#[cfg(test)]
2use crate::dsl::model::{ParsedStage, ParsedStageKind};
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6pub enum VerbStreaming {
7 Streamable,
9 Conditional,
11 Materializes,
13 Meta,
15}
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub struct VerbInfo {
20 pub verb: &'static str,
22 pub summary: &'static str,
24 pub streaming: VerbStreaming,
26 pub streaming_note: &'static str,
28}
29
30const VERBS: &[VerbInfo] = &[
31 VerbInfo {
32 verb: "F",
33 summary: "Filter rows",
34 streaming: VerbStreaming::Streamable,
35 streaming_note: "row-by-row filter",
36 },
37 VerbInfo {
38 verb: "P",
39 summary: "Project columns",
40 streaming: VerbStreaming::Streamable,
41 streaming_note: "row-by-row projection/fanout",
42 },
43 VerbInfo {
44 verb: "S",
45 summary: "Sort rows",
46 streaming: VerbStreaming::Materializes,
47 streaming_note: "sorting needs the full input",
48 },
49 VerbInfo {
50 verb: "G",
51 summary: "Group rows",
52 streaming: VerbStreaming::Materializes,
53 streaming_note: "grouping needs the full input",
54 },
55 VerbInfo {
56 verb: "A",
57 summary: "Aggregate rows/groups",
58 streaming: VerbStreaming::Materializes,
59 streaming_note: "aggregation needs the full input or full groups",
60 },
61 VerbInfo {
62 verb: "L",
63 summary: "Limit rows",
64 streaming: VerbStreaming::Conditional,
65 streaming_note: "head limits stream; tail/negative forms materialize",
66 },
67 VerbInfo {
68 verb: "Z",
69 summary: "Collapse grouped output",
70 streaming: VerbStreaming::Materializes,
71 streaming_note: "collapse only runs after grouped output exists",
72 },
73 VerbInfo {
74 verb: "C",
75 summary: "Count rows",
76 streaming: VerbStreaming::Materializes,
77 streaming_note: "count needs the full input or full groups",
78 },
79 VerbInfo {
80 verb: "Y",
81 summary: "Mark output for copy",
82 streaming: VerbStreaming::Streamable,
83 streaming_note: "passthrough marker",
84 },
85 VerbInfo {
86 verb: "H",
87 summary: "Show DSL help",
88 streaming: VerbStreaming::Meta,
89 streaming_note: "help stage; not part of data execution",
90 },
91 VerbInfo {
92 verb: "V",
93 summary: "Value-only quick search",
94 streaming: VerbStreaming::Conditional,
95 streaming_note: "flat rows stream with a two-row lookahead; grouped input still materializes",
96 },
97 VerbInfo {
98 verb: "K",
99 summary: "Key-only quick search",
100 streaming: VerbStreaming::Conditional,
101 streaming_note: "flat rows stream with a two-row lookahead; grouped input still materializes",
102 },
103 VerbInfo {
104 verb: "?",
105 summary: "Clean rows / exists filter",
106 streaming: VerbStreaming::Conditional,
107 streaming_note: "flat rows stream; grouped input still materializes",
108 },
109 VerbInfo {
110 verb: "U",
111 summary: "Unroll list field",
112 streaming: VerbStreaming::Streamable,
113 streaming_note: "row-by-row selector fanout",
114 },
115 VerbInfo {
116 verb: "JQ",
117 summary: "Run jq-like expression",
118 streaming: VerbStreaming::Materializes,
119 streaming_note: "jq receives the full current payload",
120 },
121 VerbInfo {
122 verb: "VAL",
123 summary: "Extract values",
124 streaming: VerbStreaming::Streamable,
125 streaming_note: "row-by-row value extraction",
126 },
127 VerbInfo {
128 verb: "VALUE",
129 summary: "Extract values",
130 streaming: VerbStreaming::Streamable,
131 streaming_note: "row-by-row value extraction",
132 },
133];
134
135pub fn registered_verbs() -> &'static [VerbInfo] {
137 VERBS
138}
139
140#[cfg(test)]
141pub fn registered_explicit_verbs() -> Vec<&'static str> {
143 VERBS
144 .iter()
145 .filter(|info| !matches!(info.streaming, VerbStreaming::Meta))
146 .map(|info| info.verb)
147 .collect()
148}
149
150pub fn verb_info(verb: &str) -> Option<&'static VerbInfo> {
152 VERBS
153 .iter()
154 .find(|info| info.verb.eq_ignore_ascii_case(verb))
155}
156
157pub fn is_registered_explicit_verb(verb: &str) -> bool {
159 VERBS
160 .iter()
161 .filter(|info| !matches!(info.streaming, VerbStreaming::Meta))
162 .any(|info| info.verb.eq_ignore_ascii_case(verb))
163}
164
165pub fn render_streaming_badge(streaming: VerbStreaming) -> Option<&'static str> {
167 match streaming {
168 VerbStreaming::Streamable | VerbStreaming::Meta => None,
169 VerbStreaming::Conditional => Some("[conditional]"),
170 VerbStreaming::Materializes => Some("[materializes]"),
171 }
172}
173
174#[cfg(test)]
175pub(crate) fn stage_can_stream_rows(stage: &ParsedStage) -> bool {
176 if !matches!(
177 stage.kind,
178 ParsedStageKind::Quick | ParsedStageKind::Explicit
179 ) {
180 return false;
181 }
182
183 crate::dsl::compiled::CompiledStage::from_parsed(stage)
184 .map(|stage| stage.behavior().can_stream)
185 .unwrap_or(false)
186}
187
188#[cfg(test)]
189mod tests {
190 use crate::dsl::{
191 model::{ParsedStage, ParsedStageKind},
192 verb_info::{
193 VerbStreaming, is_registered_explicit_verb, registered_explicit_verbs,
194 render_streaming_badge, stage_can_stream_rows, verb_info,
195 },
196 };
197
198 #[test]
199 fn stage_streamability_matches_real_barriers_unit() {
200 assert!(stage_can_stream_rows(&ParsedStage::new(
201 ParsedStageKind::Explicit,
202 "F",
203 "uid=alice",
204 "F uid=alice",
205 )));
206 assert!(stage_can_stream_rows(&ParsedStage::new(
207 ParsedStageKind::Explicit,
208 "L",
209 "10 0",
210 "L 10 0",
211 )));
212 assert!(!stage_can_stream_rows(&ParsedStage::new(
213 ParsedStageKind::Explicit,
214 "L",
215 "-2",
216 "L -2",
217 )));
218 assert!(!stage_can_stream_rows(&ParsedStage::new(
219 ParsedStageKind::Explicit,
220 "A",
221 "count",
222 "A count",
223 )));
224 assert!(stage_can_stream_rows(&ParsedStage::new(
225 ParsedStageKind::Quick,
226 "UID",
227 "",
228 "uid",
229 )));
230 }
231
232 #[test]
233 fn verb_metadata_exposes_streaming_annotations_unit() {
234 let aggregate = verb_info("A").expect("aggregate verb should exist");
235 assert_eq!(aggregate.streaming, VerbStreaming::Materializes);
236 assert_eq!(
237 render_streaming_badge(aggregate.streaming),
238 Some("[materializes]")
239 );
240
241 let filter = verb_info("F").expect("filter verb should exist");
242 assert_eq!(filter.streaming, VerbStreaming::Streamable);
243 assert_eq!(render_streaming_badge(filter.streaming), None);
244 }
245
246 #[test]
247 fn explicit_verb_registration_is_derived_from_metadata_unit() {
248 let verbs = registered_explicit_verbs();
249 assert!(verbs.contains(&"F"));
250 assert!(verbs.contains(&"JQ"));
251 assert!(!verbs.contains(&"H"));
252 assert!(is_registered_explicit_verb("val"));
253 assert!(!is_registered_explicit_verb("h"));
254 }
255}