1use std::collections::HashSet;
2
3use crate::core::row::Row;
4use anyhow::{Result, anyhow};
5use serde_json::Value;
6
7use crate::dsl::{
8 eval::{
9 flatten::{coalesce_flat_row, flatten_row},
10 matchers::{KeyMatches, match_row_keys_detailed, render_value},
11 resolve::{resolve_pairs, resolve_values_truthy},
12 },
13 parse::{
14 key_spec::ExactMode,
15 path::parse_path,
16 quick::{QuickScope, parse_quick_spec},
17 },
18};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21enum MatchMode {
22 Single,
23 Multi,
24}
25
26#[derive(Debug, Clone)]
27struct MatchResult {
28 matched: bool,
29 key_hits: Vec<String>,
30 value_hits: Vec<String>,
31 is_projection: bool,
32 synthetic: Row,
33}
34
35#[derive(Debug, Clone)]
36pub(crate) struct QuickPlan {
37 spec: crate::dsl::parse::quick::QuickSpec,
38}
39
40impl QuickPlan {
41 fn apply_row(&self, row: Row, mode: MatchMode) -> Vec<Row> {
42 apply_row_with_mode(row, &self.spec, mode)
43 }
44}
45
46pub(crate) fn compile(raw_stage: &str) -> Result<QuickPlan> {
47 let spec = parse_quick_spec(raw_stage);
48 if spec.key_spec.token.trim().is_empty() {
49 return Err(anyhow!("quick stage requires a search token"));
50 }
51 Ok(QuickPlan { spec })
52}
53
54pub fn apply(rows: Vec<Row>, raw_stage: &str) -> Result<Vec<Row>> {
55 let plan = compile(raw_stage)?;
56
57 let mode = if rows.len() > 1 {
61 MatchMode::Multi
62 } else {
63 MatchMode::Single
64 };
65
66 let mut out = Vec::new();
67 for row in rows {
68 out.extend(plan.apply_row(row, mode));
69 }
70
71 Ok(out)
72}
73
74pub(crate) fn stream_rows_with_plan<I>(
75 rows: I,
76 plan: QuickPlan,
77) -> impl Iterator<Item = Result<Row>>
78where
79 I: IntoIterator<Item = Result<Row>>,
80{
81 let mut iter = rows.into_iter();
82 let first = iter.next();
83 let second = iter.next();
84
85 let mode = if second.is_some() {
89 MatchMode::Multi
90 } else {
91 MatchMode::Single
92 };
93
94 let mut seed = Vec::new();
95 if let Some(row) = first {
96 match row {
97 Ok(row) => seed.extend(plan.apply_row(row, mode).into_iter().map(Ok)),
98 Err(err) => seed.push(Err(err)),
99 }
100 }
101 if let Some(row) = second {
102 match row {
103 Ok(row) => seed.extend(plan.apply_row(row, mode).into_iter().map(Ok)),
104 Err(err) => seed.push(Err(err)),
105 }
106 }
107
108 seed.into_iter().chain(iter.flat_map(move |row| {
109 match row {
110 Ok(row) => plan
111 .apply_row(row, mode)
112 .into_iter()
113 .map(Ok)
114 .collect::<Vec<_>>()
115 .into_iter(),
116 Err(err) => vec![Err(err)].into_iter(),
117 }
118 }))
119}
120
121fn apply_row_with_mode(
122 row: Row,
123 spec: &crate::dsl::parse::quick::QuickSpec,
124 mode: MatchMode,
125) -> Vec<Row> {
126 if spec.key_spec.existence {
127 let found = resolve_values_truthy(&row, &spec.key_spec.token, spec.key_spec.exact);
128 let matched = if spec.key_spec.negated { !found } else { found };
129 return if matched { vec![row] } else { Vec::new() };
130 }
131
132 let flat = flatten_row(&row);
133 let (pairs, _) = resolve_pairs(&flat, &spec.key_spec.token);
134 let synthetic = build_synthetic_map(&pairs, &flat);
135 let mut result = match_row(&flat, &pairs, synthetic, spec);
136
137 let keep = match spec.scope {
138 QuickScope::KeyOnly => {
139 if matches!(mode, MatchMode::Multi) {
140 result.matched
141 } else {
142 spec.key_spec.negated || result.matched
143 }
144 }
145 QuickScope::ValueOnly | QuickScope::KeyOrValue => {
146 if matches!(mode, MatchMode::Multi) {
147 result.matched
148 } else {
149 result.matched || spec.key_spec.negated
150 }
151 }
152 };
153
154 if !keep {
155 return Vec::new();
156 }
157
158 if matches!(mode, MatchMode::Multi) && !result.is_projection {
159 return vec![row];
160 }
161
162 transform_row(&flat, &mut result, spec).unwrap_or_default()
163}
164
165fn match_row(
166 flat: &Row,
167 pairs: &[(String, Value)],
168 synthetic: Row,
169 spec: &crate::dsl::parse::quick::QuickSpec,
170) -> MatchResult {
171 let matches = match_row_keys_detailed(flat, &spec.key_spec.token, spec.key_spec.exact);
172 let mut key_hits = prefer_exact_keys(&matches, spec.key_spec.exact);
173 let mut value_hits = Vec::new();
174 let mut seen_values = HashSet::new();
175
176 for (key, value) in pairs {
177 let matched = match value {
178 Value::Array(items) => items
179 .iter()
180 .any(|item| value_matches_token(item, &spec.key_spec.token, spec.key_spec.exact)),
181 scalar => value_matches_token(scalar, &spec.key_spec.token, spec.key_spec.exact),
182 };
183 if matched && seen_values.insert(key.as_str()) {
184 value_hits.push(key.clone());
185 }
186 }
187
188 let mut matched = match spec.scope {
189 QuickScope::KeyOnly => {
190 if spec.key_not_equals {
191 let key_set = key_hits.iter().collect::<HashSet<_>>();
192 flat.keys().any(|key| !key_set.contains(key))
193 } else {
194 !key_hits.is_empty()
195 }
196 }
197 QuickScope::ValueOnly => !value_hits.is_empty() || !synthetic.is_empty(),
198 QuickScope::KeyOrValue => {
199 !key_hits.is_empty() || !value_hits.is_empty() || !synthetic.is_empty()
200 }
201 };
202
203 if spec.key_spec.negated {
204 matched = !matched;
205 }
206
207 let mut is_projection = match spec.scope {
208 QuickScope::ValueOnly | QuickScope::KeyOrValue => !synthetic.is_empty(),
209 QuickScope::KeyOnly => false,
210 };
211
212 if key_hits_match_projection_token(&key_hits, &spec.key_spec.token) {
213 is_projection = true;
214 }
215
216 if is_projection && !synthetic.is_empty() && matches!(spec.scope, QuickScope::KeyOrValue) {
217 key_hits.clear();
218 }
219
220 MatchResult {
221 matched,
222 key_hits,
223 value_hits,
224 is_projection,
225 synthetic,
226 }
227}
228
229fn transform_row(
230 flat: &Row,
231 result: &mut MatchResult,
232 spec: &crate::dsl::parse::quick::QuickSpec,
233) -> Option<Vec<Row>> {
234 let synthetic_keys = result.synthetic.keys().cloned().collect::<Vec<_>>();
235
236 if result.is_projection && !spec.key_spec.negated {
237 if !result.synthetic.is_empty() {
238 let mut rows = Vec::new();
239 let mut keys = result.synthetic.keys().cloned().collect::<Vec<_>>();
240 keys.sort();
241 for key in keys {
242 if let Some(value) = result.synthetic.get(&key) {
243 let mut projected = Row::new();
244 projected.insert(key.clone(), value.clone());
245 let mut coalesced = coalesce_flat_row(&projected);
246 coalesced = squeeze_single_entry(coalesced);
247 if !coalesced.is_empty() {
248 rows.push(coalesced);
249 }
250 }
251 }
252 if !rows.is_empty() {
253 return Some(rows);
254 }
255 }
256
257 let mut selected = Vec::new();
258 let mut seen = HashSet::new();
259 extend_unique(&mut selected, &mut seen, &result.key_hits);
260 extend_unique(&mut selected, &mut seen, &result.value_hits);
261 extend_unique(&mut selected, &mut seen, &synthetic_keys);
262
263 let mut projected = Row::new();
264 for key in selected {
265 if let Some(value) = flat
266 .get(&key)
267 .cloned()
268 .or_else(|| result.synthetic.get(&key).cloned())
269 {
270 projected.insert(key, value);
271 }
272 }
273 if projected.is_empty() {
274 return None;
275 }
276 return Some(vec![coalesce_flat_row(&projected)]);
277 }
278
279 if spec.key_spec.negated {
280 let mut new_row = flat.clone();
281 let mut new_synthetic = result.synthetic.clone();
282 let keys = union_keys(&result.key_hits, &result.value_hits);
283 for key in keys {
284 if let Some(value) = new_row.get(&key).cloned() {
285 if result.value_hits.contains(&key) {
286 if let Value::Array(items) = value {
287 let remaining = items
288 .into_iter()
289 .filter(|item| {
290 !value_matches_token(
291 item,
292 &spec.key_spec.token,
293 spec.key_spec.exact,
294 )
295 })
296 .collect::<Vec<_>>();
297 if remaining.is_empty() {
298 new_row.remove(&key);
299 } else {
300 new_row.insert(key.clone(), Value::Array(remaining));
301 }
302 } else if value_matches_token(&value, &spec.key_spec.token, spec.key_spec.exact)
303 {
304 new_row.remove(&key);
305 }
306 } else if result.key_hits.contains(&key) {
307 new_row.remove(&key);
308 }
309 } else if let Some(value) = new_synthetic.get(&key).cloned() {
310 if let Value::Array(items) = value {
311 let remaining = items
312 .into_iter()
313 .filter(|item| {
314 !value_matches_token(item, &spec.key_spec.token, spec.key_spec.exact)
315 })
316 .collect::<Vec<_>>();
317 if remaining.is_empty() {
318 new_synthetic.remove(&key);
319 } else {
320 new_synthetic.insert(key.clone(), Value::Array(remaining));
321 }
322 } else if value_matches_token(&value, &spec.key_spec.token, spec.key_spec.exact) {
323 new_synthetic.remove(&key);
324 }
325 }
326 }
327 for (key, value) in new_synthetic {
328 new_row.insert(key, value);
329 }
330 if new_row.is_empty() {
331 return None;
332 }
333 return Some(vec![coalesce_flat_row(&new_row)]);
334 }
335
336 let mut filtered = Row::new();
337 let keys = union_keys(&result.key_hits, &result.value_hits);
338 for key in keys.into_iter().chain(result.synthetic.keys().cloned()) {
339 let Some(value) = flat
340 .get(&key)
341 .cloned()
342 .or_else(|| result.synthetic.get(&key).cloned())
343 else {
344 continue;
345 };
346 if result.value_hits.contains(&key)
347 && let Value::Array(items) = value
348 {
349 let filtered_values = items
350 .into_iter()
351 .filter(|item| value_matches_token(item, &spec.key_spec.token, spec.key_spec.exact))
352 .collect::<Vec<_>>();
353 if filtered_values.is_empty() {
354 continue;
355 }
356 filtered.insert(key.clone(), Value::Array(filtered_values));
357 continue;
358 }
359 filtered.insert(key, value);
360 }
361
362 if filtered.is_empty() {
363 None
364 } else {
365 let mut coalesced = coalesce_flat_row(&filtered);
366 compact_sparse_arrays_in_row(&mut coalesced);
367 Some(vec![coalesced])
368 }
369}
370
371fn build_synthetic_map(pairs: &[(String, Value)], flat: &Row) -> Row {
372 let mut out = Row::new();
373 for (key, value) in pairs {
374 if !flat.contains_key(key) {
375 out.insert(key.clone(), value.clone());
376 }
377 }
378 out
379}
380
381fn prefer_exact_keys(matches: &KeyMatches, _exact: ExactMode) -> Vec<String> {
382 if !matches.exact.is_empty() {
383 matches.exact.clone()
384 } else {
385 matches.partial.clone()
386 }
387}
388
389fn key_hits_match_projection_token(key_hits: &[String], token: &str) -> bool {
390 let mut names = key_hits.iter().filter_map(|key| last_segment_name(key));
391 let Some(first) = names.next() else {
392 return false;
393 };
394
395 if !first.eq_ignore_ascii_case(token) {
396 return false;
397 }
398
399 names.all(|name| name.eq_ignore_ascii_case(&first))
400}
401
402fn extend_unique(out: &mut Vec<String>, seen: &mut HashSet<String>, keys: &[String]) {
403 for key in keys {
404 if seen.insert(key.clone()) {
405 out.push(key.clone());
406 }
407 }
408}
409
410fn union_keys(left: &[String], right: &[String]) -> Vec<String> {
411 let mut out = Vec::new();
412 let mut seen = HashSet::new();
413 extend_unique(&mut out, &mut seen, left);
414 extend_unique(&mut out, &mut seen, right);
415 out
416}
417
418fn value_matches_token(value: &Value, token: &str, exact: ExactMode) -> bool {
419 match exact {
420 ExactMode::CaseSensitive => {
421 if let Value::Array(values) = value {
422 return values
423 .iter()
424 .any(|item| value_matches_token(item, token, exact));
425 }
426 render_value(value) == token
427 }
428 ExactMode::CaseInsensitive => {
429 if let Value::Array(values) = value {
430 return values
431 .iter()
432 .any(|item| value_matches_token(item, token, exact));
433 }
434 render_value(value).eq_ignore_ascii_case(token)
435 }
436 ExactMode::None => {
437 if let Value::Array(values) = value {
438 return values
439 .iter()
440 .any(|item| value_matches_token(item, token, exact));
441 }
442 render_value(value)
443 .to_ascii_lowercase()
444 .contains(&token.to_ascii_lowercase())
445 }
446 }
447}
448
449fn last_segment_name(key: &str) -> Option<String> {
450 if let Ok(path) = parse_path(key)
451 && let Some(segment) = path.segments.last()
452 && let Some(name) = &segment.name
453 {
454 return Some(name.clone());
455 }
456 let last = key.rsplit('.').next().unwrap_or(key);
457 Some(last.split('[').next().unwrap_or(last).to_string())
458}
459
460fn squeeze_single_entry(row: Row) -> Row {
461 if row.len() != 1 {
462 return row;
463 }
464 let (only_key, only_val) = match row.iter().next() {
465 Some((key, value)) => (key.clone(), value.clone()),
466 None => return row,
467 };
468 match only_val {
469 Value::Array(items) => {
470 let cleaned = items
471 .into_iter()
472 .filter(|item| !item.is_null())
473 .collect::<Vec<_>>();
474 if cleaned.len() == 1
475 && let Value::Object(obj) = &cleaned[0]
476 {
477 return obj.clone();
478 }
479 if cleaned.is_empty() {
480 return Row::new();
481 }
482 let mut out = Row::new();
483 out.insert(only_key, Value::Array(cleaned));
484 out
485 }
486 Value::Object(obj) => obj,
487 _ => row,
488 }
489}
490
491fn compact_sparse_arrays_in_row(row: &mut Row) {
492 for value in row.values_mut() {
493 compact_sparse_arrays(value);
494 }
495}
496
497fn compact_sparse_arrays(value: &mut Value) {
498 match value {
499 Value::Array(items) => {
500 for item in items.iter_mut() {
501 compact_sparse_arrays(item);
502 }
503 if items.iter().any(|item| !item.is_null()) {
504 items.retain(|item| !item.is_null());
505 }
506 }
507 Value::Object(map) => {
508 for item in map.values_mut() {
509 compact_sparse_arrays(item);
510 }
511 }
512 _ => {}
513 }
514}
515
516#[cfg(test)]
517mod tests;