Skip to main content

dynoxide/partiql/
executor.rs

1//! PartiQL statement executor.
2//!
3//! Maps parsed PartiQL statements to internal DynamoDB operations.
4
5use crate::errors::{DynoxideError, Result};
6use crate::partiql::parser::{
7    CompOp, PartiqlValue, SetValue, Statement, WhereClause, WhereCondition,
8};
9use crate::storage_backend::StorageBackend;
10use crate::types::{AttributeValue, Item};
11use std::collections::HashMap;
12
13/// Execute a parsed PartiQL statement.
14///
15/// Returns `Some(items)` for SELECT (may be empty), `None` for write operations.
16/// An optional `limit` restricts how many items a SELECT returns.
17pub async fn execute<S: StorageBackend>(
18    storage: &S,
19    stmt: &Statement,
20    parameters: &[AttributeValue],
21    limit: Option<usize>,
22) -> Result<Option<Vec<Item>>> {
23    Ok(execute_measured(storage, stmt, parameters, limit).await?.0)
24}
25
26/// Like [`execute`], but also returns the total item byte size the statement
27/// touched, for `ConsumedCapacity` accounting. SELECT reports the summed size of
28/// the rows returned; INSERT/UPDATE/DELETE report the affected item's size (0
29/// when the statement was a no-op, e.g. a missing DELETE target).
30pub async fn execute_measured<S: StorageBackend>(
31    storage: &S,
32    stmt: &Statement,
33    parameters: &[AttributeValue],
34    limit: Option<usize>,
35) -> Result<(Option<Vec<Item>>, usize)> {
36    match stmt {
37        Statement::Select {
38            table_name,
39            projections,
40            where_clause,
41        } => {
42            let items = execute_select(
43                storage,
44                table_name,
45                projections,
46                where_clause.as_ref(),
47                parameters,
48                limit,
49            )
50            .await?;
51            let size = items
52                .as_ref()
53                .map(|rows| rows.iter().map(crate::types::item_size).sum())
54                .unwrap_or(0);
55            Ok((items, size))
56        }
57        Statement::Insert {
58            table_name,
59            item,
60            if_not_exists,
61        } => {
62            let size =
63                execute_insert(storage, table_name, item, parameters, *if_not_exists).await?;
64            Ok((None, size))
65        }
66        Statement::Update {
67            table_name,
68            set_clauses,
69            remove_paths,
70            where_clause,
71        } => {
72            let size = execute_update(
73                storage,
74                table_name,
75                set_clauses,
76                remove_paths,
77                where_clause.as_ref(),
78                parameters,
79            )
80            .await?;
81            Ok((None, size))
82        }
83        Statement::Delete {
84            table_name,
85            where_clause,
86        } => {
87            let size =
88                execute_delete(storage, table_name, where_clause.as_ref(), parameters).await?;
89            Ok((None, size))
90        }
91    }
92}
93
94/// Insert a projected value into a result item.
95///
96/// For dotted paths (e.g. `a.b.c`), DynamoDB PartiQL returns the resolved value
97/// keyed by the leaf segment name (`c`), not the full path or reconstructed
98/// nested structure. For simple paths and array index paths, the key is used as-is.
99fn insert_nested_projection(result: &mut Item, path: &str, val: AttributeValue) {
100    let parts: Vec<&str> = path.split('.').collect();
101    // Use the leaf segment as the key
102    let key = parts.last().unwrap();
103    result.insert(key.to_string(), val);
104}
105
106async fn execute_select<S: StorageBackend>(
107    storage: &S,
108    table_name: &str,
109    projections: &[String],
110    where_clause: Option<&WhereClause>,
111    parameters: &[AttributeValue],
112    limit: Option<usize>,
113) -> Result<Option<Vec<Item>>> {
114    let meta = require_table(storage, table_name).await?;
115    let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
116
117    // Check for COUNT(*) projection
118    if projections.len() == 1 && projections[0] == "COUNT(*)" {
119        let items = collect_matching_items(
120            storage,
121            table_name,
122            where_clause,
123            parameters,
124            &key_schema,
125            None,
126        )
127        .await?;
128        let count = items.len();
129        let mut result = HashMap::new();
130        result.insert("Count".to_string(), AttributeValue::N(count.to_string()));
131        return Ok(Some(vec![result]));
132    }
133
134    let items = collect_matching_items(
135        storage,
136        table_name,
137        where_clause,
138        parameters,
139        &key_schema,
140        limit,
141    )
142    .await?;
143
144    // Apply projections
145    let items = if projections.is_empty() {
146        items
147    } else {
148        items
149            .into_iter()
150            .map(|item| {
151                let mut projected = HashMap::new();
152                for proj in projections {
153                    if let Some(val) = resolve_nested_path(&item, proj) {
154                        insert_nested_projection(&mut projected, proj, val.clone());
155                    }
156                }
157                projected
158            })
159            .collect()
160    };
161
162    Ok(Some(items))
163}
164
165/// Collect items that match the WHERE clause, optionally limited.
166async fn collect_matching_items<S: StorageBackend>(
167    storage: &S,
168    table_name: &str,
169    where_clause: Option<&WhereClause>,
170    parameters: &[AttributeValue],
171    key_schema: &crate::actions::helpers::KeySchema,
172    limit: Option<usize>,
173) -> Result<Vec<Item>> {
174    // Try to use Query if the WHERE clause constrains the partition key
175    let pk_condition = where_clause.and_then(|wc| find_pk_condition(wc, &key_schema.partition_key));
176
177    let items: Vec<Item> = if let Some(pk_cond) = pk_condition {
178        let pk_val = resolve_value(&pk_cond.value, parameters)?;
179        let pk_str = pk_val
180            .to_key_string()
181            .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
182
183        let rows = storage
184            .query_items(table_name, &pk_str, &Default::default())
185            .await?;
186
187        let iter = rows
188            .into_iter()
189            .filter_map(|(_, _, json)| serde_json::from_str::<Item>(&json).ok())
190            .filter(|item| matches_where(item, where_clause, parameters));
191
192        if let Some(lim) = limit {
193            iter.take(lim).collect()
194        } else {
195            iter.collect()
196        }
197    } else {
198        let rows = storage.scan_items(table_name, &Default::default()).await?;
199
200        let iter = rows
201            .into_iter()
202            .filter_map(|(_, _, json)| serde_json::from_str::<Item>(&json).ok())
203            .filter(|item| matches_where(item, where_clause, parameters));
204
205        if let Some(lim) = limit {
206            iter.take(lim).collect()
207        } else {
208            iter.collect()
209        }
210    };
211
212    Ok(items)
213}
214
215/// Find a partition key equality condition, searching across all OR groups.
216fn find_pk_condition<'a>(
217    wc: &'a WhereClause,
218    pk_name: &str,
219) -> Option<&'a crate::partiql::parser::Condition> {
220    // Only optimise to a Query when there is a single OR group
221    // (multi-group OR with pk in only one group would need a union approach).
222    if wc.groups.len() == 1 {
223        wc.groups[0].iter().find_map(|c| match c {
224            WhereCondition::Comparison(cond) if cond.path == pk_name && cond.op == CompOp::Eq => {
225                Some(cond)
226            }
227            _ => None,
228        })
229    } else {
230        None
231    }
232}
233
234/// Returns the inserted item's size in bytes (0 when an `if_not_exists`
235/// duplicate makes the insert a no-op), for `ConsumedCapacity` accounting.
236async fn execute_insert<S: StorageBackend>(
237    storage: &S,
238    table_name: &str,
239    item_template: &HashMap<String, PartiqlValue>,
240    parameters: &[AttributeValue],
241    if_not_exists: bool,
242) -> Result<usize> {
243    // Resolve any parameter placeholders in the item
244    let mut item = HashMap::new();
245    for (k, v) in item_template {
246        let resolved = match v {
247            PartiqlValue::Literal(av) => av.clone(),
248            PartiqlValue::Parameter(idx) => parameters.get(*idx).cloned().ok_or_else(|| {
249                DynoxideError::ValidationException(format!(
250                    "Parameter index {idx} out of range (have {} parameters)",
251                    parameters.len()
252                ))
253            })?,
254        };
255        item.insert(k.clone(), resolved);
256    }
257
258    let meta = require_table(storage, table_name).await?;
259    let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
260
261    // Validate keys present
262    crate::actions::helpers::validate_item_keys(&item, &key_schema, &meta)?;
263    crate::validation::validate_item_attribute_values(&item)?;
264
265    // Deduplicate sets
266    crate::validation::normalize_item_sets(&mut item);
267
268    // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
269    let (pk, sk) = crate::actions::helpers::extract_key_strings(&item, &key_schema)?;
270
271    // PartiQL INSERT must reject duplicates (unlike PutItem which overwrites)
272    let existing = storage.get_item(table_name, &pk, &sk).await?;
273    if existing.is_some() {
274        if if_not_exists {
275            // Silently succeed — no-op
276            return Ok(0);
277        }
278        return Err(DynoxideError::DuplicateItemException(
279            "Duplicate primary key exists in table".to_string(),
280        ));
281    }
282
283    let item_json = serde_json::to_string(&item)
284        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
285    let item_size = crate::types::item_size(&item);
286
287    let hash_prefix = item
288        .get(&key_schema.partition_key)
289        .map(crate::storage::compute_hash_prefix)
290        .unwrap_or_default();
291    let old_json = storage
292        .put_item_with_hash(table_name, &pk, &sk, &item_json, item_size, &hash_prefix)
293        .await?;
294
295    // GSI maintenance
296    let table_sk_attr = key_schema.sort_key.as_deref();
297    let _ = crate::actions::gsi::maintain_gsis_after_write(
298        storage,
299        table_name,
300        &meta,
301        &pk,
302        &sk,
303        &item,
304        &key_schema.partition_key,
305        table_sk_attr,
306    )
307    .await?;
308
309    // LSI maintenance
310    crate::actions::lsi::maintain_lsis_after_write(
311        storage,
312        table_name,
313        &meta,
314        &pk,
315        &sk,
316        &item,
317        &key_schema.partition_key,
318        table_sk_attr,
319    )
320    .await?;
321
322    // Stream record
323    let old_item: Option<Item> = old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
324    crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item)).await?;
325
326    Ok(item_size)
327}
328
329/// Returns the updated item's new size in bytes (0 when the update resolves to
330/// an empty item and is skipped), for `ConsumedCapacity` accounting.
331async fn execute_update<S: StorageBackend>(
332    storage: &S,
333    table_name: &str,
334    set_clauses: &[crate::partiql::parser::SetClause],
335    remove_paths: &[String],
336    where_clause: Option<&WhereClause>,
337    parameters: &[AttributeValue],
338) -> Result<usize> {
339    let meta = require_table(storage, table_name).await?;
340    let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
341
342    // WHERE clause is required for UPDATE to identify the item
343    let wc = where_clause.ok_or_else(|| {
344        DynoxideError::ValidationException("UPDATE requires a WHERE clause".to_string())
345    })?;
346
347    // DynamoDB does not support OR in UPDATE WHERE clauses
348    if wc.groups.len() > 1 {
349        return Err(DynoxideError::ValidationException(
350            "UPDATE does not support OR conditions in WHERE clause".to_string(),
351        ));
352    }
353
354    // Extract partition key from WHERE (must be in first/only group for key lookup)
355    let pk_cond =
356        find_comparison_in_groups(&wc.groups, &key_schema.partition_key).ok_or_else(|| {
357            DynoxideError::ValidationException(
358                "Where clause does not contain a mandatory equality on all key attributes"
359                    .to_string(),
360            )
361        })?;
362
363    let pk_val = resolve_value(&pk_cond.value, parameters)?;
364    let pk_str = pk_val
365        .to_key_string()
366        .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
367
368    let sk_str = if let Some(ref sk_name) = key_schema.sort_key {
369        let sk_cond = find_comparison_in_groups(&wc.groups, sk_name);
370        if sk_cond.is_none() {
371            return Err(DynoxideError::ValidationException(
372                "Where clause does not contain a mandatory equality on all key attributes"
373                    .to_string(),
374            ));
375        }
376        sk_cond
377            .map(|c| resolve_value(&c.value, parameters))
378            .transpose()?
379            .and_then(|v| v.to_key_string())
380            .unwrap_or_default()
381    } else {
382        String::new()
383    };
384
385    // Get existing item
386    let existing_json = storage.get_item(table_name, &pk_str, &sk_str).await?;
387    let mut item: Item = existing_json
388        .as_ref()
389        .and_then(|j| serde_json::from_str(j).ok())
390        .unwrap_or_default();
391
392    let old_item = item.clone();
393
394    // Non-key WHERE predicates act as a condition on the existing item, like a
395    // conditional write. When the item exists but the condition is false, AWS
396    // raises ConditionalCheckFailedException; a missing item is not a condition
397    // failure and falls through to the existing create/no-op behaviour below.
398    if existing_json.is_some() && !matches_where(&old_item, where_clause, parameters) {
399        return Err(DynoxideError::ConditionalCheckFailedException(
400            "The conditional request failed".to_string(),
401            None,
402        ));
403    }
404
405    let before_item = item.clone();
406
407    // Apply SET clauses with nested path support
408    for clause in set_clauses {
409        let val = resolve_set_value(&clause.value, &item, parameters)?;
410        set_nested_value(&mut item, &clause.path, val)?;
411    }
412
413    // Apply REMOVE clauses
414    for path in remove_paths {
415        remove_nested_value(&mut item, path);
416    }
417
418    // Ensure keys are present
419    if item.is_empty() {
420        return Ok(0);
421    }
422
423    // Validate attribute values after SET clauses applied
424    crate::validation::validate_item_attribute_values(&item)?;
425    crate::validation::normalize_item_sets(&mut item);
426
427    // Reject an index key this update set to an invalid value (see helpers).
428    crate::actions::helpers::validate_updated_index_keys(&before_item, &item, &meta)?;
429
430    let item_json = serde_json::to_string(&item)
431        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
432    let item_size = crate::types::item_size(&item);
433
434    let hash_prefix = item
435        .get(&key_schema.partition_key)
436        .map(crate::storage::compute_hash_prefix)
437        .unwrap_or_default();
438    storage
439        .put_item_with_hash(
440            table_name,
441            &pk_str,
442            &sk_str,
443            &item_json,
444            item_size,
445            &hash_prefix,
446        )
447        .await?;
448
449    // GSI maintenance
450    let table_sk_attr = key_schema.sort_key.as_deref();
451    let _ = crate::actions::gsi::maintain_gsis_after_write(
452        storage,
453        table_name,
454        &meta,
455        &pk_str,
456        &sk_str,
457        &item,
458        &key_schema.partition_key,
459        table_sk_attr,
460    )
461    .await?;
462
463    // LSI maintenance
464    crate::actions::lsi::maintain_lsis_after_write(
465        storage,
466        table_name,
467        &meta,
468        &pk_str,
469        &sk_str,
470        &item,
471        &key_schema.partition_key,
472        table_sk_attr,
473    )
474    .await?;
475
476    // Stream record
477    let old_ref = if existing_json.is_some() {
478        Some(&old_item)
479    } else {
480        None
481    };
482    crate::streams::record_stream_event(storage, &meta, old_ref, Some(&item)).await?;
483
484    Ok(item_size)
485}
486
487/// Returns the deleted item's size in bytes (0 when the target was missing and
488/// the delete was a no-op), for `ConsumedCapacity` accounting.
489async fn execute_delete<S: StorageBackend>(
490    storage: &S,
491    table_name: &str,
492    where_clause: Option<&WhereClause>,
493    parameters: &[AttributeValue],
494) -> Result<usize> {
495    let meta = require_table(storage, table_name).await?;
496    let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
497
498    let wc = where_clause.ok_or_else(|| {
499        DynoxideError::ValidationException("DELETE requires a WHERE clause".to_string())
500    })?;
501
502    // DynamoDB does not support OR in DELETE WHERE clauses
503    if wc.groups.len() > 1 {
504        return Err(DynoxideError::ValidationException(
505            "DELETE does not support OR conditions in WHERE clause".to_string(),
506        ));
507    }
508
509    let pk_cond =
510        find_comparison_in_groups(&wc.groups, &key_schema.partition_key).ok_or_else(|| {
511            DynoxideError::ValidationException(
512                "Where clause does not contain a mandatory equality on all key attributes"
513                    .to_string(),
514            )
515        })?;
516
517    let pk_val = resolve_value(&pk_cond.value, parameters)?;
518    let pk_str = pk_val
519        .to_key_string()
520        .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
521
522    // I15: Validate that the sort key is present in the WHERE clause if the table has one
523    if let Some(ref sk_name) = key_schema.sort_key {
524        let has_sk_condition = wc.groups.iter().any(|group| {
525            group.iter().any(|c| match c {
526                WhereCondition::Comparison(comp) => comp.path == *sk_name && comp.op == CompOp::Eq,
527                _ => false,
528            })
529        });
530        if !has_sk_condition {
531            return Err(DynoxideError::ValidationException(
532                "Where clause does not contain a mandatory equality on all key attributes"
533                    .to_string(),
534            ));
535        }
536    }
537
538    let sk_str = if let Some(ref sk_name) = key_schema.sort_key {
539        find_comparison_in_groups(&wc.groups, sk_name)
540            .map(|c| resolve_value(&c.value, parameters))
541            .transpose()?
542            .and_then(|v| v.to_key_string())
543            .unwrap_or_default()
544    } else {
545        String::new()
546    };
547
548    // Non-key WHERE predicates act as a condition on the existing item, like a
549    // conditional write. AWS raises ConditionalCheckFailedException when the item
550    // is present but the condition is false, and a missing item is a silent
551    // no-op (the condition is never evaluated). Re-running the full WHERE via
552    // matches_where covers both the key equality (always true for the fetched
553    // item) and any extra predicates.
554    if let Some(json) = storage.get_item(table_name, &pk_str, &sk_str).await? {
555        let existing: Item = serde_json::from_str(&json)
556            .map_err(|e| DynoxideError::InternalServerError(format!("Bad item JSON: {e}")))?;
557        if !matches_where(&existing, where_clause, parameters) {
558            return Err(DynoxideError::ConditionalCheckFailedException(
559                "The conditional request failed".to_string(),
560                None,
561            ));
562        }
563    }
564
565    let old_json = storage.delete_item(table_name, &pk_str, &sk_str).await?;
566
567    // GSI maintenance
568    let _ = crate::actions::gsi::maintain_gsis_after_delete(
569        storage, table_name, &meta, &pk_str, &sk_str,
570    )
571    .await?;
572
573    // LSI maintenance
574    crate::actions::lsi::maintain_lsis_after_delete(storage, table_name, &meta, &pk_str, &sk_str)
575        .await?;
576
577    // Stream record
578    let old_item: Option<Item> = old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
579    if old_item.is_some() {
580        crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None).await?;
581    }
582
583    // A delete is charged for the size of the item it removed; a no-op delete
584    // (missing target) reports 0.
585    let deleted_size = old_item.as_ref().map(crate::types::item_size).unwrap_or(0);
586    Ok(deleted_size)
587}
588
589// ---------------------------------------------------------------------------
590// Helpers
591// ---------------------------------------------------------------------------
592
593async fn require_table<S: StorageBackend>(
594    storage: &S,
595    table_name: &str,
596) -> Result<crate::storage::TableMetadata> {
597    crate::actions::helpers::require_table(storage, table_name).await
598}
599
600/// Find a comparison condition matching a given path with Eq operator,
601/// searching across all OR groups.
602fn find_comparison_in_groups<'a>(
603    groups: &'a [Vec<WhereCondition>],
604    path: &str,
605) -> Option<&'a crate::partiql::parser::Condition> {
606    for group in groups {
607        if let Some(cond) = find_comparison(group, path) {
608            return Some(cond);
609        }
610    }
611    None
612}
613
614/// Find a comparison condition matching a given path with Eq operator.
615fn find_comparison<'a>(
616    conditions: &'a [WhereCondition],
617    path: &str,
618) -> Option<&'a crate::partiql::parser::Condition> {
619    conditions.iter().find_map(|c| match c {
620        WhereCondition::Comparison(cond) if cond.path == path && cond.op == CompOp::Eq => {
621            Some(cond)
622        }
623        _ => None,
624    })
625}
626
627/// Resolve a PartiqlValue to a concrete AttributeValue.
628fn resolve_value(val: &PartiqlValue, parameters: &[AttributeValue]) -> Result<AttributeValue> {
629    match val {
630        PartiqlValue::Literal(av) => Ok(av.clone()),
631        PartiqlValue::Parameter(idx) => parameters.get(*idx).cloned().ok_or_else(|| {
632            DynoxideError::ValidationException(format!(
633                "Parameter index {idx} out of range (have {} parameters)",
634                parameters.len()
635            ))
636        }),
637    }
638}
639
640/// Resolve a SetValue to a concrete AttributeValue, potentially using the current item.
641fn resolve_set_value(
642    val: &SetValue,
643    item: &Item,
644    parameters: &[AttributeValue],
645) -> Result<AttributeValue> {
646    match val {
647        SetValue::Simple(pv) => resolve_value(pv, parameters),
648        SetValue::Add(attr, pv) => {
649            let current = resolve_nested_path(item, attr);
650            let operand = resolve_value(pv, parameters)?;
651            match (current, &operand) {
652                (Some(AttributeValue::N(cur)), AttributeValue::N(add)) => {
653                    use bigdecimal::BigDecimal;
654                    use std::str::FromStr;
655                    let a = BigDecimal::from_str(cur).map_err(|e| {
656                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
657                    })?;
658                    let b = BigDecimal::from_str(add).map_err(|e| {
659                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
660                    })?;
661                    let result = a + b;
662                    Ok(AttributeValue::N(format_bigdecimal(&result)))
663                }
664                (None, AttributeValue::N(_)) => {
665                    // Attribute doesn't exist yet — use the operand value
666                    Ok(operand)
667                }
668                _ => Err(DynoxideError::ValidationException(
669                    "SET expression add requires numeric attribute and operand".to_string(),
670                )),
671            }
672        }
673        SetValue::Sub(attr, pv) => {
674            let current = resolve_nested_path(item, attr);
675            let operand = resolve_value(pv, parameters)?;
676            match (current, &operand) {
677                (Some(AttributeValue::N(cur)), AttributeValue::N(sub)) => {
678                    use bigdecimal::BigDecimal;
679                    use std::str::FromStr;
680                    let a = BigDecimal::from_str(cur).map_err(|e| {
681                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
682                    })?;
683                    let b = BigDecimal::from_str(sub).map_err(|e| {
684                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
685                    })?;
686                    let result = a - b;
687                    Ok(AttributeValue::N(format_bigdecimal(&result)))
688                }
689                (None, AttributeValue::N(sub)) => {
690                    // Attribute doesn't exist yet — treat as 0 - operand
691                    use bigdecimal::BigDecimal;
692                    use std::str::FromStr;
693                    let b = BigDecimal::from_str(sub).map_err(|e| {
694                        DynoxideError::ValidationException(format!("Invalid number: {e}"))
695                    })?;
696                    let result = -b;
697                    Ok(AttributeValue::N(format_bigdecimal(&result)))
698                }
699                _ => Err(DynoxideError::ValidationException(
700                    "SET expression subtract requires numeric attribute and operand".to_string(),
701                )),
702            }
703        }
704        SetValue::ListAppend(first, second) => {
705            let a = resolve_value(first, parameters)?;
706            let b = resolve_value(second, parameters)?;
707            // At least one should be a list. If an attribute name was given,
708            // resolve it from the item.
709            let list_a = match &a {
710                AttributeValue::S(name) => resolve_nested_path(item, name)
711                    .cloned()
712                    .unwrap_or(AttributeValue::L(Vec::new())),
713                other => other.clone(),
714            };
715            let list_b = match &b {
716                AttributeValue::S(name) => resolve_nested_path(item, name)
717                    .cloned()
718                    .unwrap_or(AttributeValue::L(Vec::new())),
719                other => other.clone(),
720            };
721            match (list_a, list_b) {
722                (AttributeValue::L(mut la), AttributeValue::L(lb)) => {
723                    la.extend(lb);
724                    Ok(AttributeValue::L(la))
725                }
726                _ => Err(DynoxideError::ValidationException(
727                    "list_append requires list operands".to_string(),
728                )),
729            }
730        }
731    }
732}
733
734/// Set a value at a potentially nested path (e.g. `address.city`).
735fn set_nested_value(item: &mut Item, path: &str, val: AttributeValue) -> Result<()> {
736    let parts: Vec<&str> = path.split('.').collect();
737    if parts.len() == 1 {
738        item.insert(path.to_string(), val);
739        return Ok(());
740    }
741    // Navigate into nested maps, creating them if needed
742    let mut current = item;
743    for part in &parts[..parts.len() - 1] {
744        let entry = current
745            .entry(part.to_string())
746            .or_insert_with(|| AttributeValue::M(HashMap::new()));
747        match entry {
748            AttributeValue::M(map) => {
749                current = map;
750            }
751            _ => {
752                return Err(DynoxideError::ValidationException(
753                    "The document path provided in the update expression is invalid for update"
754                        .to_string(),
755                ));
756            }
757        }
758    }
759    current.insert(parts.last().unwrap().to_string(), val);
760    Ok(())
761}
762
763/// Remove a value at a potentially nested path (e.g. `address.city`).
764fn remove_nested_value(item: &mut Item, path: &str) {
765    let parts: Vec<&str> = path.split('.').collect();
766    if parts.len() == 1 {
767        item.remove(path);
768        return;
769    }
770    // Navigate into nested maps
771    let mut current = item;
772    for part in &parts[..parts.len() - 1] {
773        match current.get_mut(*part) {
774            Some(AttributeValue::M(map)) => {
775                current = map;
776            }
777            _ => return, // Path doesn't exist or isn't a map — nothing to remove
778        }
779    }
780    current.remove(*parts.last().unwrap());
781}
782
783/// Check if an item matches a WHERE clause (with OR-group support).
784fn matches_where(
785    item: &Item,
786    where_clause: Option<&WhereClause>,
787    parameters: &[AttributeValue],
788) -> bool {
789    let wc = match where_clause {
790        Some(wc) => wc,
791        None => return true,
792    };
793
794    // OR semantics: any group matching is sufficient
795    wc.groups
796        .iter()
797        .any(|group| matches_conditions(item, group, parameters))
798}
799
800/// Check if an item matches all conditions in a group (AND semantics).
801fn matches_conditions(
802    item: &Item,
803    conditions: &[WhereCondition],
804    parameters: &[AttributeValue],
805) -> bool {
806    for cond in conditions {
807        match cond {
808            WhereCondition::Comparison(c) => {
809                let item_val = match resolve_nested_path(item, &c.path) {
810                    Some(v) => v,
811                    None => return false,
812                };
813                let target = match resolve_value(&c.value, parameters) {
814                    Ok(v) => v,
815                    Err(_) => return false,
816                };
817                if !compare_values(item_val, &c.op, &target) {
818                    return false;
819                }
820            }
821            WhereCondition::Exists(path) | WhereCondition::IsNotMissing(path) => {
822                if resolve_nested_path(item, path).is_none() {
823                    return false;
824                }
825            }
826            WhereCondition::NotExists(path) | WhereCondition::IsMissing(path) => {
827                if resolve_nested_path(item, path).is_some() {
828                    return false;
829                }
830            }
831            WhereCondition::BeginsWith(path, prefix_val) => {
832                let item_val = match resolve_nested_path(item, path) {
833                    Some(v) => v,
834                    None => return false,
835                };
836                let prefix = match resolve_value(prefix_val, parameters) {
837                    Ok(v) => v,
838                    Err(_) => return false,
839                };
840                match (item_val, &prefix) {
841                    (AttributeValue::S(s), AttributeValue::S(p)) => {
842                        if !s.starts_with(p.as_str()) {
843                            return false;
844                        }
845                    }
846                    _ => return false,
847                }
848            }
849            WhereCondition::NotBeginsWith(path, prefix_val) => {
850                // Logical negation of begins_with: the row matches unless the
851                // value is a string that starts with the prefix. A missing or
852                // non-string attribute does not begin with the prefix, so it is
853                // kept.
854                if let Some(item_val) = resolve_nested_path(item, path) {
855                    let prefix = match resolve_value(prefix_val, parameters) {
856                        Ok(v) => v,
857                        Err(_) => return false,
858                    };
859                    if let (AttributeValue::S(s), AttributeValue::S(p)) = (item_val, &prefix) {
860                        if s.starts_with(p.as_str()) {
861                            return false;
862                        }
863                    }
864                }
865            }
866            WhereCondition::Between(path, low, high) => {
867                let item_val = match resolve_nested_path(item, path) {
868                    Some(v) => v,
869                    None => return false,
870                };
871                let low_val = match resolve_value(low, parameters) {
872                    Ok(v) => v,
873                    Err(_) => return false,
874                };
875                let high_val = match resolve_value(high, parameters) {
876                    Ok(v) => v,
877                    Err(_) => return false,
878                };
879                if !compare_values(item_val, &CompOp::Ge, &low_val)
880                    || !compare_values(item_val, &CompOp::Le, &high_val)
881                {
882                    return false;
883                }
884            }
885            WhereCondition::In(path, values) => {
886                let item_val = match resolve_nested_path(item, path) {
887                    Some(v) => v,
888                    None => return false,
889                };
890                let matched = values.iter().any(|v| {
891                    resolve_value(v, parameters)
892                        .map(|target| compare_values(item_val, &CompOp::Eq, &target))
893                        .unwrap_or(false)
894                });
895                if !matched {
896                    return false;
897                }
898            }
899            WhereCondition::Contains(path, substr_val) => {
900                let item_val = match resolve_nested_path(item, path) {
901                    Some(v) => v,
902                    None => return false,
903                };
904                let substr = match resolve_value(substr_val, parameters) {
905                    Ok(v) => v,
906                    Err(_) => return false,
907                };
908                match (item_val, &substr) {
909                    (AttributeValue::S(s), AttributeValue::S(sub)) => {
910                        if !s.contains(sub.as_str()) {
911                            return false;
912                        }
913                    }
914                    (AttributeValue::SS(set), AttributeValue::S(val)) => {
915                        if !set.contains(val) {
916                            return false;
917                        }
918                    }
919                    (AttributeValue::NS(set), AttributeValue::N(val)) => {
920                        if !set.contains(val) {
921                            return false;
922                        }
923                    }
924                    (AttributeValue::L(list), target) => {
925                        if !list.contains(target) {
926                            return false;
927                        }
928                    }
929                    _ => return false,
930                }
931            }
932        }
933    }
934
935    true
936}
937
938/// Resolve a dotted/indexed path to a nested attribute value.
939///
940/// Supports paths like `"a"`, `"a.b.c"`, and `"a[0].b"`.
941fn resolve_nested_path<'a>(item: &'a Item, path: &str) -> Option<&'a AttributeValue> {
942    // Fast path: no dots or brackets means a simple top-level lookup
943    if !path.contains('.') && !path.contains('[') {
944        return item.get(path);
945    }
946
947    let segments = split_path_segments(path)?;
948    if segments.is_empty() {
949        return None;
950    }
951
952    // First segment must be a map key on the top-level item
953    let mut current = match &segments[0] {
954        PathSegment::Key(k) => item.get(*k)?,
955        PathSegment::Index(_) => return None,
956    };
957
958    for seg in &segments[1..] {
959        current = match seg {
960            PathSegment::Key(k) => match current {
961                AttributeValue::M(map) => map.get(*k)?,
962                _ => return None,
963            },
964            PathSegment::Index(idx) => match current {
965                AttributeValue::L(list) => list.get(*idx)?,
966                _ => return None,
967            },
968        };
969    }
970
971    Some(current)
972}
973
974enum PathSegment<'a> {
975    Key(&'a str),
976    Index(usize),
977}
978
979/// Split a path like `"a.b[0].c"` into segments.
980/// Returns None if the path contains malformed bracket expressions (e.g. `a[xyz]`).
981fn split_path_segments(path: &str) -> Option<Vec<PathSegment<'_>>> {
982    let mut segments = Vec::new();
983    let bytes = path.as_bytes();
984    let mut start = 0;
985    let mut i = 0;
986
987    while i < bytes.len() {
988        match bytes[i] {
989            b'.' => {
990                if start < i {
991                    segments.push(PathSegment::Key(&path[start..i]));
992                }
993                i += 1;
994                start = i;
995            }
996            b'[' => {
997                if start < i {
998                    segments.push(PathSegment::Key(&path[start..i]));
999                }
1000                i += 1;
1001                let idx_start = i;
1002                while i < bytes.len() && bytes[i] != b']' {
1003                    i += 1;
1004                }
1005                let idx = path[idx_start..i].parse::<usize>().ok()?;
1006                segments.push(PathSegment::Index(idx));
1007                if i < bytes.len() {
1008                    i += 1; // skip ']'
1009                }
1010                start = i;
1011                // Skip a trailing dot after ']' (e.g. `a[0].b`)
1012                if i < bytes.len() && bytes[i] == b'.' {
1013                    i += 1;
1014                    start = i;
1015                }
1016            }
1017            _ => {
1018                i += 1;
1019            }
1020        }
1021    }
1022
1023    if start < bytes.len() {
1024        segments.push(PathSegment::Key(&path[start..]));
1025    }
1026
1027    Some(segments)
1028}
1029
1030/// Compare two AttributeValues using a comparison operator.
1031fn compare_values(left: &AttributeValue, op: &CompOp, right: &AttributeValue) -> bool {
1032    match (left, right) {
1033        (AttributeValue::S(a), AttributeValue::S(b)) => compare_ord(a, op, b),
1034        (AttributeValue::N(a), AttributeValue::N(b)) => {
1035            use bigdecimal::BigDecimal;
1036            use std::str::FromStr;
1037            match (BigDecimal::from_str(a), BigDecimal::from_str(b)) {
1038                (Ok(da), Ok(db)) => compare_ord(&da, op, &db),
1039                _ => false,
1040            }
1041        }
1042        (AttributeValue::BOOL(a), AttributeValue::BOOL(b)) => match op {
1043            CompOp::Eq => a == b,
1044            CompOp::Ne => a != b,
1045            _ => false,
1046        },
1047        _ => match op {
1048            CompOp::Eq => false,
1049            CompOp::Ne => true,
1050            _ => false,
1051        },
1052    }
1053}
1054
1055/// Format a BigDecimal number, stripping unnecessary trailing zeros.
1056fn format_bigdecimal(n: &bigdecimal::BigDecimal) -> String {
1057    let normalized = n.normalized();
1058    if normalized.as_bigint_and_exponent().1 < 0 {
1059        normalized.with_scale(0).to_string()
1060    } else {
1061        normalized.to_string()
1062    }
1063}
1064
1065fn compare_ord<T: PartialOrd>(a: &T, op: &CompOp, b: &T) -> bool {
1066    match op {
1067        CompOp::Eq => a == b,
1068        CompOp::Ne => a != b,
1069        CompOp::Lt => a < b,
1070        CompOp::Le => a <= b,
1071        CompOp::Gt => a > b,
1072        CompOp::Ge => a >= b,
1073    }
1074}