Skip to main content

dynoxide/partiql/
executor.rs

1//! PartiQL statement executor.
2//!
3//! Maps parsed PartiQL statements to internal DynamoDB operations.
4
5use crate::errors::{DynoxideError, Result};
6use crate::partiql::parser::{
7    CompOp, PartiqlValue, SetValue, Statement, WhereClause, WhereCondition,
8};
9use crate::storage::Storage;
10use crate::types::{AttributeValue, Item};
11use std::collections::HashMap;
12
13/// Execute a parsed PartiQL statement.
14///
15/// Returns `Some(items)` for SELECT (may be empty), `None` for write operations.
16/// An optional `limit` restricts how many items a SELECT returns.
17pub fn execute(
18    storage: &Storage,
19    stmt: &Statement,
20    parameters: &[AttributeValue],
21    limit: Option<usize>,
22) -> Result<Option<Vec<Item>>> {
23    match stmt {
24        Statement::Select {
25            table_name,
26            projections,
27            where_clause,
28        } => execute_select(
29            storage,
30            table_name,
31            projections,
32            where_clause.as_ref(),
33            parameters,
34            limit,
35        ),
36        Statement::Insert {
37            table_name,
38            item,
39            if_not_exists,
40        } => {
41            execute_insert(storage, table_name, item, parameters, *if_not_exists)?;
42            Ok(None)
43        }
44        Statement::Update {
45            table_name,
46            set_clauses,
47            remove_paths,
48            where_clause,
49        } => {
50            execute_update(
51                storage,
52                table_name,
53                set_clauses,
54                remove_paths,
55                where_clause.as_ref(),
56                parameters,
57            )?;
58            Ok(None)
59        }
60        Statement::Delete {
61            table_name,
62            where_clause,
63        } => {
64            execute_delete(storage, table_name, where_clause.as_ref(), parameters)?;
65            Ok(None)
66        }
67    }
68}
69
70/// Insert a projected value into a result item.
71///
72/// For dotted paths (e.g. `a.b.c`), DynamoDB PartiQL returns the resolved value
73/// keyed by the leaf segment name (`c`), not the full path or reconstructed
74/// nested structure. For simple paths and array index paths, the key is used as-is.
75fn insert_nested_projection(result: &mut Item, path: &str, val: AttributeValue) {
76    let parts: Vec<&str> = path.split('.').collect();
77    // Use the leaf segment as the key
78    let key = parts.last().unwrap();
79    result.insert(key.to_string(), val);
80}
81
82fn execute_select(
83    storage: &Storage,
84    table_name: &str,
85    projections: &[String],
86    where_clause: Option<&WhereClause>,
87    parameters: &[AttributeValue],
88    limit: Option<usize>,
89) -> Result<Option<Vec<Item>>> {
90    let meta = require_table(storage, table_name)?;
91    let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
92
93    // Check for COUNT(*) projection
94    if projections.len() == 1 && projections[0] == "COUNT(*)" {
95        let items = collect_matching_items(
96            storage,
97            table_name,
98            where_clause,
99            parameters,
100            &key_schema,
101            None,
102        )?;
103        let count = items.len();
104        let mut result = HashMap::new();
105        result.insert("Count".to_string(), AttributeValue::N(count.to_string()));
106        return Ok(Some(vec![result]));
107    }
108
109    let items = collect_matching_items(
110        storage,
111        table_name,
112        where_clause,
113        parameters,
114        &key_schema,
115        limit,
116    )?;
117
118    // Apply projections
119    let items = if projections.is_empty() {
120        items
121    } else {
122        items
123            .into_iter()
124            .map(|item| {
125                let mut projected = HashMap::new();
126                for proj in projections {
127                    if let Some(val) = resolve_nested_path(&item, proj) {
128                        insert_nested_projection(&mut projected, proj, val.clone());
129                    }
130                }
131                projected
132            })
133            .collect()
134    };
135
136    Ok(Some(items))
137}
138
139/// Collect items that match the WHERE clause, optionally limited.
140fn collect_matching_items(
141    storage: &Storage,
142    table_name: &str,
143    where_clause: Option<&WhereClause>,
144    parameters: &[AttributeValue],
145    key_schema: &crate::actions::helpers::KeySchema,
146    limit: Option<usize>,
147) -> Result<Vec<Item>> {
148    // Try to use Query if the WHERE clause constrains the partition key
149    let pk_condition = where_clause.and_then(|wc| find_pk_condition(wc, &key_schema.partition_key));
150
151    let items: Vec<Item> = if let Some(pk_cond) = pk_condition {
152        let pk_val = resolve_value(&pk_cond.value, parameters)?;
153        let pk_str = pk_val
154            .to_key_string()
155            .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
156
157        let rows = storage.query_items(table_name, &pk_str, &Default::default())?;
158
159        let iter = rows
160            .into_iter()
161            .filter_map(|(_, _, json)| serde_json::from_str::<Item>(&json).ok())
162            .filter(|item| matches_where(item, where_clause, parameters));
163
164        if let Some(lim) = limit {
165            iter.take(lim).collect()
166        } else {
167            iter.collect()
168        }
169    } else {
170        let rows = storage.scan_items(table_name, &Default::default())?;
171
172        let iter = rows
173            .into_iter()
174            .filter_map(|(_, _, json)| serde_json::from_str::<Item>(&json).ok())
175            .filter(|item| matches_where(item, where_clause, parameters));
176
177        if let Some(lim) = limit {
178            iter.take(lim).collect()
179        } else {
180            iter.collect()
181        }
182    };
183
184    Ok(items)
185}
186
187/// Find a partition key equality condition, searching across all OR groups.
188fn find_pk_condition<'a>(
189    wc: &'a WhereClause,
190    pk_name: &str,
191) -> Option<&'a crate::partiql::parser::Condition> {
192    // Only optimise to a Query when there is a single OR group
193    // (multi-group OR with pk in only one group would need a union approach).
194    if wc.groups.len() == 1 {
195        wc.groups[0].iter().find_map(|c| match c {
196            WhereCondition::Comparison(cond) if cond.path == pk_name && cond.op == CompOp::Eq => {
197                Some(cond)
198            }
199            _ => None,
200        })
201    } else {
202        None
203    }
204}
205
206fn execute_insert(
207    storage: &Storage,
208    table_name: &str,
209    item_template: &HashMap<String, PartiqlValue>,
210    parameters: &[AttributeValue],
211    if_not_exists: bool,
212) -> Result<()> {
213    // Resolve any parameter placeholders in the item
214    let mut item = HashMap::new();
215    for (k, v) in item_template {
216        let resolved = match v {
217            PartiqlValue::Literal(av) => av.clone(),
218            PartiqlValue::Parameter(idx) => parameters.get(*idx).cloned().ok_or_else(|| {
219                DynoxideError::ValidationException(format!(
220                    "Parameter index {idx} out of range (have {} parameters)",
221                    parameters.len()
222                ))
223            })?,
224        };
225        item.insert(k.clone(), resolved);
226    }
227
228    let meta = require_table(storage, table_name)?;
229    let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
230
231    // Validate keys present
232    crate::actions::helpers::validate_item_keys(&item, &key_schema, &meta)?;
233    crate::validation::validate_item_attribute_values(&item)?;
234
235    // Deduplicate sets
236    crate::validation::normalize_item_sets(&mut item);
237
238    // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
239    let (pk, sk) = crate::actions::helpers::extract_key_strings(&item, &key_schema)?;
240
241    // PartiQL INSERT must reject duplicates (unlike PutItem which overwrites)
242    let existing = storage.get_item(table_name, &pk, &sk)?;
243    if existing.is_some() {
244        if if_not_exists {
245            // Silently succeed — no-op
246            return Ok(());
247        }
248        return Err(DynoxideError::DuplicateItemException(
249            "Duplicate primary key exists in table".to_string(),
250        ));
251    }
252
253    let item_json = serde_json::to_string(&item)
254        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
255    let item_size = crate::types::item_size(&item);
256
257    let hash_prefix = item
258        .get(&key_schema.partition_key)
259        .map(crate::storage::compute_hash_prefix)
260        .unwrap_or_default();
261    let old_json =
262        storage.put_item_with_hash(table_name, &pk, &sk, &item_json, item_size, &hash_prefix)?;
263
264    // GSI maintenance
265    let table_sk_attr = key_schema.sort_key.as_deref();
266    let _ = crate::actions::gsi::maintain_gsis_after_write(
267        storage,
268        table_name,
269        &meta,
270        &pk,
271        &sk,
272        &item,
273        &key_schema.partition_key,
274        table_sk_attr,
275    )?;
276
277    // LSI maintenance
278    crate::actions::lsi::maintain_lsis_after_write(
279        storage,
280        table_name,
281        &meta,
282        &pk,
283        &sk,
284        &item,
285        &key_schema.partition_key,
286        table_sk_attr,
287    )?;
288
289    // Stream record
290    let old_item: Option<Item> = old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
291    crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item))?;
292
293    Ok(())
294}
295
296fn execute_update(
297    storage: &Storage,
298    table_name: &str,
299    set_clauses: &[crate::partiql::parser::SetClause],
300    remove_paths: &[String],
301    where_clause: Option<&WhereClause>,
302    parameters: &[AttributeValue],
303) -> Result<()> {
304    let meta = require_table(storage, table_name)?;
305    let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
306
307    // WHERE clause is required for UPDATE to identify the item
308    let wc = where_clause.ok_or_else(|| {
309        DynoxideError::ValidationException("UPDATE requires a WHERE clause".to_string())
310    })?;
311
312    // DynamoDB does not support OR in UPDATE WHERE clauses
313    if wc.groups.len() > 1 {
314        return Err(DynoxideError::ValidationException(
315            "UPDATE does not support OR conditions in WHERE clause".to_string(),
316        ));
317    }
318
319    // Extract partition key from WHERE (must be in first/only group for key lookup)
320    let pk_cond =
321        find_comparison_in_groups(&wc.groups, &key_schema.partition_key).ok_or_else(|| {
322            DynoxideError::ValidationException(
323                "UPDATE WHERE must include partition key equality".to_string(),
324            )
325        })?;
326
327    let pk_val = resolve_value(&pk_cond.value, parameters)?;
328    let pk_str = pk_val
329        .to_key_string()
330        .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
331
332    let sk_str = if let Some(ref sk_name) = key_schema.sort_key {
333        let sk_cond = find_comparison_in_groups(&wc.groups, sk_name);
334        if sk_cond.is_none() {
335            return Err(DynoxideError::ValidationException(
336                "Where clause does not contain a mandatory equality on all key attributes"
337                    .to_string(),
338            ));
339        }
340        sk_cond
341            .map(|c| resolve_value(&c.value, parameters))
342            .transpose()?
343            .and_then(|v| v.to_key_string())
344            .unwrap_or_default()
345    } else {
346        String::new()
347    };
348
349    // Get existing item
350    let existing_json = storage.get_item(table_name, &pk_str, &sk_str)?;
351    let mut item: Item = existing_json
352        .as_ref()
353        .and_then(|j| serde_json::from_str(j).ok())
354        .unwrap_or_default();
355
356    let old_item = item.clone();
357
358    // Apply SET clauses with nested path support
359    for clause in set_clauses {
360        let val = resolve_set_value(&clause.value, &item, parameters)?;
361        set_nested_value(&mut item, &clause.path, val)?;
362    }
363
364    // Apply REMOVE clauses
365    for path in remove_paths {
366        remove_nested_value(&mut item, path);
367    }
368
369    // Ensure keys are present
370    if item.is_empty() {
371        return Ok(());
372    }
373
374    // Validate attribute values after SET clauses applied
375    crate::validation::validate_item_attribute_values(&item)?;
376    crate::validation::normalize_item_sets(&mut item);
377
378    let item_json = serde_json::to_string(&item)
379        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
380    let item_size = crate::types::item_size(&item);
381
382    let hash_prefix = item
383        .get(&key_schema.partition_key)
384        .map(crate::storage::compute_hash_prefix)
385        .unwrap_or_default();
386    storage.put_item_with_hash(
387        table_name,
388        &pk_str,
389        &sk_str,
390        &item_json,
391        item_size,
392        &hash_prefix,
393    )?;
394
395    // GSI maintenance
396    let table_sk_attr = key_schema.sort_key.as_deref();
397    let _ = crate::actions::gsi::maintain_gsis_after_write(
398        storage,
399        table_name,
400        &meta,
401        &pk_str,
402        &sk_str,
403        &item,
404        &key_schema.partition_key,
405        table_sk_attr,
406    )?;
407
408    // LSI maintenance
409    crate::actions::lsi::maintain_lsis_after_write(
410        storage,
411        table_name,
412        &meta,
413        &pk_str,
414        &sk_str,
415        &item,
416        &key_schema.partition_key,
417        table_sk_attr,
418    )?;
419
420    // Stream record
421    let old_ref = if existing_json.is_some() {
422        Some(&old_item)
423    } else {
424        None
425    };
426    crate::streams::record_stream_event(storage, &meta, old_ref, Some(&item))?;
427
428    Ok(())
429}
430
431fn execute_delete(
432    storage: &Storage,
433    table_name: &str,
434    where_clause: Option<&WhereClause>,
435    parameters: &[AttributeValue],
436) -> Result<()> {
437    let meta = require_table(storage, table_name)?;
438    let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
439
440    let wc = where_clause.ok_or_else(|| {
441        DynoxideError::ValidationException("DELETE requires a WHERE clause".to_string())
442    })?;
443
444    // DynamoDB does not support OR in DELETE WHERE clauses
445    if wc.groups.len() > 1 {
446        return Err(DynoxideError::ValidationException(
447            "DELETE does not support OR conditions in WHERE clause".to_string(),
448        ));
449    }
450
451    let pk_cond =
452        find_comparison_in_groups(&wc.groups, &key_schema.partition_key).ok_or_else(|| {
453            DynoxideError::ValidationException(
454                "DELETE WHERE must include partition key equality".to_string(),
455            )
456        })?;
457
458    let pk_val = resolve_value(&pk_cond.value, parameters)?;
459    let pk_str = pk_val
460        .to_key_string()
461        .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
462
463    // I15: Validate that the sort key is present in the WHERE clause if the table has one
464    if let Some(ref sk_name) = key_schema.sort_key {
465        let has_sk_condition = wc.groups.iter().any(|group| {
466            group.iter().any(|c| match c {
467                WhereCondition::Comparison(comp) => comp.path == *sk_name && comp.op == CompOp::Eq,
468                _ => false,
469            })
470        });
471        if !has_sk_condition {
472            return Err(DynoxideError::ValidationException(
473                "Where clause does not contain a mandatory equality on all key attributes"
474                    .to_string(),
475            ));
476        }
477    }
478
479    let sk_str = if let Some(ref sk_name) = key_schema.sort_key {
480        find_comparison_in_groups(&wc.groups, sk_name)
481            .map(|c| resolve_value(&c.value, parameters))
482            .transpose()?
483            .and_then(|v| v.to_key_string())
484            .unwrap_or_default()
485    } else {
486        String::new()
487    };
488
489    let old_json = storage.delete_item(table_name, &pk_str, &sk_str)?;
490
491    // GSI maintenance
492    let _ = crate::actions::gsi::maintain_gsis_after_delete(
493        storage, table_name, &meta, &pk_str, &sk_str,
494    )?;
495
496    // LSI maintenance
497    crate::actions::lsi::maintain_lsis_after_delete(storage, table_name, &meta, &pk_str, &sk_str)?;
498
499    // Stream record
500    let old_item: Option<Item> = old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
501    if old_item.is_some() {
502        crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None)?;
503    }
504
505    Ok(())
506}
507
508// ---------------------------------------------------------------------------
509// Helpers
510// ---------------------------------------------------------------------------
511
512fn require_table(storage: &Storage, table_name: &str) -> Result<crate::storage::TableMetadata> {
513    crate::actions::helpers::require_table(storage, table_name)
514}
515
516/// Find a comparison condition matching a given path with Eq operator,
517/// searching across all OR groups.
518fn find_comparison_in_groups<'a>(
519    groups: &'a [Vec<WhereCondition>],
520    path: &str,
521) -> Option<&'a crate::partiql::parser::Condition> {
522    for group in groups {
523        if let Some(cond) = find_comparison(group, path) {
524            return Some(cond);
525        }
526    }
527    None
528}
529
530/// Find a comparison condition matching a given path with Eq operator.
531fn find_comparison<'a>(
532    conditions: &'a [WhereCondition],
533    path: &str,
534) -> Option<&'a crate::partiql::parser::Condition> {
535    conditions.iter().find_map(|c| match c {
536        WhereCondition::Comparison(cond) if cond.path == path && cond.op == CompOp::Eq => {
537            Some(cond)
538        }
539        _ => None,
540    })
541}
542
543/// Resolve a PartiqlValue to a concrete AttributeValue.
544fn resolve_value(val: &PartiqlValue, parameters: &[AttributeValue]) -> Result<AttributeValue> {
545    match val {
546        PartiqlValue::Literal(av) => Ok(av.clone()),
547        PartiqlValue::Parameter(idx) => parameters.get(*idx).cloned().ok_or_else(|| {
548            DynoxideError::ValidationException(format!(
549                "Parameter index {idx} out of range (have {} parameters)",
550                parameters.len()
551            ))
552        }),
553    }
554}
555
556/// Resolve a SetValue to a concrete AttributeValue, potentially using the current item.
557fn resolve_set_value(
558    val: &SetValue,
559    item: &Item,
560    parameters: &[AttributeValue],
561) -> Result<AttributeValue> {
562    match val {
563        SetValue::Simple(pv) => resolve_value(pv, parameters),
564        SetValue::Add(attr, pv) => {
565            let current = resolve_nested_path(item, attr);
566            let operand = resolve_value(pv, parameters)?;
567            match (current, &operand) {
568                (Some(AttributeValue::N(cur)), AttributeValue::N(add)) => {
569                    use bigdecimal::BigDecimal;
570                    use std::str::FromStr;
571                    let a = BigDecimal::from_str(cur).map_err(|e| {
572                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
573                    })?;
574                    let b = BigDecimal::from_str(add).map_err(|e| {
575                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
576                    })?;
577                    let result = a + b;
578                    Ok(AttributeValue::N(format_bigdecimal(&result)))
579                }
580                (None, AttributeValue::N(_)) => {
581                    // Attribute doesn't exist yet — use the operand value
582                    Ok(operand)
583                }
584                _ => Err(DynoxideError::ValidationException(
585                    "SET expression add requires numeric attribute and operand".to_string(),
586                )),
587            }
588        }
589        SetValue::Sub(attr, pv) => {
590            let current = resolve_nested_path(item, attr);
591            let operand = resolve_value(pv, parameters)?;
592            match (current, &operand) {
593                (Some(AttributeValue::N(cur)), AttributeValue::N(sub)) => {
594                    use bigdecimal::BigDecimal;
595                    use std::str::FromStr;
596                    let a = BigDecimal::from_str(cur).map_err(|e| {
597                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
598                    })?;
599                    let b = BigDecimal::from_str(sub).map_err(|e| {
600                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
601                    })?;
602                    let result = a - b;
603                    Ok(AttributeValue::N(format_bigdecimal(&result)))
604                }
605                (None, AttributeValue::N(sub)) => {
606                    // Attribute doesn't exist yet — treat as 0 - operand
607                    use bigdecimal::BigDecimal;
608                    use std::str::FromStr;
609                    let b = BigDecimal::from_str(sub).map_err(|e| {
610                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
611                    })?;
612                    let result = -b;
613                    Ok(AttributeValue::N(format_bigdecimal(&result)))
614                }
615                _ => Err(DynoxideError::ValidationException(
616                    "SET expression subtract requires numeric attribute and operand".to_string(),
617                )),
618            }
619        }
620        SetValue::ListAppend(first, second) => {
621            let a = resolve_value(first, parameters)?;
622            let b = resolve_value(second, parameters)?;
623            // At least one should be a list. If an attribute name was given,
624            // resolve it from the item.
625            let list_a = match &a {
626                AttributeValue::S(name) => resolve_nested_path(item, name)
627                    .cloned()
628                    .unwrap_or(AttributeValue::L(Vec::new())),
629                other => other.clone(),
630            };
631            let list_b = match &b {
632                AttributeValue::S(name) => resolve_nested_path(item, name)
633                    .cloned()
634                    .unwrap_or(AttributeValue::L(Vec::new())),
635                other => other.clone(),
636            };
637            match (list_a, list_b) {
638                (AttributeValue::L(mut la), AttributeValue::L(lb)) => {
639                    la.extend(lb);
640                    Ok(AttributeValue::L(la))
641                }
642                _ => Err(DynoxideError::ValidationException(
643                    "list_append requires list operands".to_string(),
644                )),
645            }
646        }
647    }
648}
649
650/// Set a value at a potentially nested path (e.g. `address.city`).
651fn set_nested_value(item: &mut Item, path: &str, val: AttributeValue) -> Result<()> {
652    let parts: Vec<&str> = path.split('.').collect();
653    if parts.len() == 1 {
654        item.insert(path.to_string(), val);
655        return Ok(());
656    }
657    // Navigate into nested maps, creating them if needed
658    let mut current = item;
659    for part in &parts[..parts.len() - 1] {
660        let entry = current
661            .entry(part.to_string())
662            .or_insert_with(|| AttributeValue::M(HashMap::new()));
663        match entry {
664            AttributeValue::M(map) => {
665                current = map;
666            }
667            _ => {
668                return Err(DynoxideError::ValidationException(
669                    "The document path provided in the update expression is invalid for update"
670                        .to_string(),
671                ));
672            }
673        }
674    }
675    current.insert(parts.last().unwrap().to_string(), val);
676    Ok(())
677}
678
679/// Remove a value at a potentially nested path (e.g. `address.city`).
680fn remove_nested_value(item: &mut Item, path: &str) {
681    let parts: Vec<&str> = path.split('.').collect();
682    if parts.len() == 1 {
683        item.remove(path);
684        return;
685    }
686    // Navigate into nested maps
687    let mut current = item;
688    for part in &parts[..parts.len() - 1] {
689        match current.get_mut(*part) {
690            Some(AttributeValue::M(map)) => {
691                current = map;
692            }
693            _ => return, // Path doesn't exist or isn't a map — nothing to remove
694        }
695    }
696    current.remove(*parts.last().unwrap());
697}
698
699/// Check if an item matches a WHERE clause (with OR-group support).
700fn matches_where(
701    item: &Item,
702    where_clause: Option<&WhereClause>,
703    parameters: &[AttributeValue],
704) -> bool {
705    let wc = match where_clause {
706        Some(wc) => wc,
707        None => return true,
708    };
709
710    // OR semantics: any group matching is sufficient
711    wc.groups
712        .iter()
713        .any(|group| matches_conditions(item, group, parameters))
714}
715
716/// Check if an item matches all conditions in a group (AND semantics).
717fn matches_conditions(
718    item: &Item,
719    conditions: &[WhereCondition],
720    parameters: &[AttributeValue],
721) -> bool {
722    for cond in conditions {
723        match cond {
724            WhereCondition::Comparison(c) => {
725                let item_val = match resolve_nested_path(item, &c.path) {
726                    Some(v) => v,
727                    None => return false,
728                };
729                let target = match resolve_value(&c.value, parameters) {
730                    Ok(v) => v,
731                    Err(_) => return false,
732                };
733                if !compare_values(item_val, &c.op, &target) {
734                    return false;
735                }
736            }
737            WhereCondition::Exists(path) | WhereCondition::IsNotMissing(path) => {
738                if resolve_nested_path(item, path).is_none() {
739                    return false;
740                }
741            }
742            WhereCondition::NotExists(path) | WhereCondition::IsMissing(path) => {
743                if resolve_nested_path(item, path).is_some() {
744                    return false;
745                }
746            }
747            WhereCondition::BeginsWith(path, prefix_val) => {
748                let item_val = match resolve_nested_path(item, path) {
749                    Some(v) => v,
750                    None => return false,
751                };
752                let prefix = match resolve_value(prefix_val, parameters) {
753                    Ok(v) => v,
754                    Err(_) => return false,
755                };
756                match (item_val, &prefix) {
757                    (AttributeValue::S(s), AttributeValue::S(p)) => {
758                        if !s.starts_with(p.as_str()) {
759                            return false;
760                        }
761                    }
762                    _ => return false,
763                }
764            }
765            WhereCondition::Between(path, low, high) => {
766                let item_val = match resolve_nested_path(item, path) {
767                    Some(v) => v,
768                    None => return false,
769                };
770                let low_val = match resolve_value(low, parameters) {
771                    Ok(v) => v,
772                    Err(_) => return false,
773                };
774                let high_val = match resolve_value(high, parameters) {
775                    Ok(v) => v,
776                    Err(_) => return false,
777                };
778                if !compare_values(item_val, &CompOp::Ge, &low_val)
779                    || !compare_values(item_val, &CompOp::Le, &high_val)
780                {
781                    return false;
782                }
783            }
784            WhereCondition::In(path, values) => {
785                let item_val = match resolve_nested_path(item, path) {
786                    Some(v) => v,
787                    None => return false,
788                };
789                let matched = values.iter().any(|v| {
790                    resolve_value(v, parameters)
791                        .map(|target| compare_values(item_val, &CompOp::Eq, &target))
792                        .unwrap_or(false)
793                });
794                if !matched {
795                    return false;
796                }
797            }
798            WhereCondition::Contains(path, substr_val) => {
799                let item_val = match resolve_nested_path(item, path) {
800                    Some(v) => v,
801                    None => return false,
802                };
803                let substr = match resolve_value(substr_val, parameters) {
804                    Ok(v) => v,
805                    Err(_) => return false,
806                };
807                match (item_val, &substr) {
808                    (AttributeValue::S(s), AttributeValue::S(sub)) => {
809                        if !s.contains(sub.as_str()) {
810                            return false;
811                        }
812                    }
813                    (AttributeValue::SS(set), AttributeValue::S(val)) => {
814                        if !set.contains(val) {
815                            return false;
816                        }
817                    }
818                    (AttributeValue::NS(set), AttributeValue::N(val)) => {
819                        if !set.contains(val) {
820                            return false;
821                        }
822                    }
823                    (AttributeValue::L(list), target) => {
824                        if !list.contains(target) {
825                            return false;
826                        }
827                    }
828                    _ => return false,
829                }
830            }
831        }
832    }
833
834    true
835}
836
837/// Resolve a dotted/indexed path to a nested attribute value.
838///
839/// Supports paths like `"a"`, `"a.b.c"`, and `"a[0].b"`.
840fn resolve_nested_path<'a>(item: &'a Item, path: &str) -> Option<&'a AttributeValue> {
841    // Fast path: no dots or brackets means a simple top-level lookup
842    if !path.contains('.') && !path.contains('[') {
843        return item.get(path);
844    }
845
846    let segments = split_path_segments(path)?;
847    if segments.is_empty() {
848        return None;
849    }
850
851    // First segment must be a map key on the top-level item
852    let mut current = match &segments[0] {
853        PathSegment::Key(k) => item.get(*k)?,
854        PathSegment::Index(_) => return None,
855    };
856
857    for seg in &segments[1..] {
858        current = match seg {
859            PathSegment::Key(k) => match current {
860                AttributeValue::M(map) => map.get(*k)?,
861                _ => return None,
862            },
863            PathSegment::Index(idx) => match current {
864                AttributeValue::L(list) => list.get(*idx)?,
865                _ => return None,
866            },
867        };
868    }
869
870    Some(current)
871}
872
873enum PathSegment<'a> {
874    Key(&'a str),
875    Index(usize),
876}
877
878/// Split a path like `"a.b[0].c"` into segments.
879/// Returns None if the path contains malformed bracket expressions (e.g. `a[xyz]`).
880fn split_path_segments(path: &str) -> Option<Vec<PathSegment<'_>>> {
881    let mut segments = Vec::new();
882    let bytes = path.as_bytes();
883    let mut start = 0;
884    let mut i = 0;
885
886    while i < bytes.len() {
887        match bytes[i] {
888            b'.' => {
889                if start < i {
890                    segments.push(PathSegment::Key(&path[start..i]));
891                }
892                i += 1;
893                start = i;
894            }
895            b'[' => {
896                if start < i {
897                    segments.push(PathSegment::Key(&path[start..i]));
898                }
899                i += 1;
900                let idx_start = i;
901                while i < bytes.len() && bytes[i] != b']' {
902                    i += 1;
903                }
904                let idx = path[idx_start..i].parse::<usize>().ok()?;
905                segments.push(PathSegment::Index(idx));
906                if i < bytes.len() {
907                    i += 1; // skip ']'
908                }
909                start = i;
910                // Skip a trailing dot after ']' (e.g. `a[0].b`)
911                if i < bytes.len() && bytes[i] == b'.' {
912                    i += 1;
913                    start = i;
914                }
915            }
916            _ => {
917                i += 1;
918            }
919        }
920    }
921
922    if start < bytes.len() {
923        segments.push(PathSegment::Key(&path[start..]));
924    }
925
926    Some(segments)
927}
928
929/// Compare two AttributeValues using a comparison operator.
930fn compare_values(left: &AttributeValue, op: &CompOp, right: &AttributeValue) -> bool {
931    match (left, right) {
932        (AttributeValue::S(a), AttributeValue::S(b)) => compare_ord(a, op, b),
933        (AttributeValue::N(a), AttributeValue::N(b)) => {
934            use bigdecimal::BigDecimal;
935            use std::str::FromStr;
936            match (BigDecimal::from_str(a), BigDecimal::from_str(b)) {
937                (Ok(da), Ok(db)) => compare_ord(&da, op, &db),
938                _ => false,
939            }
940        }
941        (AttributeValue::BOOL(a), AttributeValue::BOOL(b)) => match op {
942            CompOp::Eq => a == b,
943            CompOp::Ne => a != b,
944            _ => false,
945        },
946        _ => match op {
947            CompOp::Eq => false,
948            CompOp::Ne => true,
949            _ => false,
950        },
951    }
952}
953
954/// Format a BigDecimal number, stripping unnecessary trailing zeros.
955fn format_bigdecimal(n: &bigdecimal::BigDecimal) -> String {
956    let normalized = n.normalized();
957    if normalized.as_bigint_and_exponent().1 < 0 {
958        normalized.with_scale(0).to_string()
959    } else {
960        normalized.to_string()
961    }
962}
963
964fn compare_ord<T: PartialOrd>(a: &T, op: &CompOp, b: &T) -> bool {
965    match op {
966        CompOp::Eq => a == b,
967        CompOp::Ne => a != b,
968        CompOp::Lt => a < b,
969        CompOp::Le => a <= b,
970        CompOp::Gt => a > b,
971        CompOp::Ge => a >= b,
972    }
973}