1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage_backend::StorageBackend;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12#[derive(Debug, Default, Deserialize)]
14struct QueryRequestRaw {
15 #[serde(rename = "TableName", default)]
16 table_name: Option<String>,
17 #[serde(rename = "KeyConditionExpression", default)]
18 key_condition_expression: Option<String>,
19 #[serde(rename = "FilterExpression", default)]
20 filter_expression: Option<String>,
21 #[serde(rename = "ProjectionExpression", default)]
22 projection_expression: Option<String>,
23 #[serde(rename = "ExpressionAttributeNames", default)]
24 expression_attribute_names: Option<HashMap<String, String>>,
25 #[serde(rename = "ExpressionAttributeValues", default)]
26 expression_attribute_values: Option<HashMap<String, AttributeValue>>,
27 #[serde(rename = "ScanIndexForward", default = "default_true")]
28 scan_index_forward: bool,
29 #[serde(rename = "Limit", default)]
30 limit: Option<usize>,
31 #[serde(rename = "ExclusiveStartKey", default)]
32 exclusive_start_key: Option<serde_json::Value>,
33 #[serde(rename = "Select", default)]
34 select: Option<String>,
35 #[serde(rename = "ConsistentRead", default)]
36 consistent_read: Option<bool>,
37 #[serde(rename = "IndexName", default)]
38 index_name: Option<String>,
39 #[serde(rename = "ReturnConsumedCapacity", default)]
40 return_consumed_capacity: Option<String>,
41 #[serde(rename = "KeyConditions", default)]
42 key_conditions: Option<serde_json::Value>,
43 #[serde(rename = "AttributesToGet", default)]
44 attributes_to_get: Option<Vec<String>>,
45 #[serde(rename = "QueryFilter", default)]
46 query_filter: Option<serde_json::Value>,
47 #[serde(rename = "ConditionalOperator", default)]
48 conditional_operator: Option<String>,
49}
50
51fn default_true() -> bool {
52 true
53}
54
55#[derive(Debug, Default)]
56pub struct QueryRequest {
57 pub table_name: String,
58 pub key_condition_expression: Option<String>,
59 pub filter_expression: Option<String>,
60 pub projection_expression: Option<String>,
61 pub expression_attribute_names: Option<HashMap<String, String>>,
62 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
63 pub scan_index_forward: bool,
64 pub limit: Option<usize>,
65 pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
66 pub select: Option<String>,
67 pub consistent_read: Option<bool>,
68 pub index_name: Option<String>,
69 pub return_consumed_capacity: Option<String>,
70 pub key_conditions: Option<serde_json::Value>,
71 pub attributes_to_get: Option<Vec<String>>,
72 pub query_filter: Option<serde_json::Value>,
73 pub conditional_operator: Option<String>,
74 pub exclusive_start_key_raw: Option<serde_json::Value>,
79}
80
81impl<'de> serde::Deserialize<'de> for QueryRequest {
82 fn deserialize<D: serde::Deserializer<'de>>(
83 deserializer: D,
84 ) -> std::result::Result<Self, D::Error> {
85 let raw = QueryRequestRaw::deserialize(deserializer)?;
86 use crate::validation::{
87 TableNameContext, format_validation_errors, table_name_constraint_errors,
88 };
89
90 let mut errors = Vec::new();
91 errors.extend(table_name_constraint_errors(
92 raw.table_name.as_deref(),
93 TableNameContext::ReadWrite,
94 ));
95 let table_name = raw.table_name.unwrap_or_default();
96
97 if let Some(ref rcc) = raw.return_consumed_capacity {
99 if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
100 errors.push(format!(
101 "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
102 Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
103 rcc
104 ));
105 }
106 }
107
108 if let Some(ref sel) = raw.select {
110 if ![
111 "ALL_ATTRIBUTES",
112 "ALL_PROJECTED_ATTRIBUTES",
113 "COUNT",
114 "SPECIFIC_ATTRIBUTES",
115 ]
116 .contains(&sel.as_str())
117 {
118 errors.push(format!(
119 "Value '{}' at 'select' failed to satisfy constraint: \
120 Member must satisfy enum value set: [SPECIFIC_ATTRIBUTES, COUNT, ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES]",
121 sel
122 ));
123 }
124 }
125
126 if let Some(limit) = raw.limit {
131 if limit == 0 {
132 errors.push(
133 "Value at 'Limit' failed to satisfy constraint: \
134 Member must have value greater than or equal to 1"
135 .to_string(),
136 );
137 }
138 }
139
140 if let Some(msg) = format_validation_errors(&errors) {
141 return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
142 }
143
144 Ok(QueryRequest {
145 table_name,
146 key_condition_expression: raw.key_condition_expression,
147 filter_expression: raw.filter_expression,
148 projection_expression: raw.projection_expression,
149 expression_attribute_names: raw.expression_attribute_names,
150 expression_attribute_values: raw.expression_attribute_values,
151 scan_index_forward: raw.scan_index_forward,
152 limit: raw.limit,
153 exclusive_start_key: None,
154 select: raw.select,
155 consistent_read: raw.consistent_read,
156 index_name: raw.index_name,
157 return_consumed_capacity: raw.return_consumed_capacity,
158 key_conditions: raw.key_conditions,
159 attributes_to_get: raw.attributes_to_get,
160 query_filter: raw.query_filter,
161 conditional_operator: raw.conditional_operator,
162 exclusive_start_key_raw: raw.exclusive_start_key,
163 })
164 }
165}
166
167#[derive(Debug, Default, Serialize)]
168pub struct QueryResponse {
169 #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
170 pub items: Option<Vec<Item>>,
171 #[serde(rename = "Count")]
172 pub count: usize,
173 #[serde(rename = "ScannedCount")]
174 pub scanned_count: usize,
175 #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
176 pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
177 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
178 pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
179}
180
181pub async fn execute<S: StorageBackend>(
182 storage: &S,
183 mut request: QueryRequest,
184) -> Result<QueryResponse> {
185 crate::validation::validate_table_name(&request.table_name)?;
187
188 {
191 let mut non_expr = Vec::new();
192 let mut expr = Vec::new();
193 if request.attributes_to_get.is_some() {
194 non_expr.push("AttributesToGet");
195 }
196 if request.query_filter.is_some()
197 && request.query_filter.as_ref().is_some_and(|v| !v.is_null())
198 {
199 non_expr.push("QueryFilter");
200 }
201 if request.conditional_operator.is_some() {
202 non_expr.push("ConditionalOperator");
203 }
204 if request.key_conditions.is_some()
205 && request
206 .key_conditions
207 .as_ref()
208 .is_some_and(|v| !v.is_null())
209 {
210 non_expr.push("KeyConditions");
211 }
212 if request.projection_expression.is_some() {
213 expr.push("ProjectionExpression");
214 }
215 if request.filter_expression.is_some() {
216 expr.push("FilterExpression");
217 }
218 if request.key_condition_expression.is_some() {
219 expr.push("KeyConditionExpression");
220 }
221 let no_raw_eav: Option<serde_json::Value> = None;
222 let ctx = helpers::ExpressionParamContext {
223 non_expression_params: non_expr,
224 expression_params: expr,
225 all_expression_param_names: vec!["FilterExpression", "KeyConditionExpression"],
226 expression_attribute_names: &request.expression_attribute_names,
227 expression_attribute_values: &request.expression_attribute_values,
228 expression_attribute_values_raw: &no_raw_eav,
229 };
230 helpers::validate_expression_params(&ctx)?;
231 }
232
233 helpers::validate_filter_conditions_raw(request.query_filter.as_ref(), "QueryFilter")?;
235 helpers::validate_filter_conditions_raw(request.key_conditions.as_ref(), "KeyConditions")?;
236
237 helpers::validate_filter_condition_args(request.query_filter.as_ref())?;
239 helpers::validate_filter_condition_args(request.key_conditions.as_ref())?;
240
241 if let Some(ref attrs) = request.attributes_to_get {
243 helpers::validate_attributes_to_get_no_duplicates(attrs)?;
244 }
245
246 let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
248 Some(helpers::parse_exclusive_start_key(esk_val)?)
249 } else {
250 request.exclusive_start_key.clone()
251 };
252
253 if let Some(ref kce) = request.key_condition_expression {
257 if kce.is_empty() {
258 return Err(DynoxideError::ValidationException(
259 "Invalid KeyConditionExpression: The expression can not be empty;".to_string(),
260 ));
261 }
262 }
263 if let Some(ref fe) = request.filter_expression {
264 if fe.is_empty() {
265 if request.query_filter.is_none() || request.filter_expression.as_deref() == Some("") {
266 return Err(DynoxideError::ValidationException(
267 "Invalid FilterExpression: The expression can not be empty;".to_string(),
268 ));
269 }
270 } else {
271 let parsed_fe = expressions::condition::parse(fe).map_err(|e| {
272 DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
273 })?;
274 if let Err(e) = expressions::condition::validate_name_refs(
277 &parsed_fe,
278 &request.expression_attribute_names,
279 ) {
280 return Err(DynoxideError::ValidationException(format!(
281 "Invalid FilterExpression: {e}"
282 )));
283 }
284 if let Err(e) = expressions::condition::validate_operand_semantics(
285 &parsed_fe,
286 &request.expression_attribute_names,
287 &request.expression_attribute_values,
288 ) {
289 return Err(DynoxideError::ValidationException(format!(
290 "Invalid FilterExpression: {e}"
291 )));
292 }
293 }
294 }
295 if let Some(ref pe) = request.projection_expression {
296 if pe.is_empty() {
297 return Err(DynoxideError::ValidationException(
298 "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
299 ));
300 }
301 }
302
303 if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
305 && request.projection_expression.is_none()
306 && request.attributes_to_get.is_none()
307 {
308 return Err(DynoxideError::ValidationException(
309 "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
310 .to_string(),
311 ));
312 }
313
314 if request.projection_expression.is_some() {
316 if let Some(select) = request.select.as_deref() {
317 if select != "SPECIFIC_ATTRIBUTES" {
318 let target = if select == "COUNT" {
320 "only the Count"
321 } else {
322 select
323 };
324 return Err(DynoxideError::ValidationException(format!(
325 "Cannot specify the ProjectionExpression when choosing to get {target}"
326 )));
327 }
328 }
329 }
330
331 if request.select.as_deref() == Some("ALL_PROJECTED_ATTRIBUTES") && request.index_name.is_none()
333 {
334 return Err(DynoxideError::ValidationException(
335 "ALL_PROJECTED_ATTRIBUTES can be used only when Querying using an IndexName"
336 .to_string(),
337 ));
338 }
339
340 if let Some(ref kce) = request.key_condition_expression {
342 if !kce.is_empty() {
343 let temp_tracker = crate::expressions::TrackedExpressionAttributes::new(
345 &request.expression_attribute_names,
346 &request.expression_attribute_values,
347 );
348 if let Err(e) = expressions::key_condition::parse(kce, &temp_tracker) {
349 return Err(DynoxideError::ValidationException(e));
350 }
351 }
352 }
353
354 let meta = helpers::require_table_for_item_op(storage, &request.table_name).await?;
355 let table_key_schema = helpers::parse_key_schema(&meta)?;
356
357 let effective_pk_for_kc = if let Some(ref index_name) = request.index_name {
361 if let Some((pk, _)) = request
362 .index_name
363 .as_ref()
364 .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok())
365 {
366 pk
367 } else if let Ok((pk, _)) = super::gsi::parse_gsi_key_schema(&meta, index_name) {
368 pk
369 } else {
370 table_key_schema.partition_key.clone()
371 }
372 } else {
373 table_key_schema.partition_key.clone()
374 };
375
376 if request.key_condition_expression.is_none() {
378 if let Some(ref kc_val) = request.key_conditions {
379 if let Ok(kc) =
380 serde_json::from_value::<HashMap<String, helpers::KeyCondition>>(kc_val.clone())
381 {
382 if !kc.is_empty() {
383 let converted =
384 helpers::convert_key_conditions(&kc, Some(&effective_pk_for_kc))?;
385 request.key_condition_expression = Some(converted.expression);
386 let expr_values = request
387 .expression_attribute_values
388 .get_or_insert_with(HashMap::new);
389 expr_values.extend(converted.attribute_values);
390 let expr_names = request
391 .expression_attribute_names
392 .get_or_insert_with(HashMap::new);
393 expr_names.extend(converted.attribute_names);
394 }
395 }
396 }
397 }
398
399 if request.filter_expression.is_none() {
401 if let Some(ref qf_val) = request.query_filter {
402 if let Ok(qf) =
403 serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(qf_val.clone())
404 {
405 if !qf.is_empty() {
406 let converted = helpers::convert_filter_conditions(
407 &qf,
408 request.conditional_operator.as_deref(),
409 )?;
410 if !converted.expression.is_empty() {
411 request.filter_expression = Some(converted.expression);
412 let expr_values = request
413 .expression_attribute_values
414 .get_or_insert_with(HashMap::new);
415 expr_values.extend(converted.attribute_values);
416 let expr_names = request
417 .expression_attribute_names
418 .get_or_insert_with(HashMap::new);
419 expr_names.extend(converted.attribute_names);
420 }
421 }
422 }
423 }
424 }
425
426 let legacy_projection = if request.projection_expression.is_none() {
428 request
429 .attributes_to_get
430 .as_ref()
431 .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
432 } else {
433 None
434 };
435
436 let key_condition_expression = request.key_condition_expression.as_deref().ok_or_else(|| {
438 DynoxideError::ValidationException(
439 "Either the KeyConditions or KeyConditionExpression parameter must be specified in the request."
440 .to_string(),
441 )
442 })?;
443 let key_condition_expression = key_condition_expression.to_string();
444
445 let lsi_keys = request
447 .index_name
448 .as_ref()
449 .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
450 let is_lsi = lsi_keys.is_some();
451
452 if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
454 return Err(DynoxideError::ValidationException(
455 "Consistent reads are not supported on global secondary indexes".to_string(),
456 ));
457 }
458
459 let index_projection_type = if let Some(ref index_name) = request.index_name {
461 if is_lsi {
462 super::lsi::parse_lsi_defs(&meta)?
463 .into_iter()
464 .find(|l| l.index_name == *index_name)
465 .map(|l| l.projection_type)
466 } else {
467 super::gsi::parse_gsi_defs(&meta)?
468 .into_iter()
469 .find(|g| g.index_name == *index_name)
470 .map(|g| g.projection_type)
471 }
472 } else {
473 None
474 };
475
476 let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
477 if let Some(keys) = lsi_keys {
478 keys
479 } else {
480 super::gsi::parse_gsi_key_schema(&meta, index_name)?
481 }
482 } else {
483 (
484 table_key_schema.partition_key.clone(),
485 table_key_schema.sort_key.clone(),
486 )
487 };
488
489 if let Some(ref esk) = exclusive_start_key {
491 helpers::validate_esk_count_and_index_keys(
493 esk,
494 &meta,
495 request.index_name.as_deref(),
496 "The provided starting key is invalid",
497 )?;
498 helpers::validate_esk_table_keys(esk, &meta)?;
500 }
501
502 let tracker = crate::expressions::TrackedExpressionAttributes::new(
504 &request.expression_attribute_names,
505 &request.expression_attribute_values,
506 );
507
508 let key_cond = expressions::key_condition::parse(&key_condition_expression, &tracker)
510 .map_err(DynoxideError::ValidationException)?;
511
512 if key_cond.pk_name != effective_pk {
514 return Err(DynoxideError::ValidationException(format!(
515 "Query condition missed key schema element: {}",
516 effective_pk
517 )));
518 }
519
520 let resolved = expressions::key_condition::resolve_values(&key_cond, &tracker)
522 .map_err(DynoxideError::ValidationException)?;
523
524 let pk_str = resolved.pk_value.to_key_string().ok_or_else(|| {
526 DynoxideError::ValidationException(
527 "Cannot convert partition key value to string".to_string(),
528 )
529 })?;
530
531 let mut sk_sql_parts = Vec::new();
533 let mut sk_param_values = Vec::new();
534
535 if let Some(ref sk_cond) = resolved.sk_condition {
536 if let Some(ref eff_sk) = effective_sk {
538 if sk_cond.sk_name() != eff_sk {
539 return Err(DynoxideError::ValidationException(format!(
540 "Query condition missed key schema element: {eff_sk}"
541 )));
542 }
543 } else {
544 return Err(DynoxideError::ValidationException(
545 "Query filter contains a sort key condition but the table has no sort key"
546 .to_string(),
547 ));
548 }
549
550 let conditions = sk_cond.to_sql_conditions();
551 for (i, (op, val)) in conditions.iter().enumerate() {
552 let param_idx = i + 2; if op == "LIKE" {
554 sk_sql_parts.push(format!("AND sk LIKE ?{param_idx} ESCAPE '\\'"));
555 } else {
556 sk_sql_parts.push(format!("AND sk {op} ?{param_idx}"));
557 }
558 sk_param_values.push(val.clone());
559 }
560 }
561
562 let mut effective_key_attrs = vec![effective_pk.clone()];
565 if let Some(ref sk) = effective_sk {
566 effective_key_attrs.push(sk.clone());
567 }
568
569 if let Some(ref qf_val) = request.query_filter {
571 if let Some(obj) = qf_val.as_object() {
572 for attr_name in obj.keys() {
573 if effective_key_attrs.contains(attr_name) {
574 return Err(DynoxideError::ValidationException(format!(
575 "QueryFilter can only contain non-primary key attributes: \
576 Primary key attribute: {attr_name}"
577 )));
578 }
579 }
580 }
581 }
582
583 if request.query_filter.is_none() {
586 if let Some(ref fe) = request.filter_expression {
587 if let Ok(parsed_fe) = expressions::condition::parse(fe) {
588 let top_attrs = expressions::condition::extract_top_level_attributes(
589 &parsed_fe,
590 &request.expression_attribute_names,
591 );
592 for attr in &top_attrs {
593 if effective_key_attrs.contains(attr) {
594 return Err(DynoxideError::ValidationException(format!(
595 "Filter Expression can only contain non-primary key attributes: \
596 Primary key attribute: {attr}"
597 )));
598 }
599 }
600 let mut index_key_attrs = Vec::new();
603 if request.index_name.is_some() {
604 if !effective_key_attrs
606 .iter()
607 .any(|k| k == &table_key_schema.partition_key)
608 {
609 }
611 for k in &effective_key_attrs {
613 if ![table_key_schema.partition_key.clone()]
614 .iter()
615 .chain(table_key_schema.sort_key.iter())
616 .any(|tk| tk == k)
617 {
618 index_key_attrs.push(k.clone());
619 }
620 }
621 }
622 let base_key_attrs: Vec<String> = {
623 let mut v = vec![table_key_schema.partition_key.clone()];
624 if let Some(ref sk) = table_key_schema.sort_key {
625 v.push(sk.clone());
626 }
627 v
628 };
629 if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
630 &parsed_fe,
631 &request.expression_attribute_names,
632 &base_key_attrs,
633 &index_key_attrs,
634 ) {
635 let prefix = if is_index { "IndexKey" } else { "Key" };
636 return Err(DynoxideError::ValidationException(format!(
637 "Key attributes must be scalars; \
638 list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
639 )));
640 }
641 }
642 }
643 }
644
645 let is_index_query = request.index_name.is_some();
646
647 let start_sk = if let Some(ref esk) = exclusive_start_key {
651 if let Some(ref sk_name) = effective_sk {
652 esk.get(sk_name).and_then(|v| v.to_key_string())
653 } else if is_index_query {
654 Some(String::new())
656 } else {
657 None
658 }
659 } else {
660 None
661 };
662
663 let (start_base_pk, start_base_sk) = if is_index_query {
666 if let Some(ref esk) = exclusive_start_key {
667 let base_pk = esk
668 .get(&table_key_schema.partition_key)
669 .and_then(|v| v.to_key_string());
670 let base_sk = if let Some(sk_name) = table_key_schema.sort_key.as_ref() {
678 esk.get(sk_name).and_then(|v| v.to_key_string())
679 } else {
680 Some(String::new())
681 };
682 (base_pk, base_sk)
683 } else {
684 (None, None)
685 }
686 } else {
687 (None, None)
688 };
689
690 let is_select_all_attributes = request
693 .select
694 .as_deref()
695 .map(|s| s.eq_ignore_ascii_case("ALL_ATTRIBUTES"))
696 .unwrap_or(false);
697 let fetch_from_base_table = if is_select_all_attributes {
698 if let Some(ref proj_type) = index_projection_type {
699 if *proj_type != crate::types::ProjectionType::ALL {
700 if !is_lsi {
701 return Err(DynoxideError::ValidationException(format!(
702 "One or more parameter values were invalid: \
703 Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
704 because its projection type is not ALL",
705 request.index_name.as_deref().unwrap_or("")
706 )));
707 }
708 true
710 } else {
711 false
712 }
713 } else {
714 false
715 }
716 } else {
717 false
718 };
719
720 let sk_condition_sql = if sk_sql_parts.is_empty() {
722 None
723 } else {
724 Some(sk_sql_parts.join(" "))
725 };
726
727 let fetch_limit = request.limit;
728 let sk_params_refs: Vec<&str> = sk_param_values.iter().map(|s| s.as_str()).collect();
729
730 let query_params = crate::storage::QueryParams {
732 sk_condition: sk_condition_sql.as_deref(),
733 sk_params: &sk_params_refs,
734 forward: request.scan_index_forward,
735 limit: fetch_limit,
736 exclusive_start_sk: start_sk.as_deref(),
737 exclusive_start_base_pk: start_base_pk.as_deref(),
738 exclusive_start_base_sk: start_base_sk.as_deref(),
739 };
740 let rows = if let Some(ref index_name) = request.index_name {
741 if is_lsi {
742 storage
743 .query_lsi_items(&request.table_name, index_name, &pk_str, &query_params)
744 .await?
745 } else {
746 storage
747 .query_gsi_items(&request.table_name, index_name, &pk_str, &query_params)
748 .await?
749 }
750 } else {
751 storage
752 .query_items(&request.table_name, &pk_str, &query_params)
753 .await?
754 };
755
756 let filter_expr = request
758 .filter_expression
759 .as_ref()
760 .map(|expr| expressions::condition::parse(expr))
761 .transpose()
762 .map_err(DynoxideError::ValidationException)?;
763
764 let projection = if let Some(ref proj_expr) = request.projection_expression {
766 Some(
767 expressions::projection::parse(proj_expr)
768 .map_err(DynoxideError::ValidationException)?,
769 )
770 } else {
771 legacy_projection.clone()
772 };
773
774 if let Some(ref filter) = filter_expr {
776 tracker.track_condition_expr(filter);
777 }
778 if let Some(ref proj) = projection {
779 tracker.track_projection_expr(proj);
780 }
781
782 let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
784 &request.expression_attribute_names,
785 &request.expression_attribute_values,
786 );
787
788 let is_count = request
790 .select
791 .as_deref()
792 .map(|s| s.eq_ignore_ascii_case("COUNT"))
793 .unwrap_or(false);
794
795 let mut key_attrs = vec![effective_pk.clone()];
797 if let Some(ref sk) = effective_sk {
798 key_attrs.push(sk.clone());
799 }
800 if request.index_name.is_some() {
802 if !key_attrs.contains(&table_key_schema.partition_key) {
803 key_attrs.push(table_key_schema.partition_key.clone());
804 }
805 if let Some(ref sk) = table_key_schema.sort_key {
806 if !key_attrs.contains(sk) {
807 key_attrs.push(sk.clone());
808 }
809 }
810 }
811
812 let mut items = Vec::new();
813 let mut scanned_count = 0;
814 let mut filtered_count = 0;
815 let mut cumulative_size = 0;
816 let mut last_evaluated_item: Option<Item> = None;
817 let mut truncated_by_size = false;
818
819 let mut base_table_cumulative_size = 0usize;
822 let mut index_cumulative_size = 0usize;
823
824 for (_pk, _sk, item_json) in &rows {
825 let index_item: Item = serde_json::from_str(item_json).map_err(|e| {
826 DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
827 })?;
828
829 index_cumulative_size += crate::types::item_size(&index_item);
833 let item = if fetch_from_base_table {
834 let base_pk = index_item
835 .get(&table_key_schema.partition_key)
836 .and_then(|v| v.to_key_string())
837 .unwrap_or_default();
838 let base_sk = table_key_schema
839 .sort_key
840 .as_ref()
841 .and_then(|sk_name| index_item.get(sk_name))
842 .and_then(|v| v.to_key_string())
843 .unwrap_or_default();
844 if let Some(full_json) = storage
845 .get_item(&request.table_name, &base_pk, &base_sk)
846 .await?
847 {
848 let full_item: Item = serde_json::from_str(&full_json).map_err(|e| {
849 DynoxideError::InternalServerError(format!("Bad item JSON: {e}"))
850 })?;
851 base_table_cumulative_size += crate::types::item_size(&full_item);
852 full_item
853 } else {
854 index_item.clone()
855 }
856 } else {
857 index_item.clone()
858 };
859
860 scanned_count += 1;
861
862 let item_size = crate::types::item_size(&item);
865 if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
866 truncated_by_size = true;
867 break;
868 }
869 cumulative_size += item_size;
870
871 if let Some(ref filter) = filter_expr {
873 let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
874 .map_err(DynoxideError::ValidationException)?;
875 if !passes {
876 last_evaluated_item = Some(index_item);
877 continue;
878 }
879 }
880
881 filtered_count += 1;
882
883 let result_item = if let Some(ref proj) = projection {
886 let no_keys: &[String] = &[];
887 expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
888 .map_err(DynoxideError::ValidationException)?
889 } else {
890 item
891 };
892
893 last_evaluated_item = Some(index_item);
894 if !is_count {
895 items.push(result_item);
896 }
897 }
898
899 tracker.check_unused()?;
901
902 let count = if is_count {
903 filtered_count
904 } else {
905 items.len()
906 };
907
908 let has_more = truncated_by_size
911 || (fetch_limit.is_some() && scanned_count >= fetch_limit.unwrap_or(usize::MAX));
912
913 let is_gsi_query = request.index_name.is_some() && !is_lsi;
918 let last_evaluated_key = if has_more {
919 last_evaluated_item.map(|item| {
920 let mut key = build_last_evaluated_key(&item, &effective_pk, effective_sk.as_deref());
921 if is_lsi {
923 if let Some(tsk) = table_key_schema.sort_key.as_deref() {
924 if !key.contains_key(tsk) {
925 if let Some(v) = item.get(tsk) {
926 key.insert(tsk.to_string(), v.clone());
927 }
928 }
929 }
930 }
931 if is_gsi_query {
933 if !key.contains_key(&table_key_schema.partition_key) {
934 if let Some(v) = item.get(&table_key_schema.partition_key) {
935 key.insert(table_key_schema.partition_key.clone(), v.clone());
936 }
937 }
938 if let Some(ref tsk) = table_key_schema.sort_key {
939 if !key.contains_key(tsk) {
940 if let Some(v) = item.get(tsk) {
941 key.insert(tsk.clone(), v.clone());
942 }
943 }
944 }
945 }
946 key
947 })
948 } else {
949 None
950 };
951
952 let is_gsi = is_gsi_query;
954 let consistent = request.consistent_read.unwrap_or(false);
955 let consumed_capacity = if is_gsi {
956 let mut gsi_units = std::collections::HashMap::new();
957 gsi_units.insert(
958 request.index_name.as_ref().unwrap().clone(),
959 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
960 );
961 crate::types::consumed_capacity_with_indexes(
962 &request.table_name,
963 0.0,
964 &gsi_units,
965 &request.return_consumed_capacity,
966 )
967 } else if is_lsi {
968 let (table_cap, lsi_cap) = if fetch_from_base_table {
971 let table_rcu = crate::types::read_capacity_units_with_consistency(
972 base_table_cumulative_size,
973 consistent,
974 );
975 let lsi_rcu = crate::types::read_capacity_units_with_consistency(
976 index_cumulative_size,
977 consistent,
978 );
979 (table_rcu, lsi_rcu)
980 } else {
981 (
982 0.0,
983 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
984 )
985 };
986 let mut lsi_units = std::collections::HashMap::new();
987 lsi_units.insert(request.index_name.as_ref().unwrap().clone(), lsi_cap);
988 crate::types::consumed_capacity_with_secondary_indexes(
989 &request.table_name,
990 table_cap,
991 &std::collections::HashMap::new(),
992 &lsi_units,
993 &request.return_consumed_capacity,
994 )
995 } else {
996 crate::types::consumed_capacity(
997 &request.table_name,
998 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
999 &request.return_consumed_capacity,
1000 )
1001 };
1002
1003 Ok(QueryResponse {
1004 items: if is_count { None } else { Some(items) },
1005 count,
1006 scanned_count,
1007 last_evaluated_key,
1008 consumed_capacity,
1009 })
1010}
1011
1012fn build_last_evaluated_key(
1013 item: &Item,
1014 pk_name: &str,
1015 sk_name: Option<&str>,
1016) -> HashMap<String, AttributeValue> {
1017 let mut key = HashMap::new();
1018 if let Some(pk_val) = item.get(pk_name) {
1019 key.insert(pk_name.to_string(), pk_val.clone());
1020 }
1021 if let Some(sk) = sk_name {
1022 if let Some(sk_val) = item.get(sk) {
1023 key.insert(sk.to_string(), sk_val.clone());
1024 }
1025 }
1026 key
1027}