1use crate::errors::{DynoxideError, Result};
6use crate::partiql::parser::{
7 CompOp, PartiqlValue, SetValue, Statement, WhereClause, WhereCondition,
8};
9use crate::storage::Storage;
10use crate::types::{AttributeValue, Item};
11use std::collections::HashMap;
12
13pub fn execute(
18 storage: &Storage,
19 stmt: &Statement,
20 parameters: &[AttributeValue],
21 limit: Option<usize>,
22) -> Result<Option<Vec<Item>>> {
23 match stmt {
24 Statement::Select {
25 table_name,
26 projections,
27 where_clause,
28 } => execute_select(
29 storage,
30 table_name,
31 projections,
32 where_clause.as_ref(),
33 parameters,
34 limit,
35 ),
36 Statement::Insert {
37 table_name,
38 item,
39 if_not_exists,
40 } => {
41 execute_insert(storage, table_name, item, parameters, *if_not_exists)?;
42 Ok(None)
43 }
44 Statement::Update {
45 table_name,
46 set_clauses,
47 remove_paths,
48 where_clause,
49 } => {
50 execute_update(
51 storage,
52 table_name,
53 set_clauses,
54 remove_paths,
55 where_clause.as_ref(),
56 parameters,
57 )?;
58 Ok(None)
59 }
60 Statement::Delete {
61 table_name,
62 where_clause,
63 } => {
64 execute_delete(storage, table_name, where_clause.as_ref(), parameters)?;
65 Ok(None)
66 }
67 }
68}
69
70fn insert_nested_projection(result: &mut Item, path: &str, val: AttributeValue) {
76 let parts: Vec<&str> = path.split('.').collect();
77 let key = parts.last().unwrap();
79 result.insert(key.to_string(), val);
80}
81
82fn execute_select(
83 storage: &Storage,
84 table_name: &str,
85 projections: &[String],
86 where_clause: Option<&WhereClause>,
87 parameters: &[AttributeValue],
88 limit: Option<usize>,
89) -> Result<Option<Vec<Item>>> {
90 let meta = require_table(storage, table_name)?;
91 let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
92
93 if projections.len() == 1 && projections[0] == "COUNT(*)" {
95 let items = collect_matching_items(
96 storage,
97 table_name,
98 where_clause,
99 parameters,
100 &key_schema,
101 None,
102 )?;
103 let count = items.len();
104 let mut result = HashMap::new();
105 result.insert("Count".to_string(), AttributeValue::N(count.to_string()));
106 return Ok(Some(vec![result]));
107 }
108
109 let items = collect_matching_items(
110 storage,
111 table_name,
112 where_clause,
113 parameters,
114 &key_schema,
115 limit,
116 )?;
117
118 let items = if projections.is_empty() {
120 items
121 } else {
122 items
123 .into_iter()
124 .map(|item| {
125 let mut projected = HashMap::new();
126 for proj in projections {
127 if let Some(val) = resolve_nested_path(&item, proj) {
128 insert_nested_projection(&mut projected, proj, val.clone());
129 }
130 }
131 projected
132 })
133 .collect()
134 };
135
136 Ok(Some(items))
137}
138
139fn collect_matching_items(
141 storage: &Storage,
142 table_name: &str,
143 where_clause: Option<&WhereClause>,
144 parameters: &[AttributeValue],
145 key_schema: &crate::actions::helpers::KeySchema,
146 limit: Option<usize>,
147) -> Result<Vec<Item>> {
148 let pk_condition = where_clause.and_then(|wc| find_pk_condition(wc, &key_schema.partition_key));
150
151 let items: Vec<Item> = if let Some(pk_cond) = pk_condition {
152 let pk_val = resolve_value(&pk_cond.value, parameters)?;
153 let pk_str = pk_val
154 .to_key_string()
155 .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
156
157 let rows = storage.query_items(table_name, &pk_str, &Default::default())?;
158
159 let iter = rows
160 .into_iter()
161 .filter_map(|(_, _, json)| serde_json::from_str::<Item>(&json).ok())
162 .filter(|item| matches_where(item, where_clause, parameters));
163
164 if let Some(lim) = limit {
165 iter.take(lim).collect()
166 } else {
167 iter.collect()
168 }
169 } else {
170 let rows = storage.scan_items(table_name, &Default::default())?;
171
172 let iter = rows
173 .into_iter()
174 .filter_map(|(_, _, json)| serde_json::from_str::<Item>(&json).ok())
175 .filter(|item| matches_where(item, where_clause, parameters));
176
177 if let Some(lim) = limit {
178 iter.take(lim).collect()
179 } else {
180 iter.collect()
181 }
182 };
183
184 Ok(items)
185}
186
187fn find_pk_condition<'a>(
189 wc: &'a WhereClause,
190 pk_name: &str,
191) -> Option<&'a crate::partiql::parser::Condition> {
192 if wc.groups.len() == 1 {
195 wc.groups[0].iter().find_map(|c| match c {
196 WhereCondition::Comparison(cond) if cond.path == pk_name && cond.op == CompOp::Eq => {
197 Some(cond)
198 }
199 _ => None,
200 })
201 } else {
202 None
203 }
204}
205
206fn execute_insert(
207 storage: &Storage,
208 table_name: &str,
209 item_template: &HashMap<String, PartiqlValue>,
210 parameters: &[AttributeValue],
211 if_not_exists: bool,
212) -> Result<()> {
213 let mut item = HashMap::new();
215 for (k, v) in item_template {
216 let resolved = match v {
217 PartiqlValue::Literal(av) => av.clone(),
218 PartiqlValue::Parameter(idx) => parameters.get(*idx).cloned().ok_or_else(|| {
219 DynoxideError::ValidationException(format!(
220 "Parameter index {idx} out of range (have {} parameters)",
221 parameters.len()
222 ))
223 })?,
224 };
225 item.insert(k.clone(), resolved);
226 }
227
228 let meta = require_table(storage, table_name)?;
229 let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
230
231 crate::actions::helpers::validate_item_keys(&item, &key_schema, &meta)?;
233 crate::validation::validate_item_attribute_values(&item)?;
234
235 crate::validation::normalize_item_sets(&mut item);
237
238 let (pk, sk) = crate::actions::helpers::extract_key_strings(&item, &key_schema)?;
239
240 let existing = storage.get_item(table_name, &pk, &sk)?;
242 if existing.is_some() {
243 if if_not_exists {
244 return Ok(());
246 }
247 return Err(DynoxideError::DuplicateItemException(
248 "Duplicate primary key exists in table".to_string(),
249 ));
250 }
251
252 let item_json = serde_json::to_string(&item)
253 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
254 let item_size = crate::types::item_size(&item);
255
256 let hash_prefix = item
257 .get(&key_schema.partition_key)
258 .map(crate::storage::compute_hash_prefix)
259 .unwrap_or_default();
260 let old_json =
261 storage.put_item_with_hash(table_name, &pk, &sk, &item_json, item_size, &hash_prefix)?;
262
263 let table_sk_attr = key_schema.sort_key.as_deref();
265 let _ = crate::actions::gsi::maintain_gsis_after_write(
266 storage,
267 table_name,
268 &meta,
269 &pk,
270 &sk,
271 &item,
272 &key_schema.partition_key,
273 table_sk_attr,
274 )?;
275
276 crate::actions::lsi::maintain_lsis_after_write(
278 storage,
279 table_name,
280 &meta,
281 &pk,
282 &sk,
283 &item,
284 &key_schema.partition_key,
285 table_sk_attr,
286 )?;
287
288 let old_item: Option<Item> = old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
290 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item))?;
291
292 Ok(())
293}
294
295fn execute_update(
296 storage: &Storage,
297 table_name: &str,
298 set_clauses: &[crate::partiql::parser::SetClause],
299 remove_paths: &[String],
300 where_clause: Option<&WhereClause>,
301 parameters: &[AttributeValue],
302) -> Result<()> {
303 let meta = require_table(storage, table_name)?;
304 let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
305
306 let wc = where_clause.ok_or_else(|| {
308 DynoxideError::ValidationException("UPDATE requires a WHERE clause".to_string())
309 })?;
310
311 if wc.groups.len() > 1 {
313 return Err(DynoxideError::ValidationException(
314 "UPDATE does not support OR conditions in WHERE clause".to_string(),
315 ));
316 }
317
318 let pk_cond =
320 find_comparison_in_groups(&wc.groups, &key_schema.partition_key).ok_or_else(|| {
321 DynoxideError::ValidationException(
322 "UPDATE WHERE must include partition key equality".to_string(),
323 )
324 })?;
325
326 let pk_val = resolve_value(&pk_cond.value, parameters)?;
327 let pk_str = pk_val
328 .to_key_string()
329 .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
330
331 let sk_str = if let Some(ref sk_name) = key_schema.sort_key {
332 let sk_cond = find_comparison_in_groups(&wc.groups, sk_name);
333 if sk_cond.is_none() {
334 return Err(DynoxideError::ValidationException(
335 "Where clause does not contain a mandatory equality on all key attributes"
336 .to_string(),
337 ));
338 }
339 sk_cond
340 .map(|c| resolve_value(&c.value, parameters))
341 .transpose()?
342 .and_then(|v| v.to_key_string())
343 .unwrap_or_default()
344 } else {
345 String::new()
346 };
347
348 let existing_json = storage.get_item(table_name, &pk_str, &sk_str)?;
350 let mut item: Item = existing_json
351 .as_ref()
352 .and_then(|j| serde_json::from_str(j).ok())
353 .unwrap_or_default();
354
355 let old_item = item.clone();
356
357 for clause in set_clauses {
359 let val = resolve_set_value(&clause.value, &item, parameters)?;
360 set_nested_value(&mut item, &clause.path, val)?;
361 }
362
363 for path in remove_paths {
365 remove_nested_value(&mut item, path);
366 }
367
368 if item.is_empty() {
370 return Ok(());
371 }
372
373 crate::validation::validate_item_attribute_values(&item)?;
375 crate::validation::normalize_item_sets(&mut item);
376
377 let item_json = serde_json::to_string(&item)
378 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
379 let item_size = crate::types::item_size(&item);
380
381 let hash_prefix = item
382 .get(&key_schema.partition_key)
383 .map(crate::storage::compute_hash_prefix)
384 .unwrap_or_default();
385 storage.put_item_with_hash(
386 table_name,
387 &pk_str,
388 &sk_str,
389 &item_json,
390 item_size,
391 &hash_prefix,
392 )?;
393
394 let table_sk_attr = key_schema.sort_key.as_deref();
396 let _ = crate::actions::gsi::maintain_gsis_after_write(
397 storage,
398 table_name,
399 &meta,
400 &pk_str,
401 &sk_str,
402 &item,
403 &key_schema.partition_key,
404 table_sk_attr,
405 )?;
406
407 crate::actions::lsi::maintain_lsis_after_write(
409 storage,
410 table_name,
411 &meta,
412 &pk_str,
413 &sk_str,
414 &item,
415 &key_schema.partition_key,
416 table_sk_attr,
417 )?;
418
419 let old_ref = if existing_json.is_some() {
421 Some(&old_item)
422 } else {
423 None
424 };
425 crate::streams::record_stream_event(storage, &meta, old_ref, Some(&item))?;
426
427 Ok(())
428}
429
430fn execute_delete(
431 storage: &Storage,
432 table_name: &str,
433 where_clause: Option<&WhereClause>,
434 parameters: &[AttributeValue],
435) -> Result<()> {
436 let meta = require_table(storage, table_name)?;
437 let key_schema = crate::actions::helpers::parse_key_schema(&meta)?;
438
439 let wc = where_clause.ok_or_else(|| {
440 DynoxideError::ValidationException("DELETE requires a WHERE clause".to_string())
441 })?;
442
443 if wc.groups.len() > 1 {
445 return Err(DynoxideError::ValidationException(
446 "DELETE does not support OR conditions in WHERE clause".to_string(),
447 ));
448 }
449
450 let pk_cond =
451 find_comparison_in_groups(&wc.groups, &key_schema.partition_key).ok_or_else(|| {
452 DynoxideError::ValidationException(
453 "DELETE WHERE must include partition key equality".to_string(),
454 )
455 })?;
456
457 let pk_val = resolve_value(&pk_cond.value, parameters)?;
458 let pk_str = pk_val
459 .to_key_string()
460 .ok_or_else(|| DynoxideError::ValidationException("Invalid key value".to_string()))?;
461
462 if let Some(ref sk_name) = key_schema.sort_key {
464 let has_sk_condition = wc.groups.iter().any(|group| {
465 group.iter().any(|c| match c {
466 WhereCondition::Comparison(comp) => comp.path == *sk_name && comp.op == CompOp::Eq,
467 _ => false,
468 })
469 });
470 if !has_sk_condition {
471 return Err(DynoxideError::ValidationException(
472 "Where clause does not contain a mandatory equality on all key attributes"
473 .to_string(),
474 ));
475 }
476 }
477
478 let sk_str = if let Some(ref sk_name) = key_schema.sort_key {
479 find_comparison_in_groups(&wc.groups, sk_name)
480 .map(|c| resolve_value(&c.value, parameters))
481 .transpose()?
482 .and_then(|v| v.to_key_string())
483 .unwrap_or_default()
484 } else {
485 String::new()
486 };
487
488 let old_json = storage.delete_item(table_name, &pk_str, &sk_str)?;
489
490 let _ = crate::actions::gsi::maintain_gsis_after_delete(
492 storage, table_name, &meta, &pk_str, &sk_str,
493 )?;
494
495 crate::actions::lsi::maintain_lsis_after_delete(storage, table_name, &meta, &pk_str, &sk_str)?;
497
498 let old_item: Option<Item> = old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
500 if old_item.is_some() {
501 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None)?;
502 }
503
504 Ok(())
505}
506
507fn require_table(storage: &Storage, table_name: &str) -> Result<crate::storage::TableMetadata> {
512 crate::actions::helpers::require_table(storage, table_name)
513}
514
515fn find_comparison_in_groups<'a>(
518 groups: &'a [Vec<WhereCondition>],
519 path: &str,
520) -> Option<&'a crate::partiql::parser::Condition> {
521 for group in groups {
522 if let Some(cond) = find_comparison(group, path) {
523 return Some(cond);
524 }
525 }
526 None
527}
528
529fn find_comparison<'a>(
531 conditions: &'a [WhereCondition],
532 path: &str,
533) -> Option<&'a crate::partiql::parser::Condition> {
534 conditions.iter().find_map(|c| match c {
535 WhereCondition::Comparison(cond) if cond.path == path && cond.op == CompOp::Eq => {
536 Some(cond)
537 }
538 _ => None,
539 })
540}
541
542fn resolve_value(val: &PartiqlValue, parameters: &[AttributeValue]) -> Result<AttributeValue> {
544 match val {
545 PartiqlValue::Literal(av) => Ok(av.clone()),
546 PartiqlValue::Parameter(idx) => parameters.get(*idx).cloned().ok_or_else(|| {
547 DynoxideError::ValidationException(format!(
548 "Parameter index {idx} out of range (have {} parameters)",
549 parameters.len()
550 ))
551 }),
552 }
553}
554
555fn resolve_set_value(
557 val: &SetValue,
558 item: &Item,
559 parameters: &[AttributeValue],
560) -> Result<AttributeValue> {
561 match val {
562 SetValue::Simple(pv) => resolve_value(pv, parameters),
563 SetValue::Add(attr, pv) => {
564 let current = resolve_nested_path(item, attr);
565 let operand = resolve_value(pv, parameters)?;
566 match (current, &operand) {
567 (Some(AttributeValue::N(cur)), AttributeValue::N(add)) => {
568 use bigdecimal::BigDecimal;
569 use std::str::FromStr;
570 let a = BigDecimal::from_str(cur).map_err(|e| {
571 DynoxideError::ValidationException(format!("Invalid number: {e}"))
572 })?;
573 let b = BigDecimal::from_str(add).map_err(|e| {
574 DynoxideError::ValidationException(format!("Invalid number: {e}"))
575 })?;
576 let result = a + b;
577 Ok(AttributeValue::N(format_bigdecimal(&result)))
578 }
579 (None, AttributeValue::N(_)) => {
580 Ok(operand)
582 }
583 _ => Err(DynoxideError::ValidationException(
584 "SET expression add requires numeric attribute and operand".to_string(),
585 )),
586 }
587 }
588 SetValue::Sub(attr, pv) => {
589 let current = resolve_nested_path(item, attr);
590 let operand = resolve_value(pv, parameters)?;
591 match (current, &operand) {
592 (Some(AttributeValue::N(cur)), AttributeValue::N(sub)) => {
593 use bigdecimal::BigDecimal;
594 use std::str::FromStr;
595 let a = BigDecimal::from_str(cur).map_err(|e| {
596 DynoxideError::ValidationException(format!("Invalid number: {e}"))
597 })?;
598 let b = BigDecimal::from_str(sub).map_err(|e| {
599 DynoxideError::ValidationException(format!("Invalid number: {e}"))
600 })?;
601 let result = a - b;
602 Ok(AttributeValue::N(format_bigdecimal(&result)))
603 }
604 (None, AttributeValue::N(sub)) => {
605 use bigdecimal::BigDecimal;
607 use std::str::FromStr;
608 let b = BigDecimal::from_str(sub).map_err(|e| {
609 DynoxideError::ValidationException(format!("Invalid number: {e}"))
610 })?;
611 let result = -b;
612 Ok(AttributeValue::N(format_bigdecimal(&result)))
613 }
614 _ => Err(DynoxideError::ValidationException(
615 "SET expression subtract requires numeric attribute and operand".to_string(),
616 )),
617 }
618 }
619 SetValue::ListAppend(first, second) => {
620 let a = resolve_value(first, parameters)?;
621 let b = resolve_value(second, parameters)?;
622 let list_a = match &a {
625 AttributeValue::S(name) => resolve_nested_path(item, name)
626 .cloned()
627 .unwrap_or(AttributeValue::L(Vec::new())),
628 other => other.clone(),
629 };
630 let list_b = match &b {
631 AttributeValue::S(name) => resolve_nested_path(item, name)
632 .cloned()
633 .unwrap_or(AttributeValue::L(Vec::new())),
634 other => other.clone(),
635 };
636 match (list_a, list_b) {
637 (AttributeValue::L(mut la), AttributeValue::L(lb)) => {
638 la.extend(lb);
639 Ok(AttributeValue::L(la))
640 }
641 _ => Err(DynoxideError::ValidationException(
642 "list_append requires list operands".to_string(),
643 )),
644 }
645 }
646 }
647}
648
649fn set_nested_value(item: &mut Item, path: &str, val: AttributeValue) -> Result<()> {
651 let parts: Vec<&str> = path.split('.').collect();
652 if parts.len() == 1 {
653 item.insert(path.to_string(), val);
654 return Ok(());
655 }
656 let mut current = item;
658 for part in &parts[..parts.len() - 1] {
659 let entry = current
660 .entry(part.to_string())
661 .or_insert_with(|| AttributeValue::M(HashMap::new()));
662 match entry {
663 AttributeValue::M(map) => {
664 current = map;
665 }
666 _ => {
667 return Err(DynoxideError::ValidationException(
668 "The document path provided in the update expression is invalid for update"
669 .to_string(),
670 ));
671 }
672 }
673 }
674 current.insert(parts.last().unwrap().to_string(), val);
675 Ok(())
676}
677
678fn remove_nested_value(item: &mut Item, path: &str) {
680 let parts: Vec<&str> = path.split('.').collect();
681 if parts.len() == 1 {
682 item.remove(path);
683 return;
684 }
685 let mut current = item;
687 for part in &parts[..parts.len() - 1] {
688 match current.get_mut(*part) {
689 Some(AttributeValue::M(map)) => {
690 current = map;
691 }
692 _ => return, }
694 }
695 current.remove(*parts.last().unwrap());
696}
697
698fn matches_where(
700 item: &Item,
701 where_clause: Option<&WhereClause>,
702 parameters: &[AttributeValue],
703) -> bool {
704 let wc = match where_clause {
705 Some(wc) => wc,
706 None => return true,
707 };
708
709 wc.groups
711 .iter()
712 .any(|group| matches_conditions(item, group, parameters))
713}
714
715fn matches_conditions(
717 item: &Item,
718 conditions: &[WhereCondition],
719 parameters: &[AttributeValue],
720) -> bool {
721 for cond in conditions {
722 match cond {
723 WhereCondition::Comparison(c) => {
724 let item_val = match resolve_nested_path(item, &c.path) {
725 Some(v) => v,
726 None => return false,
727 };
728 let target = match resolve_value(&c.value, parameters) {
729 Ok(v) => v,
730 Err(_) => return false,
731 };
732 if !compare_values(item_val, &c.op, &target) {
733 return false;
734 }
735 }
736 WhereCondition::Exists(path) | WhereCondition::IsNotMissing(path) => {
737 if resolve_nested_path(item, path).is_none() {
738 return false;
739 }
740 }
741 WhereCondition::NotExists(path) | WhereCondition::IsMissing(path) => {
742 if resolve_nested_path(item, path).is_some() {
743 return false;
744 }
745 }
746 WhereCondition::BeginsWith(path, prefix_val) => {
747 let item_val = match resolve_nested_path(item, path) {
748 Some(v) => v,
749 None => return false,
750 };
751 let prefix = match resolve_value(prefix_val, parameters) {
752 Ok(v) => v,
753 Err(_) => return false,
754 };
755 match (item_val, &prefix) {
756 (AttributeValue::S(s), AttributeValue::S(p)) => {
757 if !s.starts_with(p.as_str()) {
758 return false;
759 }
760 }
761 _ => return false,
762 }
763 }
764 WhereCondition::Between(path, low, high) => {
765 let item_val = match resolve_nested_path(item, path) {
766 Some(v) => v,
767 None => return false,
768 };
769 let low_val = match resolve_value(low, parameters) {
770 Ok(v) => v,
771 Err(_) => return false,
772 };
773 let high_val = match resolve_value(high, parameters) {
774 Ok(v) => v,
775 Err(_) => return false,
776 };
777 if !compare_values(item_val, &CompOp::Ge, &low_val)
778 || !compare_values(item_val, &CompOp::Le, &high_val)
779 {
780 return false;
781 }
782 }
783 WhereCondition::In(path, values) => {
784 let item_val = match resolve_nested_path(item, path) {
785 Some(v) => v,
786 None => return false,
787 };
788 let matched = values.iter().any(|v| {
789 resolve_value(v, parameters)
790 .map(|target| compare_values(item_val, &CompOp::Eq, &target))
791 .unwrap_or(false)
792 });
793 if !matched {
794 return false;
795 }
796 }
797 WhereCondition::Contains(path, substr_val) => {
798 let item_val = match resolve_nested_path(item, path) {
799 Some(v) => v,
800 None => return false,
801 };
802 let substr = match resolve_value(substr_val, parameters) {
803 Ok(v) => v,
804 Err(_) => return false,
805 };
806 match (item_val, &substr) {
807 (AttributeValue::S(s), AttributeValue::S(sub)) => {
808 if !s.contains(sub.as_str()) {
809 return false;
810 }
811 }
812 (AttributeValue::SS(set), AttributeValue::S(val)) => {
813 if !set.contains(val) {
814 return false;
815 }
816 }
817 (AttributeValue::NS(set), AttributeValue::N(val)) => {
818 if !set.contains(val) {
819 return false;
820 }
821 }
822 (AttributeValue::L(list), target) => {
823 if !list.contains(target) {
824 return false;
825 }
826 }
827 _ => return false,
828 }
829 }
830 }
831 }
832
833 true
834}
835
836fn resolve_nested_path<'a>(item: &'a Item, path: &str) -> Option<&'a AttributeValue> {
840 if !path.contains('.') && !path.contains('[') {
842 return item.get(path);
843 }
844
845 let segments = split_path_segments(path)?;
846 if segments.is_empty() {
847 return None;
848 }
849
850 let mut current = match &segments[0] {
852 PathSegment::Key(k) => item.get(*k)?,
853 PathSegment::Index(_) => return None,
854 };
855
856 for seg in &segments[1..] {
857 current = match seg {
858 PathSegment::Key(k) => match current {
859 AttributeValue::M(map) => map.get(*k)?,
860 _ => return None,
861 },
862 PathSegment::Index(idx) => match current {
863 AttributeValue::L(list) => list.get(*idx)?,
864 _ => return None,
865 },
866 };
867 }
868
869 Some(current)
870}
871
872enum PathSegment<'a> {
873 Key(&'a str),
874 Index(usize),
875}
876
877fn split_path_segments(path: &str) -> Option<Vec<PathSegment<'_>>> {
880 let mut segments = Vec::new();
881 let bytes = path.as_bytes();
882 let mut start = 0;
883 let mut i = 0;
884
885 while i < bytes.len() {
886 match bytes[i] {
887 b'.' => {
888 if start < i {
889 segments.push(PathSegment::Key(&path[start..i]));
890 }
891 i += 1;
892 start = i;
893 }
894 b'[' => {
895 if start < i {
896 segments.push(PathSegment::Key(&path[start..i]));
897 }
898 i += 1;
899 let idx_start = i;
900 while i < bytes.len() && bytes[i] != b']' {
901 i += 1;
902 }
903 let idx = path[idx_start..i].parse::<usize>().ok()?;
904 segments.push(PathSegment::Index(idx));
905 if i < bytes.len() {
906 i += 1; }
908 start = i;
909 if i < bytes.len() && bytes[i] == b'.' {
911 i += 1;
912 start = i;
913 }
914 }
915 _ => {
916 i += 1;
917 }
918 }
919 }
920
921 if start < bytes.len() {
922 segments.push(PathSegment::Key(&path[start..]));
923 }
924
925 Some(segments)
926}
927
928fn compare_values(left: &AttributeValue, op: &CompOp, right: &AttributeValue) -> bool {
930 match (left, right) {
931 (AttributeValue::S(a), AttributeValue::S(b)) => compare_ord(a, op, b),
932 (AttributeValue::N(a), AttributeValue::N(b)) => {
933 use bigdecimal::BigDecimal;
934 use std::str::FromStr;
935 match (BigDecimal::from_str(a), BigDecimal::from_str(b)) {
936 (Ok(da), Ok(db)) => compare_ord(&da, op, &db),
937 _ => false,
938 }
939 }
940 (AttributeValue::BOOL(a), AttributeValue::BOOL(b)) => match op {
941 CompOp::Eq => a == b,
942 CompOp::Ne => a != b,
943 _ => false,
944 },
945 _ => match op {
946 CompOp::Eq => false,
947 CompOp::Ne => true,
948 _ => false,
949 },
950 }
951}
952
953fn format_bigdecimal(n: &bigdecimal::BigDecimal) -> String {
955 let normalized = n.normalized();
956 if normalized.as_bigint_and_exponent().1 < 0 {
957 normalized.with_scale(0).to_string()
958 } else {
959 normalized.to_string()
960 }
961}
962
963fn compare_ord<T: PartialOrd>(a: &T, op: &CompOp, b: &T) -> bool {
964 match op {
965 CompOp::Eq => a == b,
966 CompOp::Ne => a != b,
967 CompOp::Lt => a < b,
968 CompOp::Le => a <= b,
969 CompOp::Gt => a > b,
970 CompOp::Ge => a >= b,
971 }
972}