1use crate::errors::{DynoxideError, Result};
6use crate::partiql::parser::{
7 CompOp, PartiqlValue, SetValue, Statement, WhereClause, WhereCondition,
8};
9use crate::storage_backend::StorageBackend;
10use crate::types::{AttributeValue, Item};
11use std::collections::HashMap;
12
13pub async fn execute<S: StorageBackend>(
18 storage: &S,
19 stmt: &Statement,
20 parameters: &[AttributeValue],
21 limit: Option<usize>,
22) -> Result<Option<Vec<Item>>> {
23 Ok(execute_measured(storage, stmt, parameters, limit).await?.0)
24}
25
26pub async fn execute_measured<S: StorageBackend>(
31 storage: &S,
32 stmt: &Statement,
33 parameters: &[AttributeValue],
34 limit: Option<usize>,
35) -> Result<(Option<Vec<Item>>, usize)> {
36 match stmt {
37 Statement::Select {
38 table_name,
39 projections,
40 where_clause,
41 } => {
42 let items = execute_select(
43 storage,
44 table_name,
45 projections,
46 where_clause.as_ref(),
47 parameters,
48 limit,
49 )
50 .await?;
51 let size = items
52 .as_ref()
53 .map(|rows| rows.iter().map(crate::types::item_size).sum())
54 .unwrap_or(0);
55 Ok((items, size))
56 }
57 Statement::Insert {
58 table_name,
59 item,
60 if_not_exists,
61 } => {
62 let size =
63 execute_insert(storage, table_name, item, parameters, *if_not_exists).await?;
64 Ok((None, size))
65 }
66 Statement::Update {
67 table_name,
68 set_clauses,
69 remove_paths,
70 where_clause,
71 } => {
72 let size = execute_update(
73 storage,
74 table_name,
75 set_clauses,
76 remove_paths,
77 where_clause.as_ref(),
78 parameters,
79 )
80 .await?;
81 Ok((None, size))
82 }
83 Statement::Delete {
84 table_name,
85 where_clause,
86 } => {
87 let size =
88 execute_delete(storage, table_name, where_clause.as_ref(), parameters).await?;
89 Ok((None, size))
90 }
91 }
92}
93
94fn insert_nested_projection(result: &mut Item, path: &str, val: AttributeValue) {
100 let parts: Vec<&str> = path.split('.').collect();
101 let key = parts.last().unwrap();
103 result.insert(key.to_string(), val);
104}
105
106async fn execute_select<S: StorageBackend>(
107 storage: &S,
108 table_name: &str,
109 projections: &[String],
110 where_clause: Option<&WhereClause>,
111 parameters: &[AttributeValue],
112 limit: Option<usize>,
113) -> Result<Option<Vec<Item>>> {
114 let meta = require_table(storage, table_name).await?;
115 let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
116
117 if projections.len() == 1 && projections[0] == "COUNT(*)" {
119 let items = collect_matching_items(
120 storage,
121 table_name,
122 where_clause,
123 parameters,
124 &key_schema,
125 None,
126 )
127 .await?;
128 let count = items.len();
129 let mut result = HashMap::new();
130 result.insert("Count".to_string(), AttributeValue::N(count.to_string()));
131 return Ok(Some(vec![result]));
132 }
133
134 let items = collect_matching_items(
135 storage,
136 table_name,
137 where_clause,
138 parameters,
139 &key_schema,
140 limit,
141 )
142 .await?;
143
144 let items = if projections.is_empty() {
146 items
147 } else {
148 items
149 .into_iter()
150 .map(|item| {
151 let mut projected = HashMap::new();
152 for proj in projections {
153 if let Some(val) = resolve_nested_path(&item, proj) {
154 insert_nested_projection(&mut projected, proj, val.clone());
155 }
156 }
157 projected
158 })
159 .collect()
160 };
161
162 Ok(Some(items))
163}
164
165async fn collect_matching_items<S: StorageBackend>(
167 storage: &S,
168 table_name: &str,
169 where_clause: Option<&WhereClause>,
170 parameters: &[AttributeValue],
171 key_schema: &crate::actions::helpers::KeySchema,
172 limit: Option<usize>,
173) -> Result<Vec<Item>> {
174 let pk_condition = where_clause.and_then(|wc| find_pk_condition(wc, &key_schema.partition_key));
176
177 let items: Vec<Item> = if let Some(pk_cond) = pk_condition {
178 let pk_val = resolve_value(&pk_cond.value, parameters)?;
179 let pk_str = pk_val
180 .to_key_string()
181 .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
182
183 let rows = storage
184 .query_items(table_name, &pk_str, &Default::default())
185 .await?;
186
187 let iter = rows
188 .into_iter()
189 .filter_map(|(_, _, json)| serde_json::from_str::<Item>(&json).ok())
190 .filter(|item| matches_where(item, where_clause, parameters));
191
192 if let Some(lim) = limit {
193 iter.take(lim).collect()
194 } else {
195 iter.collect()
196 }
197 } else {
198 let rows = storage.scan_items(table_name, &Default::default()).await?;
199
200 let iter = rows
201 .into_iter()
202 .filter_map(|(_, _, json)| serde_json::from_str::<Item>(&json).ok())
203 .filter(|item| matches_where(item, where_clause, parameters));
204
205 if let Some(lim) = limit {
206 iter.take(lim).collect()
207 } else {
208 iter.collect()
209 }
210 };
211
212 Ok(items)
213}
214
215fn find_pk_condition<'a>(
217 wc: &'a WhereClause,
218 pk_name: &str,
219) -> Option<&'a crate::partiql::parser::Condition> {
220 if wc.groups.len() == 1 {
223 wc.groups[0].iter().find_map(|c| match c {
224 WhereCondition::Comparison(cond) if cond.path == pk_name && cond.op == CompOp::Eq => {
225 Some(cond)
226 }
227 _ => None,
228 })
229 } else {
230 None
231 }
232}
233
234async fn execute_insert<S: StorageBackend>(
237 storage: &S,
238 table_name: &str,
239 item_template: &HashMap<String, PartiqlValue>,
240 parameters: &[AttributeValue],
241 if_not_exists: bool,
242) -> Result<usize> {
243 let mut item = HashMap::new();
245 for (k, v) in item_template {
246 let resolved = match v {
247 PartiqlValue::Literal(av) => av.clone(),
248 PartiqlValue::Parameter(idx) => parameters.get(*idx).cloned().ok_or_else(|| {
249 DynoxideError::ValidationException(format!(
250 "Parameter index {idx} out of range (have {} parameters)",
251 parameters.len()
252 ))
253 })?,
254 };
255 item.insert(k.clone(), resolved);
256 }
257
258 let meta = require_table(storage, table_name).await?;
259 let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
260
261 crate::actions::helpers::validate_item_keys(&item, &key_schema, &meta)?;
263 crate::validation::validate_item_attribute_values(&item)?;
264
265 crate::validation::normalize_item_sets(&mut item);
267
268 let (pk, sk) = crate::actions::helpers::extract_key_strings(&item, &key_schema)?;
270
271 let existing = storage.get_item(table_name, &pk, &sk).await?;
273 if existing.is_some() {
274 if if_not_exists {
275 return Ok(0);
277 }
278 return Err(DynoxideError::DuplicateItemException(
279 "Duplicate primary key exists in table".to_string(),
280 ));
281 }
282
283 let item_json = serde_json::to_string(&item)
284 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
285 let item_size = crate::types::item_size(&item);
286
287 let hash_prefix = item
288 .get(&key_schema.partition_key)
289 .map(crate::storage::compute_hash_prefix)
290 .unwrap_or_default();
291 let old_json = storage
292 .put_item_with_hash(table_name, &pk, &sk, &item_json, item_size, &hash_prefix)
293 .await?;
294
295 let table_sk_attr = key_schema.sort_key.as_deref();
297 let _ = crate::actions::gsi::maintain_gsis_after_write(
298 storage,
299 table_name,
300 &meta,
301 &pk,
302 &sk,
303 &item,
304 &key_schema.partition_key,
305 table_sk_attr,
306 )
307 .await?;
308
309 crate::actions::lsi::maintain_lsis_after_write(
311 storage,
312 table_name,
313 &meta,
314 &pk,
315 &sk,
316 &item,
317 &key_schema.partition_key,
318 table_sk_attr,
319 )
320 .await?;
321
322 let old_item: Option<Item> = old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
324 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item)).await?;
325
326 Ok(item_size)
327}
328
329async fn execute_update<S: StorageBackend>(
332 storage: &S,
333 table_name: &str,
334 set_clauses: &[crate::partiql::parser::SetClause],
335 remove_paths: &[String],
336 where_clause: Option<&WhereClause>,
337 parameters: &[AttributeValue],
338) -> Result<usize> {
339 let meta = require_table(storage, table_name).await?;
340 let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
341
342 let wc = where_clause.ok_or_else(|| {
344 DynoxideError::ValidationException("UPDATE requires a WHERE clause".to_string())
345 })?;
346
347 if wc.groups.len() > 1 {
349 return Err(DynoxideError::ValidationException(
350 "UPDATE does not support OR conditions in WHERE clause".to_string(),
351 ));
352 }
353
354 let pk_cond =
356 find_comparison_in_groups(&wc.groups, &key_schema.partition_key).ok_or_else(|| {
357 DynoxideError::ValidationException(
358 "Where clause does not contain a mandatory equality on all key attributes"
359 .to_string(),
360 )
361 })?;
362
363 let pk_val = resolve_value(&pk_cond.value, parameters)?;
364 let pk_str = pk_val
365 .to_key_string()
366 .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
367
368 let sk_str = if let Some(ref sk_name) = key_schema.sort_key {
369 let sk_cond = find_comparison_in_groups(&wc.groups, sk_name);
370 if sk_cond.is_none() {
371 return Err(DynoxideError::ValidationException(
372 "Where clause does not contain a mandatory equality on all key attributes"
373 .to_string(),
374 ));
375 }
376 sk_cond
377 .map(|c| resolve_value(&c.value, parameters))
378 .transpose()?
379 .and_then(|v| v.to_key_string())
380 .unwrap_or_default()
381 } else {
382 String::new()
383 };
384
385 let existing_json = storage.get_item(table_name, &pk_str, &sk_str).await?;
387 let mut item: Item = existing_json
388 .as_ref()
389 .and_then(|j| serde_json::from_str(j).ok())
390 .unwrap_or_default();
391
392 let old_item = item.clone();
393
394 if existing_json.is_some() && !matches_where(&old_item, where_clause, parameters) {
399 return Err(DynoxideError::ConditionalCheckFailedException(
400 "The conditional request failed".to_string(),
401 None,
402 ));
403 }
404
405 let before_item = item.clone();
406
407 for clause in set_clauses {
409 let val = resolve_set_value(&clause.value, &item, parameters)?;
410 set_nested_value(&mut item, &clause.path, val)?;
411 }
412
413 for path in remove_paths {
415 remove_nested_value(&mut item, path);
416 }
417
418 if item.is_empty() {
420 return Ok(0);
421 }
422
423 crate::validation::validate_item_attribute_values(&item)?;
425 crate::validation::normalize_item_sets(&mut item);
426
427 crate::actions::helpers::validate_updated_index_keys(&before_item, &item, &meta)?;
429
430 let item_json = serde_json::to_string(&item)
431 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
432 let item_size = crate::types::item_size(&item);
433
434 let hash_prefix = item
435 .get(&key_schema.partition_key)
436 .map(crate::storage::compute_hash_prefix)
437 .unwrap_or_default();
438 storage
439 .put_item_with_hash(
440 table_name,
441 &pk_str,
442 &sk_str,
443 &item_json,
444 item_size,
445 &hash_prefix,
446 )
447 .await?;
448
449 let table_sk_attr = key_schema.sort_key.as_deref();
451 let _ = crate::actions::gsi::maintain_gsis_after_write(
452 storage,
453 table_name,
454 &meta,
455 &pk_str,
456 &sk_str,
457 &item,
458 &key_schema.partition_key,
459 table_sk_attr,
460 )
461 .await?;
462
463 crate::actions::lsi::maintain_lsis_after_write(
465 storage,
466 table_name,
467 &meta,
468 &pk_str,
469 &sk_str,
470 &item,
471 &key_schema.partition_key,
472 table_sk_attr,
473 )
474 .await?;
475
476 let old_ref = if existing_json.is_some() {
478 Some(&old_item)
479 } else {
480 None
481 };
482 crate::streams::record_stream_event(storage, &meta, old_ref, Some(&item)).await?;
483
484 Ok(item_size)
485}
486
487async fn execute_delete<S: StorageBackend>(
490 storage: &S,
491 table_name: &str,
492 where_clause: Option<&WhereClause>,
493 parameters: &[AttributeValue],
494) -> Result<usize> {
495 let meta = require_table(storage, table_name).await?;
496 let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
497
498 let wc = where_clause.ok_or_else(|| {
499 DynoxideError::ValidationException("DELETE requires a WHERE clause".to_string())
500 })?;
501
502 if wc.groups.len() > 1 {
504 return Err(DynoxideError::ValidationException(
505 "DELETE does not support OR conditions in WHERE clause".to_string(),
506 ));
507 }
508
509 let pk_cond =
510 find_comparison_in_groups(&wc.groups, &key_schema.partition_key).ok_or_else(|| {
511 DynoxideError::ValidationException(
512 "Where clause does not contain a mandatory equality on all key attributes"
513 .to_string(),
514 )
515 })?;
516
517 let pk_val = resolve_value(&pk_cond.value, parameters)?;
518 let pk_str = pk_val
519 .to_key_string()
520 .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
521
522 if let Some(ref sk_name) = key_schema.sort_key {
524 let has_sk_condition = wc.groups.iter().any(|group| {
525 group.iter().any(|c| match c {
526 WhereCondition::Comparison(comp) => comp.path == *sk_name && comp.op == CompOp::Eq,
527 _ => false,
528 })
529 });
530 if !has_sk_condition {
531 return Err(DynoxideError::ValidationException(
532 "Where clause does not contain a mandatory equality on all key attributes"
533 .to_string(),
534 ));
535 }
536 }
537
538 let sk_str = if let Some(ref sk_name) = key_schema.sort_key {
539 find_comparison_in_groups(&wc.groups, sk_name)
540 .map(|c| resolve_value(&c.value, parameters))
541 .transpose()?
542 .and_then(|v| v.to_key_string())
543 .unwrap_or_default()
544 } else {
545 String::new()
546 };
547
548 if let Some(json) = storage.get_item(table_name, &pk_str, &sk_str).await? {
555 let existing: Item = serde_json::from_str(&json)
556 .map_err(|e| DynoxideError::InternalServerError(format!("Bad item JSON: {e}")))?;
557 if !matches_where(&existing, where_clause, parameters) {
558 return Err(DynoxideError::ConditionalCheckFailedException(
559 "The conditional request failed".to_string(),
560 None,
561 ));
562 }
563 }
564
565 let old_json = storage.delete_item(table_name, &pk_str, &sk_str).await?;
566
567 let _ = crate::actions::gsi::maintain_gsis_after_delete(
569 storage, table_name, &meta, &pk_str, &sk_str,
570 )
571 .await?;
572
573 crate::actions::lsi::maintain_lsis_after_delete(storage, table_name, &meta, &pk_str, &sk_str)
575 .await?;
576
577 let old_item: Option<Item> = old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
579 if old_item.is_some() {
580 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None).await?;
581 }
582
583 let deleted_size = old_item.as_ref().map(crate::types::item_size).unwrap_or(0);
586 Ok(deleted_size)
587}
588
589async fn require_table<S: StorageBackend>(
594 storage: &S,
595 table_name: &str,
596) -> Result<crate::storage::TableMetadata> {
597 crate::actions::helpers::require_table(storage, table_name).await
598}
599
600fn find_comparison_in_groups<'a>(
603 groups: &'a [Vec<WhereCondition>],
604 path: &str,
605) -> Option<&'a crate::partiql::parser::Condition> {
606 for group in groups {
607 if let Some(cond) = find_comparison(group, path) {
608 return Some(cond);
609 }
610 }
611 None
612}
613
614fn find_comparison<'a>(
616 conditions: &'a [WhereCondition],
617 path: &str,
618) -> Option<&'a crate::partiql::parser::Condition> {
619 conditions.iter().find_map(|c| match c {
620 WhereCondition::Comparison(cond) if cond.path == path && cond.op == CompOp::Eq => {
621 Some(cond)
622 }
623 _ => None,
624 })
625}
626
627fn resolve_value(val: &PartiqlValue, parameters: &[AttributeValue]) -> Result<AttributeValue> {
629 match val {
630 PartiqlValue::Literal(av) => Ok(av.clone()),
631 PartiqlValue::Parameter(idx) => parameters.get(*idx).cloned().ok_or_else(|| {
632 DynoxideError::ValidationException(format!(
633 "Parameter index {idx} out of range (have {} parameters)",
634 parameters.len()
635 ))
636 }),
637 }
638}
639
640fn resolve_set_value(
642 val: &SetValue,
643 item: &Item,
644 parameters: &[AttributeValue],
645) -> Result<AttributeValue> {
646 match val {
647 SetValue::Simple(pv) => resolve_value(pv, parameters),
648 SetValue::Add(attr, pv) => {
649 let current = resolve_nested_path(item, attr);
650 let operand = resolve_value(pv, parameters)?;
651 match (current, &operand) {
652 (Some(AttributeValue::N(cur)), AttributeValue::N(add)) => {
653 use bigdecimal::BigDecimal;
654 use std::str::FromStr;
655 let a = BigDecimal::from_str(cur).map_err(|e| {
656 DynoxideError::ValidationException(format!("Invalid number: {e}"))
657 })?;
658 let b = BigDecimal::from_str(add).map_err(|e| {
659 DynoxideError::ValidationException(format!("Invalid number: {e}"))
660 })?;
661 let result = a + b;
662 Ok(AttributeValue::N(format_bigdecimal(&result)))
663 }
664 (None, AttributeValue::N(_)) => {
665 Ok(operand)
667 }
668 _ => Err(DynoxideError::ValidationException(
669 "SET expression add requires numeric attribute and operand".to_string(),
670 )),
671 }
672 }
673 SetValue::Sub(attr, pv) => {
674 let current = resolve_nested_path(item, attr);
675 let operand = resolve_value(pv, parameters)?;
676 match (current, &operand) {
677 (Some(AttributeValue::N(cur)), AttributeValue::N(sub)) => {
678 use bigdecimal::BigDecimal;
679 use std::str::FromStr;
680 let a = BigDecimal::from_str(cur).map_err(|e| {
681 DynoxideError::ValidationException(format!("Invalid number: {e}"))
682 })?;
683 let b = BigDecimal::from_str(sub).map_err(|e| {
684 DynoxideError::ValidationException(format!("Invalid number: {e}"))
685 })?;
686 let result = a - b;
687 Ok(AttributeValue::N(format_bigdecimal(&result)))
688 }
689 (None, AttributeValue::N(sub)) => {
690 use bigdecimal::BigDecimal;
692 use std::str::FromStr;
693 let b = BigDecimal::from_str(sub).map_err(|e| {
694 DynoxideError::ValidationException(format!("Invalid number: {e}"))
695 })?;
696 let result = -b;
697 Ok(AttributeValue::N(format_bigdecimal(&result)))
698 }
699 _ => Err(DynoxideError::ValidationException(
700 "SET expression subtract requires numeric attribute and operand".to_string(),
701 )),
702 }
703 }
704 SetValue::ListAppend(first, second) => {
705 let a = resolve_value(first, parameters)?;
706 let b = resolve_value(second, parameters)?;
707 let list_a = match &a {
710 AttributeValue::S(name) => resolve_nested_path(item, name)
711 .cloned()
712 .unwrap_or(AttributeValue::L(Vec::new())),
713 other => other.clone(),
714 };
715 let list_b = match &b {
716 AttributeValue::S(name) => resolve_nested_path(item, name)
717 .cloned()
718 .unwrap_or(AttributeValue::L(Vec::new())),
719 other => other.clone(),
720 };
721 match (list_a, list_b) {
722 (AttributeValue::L(mut la), AttributeValue::L(lb)) => {
723 la.extend(lb);
724 Ok(AttributeValue::L(la))
725 }
726 _ => Err(DynoxideError::ValidationException(
727 "list_append requires list operands".to_string(),
728 )),
729 }
730 }
731 }
732}
733
734fn set_nested_value(item: &mut Item, path: &str, val: AttributeValue) -> Result<()> {
736 let parts: Vec<&str> = path.split('.').collect();
737 if parts.len() == 1 {
738 item.insert(path.to_string(), val);
739 return Ok(());
740 }
741 let mut current = item;
743 for part in &parts[..parts.len() - 1] {
744 let entry = current
745 .entry(part.to_string())
746 .or_insert_with(|| AttributeValue::M(HashMap::new()));
747 match entry {
748 AttributeValue::M(map) => {
749 current = map;
750 }
751 _ => {
752 return Err(DynoxideError::ValidationException(
753 "The document path provided in the update expression is invalid for update"
754 .to_string(),
755 ));
756 }
757 }
758 }
759 current.insert(parts.last().unwrap().to_string(), val);
760 Ok(())
761}
762
763fn remove_nested_value(item: &mut Item, path: &str) {
765 let parts: Vec<&str> = path.split('.').collect();
766 if parts.len() == 1 {
767 item.remove(path);
768 return;
769 }
770 let mut current = item;
772 for part in &parts[..parts.len() - 1] {
773 match current.get_mut(*part) {
774 Some(AttributeValue::M(map)) => {
775 current = map;
776 }
777 _ => return, }
779 }
780 current.remove(*parts.last().unwrap());
781}
782
783fn matches_where(
785 item: &Item,
786 where_clause: Option<&WhereClause>,
787 parameters: &[AttributeValue],
788) -> bool {
789 let wc = match where_clause {
790 Some(wc) => wc,
791 None => return true,
792 };
793
794 wc.groups
796 .iter()
797 .any(|group| matches_conditions(item, group, parameters))
798}
799
800fn matches_conditions(
802 item: &Item,
803 conditions: &[WhereCondition],
804 parameters: &[AttributeValue],
805) -> bool {
806 for cond in conditions {
807 match cond {
808 WhereCondition::Comparison(c) => {
809 let item_val = match resolve_nested_path(item, &c.path) {
810 Some(v) => v,
811 None => return false,
812 };
813 let target = match resolve_value(&c.value, parameters) {
814 Ok(v) => v,
815 Err(_) => return false,
816 };
817 if !compare_values(item_val, &c.op, &target) {
818 return false;
819 }
820 }
821 WhereCondition::Exists(path) | WhereCondition::IsNotMissing(path) => {
822 if resolve_nested_path(item, path).is_none() {
823 return false;
824 }
825 }
826 WhereCondition::NotExists(path) | WhereCondition::IsMissing(path) => {
827 if resolve_nested_path(item, path).is_some() {
828 return false;
829 }
830 }
831 WhereCondition::BeginsWith(path, prefix_val) => {
832 let item_val = match resolve_nested_path(item, path) {
833 Some(v) => v,
834 None => return false,
835 };
836 let prefix = match resolve_value(prefix_val, parameters) {
837 Ok(v) => v,
838 Err(_) => return false,
839 };
840 match (item_val, &prefix) {
841 (AttributeValue::S(s), AttributeValue::S(p)) => {
842 if !s.starts_with(p.as_str()) {
843 return false;
844 }
845 }
846 _ => return false,
847 }
848 }
849 WhereCondition::NotBeginsWith(path, prefix_val) => {
850 if let Some(item_val) = resolve_nested_path(item, path) {
855 let prefix = match resolve_value(prefix_val, parameters) {
856 Ok(v) => v,
857 Err(_) => return false,
858 };
859 if let (AttributeValue::S(s), AttributeValue::S(p)) = (item_val, &prefix) {
860 if s.starts_with(p.as_str()) {
861 return false;
862 }
863 }
864 }
865 }
866 WhereCondition::Between(path, low, high) => {
867 let item_val = match resolve_nested_path(item, path) {
868 Some(v) => v,
869 None => return false,
870 };
871 let low_val = match resolve_value(low, parameters) {
872 Ok(v) => v,
873 Err(_) => return false,
874 };
875 let high_val = match resolve_value(high, parameters) {
876 Ok(v) => v,
877 Err(_) => return false,
878 };
879 if !compare_values(item_val, &CompOp::Ge, &low_val)
880 || !compare_values(item_val, &CompOp::Le, &high_val)
881 {
882 return false;
883 }
884 }
885 WhereCondition::In(path, values) => {
886 let item_val = match resolve_nested_path(item, path) {
887 Some(v) => v,
888 None => return false,
889 };
890 let matched = values.iter().any(|v| {
891 resolve_value(v, parameters)
892 .map(|target| compare_values(item_val, &CompOp::Eq, &target))
893 .unwrap_or(false)
894 });
895 if !matched {
896 return false;
897 }
898 }
899 WhereCondition::Contains(path, substr_val) => {
900 let item_val = match resolve_nested_path(item, path) {
901 Some(v) => v,
902 None => return false,
903 };
904 let substr = match resolve_value(substr_val, parameters) {
905 Ok(v) => v,
906 Err(_) => return false,
907 };
908 match (item_val, &substr) {
909 (AttributeValue::S(s), AttributeValue::S(sub)) => {
910 if !s.contains(sub.as_str()) {
911 return false;
912 }
913 }
914 (AttributeValue::SS(set), AttributeValue::S(val)) => {
915 if !set.contains(val) {
916 return false;
917 }
918 }
919 (AttributeValue::NS(set), AttributeValue::N(val)) => {
920 if !set.contains(val) {
921 return false;
922 }
923 }
924 (AttributeValue::L(list), target) => {
925 if !list.contains(target) {
926 return false;
927 }
928 }
929 _ => return false,
930 }
931 }
932 }
933 }
934
935 true
936}
937
938fn resolve_nested_path<'a>(item: &'a Item, path: &str) -> Option<&'a AttributeValue> {
942 if !path.contains('.') && !path.contains('[') {
944 return item.get(path);
945 }
946
947 let segments = split_path_segments(path)?;
948 if segments.is_empty() {
949 return None;
950 }
951
952 let mut current = match &segments[0] {
954 PathSegment::Key(k) => item.get(*k)?,
955 PathSegment::Index(_) => return None,
956 };
957
958 for seg in &segments[1..] {
959 current = match seg {
960 PathSegment::Key(k) => match current {
961 AttributeValue::M(map) => map.get(*k)?,
962 _ => return None,
963 },
964 PathSegment::Index(idx) => match current {
965 AttributeValue::L(list) => list.get(*idx)?,
966 _ => return None,
967 },
968 };
969 }
970
971 Some(current)
972}
973
974enum PathSegment<'a> {
975 Key(&'a str),
976 Index(usize),
977}
978
979fn split_path_segments(path: &str) -> Option<Vec<PathSegment<'_>>> {
982 let mut segments = Vec::new();
983 let bytes = path.as_bytes();
984 let mut start = 0;
985 let mut i = 0;
986
987 while i < bytes.len() {
988 match bytes[i] {
989 b'.' => {
990 if start < i {
991 segments.push(PathSegment::Key(&path[start..i]));
992 }
993 i += 1;
994 start = i;
995 }
996 b'[' => {
997 if start < i {
998 segments.push(PathSegment::Key(&path[start..i]));
999 }
1000 i += 1;
1001 let idx_start = i;
1002 while i < bytes.len() && bytes[i] != b']' {
1003 i += 1;
1004 }
1005 let idx = path[idx_start..i].parse::<usize>().ok()?;
1006 segments.push(PathSegment::Index(idx));
1007 if i < bytes.len() {
1008 i += 1; }
1010 start = i;
1011 if i < bytes.len() && bytes[i] == b'.' {
1013 i += 1;
1014 start = i;
1015 }
1016 }
1017 _ => {
1018 i += 1;
1019 }
1020 }
1021 }
1022
1023 if start < bytes.len() {
1024 segments.push(PathSegment::Key(&path[start..]));
1025 }
1026
1027 Some(segments)
1028}
1029
1030fn compare_values(left: &AttributeValue, op: &CompOp, right: &AttributeValue) -> bool {
1032 match (left, right) {
1033 (AttributeValue::S(a), AttributeValue::S(b)) => compare_ord(a, op, b),
1034 (AttributeValue::N(a), AttributeValue::N(b)) => {
1035 use bigdecimal::BigDecimal;
1036 use std::str::FromStr;
1037 match (BigDecimal::from_str(a), BigDecimal::from_str(b)) {
1038 (Ok(da), Ok(db)) => compare_ord(&da, op, &db),
1039 _ => false,
1040 }
1041 }
1042 (AttributeValue::BOOL(a), AttributeValue::BOOL(b)) => match op {
1043 CompOp::Eq => a == b,
1044 CompOp::Ne => a != b,
1045 _ => false,
1046 },
1047 _ => match op {
1048 CompOp::Eq => false,
1049 CompOp::Ne => true,
1050 _ => false,
1051 },
1052 }
1053}
1054
1055fn format_bigdecimal(n: &bigdecimal::BigDecimal) -> String {
1057 let normalized = n.normalized();
1058 if normalized.as_bigint_and_exponent().1 < 0 {
1059 normalized.with_scale(0).to_string()
1060 } else {
1061 normalized.to_string()
1062 }
1063}
1064
1065fn compare_ord<T: PartialOrd>(a: &T, op: &CompOp, b: &T) -> bool {
1066 match op {
1067 CompOp::Eq => a == b,
1068 CompOp::Ne => a != b,
1069 CompOp::Lt => a < b,
1070 CompOp::Le => a <= b,
1071 CompOp::Gt => a > b,
1072 CompOp::Ge => a >= b,
1073 }
1074}