Skip to main content

osp_cli/dsl/stages/
quick.rs

1use std::collections::HashSet;
2
3use crate::core::row::Row;
4use anyhow::{Result, anyhow};
5use serde_json::Value;
6
7use crate::dsl::{
8    eval::{
9        flatten::{coalesce_flat_row, flatten_row},
10        matchers::{KeyMatches, match_row_keys_detailed, render_value},
11        resolve::{resolve_pairs, resolve_values_truthy},
12    },
13    parse::{
14        key_spec::ExactMode,
15        path::parse_path,
16        quick::{QuickScope, parse_quick_spec},
17    },
18};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21enum MatchMode {
22    Single,
23    Multi,
24}
25
26#[derive(Debug, Clone)]
27struct MatchResult {
28    matched: bool,
29    key_hits: Vec<String>,
30    value_hits: Vec<String>,
31    is_projection: bool,
32    synthetic: Row,
33}
34
35#[derive(Debug, Clone)]
36pub(crate) struct QuickPlan {
37    spec: crate::dsl::parse::quick::QuickSpec,
38}
39
40impl QuickPlan {
41    fn apply_row(&self, row: Row, mode: MatchMode) -> Vec<Row> {
42        apply_row_with_mode(row, &self.spec, mode)
43    }
44}
45
46pub(crate) fn compile(raw_stage: &str) -> Result<QuickPlan> {
47    let spec = parse_quick_spec(raw_stage);
48    if spec.key_spec.token.trim().is_empty() {
49        return Err(anyhow!("quick stage requires a search token"));
50    }
51    Ok(QuickPlan { spec })
52}
53
54pub fn apply(rows: Vec<Row>, raw_stage: &str) -> Result<Vec<Row>> {
55    let plan = compile(raw_stage)?;
56
57    // Quick mode is intentionally dual-purpose:
58    // - multi-row input acts like a row filter
59    // - single-row input acts more like projection/reshaping
60    let mode = if rows.len() > 1 {
61        MatchMode::Multi
62    } else {
63        MatchMode::Single
64    };
65
66    let mut out = Vec::new();
67    for row in rows {
68        out.extend(plan.apply_row(row, mode));
69    }
70
71    Ok(out)
72}
73
74pub(crate) fn stream_rows_with_plan<I>(
75    rows: I,
76    plan: QuickPlan,
77) -> impl Iterator<Item = Result<Row>>
78where
79    I: IntoIterator<Item = Result<Row>>,
80{
81    let mut iter = rows.into_iter();
82    let first = iter.next();
83    let second = iter.next();
84
85    // Quick semantics depend on whether the current payload is a single row or
86    // a multi-row set. A two-row lookahead preserves that "magic" while still
87    // allowing the common multi-row path to continue as a stream.
88    let mode = if second.is_some() {
89        MatchMode::Multi
90    } else {
91        MatchMode::Single
92    };
93
94    let mut seed = Vec::new();
95    if let Some(row) = first {
96        match row {
97            Ok(row) => seed.extend(plan.apply_row(row, mode).into_iter().map(Ok)),
98            Err(err) => seed.push(Err(err)),
99        }
100    }
101    if let Some(row) = second {
102        match row {
103            Ok(row) => seed.extend(plan.apply_row(row, mode).into_iter().map(Ok)),
104            Err(err) => seed.push(Err(err)),
105        }
106    }
107
108    seed.into_iter().chain(iter.flat_map(move |row| {
109        match row {
110            Ok(row) => plan
111                .apply_row(row, mode)
112                .into_iter()
113                .map(Ok)
114                .collect::<Vec<_>>()
115                .into_iter(),
116            Err(err) => vec![Err(err)].into_iter(),
117        }
118    }))
119}
120
121fn apply_row_with_mode(
122    row: Row,
123    spec: &crate::dsl::parse::quick::QuickSpec,
124    mode: MatchMode,
125) -> Vec<Row> {
126    if spec.key_spec.existence {
127        let found = resolve_values_truthy(&row, &spec.key_spec.token, spec.key_spec.exact);
128        let matched = if spec.key_spec.negated { !found } else { found };
129        return if matched { vec![row] } else { Vec::new() };
130    }
131
132    let flat = flatten_row(&row);
133    let (pairs, _) = resolve_pairs(&flat, &spec.key_spec.token);
134    let synthetic = build_synthetic_map(&pairs, &flat);
135    let mut result = match_row(&flat, &pairs, synthetic, spec);
136
137    let keep = match spec.scope {
138        QuickScope::KeyOnly => {
139            if matches!(mode, MatchMode::Multi) {
140                result.matched
141            } else {
142                spec.key_spec.negated || result.matched
143            }
144        }
145        QuickScope::ValueOnly | QuickScope::KeyOrValue => {
146            if matches!(mode, MatchMode::Multi) {
147                result.matched
148            } else {
149                result.matched || spec.key_spec.negated
150            }
151        }
152    };
153
154    if !keep {
155        return Vec::new();
156    }
157
158    if matches!(mode, MatchMode::Multi) && !result.is_projection {
159        return vec![row];
160    }
161
162    transform_row(&flat, &mut result, spec).unwrap_or_default()
163}
164
165fn match_row(
166    flat: &Row,
167    pairs: &[(String, Value)],
168    synthetic: Row,
169    spec: &crate::dsl::parse::quick::QuickSpec,
170) -> MatchResult {
171    let matches = match_row_keys_detailed(flat, &spec.key_spec.token, spec.key_spec.exact);
172    let mut key_hits = prefer_exact_keys(&matches, spec.key_spec.exact);
173    let mut value_hits = Vec::new();
174    let mut seen_values = HashSet::new();
175
176    for (key, value) in pairs {
177        let matched = match value {
178            Value::Array(items) => items
179                .iter()
180                .any(|item| value_matches_token(item, &spec.key_spec.token, spec.key_spec.exact)),
181            scalar => value_matches_token(scalar, &spec.key_spec.token, spec.key_spec.exact),
182        };
183        if matched && seen_values.insert(key.as_str()) {
184            value_hits.push(key.clone());
185        }
186    }
187
188    let mut matched = match spec.scope {
189        QuickScope::KeyOnly => {
190            if spec.key_not_equals {
191                let key_set = key_hits.iter().collect::<HashSet<_>>();
192                flat.keys().any(|key| !key_set.contains(key))
193            } else {
194                !key_hits.is_empty()
195            }
196        }
197        QuickScope::ValueOnly => !value_hits.is_empty() || !synthetic.is_empty(),
198        QuickScope::KeyOrValue => {
199            !key_hits.is_empty() || !value_hits.is_empty() || !synthetic.is_empty()
200        }
201    };
202
203    if spec.key_spec.negated {
204        matched = !matched;
205    }
206
207    let mut is_projection = match spec.scope {
208        QuickScope::ValueOnly | QuickScope::KeyOrValue => !synthetic.is_empty(),
209        QuickScope::KeyOnly => false,
210    };
211
212    if key_hits_match_projection_token(&key_hits, &spec.key_spec.token) {
213        is_projection = true;
214    }
215
216    if is_projection && !synthetic.is_empty() && matches!(spec.scope, QuickScope::KeyOrValue) {
217        key_hits.clear();
218    }
219
220    MatchResult {
221        matched,
222        key_hits,
223        value_hits,
224        is_projection,
225        synthetic,
226    }
227}
228
229fn transform_row(
230    flat: &Row,
231    result: &mut MatchResult,
232    spec: &crate::dsl::parse::quick::QuickSpec,
233) -> Option<Vec<Row>> {
234    let synthetic_keys = result.synthetic.keys().cloned().collect::<Vec<_>>();
235
236    if result.is_projection && !spec.key_spec.negated {
237        if !result.synthetic.is_empty() {
238            let mut rows = Vec::new();
239            let mut keys = result.synthetic.keys().cloned().collect::<Vec<_>>();
240            keys.sort();
241            for key in keys {
242                if let Some(value) = result.synthetic.get(&key) {
243                    let mut projected = Row::new();
244                    projected.insert(key.clone(), value.clone());
245                    let mut coalesced = coalesce_flat_row(&projected);
246                    coalesced = squeeze_single_entry(coalesced);
247                    if !coalesced.is_empty() {
248                        rows.push(coalesced);
249                    }
250                }
251            }
252            if !rows.is_empty() {
253                return Some(rows);
254            }
255        }
256
257        let mut selected = Vec::new();
258        let mut seen = HashSet::new();
259        extend_unique(&mut selected, &mut seen, &result.key_hits);
260        extend_unique(&mut selected, &mut seen, &result.value_hits);
261        extend_unique(&mut selected, &mut seen, &synthetic_keys);
262
263        let mut projected = Row::new();
264        for key in selected {
265            if let Some(value) = flat
266                .get(&key)
267                .cloned()
268                .or_else(|| result.synthetic.get(&key).cloned())
269            {
270                projected.insert(key, value);
271            }
272        }
273        if projected.is_empty() {
274            return None;
275        }
276        return Some(vec![coalesce_flat_row(&projected)]);
277    }
278
279    if spec.key_spec.negated {
280        let mut new_row = flat.clone();
281        let mut new_synthetic = result.synthetic.clone();
282        let keys = union_keys(&result.key_hits, &result.value_hits);
283        for key in keys {
284            if let Some(value) = new_row.get(&key).cloned() {
285                if result.value_hits.contains(&key) {
286                    if let Value::Array(items) = value {
287                        let remaining = items
288                            .into_iter()
289                            .filter(|item| {
290                                !value_matches_token(
291                                    item,
292                                    &spec.key_spec.token,
293                                    spec.key_spec.exact,
294                                )
295                            })
296                            .collect::<Vec<_>>();
297                        if remaining.is_empty() {
298                            new_row.remove(&key);
299                        } else {
300                            new_row.insert(key.clone(), Value::Array(remaining));
301                        }
302                    } else if value_matches_token(&value, &spec.key_spec.token, spec.key_spec.exact)
303                    {
304                        new_row.remove(&key);
305                    }
306                } else if result.key_hits.contains(&key) {
307                    new_row.remove(&key);
308                }
309            } else if let Some(value) = new_synthetic.get(&key).cloned() {
310                if let Value::Array(items) = value {
311                    let remaining = items
312                        .into_iter()
313                        .filter(|item| {
314                            !value_matches_token(item, &spec.key_spec.token, spec.key_spec.exact)
315                        })
316                        .collect::<Vec<_>>();
317                    if remaining.is_empty() {
318                        new_synthetic.remove(&key);
319                    } else {
320                        new_synthetic.insert(key.clone(), Value::Array(remaining));
321                    }
322                } else if value_matches_token(&value, &spec.key_spec.token, spec.key_spec.exact) {
323                    new_synthetic.remove(&key);
324                }
325            }
326        }
327        for (key, value) in new_synthetic {
328            new_row.insert(key, value);
329        }
330        if new_row.is_empty() {
331            return None;
332        }
333        return Some(vec![coalesce_flat_row(&new_row)]);
334    }
335
336    let mut filtered = Row::new();
337    let keys = union_keys(&result.key_hits, &result.value_hits);
338    for key in keys.into_iter().chain(result.synthetic.keys().cloned()) {
339        let Some(value) = flat
340            .get(&key)
341            .cloned()
342            .or_else(|| result.synthetic.get(&key).cloned())
343        else {
344            continue;
345        };
346        if result.value_hits.contains(&key)
347            && let Value::Array(items) = value
348        {
349            let filtered_values = items
350                .into_iter()
351                .filter(|item| value_matches_token(item, &spec.key_spec.token, spec.key_spec.exact))
352                .collect::<Vec<_>>();
353            if filtered_values.is_empty() {
354                continue;
355            }
356            filtered.insert(key.clone(), Value::Array(filtered_values));
357            continue;
358        }
359        filtered.insert(key, value);
360    }
361
362    if filtered.is_empty() {
363        None
364    } else {
365        let mut coalesced = coalesce_flat_row(&filtered);
366        compact_sparse_arrays_in_row(&mut coalesced);
367        Some(vec![coalesced])
368    }
369}
370
371fn build_synthetic_map(pairs: &[(String, Value)], flat: &Row) -> Row {
372    let mut out = Row::new();
373    for (key, value) in pairs {
374        if !flat.contains_key(key) {
375            out.insert(key.clone(), value.clone());
376        }
377    }
378    out
379}
380
381fn prefer_exact_keys(matches: &KeyMatches, _exact: ExactMode) -> Vec<String> {
382    if !matches.exact.is_empty() {
383        matches.exact.clone()
384    } else {
385        matches.partial.clone()
386    }
387}
388
389fn key_hits_match_projection_token(key_hits: &[String], token: &str) -> bool {
390    let mut names = key_hits.iter().filter_map(|key| last_segment_name(key));
391    let Some(first) = names.next() else {
392        return false;
393    };
394
395    if !first.eq_ignore_ascii_case(token) {
396        return false;
397    }
398
399    names.all(|name| name.eq_ignore_ascii_case(&first))
400}
401
402fn extend_unique(out: &mut Vec<String>, seen: &mut HashSet<String>, keys: &[String]) {
403    for key in keys {
404        if seen.insert(key.clone()) {
405            out.push(key.clone());
406        }
407    }
408}
409
410fn union_keys(left: &[String], right: &[String]) -> Vec<String> {
411    let mut out = Vec::new();
412    let mut seen = HashSet::new();
413    extend_unique(&mut out, &mut seen, left);
414    extend_unique(&mut out, &mut seen, right);
415    out
416}
417
418fn value_matches_token(value: &Value, token: &str, exact: ExactMode) -> bool {
419    match exact {
420        ExactMode::CaseSensitive => {
421            if let Value::Array(values) = value {
422                return values
423                    .iter()
424                    .any(|item| value_matches_token(item, token, exact));
425            }
426            render_value(value) == token
427        }
428        ExactMode::CaseInsensitive => {
429            if let Value::Array(values) = value {
430                return values
431                    .iter()
432                    .any(|item| value_matches_token(item, token, exact));
433            }
434            render_value(value).eq_ignore_ascii_case(token)
435        }
436        ExactMode::None => {
437            if let Value::Array(values) = value {
438                return values
439                    .iter()
440                    .any(|item| value_matches_token(item, token, exact));
441            }
442            render_value(value)
443                .to_ascii_lowercase()
444                .contains(&token.to_ascii_lowercase())
445        }
446    }
447}
448
449fn last_segment_name(key: &str) -> Option<String> {
450    if let Ok(path) = parse_path(key)
451        && let Some(segment) = path.segments.last()
452        && let Some(name) = &segment.name
453    {
454        return Some(name.clone());
455    }
456    let last = key.rsplit('.').next().unwrap_or(key);
457    Some(last.split('[').next().unwrap_or(last).to_string())
458}
459
460fn squeeze_single_entry(row: Row) -> Row {
461    if row.len() != 1 {
462        return row;
463    }
464    let (only_key, only_val) = match row.iter().next() {
465        Some((key, value)) => (key.clone(), value.clone()),
466        None => return row,
467    };
468    match only_val {
469        Value::Array(items) => {
470            let cleaned = items
471                .into_iter()
472                .filter(|item| !item.is_null())
473                .collect::<Vec<_>>();
474            if cleaned.len() == 1
475                && let Value::Object(obj) = &cleaned[0]
476            {
477                return obj.clone();
478            }
479            if cleaned.is_empty() {
480                return Row::new();
481            }
482            let mut out = Row::new();
483            out.insert(only_key, Value::Array(cleaned));
484            out
485        }
486        Value::Object(obj) => obj,
487        _ => row,
488    }
489}
490
491fn compact_sparse_arrays_in_row(row: &mut Row) {
492    for value in row.values_mut() {
493        compact_sparse_arrays(value);
494    }
495}
496
497fn compact_sparse_arrays(value: &mut Value) {
498    match value {
499        Value::Array(items) => {
500            for item in items.iter_mut() {
501                compact_sparse_arrays(item);
502            }
503            if items.iter().any(|item| !item.is_null()) {
504                items.retain(|item| !item.is_null());
505            }
506        }
507        Value::Object(map) => {
508            for item in map.values_mut() {
509                compact_sparse_arrays(item);
510            }
511        }
512        _ => {}
513    }
514}
515
516#[cfg(test)]
517mod tests;