Skip to main content

dynoxide/partiql/
executor.rs

1//! PartiQL statement executor.
2//!
3//! Maps parsed PartiQL statements to internal DynamoDB operations.
4
5use crate::errors::{DynoxideError, Result};
6use crate::partiql::parser::{
7    CompOp, PartiqlValue, SetValue, Statement, WhereClause, WhereCondition,
8};
9use crate::storage::Storage;
10use crate::types::{AttributeValue, Item};
11use std::collections::HashMap;
12
13/// Execute a parsed PartiQL statement.
14///
15/// Returns `Some(items)` for SELECT (may be empty), `None` for write operations.
16/// An optional `limit` restricts how many items a SELECT returns.
17pub fn execute(
18    storage: &Storage,
19    stmt: &Statement,
20    parameters: &[AttributeValue],
21    limit: Option<usize>,
22) -> Result<Option<Vec<Item>>> {
23    match stmt {
24        Statement::Select {
25            table_name,
26            projections,
27            where_clause,
28        } => execute_select(
29            storage,
30            table_name,
31            projections,
32            where_clause.as_ref(),
33            parameters,
34            limit,
35        ),
36        Statement::Insert {
37            table_name,
38            item,
39            if_not_exists,
40        } => {
41            execute_insert(storage, table_name, item, parameters, *if_not_exists)?;
42            Ok(None)
43        }
44        Statement::Update {
45            table_name,
46            set_clauses,
47            remove_paths,
48            where_clause,
49        } => {
50            execute_update(
51                storage,
52                table_name,
53                set_clauses,
54                remove_paths,
55                where_clause.as_ref(),
56                parameters,
57            )?;
58            Ok(None)
59        }
60        Statement::Delete {
61            table_name,
62            where_clause,
63        } => {
64            execute_delete(storage, table_name, where_clause.as_ref(), parameters)?;
65            Ok(None)
66        }
67    }
68}
69
70/// Insert a projected value into a result item.
71///
72/// For dotted paths (e.g. `a.b.c`), DynamoDB PartiQL returns the resolved value
73/// keyed by the leaf segment name (`c`), not the full path or reconstructed
74/// nested structure. For simple paths and array index paths, the key is used as-is.
75fn insert_nested_projection(result: &mut Item, path: &str, val: AttributeValue) {
76    let parts: Vec<&str> = path.split('.').collect();
77    // Use the leaf segment as the key
78    let key = parts.last().unwrap();
79    result.insert(key.to_string(), val);
80}
81
82fn execute_select(
83    storage: &Storage,
84    table_name: &str,
85    projections: &[String],
86    where_clause: Option<&WhereClause>,
87    parameters: &[AttributeValue],
88    limit: Option<usize>,
89) -> Result<Option<Vec<Item>>> {
90    let meta = require_table(storage, table_name)?;
91    let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
92
93    // Check for COUNT(*) projection
94    if projections.len() == 1 && projections[0] == "COUNT(*)" {
95        let items = collect_matching_items(
96            storage,
97            table_name,
98            where_clause,
99            parameters,
100            &key_schema,
101            None,
102        )?;
103        let count = items.len();
104        let mut result = HashMap::new();
105        result.insert("Count".to_string(), AttributeValue::N(count.to_string()));
106        return Ok(Some(vec![result]));
107    }
108
109    let items = collect_matching_items(
110        storage,
111        table_name,
112        where_clause,
113        parameters,
114        &key_schema,
115        limit,
116    )?;
117
118    // Apply projections
119    let items = if projections.is_empty() {
120        items
121    } else {
122        items
123            .into_iter()
124            .map(|item| {
125                let mut projected = HashMap::new();
126                for proj in projections {
127                    if let Some(val) = resolve_nested_path(&item, proj) {
128                        insert_nested_projection(&mut projected, proj, val.clone());
129                    }
130                }
131                projected
132            })
133            .collect()
134    };
135
136    Ok(Some(items))
137}
138
139/// Collect items that match the WHERE clause, optionally limited.
140fn collect_matching_items(
141    storage: &Storage,
142    table_name: &str,
143    where_clause: Option<&WhereClause>,
144    parameters: &[AttributeValue],
145    key_schema: &crate::actions::helpers::KeySchema,
146    limit: Option<usize>,
147) -> Result<Vec<Item>> {
148    // Try to use Query if the WHERE clause constrains the partition key
149    let pk_condition = where_clause.and_then(|wc| find_pk_condition(wc, &key_schema.partition_key));
150
151    let items: Vec<Item> = if let Some(pk_cond) = pk_condition {
152        let pk_val = resolve_value(&pk_cond.value, parameters)?;
153        let pk_str = pk_val
154            .to_key_string()
155            .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
156
157        let rows = storage.query_items(table_name, &pk_str, &Default::default())?;
158
159        let iter = rows
160            .into_iter()
161            .filter_map(|(_, _, json)| serde_json::from_str::<Item>(&json).ok())
162            .filter(|item| matches_where(item, where_clause, parameters));
163
164        if let Some(lim) = limit {
165            iter.take(lim).collect()
166        } else {
167            iter.collect()
168        }
169    } else {
170        let rows = storage.scan_items(table_name, &Default::default())?;
171
172        let iter = rows
173            .into_iter()
174            .filter_map(|(_, _, json)| serde_json::from_str::<Item>(&json).ok())
175            .filter(|item| matches_where(item, where_clause, parameters));
176
177        if let Some(lim) = limit {
178            iter.take(lim).collect()
179        } else {
180            iter.collect()
181        }
182    };
183
184    Ok(items)
185}
186
187/// Find a partition key equality condition, searching across all OR groups.
188fn find_pk_condition<'a>(
189    wc: &'a WhereClause,
190    pk_name: &str,
191) -> Option<&'a crate::partiql::parser::Condition> {
192    // Only optimise to a Query when there is a single OR group
193    // (multi-group OR with pk in only one group would need a union approach).
194    if wc.groups.len() == 1 {
195        wc.groups[0].iter().find_map(|c| match c {
196            WhereCondition::Comparison(cond) if cond.path == pk_name && cond.op == CompOp::Eq => {
197                Some(cond)
198            }
199            _ => None,
200        })
201    } else {
202        None
203    }
204}
205
206fn execute_insert(
207    storage: &Storage,
208    table_name: &str,
209    item_template: &HashMap<String, PartiqlValue>,
210    parameters: &[AttributeValue],
211    if_not_exists: bool,
212) -> Result<()> {
213    // Resolve any parameter placeholders in the item
214    let mut item = HashMap::new();
215    for (k, v) in item_template {
216        let resolved = match v {
217            PartiqlValue::Literal(av) => av.clone(),
218            PartiqlValue::Parameter(idx) => parameters.get(*idx).cloned().ok_or_else(|| {
219                DynoxideError::ValidationException(format!(
220                    "Parameter index {idx} out of range (have {} parameters)",
221                    parameters.len()
222                ))
223            })?,
224        };
225        item.insert(k.clone(), resolved);
226    }
227
228    let meta = require_table(storage, table_name)?;
229    let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
230
231    // Validate keys present
232    crate::actions::helpers::validate_item_keys(&item, &key_schema, &meta)?;
233    crate::validation::validate_item_attribute_values(&item)?;
234
235    // Deduplicate sets
236    crate::validation::normalize_item_sets(&mut item);
237
238    let (pk, sk) = crate::actions::helpers::extract_key_strings(&item, &key_schema)?;
239
240    // PartiQL INSERT must reject duplicates (unlike PutItem which overwrites)
241    let existing = storage.get_item(table_name, &pk, &sk)?;
242    if existing.is_some() {
243        if if_not_exists {
244            // Silently succeed — no-op
245            return Ok(());
246        }
247        return Err(DynoxideError::DuplicateItemException(
248            "Duplicate primary key exists in table".to_string(),
249        ));
250    }
251
252    let item_json = serde_json::to_string(&item)
253        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
254    let item_size = crate::types::item_size(&item);
255
256    let hash_prefix = item
257        .get(&key_schema.partition_key)
258        .map(crate::storage::compute_hash_prefix)
259        .unwrap_or_default();
260    let old_json =
261        storage.put_item_with_hash(table_name, &pk, &sk, &item_json, item_size, &hash_prefix)?;
262
263    // GSI maintenance
264    let table_sk_attr = key_schema.sort_key.as_deref();
265    let _ = crate::actions::gsi::maintain_gsis_after_write(
266        storage,
267        table_name,
268        &meta,
269        &pk,
270        &sk,
271        &item,
272        &key_schema.partition_key,
273        table_sk_attr,
274    )?;
275
276    // LSI maintenance
277    crate::actions::lsi::maintain_lsis_after_write(
278        storage,
279        table_name,
280        &meta,
281        &pk,
282        &sk,
283        &item,
284        &key_schema.partition_key,
285        table_sk_attr,
286    )?;
287
288    // Stream record
289    let old_item: Option<Item> = old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
290    crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item))?;
291
292    Ok(())
293}
294
295fn execute_update(
296    storage: &Storage,
297    table_name: &str,
298    set_clauses: &[crate::partiql::parser::SetClause],
299    remove_paths: &[String],
300    where_clause: Option<&WhereClause>,
301    parameters: &[AttributeValue],
302) -> Result<()> {
303    let meta = require_table(storage, table_name)?;
304    let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
305
306    // WHERE clause is required for UPDATE to identify the item
307    let wc = where_clause.ok_or_else(|| {
308        DynoxideError::ValidationException("UPDATE requires a WHERE clause".to_string())
309    })?;
310
311    // DynamoDB does not support OR in UPDATE WHERE clauses
312    if wc.groups.len() > 1 {
313        return Err(DynoxideError::ValidationException(
314            "UPDATE does not support OR conditions in WHERE clause".to_string(),
315        ));
316    }
317
318    // Extract partition key from WHERE (must be in first/only group for key lookup)
319    let pk_cond =
320        find_comparison_in_groups(&wc.groups, &key_schema.partition_key).ok_or_else(|| {
321            DynoxideError::ValidationException(
322                "UPDATE WHERE must include partition key equality".to_string(),
323            )
324        })?;
325
326    let pk_val = resolve_value(&pk_cond.value, parameters)?;
327    let pk_str = pk_val
328        .to_key_string()
329        .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
330
331    let sk_str = if let Some(ref sk_name) = key_schema.sort_key {
332        let sk_cond = find_comparison_in_groups(&wc.groups, sk_name);
333        if sk_cond.is_none() {
334            return Err(DynoxideError::ValidationException(
335                "Where clause does not contain a mandatory equality on all key attributes"
336                    .to_string(),
337            ));
338        }
339        sk_cond
340            .map(|c| resolve_value(&c.value, parameters))
341            .transpose()?
342            .and_then(|v| v.to_key_string())
343            .unwrap_or_default()
344    } else {
345        String::new()
346    };
347
348    // Get existing item
349    let existing_json = storage.get_item(table_name, &pk_str, &sk_str)?;
350    let mut item: Item = existing_json
351        .as_ref()
352        .and_then(|j| serde_json::from_str(j).ok())
353        .unwrap_or_default();
354
355    let old_item = item.clone();
356
357    // Apply SET clauses with nested path support
358    for clause in set_clauses {
359        let val = resolve_set_value(&clause.value, &item, parameters)?;
360        set_nested_value(&mut item, &clause.path, val)?;
361    }
362
363    // Apply REMOVE clauses
364    for path in remove_paths {
365        remove_nested_value(&mut item, path);
366    }
367
368    // Ensure keys are present
369    if item.is_empty() {
370        return Ok(());
371    }
372
373    // Validate attribute values after SET clauses applied
374    crate::validation::validate_item_attribute_values(&item)?;
375    crate::validation::normalize_item_sets(&mut item);
376
377    let item_json = serde_json::to_string(&item)
378        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
379    let item_size = crate::types::item_size(&item);
380
381    let hash_prefix = item
382        .get(&key_schema.partition_key)
383        .map(crate::storage::compute_hash_prefix)
384        .unwrap_or_default();
385    storage.put_item_with_hash(
386        table_name,
387        &pk_str,
388        &sk_str,
389        &item_json,
390        item_size,
391        &hash_prefix,
392    )?;
393
394    // GSI maintenance
395    let table_sk_attr = key_schema.sort_key.as_deref();
396    let _ = crate::actions::gsi::maintain_gsis_after_write(
397        storage,
398        table_name,
399        &meta,
400        &pk_str,
401        &sk_str,
402        &item,
403        &key_schema.partition_key,
404        table_sk_attr,
405    )?;
406
407    // LSI maintenance
408    crate::actions::lsi::maintain_lsis_after_write(
409        storage,
410        table_name,
411        &meta,
412        &pk_str,
413        &sk_str,
414        &item,
415        &key_schema.partition_key,
416        table_sk_attr,
417    )?;
418
419    // Stream record
420    let old_ref = if existing_json.is_some() {
421        Some(&old_item)
422    } else {
423        None
424    };
425    crate::streams::record_stream_event(storage, &meta, old_ref, Some(&item))?;
426
427    Ok(())
428}
429
430fn execute_delete(
431    storage: &Storage,
432    table_name: &str,
433    where_clause: Option<&WhereClause>,
434    parameters: &[AttributeValue],
435) -> Result<()> {
436    let meta = require_table(storage, table_name)?;
437    let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
438
439    let wc = where_clause.ok_or_else(|| {
440        DynoxideError::ValidationException("DELETE requires a WHERE clause".to_string())
441    })?;
442
443    // DynamoDB does not support OR in DELETE WHERE clauses
444    if wc.groups.len() > 1 {
445        return Err(DynoxideError::ValidationException(
446            "DELETE does not support OR conditions in WHERE clause".to_string(),
447        ));
448    }
449
450    let pk_cond =
451        find_comparison_in_groups(&wc.groups, &key_schema.partition_key).ok_or_else(|| {
452            DynoxideError::ValidationException(
453                "DELETE WHERE must include partition key equality".to_string(),
454            )
455        })?;
456
457    let pk_val = resolve_value(&pk_cond.value, parameters)?;
458    let pk_str = pk_val
459        .to_key_string()
460        .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
461
462    // I15: Validate that the sort key is present in the WHERE clause if the table has one
463    if let Some(ref sk_name) = key_schema.sort_key {
464        let has_sk_condition = wc.groups.iter().any(|group| {
465            group.iter().any(|c| match c {
466                WhereCondition::Comparison(comp) => comp.path == *sk_name && comp.op == CompOp::Eq,
467                _ => false,
468            })
469        });
470        if !has_sk_condition {
471            return Err(DynoxideError::ValidationException(
472                "Where clause does not contain a mandatory equality on all key attributes"
473                    .to_string(),
474            ));
475        }
476    }
477
478    let sk_str = if let Some(ref sk_name) = key_schema.sort_key {
479        find_comparison_in_groups(&wc.groups, sk_name)
480            .map(|c| resolve_value(&c.value, parameters))
481            .transpose()?
482            .and_then(|v| v.to_key_string())
483            .unwrap_or_default()
484    } else {
485        String::new()
486    };
487
488    let old_json = storage.delete_item(table_name, &pk_str, &sk_str)?;
489
490    // GSI maintenance
491    let _ = crate::actions::gsi::maintain_gsis_after_delete(
492        storage, table_name, &meta, &pk_str, &sk_str,
493    )?;
494
495    // LSI maintenance
496    crate::actions::lsi::maintain_lsis_after_delete(storage, table_name, &meta, &pk_str, &sk_str)?;
497
498    // Stream record
499    let old_item: Option<Item> = old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
500    if old_item.is_some() {
501        crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None)?;
502    }
503
504    Ok(())
505}
506
507// ---------------------------------------------------------------------------
508// Helpers
509// ---------------------------------------------------------------------------
510
511fn require_table(storage: &Storage, table_name: &str) -> Result<crate::storage::TableMetadata> {
512    crate::actions::helpers::require_table(storage, table_name)
513}
514
515/// Find a comparison condition matching a given path with Eq operator,
516/// searching across all OR groups.
517fn find_comparison_in_groups<'a>(
518    groups: &'a [Vec<WhereCondition>],
519    path: &str,
520) -> Option<&'a crate::partiql::parser::Condition> {
521    for group in groups {
522        if let Some(cond) = find_comparison(group, path) {
523            return Some(cond);
524        }
525    }
526    None
527}
528
529/// Find a comparison condition matching a given path with Eq operator.
530fn find_comparison<'a>(
531    conditions: &'a [WhereCondition],
532    path: &str,
533) -> Option<&'a crate::partiql::parser::Condition> {
534    conditions.iter().find_map(|c| match c {
535        WhereCondition::Comparison(cond) if cond.path == path && cond.op == CompOp::Eq => {
536            Some(cond)
537        }
538        _ => None,
539    })
540}
541
542/// Resolve a PartiqlValue to a concrete AttributeValue.
543fn resolve_value(val: &PartiqlValue, parameters: &[AttributeValue]) -> Result<AttributeValue> {
544    match val {
545        PartiqlValue::Literal(av) => Ok(av.clone()),
546        PartiqlValue::Parameter(idx) => parameters.get(*idx).cloned().ok_or_else(|| {
547            DynoxideError::ValidationException(format!(
548                "Parameter index {idx} out of range (have {} parameters)",
549                parameters.len()
550            ))
551        }),
552    }
553}
554
555/// Resolve a SetValue to a concrete AttributeValue, potentially using the current item.
556fn resolve_set_value(
557    val: &SetValue,
558    item: &Item,
559    parameters: &[AttributeValue],
560) -> Result<AttributeValue> {
561    match val {
562        SetValue::Simple(pv) => resolve_value(pv, parameters),
563        SetValue::Add(attr, pv) => {
564            let current = resolve_nested_path(item, attr);
565            let operand = resolve_value(pv, parameters)?;
566            match (current, &operand) {
567                (Some(AttributeValue::N(cur)), AttributeValue::N(add)) => {
568                    use bigdecimal::BigDecimal;
569                    use std::str::FromStr;
570                    let a = BigDecimal::from_str(cur).map_err(|e| {
571                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
572                    })?;
573                    let b = BigDecimal::from_str(add).map_err(|e| {
574                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
575                    })?;
576                    let result = a + b;
577                    Ok(AttributeValue::N(format_bigdecimal(&result)))
578                }
579                (None, AttributeValue::N(_)) => {
580                    // Attribute doesn't exist yet — use the operand value
581                    Ok(operand)
582                }
583                _ => Err(DynoxideError::ValidationException(
584                    "SET expression add requires numeric attribute and operand".to_string(),
585                )),
586            }
587        }
588        SetValue::Sub(attr, pv) => {
589            let current = resolve_nested_path(item, attr);
590            let operand = resolve_value(pv, parameters)?;
591            match (current, &operand) {
592                (Some(AttributeValue::N(cur)), AttributeValue::N(sub)) => {
593                    use bigdecimal::BigDecimal;
594                    use std::str::FromStr;
595                    let a = BigDecimal::from_str(cur).map_err(|e| {
596                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
597                    })?;
598                    let b = BigDecimal::from_str(sub).map_err(|e| {
599                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
600                    })?;
601                    let result = a - b;
602                    Ok(AttributeValue::N(format_bigdecimal(&result)))
603                }
604                (None, AttributeValue::N(sub)) => {
605                    // Attribute doesn't exist yet — treat as 0 - operand
606                    use bigdecimal::BigDecimal;
607                    use std::str::FromStr;
608                    let b = BigDecimal::from_str(sub).map_err(|e| {
609                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
610                    })?;
611                    let result = -b;
612                    Ok(AttributeValue::N(format_bigdecimal(&result)))
613                }
614                _ => Err(DynoxideError::ValidationException(
615                    "SET expression subtract requires numeric attribute and operand".to_string(),
616                )),
617            }
618        }
619        SetValue::ListAppend(first, second) => {
620            let a = resolve_value(first, parameters)?;
621            let b = resolve_value(second, parameters)?;
622            // At least one should be a list. If an attribute name was given,
623            // resolve it from the item.
624            let list_a = match &a {
625                AttributeValue::S(name) => resolve_nested_path(item, name)
626                    .cloned()
627                    .unwrap_or(AttributeValue::L(Vec::new())),
628                other => other.clone(),
629            };
630            let list_b = match &b {
631                AttributeValue::S(name) => resolve_nested_path(item, name)
632                    .cloned()
633                    .unwrap_or(AttributeValue::L(Vec::new())),
634                other => other.clone(),
635            };
636            match (list_a, list_b) {
637                (AttributeValue::L(mut la), AttributeValue::L(lb)) => {
638                    la.extend(lb);
639                    Ok(AttributeValue::L(la))
640                }
641                _ => Err(DynoxideError::ValidationException(
642                    "list_append requires list operands".to_string(),
643                )),
644            }
645        }
646    }
647}
648
649/// Set a value at a potentially nested path (e.g. `address.city`).
650fn set_nested_value(item: &mut Item, path: &str, val: AttributeValue) -> Result<()> {
651    let parts: Vec<&str> = path.split('.').collect();
652    if parts.len() == 1 {
653        item.insert(path.to_string(), val);
654        return Ok(());
655    }
656    // Navigate into nested maps, creating them if needed
657    let mut current = item;
658    for part in &parts[..parts.len() - 1] {
659        let entry = current
660            .entry(part.to_string())
661            .or_insert_with(|| AttributeValue::M(HashMap::new()));
662        match entry {
663            AttributeValue::M(map) => {
664                current = map;
665            }
666            _ => {
667                return Err(DynoxideError::ValidationException(
668                    "The document path provided in the update expression is invalid for update"
669                        .to_string(),
670                ));
671            }
672        }
673    }
674    current.insert(parts.last().unwrap().to_string(), val);
675    Ok(())
676}
677
678/// Remove a value at a potentially nested path (e.g. `address.city`).
679fn remove_nested_value(item: &mut Item, path: &str) {
680    let parts: Vec<&str> = path.split('.').collect();
681    if parts.len() == 1 {
682        item.remove(path);
683        return;
684    }
685    // Navigate into nested maps
686    let mut current = item;
687    for part in &parts[..parts.len() - 1] {
688        match current.get_mut(*part) {
689            Some(AttributeValue::M(map)) => {
690                current = map;
691            }
692            _ => return, // Path doesn't exist or isn't a map — nothing to remove
693        }
694    }
695    current.remove(*parts.last().unwrap());
696}
697
698/// Check if an item matches a WHERE clause (with OR-group support).
699fn matches_where(
700    item: &Item,
701    where_clause: Option<&WhereClause>,
702    parameters: &[AttributeValue],
703) -> bool {
704    let wc = match where_clause {
705        Some(wc) => wc,
706        None => return true,
707    };
708
709    // OR semantics: any group matching is sufficient
710    wc.groups
711        .iter()
712        .any(|group| matches_conditions(item, group, parameters))
713}
714
715/// Check if an item matches all conditions in a group (AND semantics).
716fn matches_conditions(
717    item: &Item,
718    conditions: &[WhereCondition],
719    parameters: &[AttributeValue],
720) -> bool {
721    for cond in conditions {
722        match cond {
723            WhereCondition::Comparison(c) => {
724                let item_val = match resolve_nested_path(item, &c.path) {
725                    Some(v) => v,
726                    None => return false,
727                };
728                let target = match resolve_value(&c.value, parameters) {
729                    Ok(v) => v,
730                    Err(_) => return false,
731                };
732                if !compare_values(item_val, &c.op, &target) {
733                    return false;
734                }
735            }
736            WhereCondition::Exists(path) | WhereCondition::IsNotMissing(path) => {
737                if resolve_nested_path(item, path).is_none() {
738                    return false;
739                }
740            }
741            WhereCondition::NotExists(path) | WhereCondition::IsMissing(path) => {
742                if resolve_nested_path(item, path).is_some() {
743                    return false;
744                }
745            }
746            WhereCondition::BeginsWith(path, prefix_val) => {
747                let item_val = match resolve_nested_path(item, path) {
748                    Some(v) => v,
749                    None => return false,
750                };
751                let prefix = match resolve_value(prefix_val, parameters) {
752                    Ok(v) => v,
753                    Err(_) => return false,
754                };
755                match (item_val, &prefix) {
756                    (AttributeValue::S(s), AttributeValue::S(p)) => {
757                        if !s.starts_with(p.as_str()) {
758                            return false;
759                        }
760                    }
761                    _ => return false,
762                }
763            }
764            WhereCondition::Between(path, low, high) => {
765                let item_val = match resolve_nested_path(item, path) {
766                    Some(v) => v,
767                    None => return false,
768                };
769                let low_val = match resolve_value(low, parameters) {
770                    Ok(v) => v,
771                    Err(_) => return false,
772                };
773                let high_val = match resolve_value(high, parameters) {
774                    Ok(v) => v,
775                    Err(_) => return false,
776                };
777                if !compare_values(item_val, &CompOp::Ge, &low_val)
778                    || !compare_values(item_val, &CompOp::Le, &high_val)
779                {
780                    return false;
781                }
782            }
783            WhereCondition::In(path, values) => {
784                let item_val = match resolve_nested_path(item, path) {
785                    Some(v) => v,
786                    None => return false,
787                };
788                let matched = values.iter().any(|v| {
789                    resolve_value(v, parameters)
790                        .map(|target| compare_values(item_val, &CompOp::Eq, &target))
791                        .unwrap_or(false)
792                });
793                if !matched {
794                    return false;
795                }
796            }
797            WhereCondition::Contains(path, substr_val) => {
798                let item_val = match resolve_nested_path(item, path) {
799                    Some(v) => v,
800                    None => return false,
801                };
802                let substr = match resolve_value(substr_val, parameters) {
803                    Ok(v) => v,
804                    Err(_) => return false,
805                };
806                match (item_val, &substr) {
807                    (AttributeValue::S(s), AttributeValue::S(sub)) => {
808                        if !s.contains(sub.as_str()) {
809                            return false;
810                        }
811                    }
812                    (AttributeValue::SS(set), AttributeValue::S(val)) => {
813                        if !set.contains(val) {
814                            return false;
815                        }
816                    }
817                    (AttributeValue::NS(set), AttributeValue::N(val)) => {
818                        if !set.contains(val) {
819                            return false;
820                        }
821                    }
822                    (AttributeValue::L(list), target) => {
823                        if !list.contains(target) {
824                            return false;
825                        }
826                    }
827                    _ => return false,
828                }
829            }
830        }
831    }
832
833    true
834}
835
836/// Resolve a dotted/indexed path to a nested attribute value.
837///
838/// Supports paths like `"a"`, `"a.b.c"`, and `"a[0].b"`.
839fn resolve_nested_path<'a>(item: &'a Item, path: &str) -> Option<&'a AttributeValue> {
840    // Fast path: no dots or brackets means a simple top-level lookup
841    if !path.contains('.') && !path.contains('[') {
842        return item.get(path);
843    }
844
845    let segments = split_path_segments(path)?;
846    if segments.is_empty() {
847        return None;
848    }
849
850    // First segment must be a map key on the top-level item
851    let mut current = match &segments[0] {
852        PathSegment::Key(k) => item.get(*k)?,
853        PathSegment::Index(_) => return None,
854    };
855
856    for seg in &segments[1..] {
857        current = match seg {
858            PathSegment::Key(k) => match current {
859                AttributeValue::M(map) => map.get(*k)?,
860                _ => return None,
861            },
862            PathSegment::Index(idx) => match current {
863                AttributeValue::L(list) => list.get(*idx)?,
864                _ => return None,
865            },
866        };
867    }
868
869    Some(current)
870}
871
872enum PathSegment<'a> {
873    Key(&'a str),
874    Index(usize),
875}
876
877/// Split a path like `"a.b[0].c"` into segments.
878/// Returns None if the path contains malformed bracket expressions (e.g. `a[xyz]`).
879fn split_path_segments(path: &str) -> Option<Vec<PathSegment<'_>>> {
880    let mut segments = Vec::new();
881    let bytes = path.as_bytes();
882    let mut start = 0;
883    let mut i = 0;
884
885    while i < bytes.len() {
886        match bytes[i] {
887            b'.' => {
888                if start < i {
889                    segments.push(PathSegment::Key(&path[start..i]));
890                }
891                i += 1;
892                start = i;
893            }
894            b'[' => {
895                if start < i {
896                    segments.push(PathSegment::Key(&path[start..i]));
897                }
898                i += 1;
899                let idx_start = i;
900                while i < bytes.len() && bytes[i] != b']' {
901                    i += 1;
902                }
903                let idx = path[idx_start..i].parse::<usize>().ok()?;
904                segments.push(PathSegment::Index(idx));
905                if i < bytes.len() {
906                    i += 1; // skip ']'
907                }
908                start = i;
909                // Skip a trailing dot after ']' (e.g. `a[0].b`)
910                if i < bytes.len() && bytes[i] == b'.' {
911                    i += 1;
912                    start = i;
913                }
914            }
915            _ => {
916                i += 1;
917            }
918        }
919    }
920
921    if start < bytes.len() {
922        segments.push(PathSegment::Key(&path[start..]));
923    }
924
925    Some(segments)
926}
927
928/// Compare two AttributeValues using a comparison operator.
929fn compare_values(left: &AttributeValue, op: &CompOp, right: &AttributeValue) -> bool {
930    match (left, right) {
931        (AttributeValue::S(a), AttributeValue::S(b)) => compare_ord(a, op, b),
932        (AttributeValue::N(a), AttributeValue::N(b)) => {
933            use bigdecimal::BigDecimal;
934            use std::str::FromStr;
935            match (BigDecimal::from_str(a), BigDecimal::from_str(b)) {
936                (Ok(da), Ok(db)) => compare_ord(&da, op, &db),
937                _ => false,
938            }
939        }
940        (AttributeValue::BOOL(a), AttributeValue::BOOL(b)) => match op {
941            CompOp::Eq => a == b,
942            CompOp::Ne => a != b,
943            _ => false,
944        },
945        _ => match op {
946            CompOp::Eq => false,
947            CompOp::Ne => true,
948            _ => false,
949        },
950    }
951}
952
953/// Format a BigDecimal number, stripping unnecessary trailing zeros.
954fn format_bigdecimal(n: &bigdecimal::BigDecimal) -> String {
955    let normalized = n.normalized();
956    if normalized.as_bigint_and_exponent().1 < 0 {
957        normalized.with_scale(0).to_string()
958    } else {
959        normalized.to_string()
960    }
961}
962
963fn compare_ord<T: PartialOrd>(a: &T, op: &CompOp, b: &T) -> bool {
964    match op {
965        CompOp::Eq => a == b,
966        CompOp::Ne => a != b,
967        CompOp::Lt => a < b,
968        CompOp::Le => a <= b,
969        CompOp::Gt => a > b,
970        CompOp::Ge => a >= b,
971    }
972}