1use crate::core::{output_model::Group, row::Row};
2use anyhow::{Result, anyhow};
3use serde_json::{Map, Value};
4
5use crate::dsl::{
6 eval::{
7 flatten::{coalesce_flat_row, flatten_row},
8 matchers::match_row_keys,
9 resolve::evaluate_path,
10 },
11 parse::{
12 key_spec::KeySpec,
13 path::{PathExpression, Selector, parse_path},
14 },
15 stages::common::parse_stage_words,
16};
17
18#[derive(Debug, Clone)]
19struct Pattern {
20 key_spec: KeySpec,
21 path: Option<PathExpression>,
22 dotted: bool,
23}
24
25pub(crate) struct ProjectPlan {
26 keepers: Vec<Pattern>,
27 droppers: Vec<Pattern>,
28}
29
30impl ProjectPlan {
31 pub(crate) fn project_row(&self, row: &Row) -> Vec<Row> {
32 project_single_row(row, &self.keepers, &self.droppers)
33 }
34}
35
36pub(crate) fn compile(spec: &str) -> Result<ProjectPlan> {
37 let (keepers, droppers) = parse_patterns(spec)?;
38 if keepers.is_empty() && droppers.is_empty() {
39 return Err(anyhow!("P requires one or more keys"));
40 }
41
42 Ok(ProjectPlan { keepers, droppers })
43}
44
45pub fn apply(rows: Vec<Row>, spec: &str) -> Result<Vec<Row>> {
46 let plan = compile(spec)?;
47
48 let mut out = Vec::new();
49 for row in rows {
50 out.extend(plan.project_row(&row));
51 }
52 Ok(out)
53}
54
55pub fn apply_groups(groups: Vec<Group>, spec: &str) -> Result<Vec<Group>> {
56 let plan = compile(spec)?;
57
58 let mut out = Vec::with_capacity(groups.len());
59 for group in groups {
60 let mut projected_rows = Vec::new();
61 for row in &group.rows {
62 projected_rows.extend(plan.project_row(row));
63 }
64
65 if !projected_rows.is_empty() || !group.aggregates.is_empty() {
66 out.push(Group {
67 groups: group.groups,
68 aggregates: group.aggregates,
69 rows: projected_rows,
70 });
71 }
72 }
73 Ok(out)
74}
75
76fn parse_patterns(spec: &str) -> Result<(Vec<Pattern>, Vec<Pattern>)> {
77 let trimmed = spec.trim();
78 if trimmed.is_empty() {
79 return Ok((Vec::new(), Vec::new()));
80 }
81
82 let mut keepers = Vec::new();
83 let mut droppers = Vec::new();
84 for token in parse_stage_words(trimmed)? {
85 for chunk in token.split(',') {
86 let text = chunk.trim();
87 if text.is_empty() {
88 continue;
89 }
90
91 let drop = text.starts_with('!');
92 let key_spec = KeySpec::parse(text);
93 let path = parse_path(&key_spec.token).ok();
94 let dotted = key_spec.token.contains('.')
95 || key_spec.token.contains('[')
96 || key_spec.token.contains(']')
97 || path
98 .as_ref()
99 .is_some_and(|expr| expr.absolute || has_selectors(expr));
100
101 let pattern = Pattern {
102 key_spec,
103 path,
104 dotted,
105 };
106
107 if drop {
108 droppers.push(pattern);
109 } else {
110 keepers.push(pattern);
111 }
112 }
113 }
114
115 Ok((keepers, droppers))
116}
117
118fn project_single_row(row: &Row, keepers: &[Pattern], droppers: &[Pattern]) -> Vec<Row> {
119 let flattened = flatten_row(row);
120 let nested = Value::Object(row.clone());
121
122 let mut static_flat = if keepers.is_empty() {
123 flattened.clone()
124 } else {
125 Map::new()
126 };
127 let mut dynamic_columns: Vec<(String, Vec<Value>)> = Vec::new();
128
129 for pattern in keepers {
130 if pattern.dotted && collect_dynamic_column(&nested, &mut dynamic_columns, pattern) {
131 continue;
132 }
133
134 for key in matched_flat_keys(&flattened, pattern) {
135 if let Some(value) = flattened.get(&key) {
136 static_flat.insert(key, value.clone());
137 }
138 }
139 }
140
141 for pattern in droppers {
142 dynamic_columns.retain(|(dynamic_label, _)| !dynamic_label_matches(pattern, dynamic_label));
143
144 for key in matched_flat_keys(&flattened, pattern) {
145 static_flat.remove(&key);
146 }
147 }
148
149 let mut rows = build_rows_from_dynamic(static_flat, dynamic_columns);
150 if rows.is_empty() && keepers.is_empty() {
151 rows.push(coalesce_flat_row(&Map::new()));
152 }
153 rows
154}
155
156fn build_rows_from_dynamic(
157 static_flat: Row,
158 dynamic_columns: Vec<(String, Vec<Value>)>,
159) -> Vec<Row> {
160 if dynamic_columns.is_empty() {
161 if static_flat.is_empty() {
162 return Vec::new();
163 }
164 return vec![coalesce_flat_row(&static_flat)];
165 }
166
167 let row_count = dynamic_columns
168 .iter()
169 .map(|(_, values)| values.len())
170 .max()
171 .unwrap_or(0);
172 if row_count == 0 {
173 return if static_flat.is_empty() {
174 Vec::new()
175 } else {
176 vec![coalesce_flat_row(&static_flat)]
177 };
178 }
179
180 let mut rows = Vec::new();
181 for index in 0..row_count {
182 let mut flat = static_flat.clone();
183 for (label, values) in &dynamic_columns {
184 if let Some(value) = values.get(index) {
185 match value {
186 Value::Object(map) => {
187 for (key, nested_value) in map {
188 flat.insert(key.clone(), nested_value.clone());
189 }
190 }
191 scalar => {
192 flat.insert(label.clone(), scalar.clone());
193 }
194 }
195 } else {
196 flat.insert(label.clone(), Value::Null);
197 }
198 }
199
200 let projected = coalesce_flat_row(&flat);
201 if !projected.is_empty() {
202 rows.push(projected);
203 }
204 }
205
206 rows
207}
208
209fn collect_dynamic_column(
210 nested_row: &Value,
211 dynamic_columns: &mut Vec<(String, Vec<Value>)>,
212 pattern: &Pattern,
213) -> bool {
214 let Some(path) = &pattern.path else {
215 return false;
216 };
217
218 if !has_selectors(path) {
219 return false;
220 }
221
222 let values = evaluate_path(nested_row, path);
223 if values.is_empty() {
224 return false;
225 }
226
227 let label = pattern_label(pattern);
228 dynamic_columns.push((label, values));
229 true
230}
231
232fn matched_flat_keys(flat_row: &Row, pattern: &Pattern) -> Vec<String> {
233 if let Some(path) = &pattern.path
234 && path.absolute
235 && !has_selectors(path)
236 {
237 let exact = flatten_path_without_absolute(path);
238 if exact.is_empty() {
239 return Vec::new();
240 }
241 return flat_row
242 .keys()
243 .filter(|key| *key == &exact)
244 .cloned()
245 .collect();
246 }
247
248 match_row_keys(flat_row, &pattern.key_spec.token, pattern.key_spec.exact)
249 .into_iter()
250 .map(ToOwned::to_owned)
251 .collect()
252}
253
254fn flatten_path_without_absolute(path: &PathExpression) -> String {
255 let mut out = String::new();
256 for (segment_index, segment) in path.segments.iter().enumerate() {
257 if segment_index > 0 {
258 out.push('.');
259 }
260 if let Some(name) = &segment.name {
261 out.push_str(name);
262 } else {
263 return String::new();
264 }
265 for selector in &segment.selectors {
266 match selector {
267 Selector::Index(index) if *index >= 0 => {
268 out.push('[');
269 out.push_str(&index.to_string());
270 out.push(']');
271 }
272 _ => return String::new(),
273 }
274 }
275 }
276 out
277}
278
279fn has_selectors(path: &PathExpression) -> bool {
280 path.segments
281 .iter()
282 .any(|segment| !segment.selectors.is_empty())
283}
284
285fn pattern_label(pattern: &Pattern) -> String {
286 if let Some(path) = &pattern.path
287 && let Some(segment) = path.segments.last()
288 && let Some(name) = &segment.name
289 {
290 return name.clone();
291 }
292
293 let token = pattern.key_spec.token.as_str();
294 let last = token.rsplit('.').next().unwrap_or(token);
295 let head = last.split('[').next().unwrap_or(last);
296 if head.is_empty() {
297 "value".to_string()
298 } else {
299 head.to_string()
300 }
301}
302
303fn dynamic_label_matches(pattern: &Pattern, label: &str) -> bool {
304 if pattern_label(pattern) == label {
305 return true;
306 }
307
308 let mut row = Row::new();
309 row.insert(label.to_string(), Value::Null);
310 !match_row_keys(&row, &pattern.key_spec.token, pattern.key_spec.exact).is_empty()
311}
312
313#[cfg(test)]
314mod tests {
315 use crate::core::output_model::Group;
316 use serde_json::json;
317
318 use super::{apply, apply_groups};
319
320 #[test]
321 fn keeps_requested_columns() {
322 let rows = vec![
323 json!({"uid": "oistes", "cn": "Oistein", "mail": "o@uio.no"})
324 .as_object()
325 .cloned()
326 .expect("object"),
327 ];
328
329 let projected = apply(rows, "uid cn").expect("project should work");
330 assert!(projected[0].contains_key("uid"));
331 assert!(projected[0].contains_key("cn"));
332 assert!(!projected[0].contains_key("mail"));
333 }
334
335 #[test]
336 fn drops_column_with_prefix() {
337 let rows = vec![
338 json!({"uid": "oistes", "status": "active"})
339 .as_object()
340 .cloned()
341 .expect("object"),
342 ];
343
344 let projected = apply(rows, "!status").expect("project should work");
345 assert!(projected[0].contains_key("uid"));
346 assert!(!projected[0].contains_key("status"));
347 }
348
349 #[test]
350 fn supports_selector_fanout() {
351 let rows = vec![
352 json!({
353 "interfaces": [
354 {"mac": "aa:bb"},
355 {"mac": "cc:dd"}
356 ]
357 })
358 .as_object()
359 .cloned()
360 .expect("object"),
361 ];
362
363 let projected = apply(rows, "interfaces[].mac").expect("project should work");
364 assert_eq!(projected.len(), 2);
365 assert_eq!(projected[0].get("mac"), Some(&json!("aa:bb")));
366 assert_eq!(projected[1].get("mac"), Some(&json!("cc:dd")));
367 }
368
369 #[test]
370 fn keeps_all_exact_nested_matches() {
371 let rows = vec![
372 json!({
373 "id": 55753,
374 "txts": {"id": 27994},
375 "ipaddresses": [{"id": 57171}, {"id": 57172}],
376 "metadata": {"asset": {"id": 42}}
377 })
378 .as_object()
379 .cloned()
380 .expect("object"),
381 ];
382
383 let projected = apply(rows, "id").expect("project should work");
384 assert_eq!(
385 projected,
386 vec![
387 json!({
388 "id": 55753,
389 "txts": {"id": 27994},
390 "ipaddresses": [{"id": 57171}, {"id": 57172}],
391 "metadata": {"asset": {"id": 42}}
392 })
393 .as_object()
394 .cloned()
395 .expect("object")
396 ]
397 );
398 }
399
400 #[test]
401 fn absolute_path_projection_keeps_only_exact_nested_key() {
402 let rows = vec![
403 json!({"id": 1, "nested": {"id": 2}, "other": {"id": 3}})
404 .as_object()
405 .cloned()
406 .expect("object"),
407 ];
408
409 let projected = apply(rows, ".nested.id").expect("project should work");
410 assert_eq!(
411 projected,
412 vec![
413 json!({"nested": {"id": 2}})
414 .as_object()
415 .cloned()
416 .expect("object")
417 ]
418 );
419 }
420
421 #[test]
422 fn apply_groups_keeps_aggregate_only_groups_even_when_rows_drop_out() {
423 let groups = vec![Group {
424 groups: json!({"dept": "eng"}).as_object().cloned().expect("object"),
425 aggregates: json!({"count": 2}).as_object().cloned().expect("object"),
426 rows: vec![
427 json!({"uid": "alice"})
428 .as_object()
429 .cloned()
430 .expect("object"),
431 json!({"uid": "bob"}).as_object().cloned().expect("object"),
432 ],
433 }];
434
435 let projected = apply_groups(groups, "missing").expect("group project should work");
436 assert_eq!(projected.len(), 1);
437 assert!(projected[0].rows.is_empty());
438 assert_eq!(projected[0].aggregates.get("count"), Some(&json!(2)));
439 }
440
441 #[test]
442 fn empty_project_spec_is_rejected() {
443 let err = apply(
444 vec![
445 json!({"uid": "alice"})
446 .as_object()
447 .cloned()
448 .expect("object"),
449 ],
450 " ",
451 )
452 .expect_err("empty spec should fail");
453
454 assert!(err.to_string().contains("requires one or more keys"));
455 }
456
457 #[test]
458 fn dropping_dynamic_projection_label_removes_fanout_column() {
459 let rows = vec![
460 json!({
461 "uid": "alice",
462 "interfaces": [{"mac": "aa:bb"}, {"mac": "cc:dd"}]
463 })
464 .as_object()
465 .cloned()
466 .expect("object"),
467 ];
468
469 let projected = apply(rows, "uid interfaces[].mac !mac").expect("project should work");
470 assert_eq!(
471 projected,
472 vec![
473 json!({"uid": "alice"})
474 .as_object()
475 .cloned()
476 .expect("object")
477 ]
478 );
479 }
480}