1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage_backend::StorageBackend;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12#[derive(Debug, Default, Deserialize)]
14struct QueryRequestRaw {
15 #[serde(rename = "TableName", default)]
16 table_name: Option<String>,
17 #[serde(rename = "KeyConditionExpression", default)]
18 key_condition_expression: Option<String>,
19 #[serde(rename = "FilterExpression", default)]
20 filter_expression: Option<String>,
21 #[serde(rename = "ProjectionExpression", default)]
22 projection_expression: Option<String>,
23 #[serde(rename = "ExpressionAttributeNames", default)]
24 expression_attribute_names: Option<HashMap<String, String>>,
25 #[serde(rename = "ExpressionAttributeValues", default)]
26 expression_attribute_values: Option<HashMap<String, AttributeValue>>,
27 #[serde(rename = "ScanIndexForward", default = "default_true")]
28 scan_index_forward: bool,
29 #[serde(rename = "Limit", default)]
30 limit: Option<usize>,
31 #[serde(rename = "ExclusiveStartKey", default)]
32 exclusive_start_key: Option<serde_json::Value>,
33 #[serde(rename = "Select", default)]
34 select: Option<String>,
35 #[serde(rename = "ConsistentRead", default)]
36 consistent_read: Option<bool>,
37 #[serde(rename = "IndexName", default)]
38 index_name: Option<String>,
39 #[serde(rename = "ReturnConsumedCapacity", default)]
40 return_consumed_capacity: Option<String>,
41 #[serde(rename = "KeyConditions", default)]
42 key_conditions: Option<serde_json::Value>,
43 #[serde(rename = "AttributesToGet", default)]
44 attributes_to_get: Option<Vec<String>>,
45 #[serde(rename = "QueryFilter", default)]
46 query_filter: Option<serde_json::Value>,
47 #[serde(rename = "ConditionalOperator", default)]
48 conditional_operator: Option<String>,
49}
50
51fn default_true() -> bool {
52 true
53}
54
55#[derive(Debug, Default)]
56pub struct QueryRequest {
57 pub table_name: String,
58 pub key_condition_expression: Option<String>,
59 pub filter_expression: Option<String>,
60 pub projection_expression: Option<String>,
61 pub expression_attribute_names: Option<HashMap<String, String>>,
62 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
63 pub scan_index_forward: bool,
64 pub limit: Option<usize>,
65 pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
66 pub select: Option<String>,
67 pub consistent_read: Option<bool>,
68 pub index_name: Option<String>,
69 pub return_consumed_capacity: Option<String>,
70 pub key_conditions: Option<serde_json::Value>,
71 pub attributes_to_get: Option<Vec<String>>,
72 pub query_filter: Option<serde_json::Value>,
73 pub conditional_operator: Option<String>,
74 pub exclusive_start_key_raw: Option<serde_json::Value>,
79}
80
81impl<'de> serde::Deserialize<'de> for QueryRequest {
82 fn deserialize<D: serde::Deserializer<'de>>(
83 deserializer: D,
84 ) -> std::result::Result<Self, D::Error> {
85 let raw = QueryRequestRaw::deserialize(deserializer)?;
86 use crate::validation::{
87 TableNameContext, format_validation_errors, table_name_constraint_errors,
88 };
89
90 let mut errors = Vec::new();
91 errors.extend(table_name_constraint_errors(
92 raw.table_name.as_deref(),
93 TableNameContext::ReadWrite,
94 ));
95 let table_name = raw.table_name.unwrap_or_default();
96
97 if let Some(ref rcc) = raw.return_consumed_capacity {
99 if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
100 errors.push(format!(
101 "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
102 Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
103 rcc
104 ));
105 }
106 }
107
108 if let Some(ref sel) = raw.select {
110 if ![
111 "ALL_ATTRIBUTES",
112 "ALL_PROJECTED_ATTRIBUTES",
113 "COUNT",
114 "SPECIFIC_ATTRIBUTES",
115 ]
116 .contains(&sel.as_str())
117 {
118 errors.push(format!(
119 "Value '{}' at 'select' failed to satisfy constraint: \
120 Member must satisfy enum value set: [SPECIFIC_ATTRIBUTES, COUNT, ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES]",
121 sel
122 ));
123 }
124 }
125
126 if let Some(limit) = raw.limit {
131 if limit == 0 {
132 errors.push(
133 "Value at 'Limit' failed to satisfy constraint: \
134 Member must have value greater than or equal to 1"
135 .to_string(),
136 );
137 }
138 }
139
140 if let Some(msg) = format_validation_errors(&errors) {
141 return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
142 }
143
144 Ok(QueryRequest {
145 table_name,
146 key_condition_expression: raw.key_condition_expression,
147 filter_expression: raw.filter_expression,
148 projection_expression: raw.projection_expression,
149 expression_attribute_names: raw.expression_attribute_names,
150 expression_attribute_values: raw.expression_attribute_values,
151 scan_index_forward: raw.scan_index_forward,
152 limit: raw.limit,
153 exclusive_start_key: None,
154 select: raw.select,
155 consistent_read: raw.consistent_read,
156 index_name: raw.index_name,
157 return_consumed_capacity: raw.return_consumed_capacity,
158 key_conditions: raw.key_conditions,
159 attributes_to_get: raw.attributes_to_get,
160 query_filter: raw.query_filter,
161 conditional_operator: raw.conditional_operator,
162 exclusive_start_key_raw: raw.exclusive_start_key,
163 })
164 }
165}
166
167#[derive(Debug, Default, Serialize)]
168pub struct QueryResponse {
169 #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
170 pub items: Option<Vec<Item>>,
171 #[serde(rename = "Count")]
172 pub count: usize,
173 #[serde(rename = "ScannedCount")]
174 pub scanned_count: usize,
175 #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
176 pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
177 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
178 pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
179}
180
181pub async fn execute<S: StorageBackend>(
182 storage: &S,
183 mut request: QueryRequest,
184) -> Result<QueryResponse> {
185 crate::validation::validate_table_name(&request.table_name)?;
187
188 {
191 let mut non_expr = Vec::new();
192 let mut expr = Vec::new();
193 if request.attributes_to_get.is_some() {
194 non_expr.push("AttributesToGet");
195 }
196 if request.query_filter.is_some()
197 && request.query_filter.as_ref().is_some_and(|v| !v.is_null())
198 {
199 non_expr.push("QueryFilter");
200 }
201 if request.conditional_operator.is_some() {
202 non_expr.push("ConditionalOperator");
203 }
204 if request.key_conditions.is_some()
205 && request
206 .key_conditions
207 .as_ref()
208 .is_some_and(|v| !v.is_null())
209 {
210 non_expr.push("KeyConditions");
211 }
212 if request.projection_expression.is_some() {
213 expr.push("ProjectionExpression");
214 }
215 if request.filter_expression.is_some() {
216 expr.push("FilterExpression");
217 }
218 if request.key_condition_expression.is_some() {
219 expr.push("KeyConditionExpression");
220 }
221 let no_raw_eav: Option<serde_json::Value> = None;
222 let ctx = helpers::ExpressionParamContext {
223 non_expression_params: non_expr,
224 expression_params: expr,
225 all_expression_param_names: vec!["FilterExpression", "KeyConditionExpression"],
226 expression_attribute_names: &request.expression_attribute_names,
227 expression_attribute_values: &request.expression_attribute_values,
228 expression_attribute_values_raw: &no_raw_eav,
229 };
230 helpers::validate_expression_params(&ctx)?;
231 }
232
233 helpers::validate_filter_conditions_raw(request.query_filter.as_ref(), "QueryFilter")?;
235 helpers::validate_filter_conditions_raw(request.key_conditions.as_ref(), "KeyConditions")?;
236
237 helpers::validate_filter_condition_args(request.query_filter.as_ref())?;
239 helpers::validate_filter_condition_args(request.key_conditions.as_ref())?;
240
241 if let Some(ref attrs) = request.attributes_to_get {
243 helpers::validate_attributes_to_get_no_duplicates(attrs)?;
244 }
245
246 let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
248 Some(helpers::parse_exclusive_start_key(esk_val)?)
249 } else {
250 request.exclusive_start_key.clone()
251 };
252
253 if let Some(ref kce) = request.key_condition_expression {
257 if kce.is_empty() {
258 return Err(DynoxideError::ValidationException(
259 "Invalid KeyConditionExpression: The expression can not be empty;".to_string(),
260 ));
261 }
262 }
263 if let Some(ref fe) = request.filter_expression {
264 if fe.is_empty() {
265 if request.query_filter.is_none() || request.filter_expression.as_deref() == Some("") {
266 return Err(DynoxideError::ValidationException(
267 "Invalid FilterExpression: The expression can not be empty;".to_string(),
268 ));
269 }
270 } else {
271 let parsed_fe = expressions::condition::parse(fe).map_err(|e| {
272 DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
273 })?;
274 if let Err(e) = expressions::condition::validate_name_refs(
277 &parsed_fe,
278 &request.expression_attribute_names,
279 ) {
280 return Err(DynoxideError::ValidationException(format!(
281 "Invalid FilterExpression: {e}"
282 )));
283 }
284 if let Err(e) = expressions::condition::validate_operand_semantics(
285 &parsed_fe,
286 &request.expression_attribute_names,
287 &request.expression_attribute_values,
288 ) {
289 return Err(DynoxideError::ValidationException(format!(
290 "Invalid FilterExpression: {e}"
291 )));
292 }
293 }
294 }
295 if let Some(ref pe) = request.projection_expression {
296 if pe.is_empty() {
297 return Err(DynoxideError::ValidationException(
298 "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
299 ));
300 }
301 }
302
303 if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
305 && request.projection_expression.is_none()
306 && request.attributes_to_get.is_none()
307 {
308 return Err(DynoxideError::ValidationException(
309 "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
310 .to_string(),
311 ));
312 }
313 if let Some(ref kce) = request.key_condition_expression {
315 if !kce.is_empty() {
316 let temp_tracker = crate::expressions::TrackedExpressionAttributes::new(
318 &request.expression_attribute_names,
319 &request.expression_attribute_values,
320 );
321 if let Err(e) = expressions::key_condition::parse(kce, &temp_tracker) {
322 return Err(DynoxideError::ValidationException(e));
323 }
324 }
325 }
326
327 let meta = helpers::require_table_for_item_op(storage, &request.table_name).await?;
328 let table_key_schema = helpers::parse_key_schema(&meta)?;
329
330 let effective_pk_for_kc = if let Some(ref index_name) = request.index_name {
334 if let Some((pk, _)) = request
335 .index_name
336 .as_ref()
337 .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok())
338 {
339 pk
340 } else if let Ok((pk, _)) = super::gsi::parse_gsi_key_schema(&meta, index_name) {
341 pk
342 } else {
343 table_key_schema.partition_key.clone()
344 }
345 } else {
346 table_key_schema.partition_key.clone()
347 };
348
349 if request.key_condition_expression.is_none() {
351 if let Some(ref kc_val) = request.key_conditions {
352 if let Ok(kc) =
353 serde_json::from_value::<HashMap<String, helpers::KeyCondition>>(kc_val.clone())
354 {
355 if !kc.is_empty() {
356 let converted =
357 helpers::convert_key_conditions(&kc, Some(&effective_pk_for_kc))?;
358 request.key_condition_expression = Some(converted.expression);
359 let expr_values = request
360 .expression_attribute_values
361 .get_or_insert_with(HashMap::new);
362 expr_values.extend(converted.attribute_values);
363 let expr_names = request
364 .expression_attribute_names
365 .get_or_insert_with(HashMap::new);
366 expr_names.extend(converted.attribute_names);
367 }
368 }
369 }
370 }
371
372 if request.filter_expression.is_none() {
374 if let Some(ref qf_val) = request.query_filter {
375 if let Ok(qf) =
376 serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(qf_val.clone())
377 {
378 if !qf.is_empty() {
379 let converted = helpers::convert_filter_conditions(
380 &qf,
381 request.conditional_operator.as_deref(),
382 )?;
383 if !converted.expression.is_empty() {
384 request.filter_expression = Some(converted.expression);
385 let expr_values = request
386 .expression_attribute_values
387 .get_or_insert_with(HashMap::new);
388 expr_values.extend(converted.attribute_values);
389 let expr_names = request
390 .expression_attribute_names
391 .get_or_insert_with(HashMap::new);
392 expr_names.extend(converted.attribute_names);
393 }
394 }
395 }
396 }
397 }
398
399 let legacy_projection = if request.projection_expression.is_none() {
401 request
402 .attributes_to_get
403 .as_ref()
404 .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
405 } else {
406 None
407 };
408
409 let key_condition_expression = request.key_condition_expression.as_deref().ok_or_else(|| {
411 DynoxideError::ValidationException(
412 "Either the KeyConditions or KeyConditionExpression parameter must be specified in the request."
413 .to_string(),
414 )
415 })?;
416 let key_condition_expression = key_condition_expression.to_string();
417
418 let lsi_keys = request
420 .index_name
421 .as_ref()
422 .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
423 let is_lsi = lsi_keys.is_some();
424
425 if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
427 return Err(DynoxideError::ValidationException(
428 "Consistent reads are not supported on global secondary indexes".to_string(),
429 ));
430 }
431
432 let index_projection_type = if let Some(ref index_name) = request.index_name {
434 if is_lsi {
435 super::lsi::parse_lsi_defs(&meta)?
436 .into_iter()
437 .find(|l| l.index_name == *index_name)
438 .map(|l| l.projection_type)
439 } else {
440 super::gsi::parse_gsi_defs(&meta)?
441 .into_iter()
442 .find(|g| g.index_name == *index_name)
443 .map(|g| g.projection_type)
444 }
445 } else {
446 None
447 };
448
449 let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
450 if let Some(keys) = lsi_keys {
451 keys
452 } else {
453 super::gsi::parse_gsi_key_schema(&meta, index_name)?
454 }
455 } else {
456 (
457 table_key_schema.partition_key.clone(),
458 table_key_schema.sort_key.clone(),
459 )
460 };
461
462 if let Some(ref esk) = exclusive_start_key {
464 helpers::validate_esk_count_and_index_keys(
466 esk,
467 &meta,
468 request.index_name.as_deref(),
469 "The provided starting key is invalid",
470 )?;
471 helpers::validate_esk_table_keys(esk, &meta)?;
473 }
474
475 let tracker = crate::expressions::TrackedExpressionAttributes::new(
477 &request.expression_attribute_names,
478 &request.expression_attribute_values,
479 );
480
481 let key_cond = expressions::key_condition::parse(&key_condition_expression, &tracker)
483 .map_err(DynoxideError::ValidationException)?;
484
485 if key_cond.pk_name != effective_pk {
487 return Err(DynoxideError::ValidationException(format!(
488 "Query condition missed key schema element: {}",
489 effective_pk
490 )));
491 }
492
493 let resolved = expressions::key_condition::resolve_values(&key_cond, &tracker)
495 .map_err(DynoxideError::ValidationException)?;
496
497 let pk_str = resolved.pk_value.to_key_string().ok_or_else(|| {
499 DynoxideError::ValidationException(
500 "Cannot convert partition key value to string".to_string(),
501 )
502 })?;
503
504 let mut sk_sql_parts = Vec::new();
506 let mut sk_param_values = Vec::new();
507
508 if let Some(ref sk_cond) = resolved.sk_condition {
509 if let Some(ref eff_sk) = effective_sk {
511 if sk_cond.sk_name() != eff_sk {
512 return Err(DynoxideError::ValidationException(format!(
513 "Query condition missed key schema element: {eff_sk}"
514 )));
515 }
516 } else {
517 return Err(DynoxideError::ValidationException(
518 "Query filter contains a sort key condition but the table has no sort key"
519 .to_string(),
520 ));
521 }
522
523 let conditions = sk_cond.to_sql_conditions();
524 for (i, (op, val)) in conditions.iter().enumerate() {
525 let param_idx = i + 2; if op == "LIKE" {
527 sk_sql_parts.push(format!("AND sk LIKE ?{param_idx} ESCAPE '\\'"));
528 } else {
529 sk_sql_parts.push(format!("AND sk {op} ?{param_idx}"));
530 }
531 sk_param_values.push(val.clone());
532 }
533 }
534
535 let mut effective_key_attrs = vec![effective_pk.clone()];
538 if let Some(ref sk) = effective_sk {
539 effective_key_attrs.push(sk.clone());
540 }
541
542 if let Some(ref qf_val) = request.query_filter {
544 if let Some(obj) = qf_val.as_object() {
545 for attr_name in obj.keys() {
546 if effective_key_attrs.contains(attr_name) {
547 return Err(DynoxideError::ValidationException(format!(
548 "QueryFilter can only contain non-primary key attributes: \
549 Primary key attribute: {attr_name}"
550 )));
551 }
552 }
553 }
554 }
555
556 if request.query_filter.is_none() {
559 if let Some(ref fe) = request.filter_expression {
560 if let Ok(parsed_fe) = expressions::condition::parse(fe) {
561 let top_attrs = expressions::condition::extract_top_level_attributes(
562 &parsed_fe,
563 &request.expression_attribute_names,
564 );
565 for attr in &top_attrs {
566 if effective_key_attrs.contains(attr) {
567 return Err(DynoxideError::ValidationException(format!(
568 "Filter Expression can only contain non-primary key attributes: \
569 Primary key attribute: {attr}"
570 )));
571 }
572 }
573 let mut index_key_attrs = Vec::new();
576 if request.index_name.is_some() {
577 if !effective_key_attrs
579 .iter()
580 .any(|k| k == &table_key_schema.partition_key)
581 {
582 }
584 for k in &effective_key_attrs {
586 if ![table_key_schema.partition_key.clone()]
587 .iter()
588 .chain(table_key_schema.sort_key.iter())
589 .any(|tk| tk == k)
590 {
591 index_key_attrs.push(k.clone());
592 }
593 }
594 }
595 let base_key_attrs: Vec<String> = {
596 let mut v = vec![table_key_schema.partition_key.clone()];
597 if let Some(ref sk) = table_key_schema.sort_key {
598 v.push(sk.clone());
599 }
600 v
601 };
602 if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
603 &parsed_fe,
604 &request.expression_attribute_names,
605 &base_key_attrs,
606 &index_key_attrs,
607 ) {
608 let prefix = if is_index { "IndexKey" } else { "Key" };
609 return Err(DynoxideError::ValidationException(format!(
610 "Key attributes must be scalars; \
611 list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
612 )));
613 }
614 }
615 }
616 }
617
618 let is_index_query = request.index_name.is_some();
619
620 let start_sk = if let Some(ref esk) = exclusive_start_key {
624 if let Some(ref sk_name) = effective_sk {
625 esk.get(sk_name).and_then(|v| v.to_key_string())
626 } else if is_index_query {
627 Some(String::new())
629 } else {
630 None
631 }
632 } else {
633 None
634 };
635
636 let (start_base_pk, start_base_sk) = if is_index_query {
639 if let Some(ref esk) = exclusive_start_key {
640 let base_pk = esk
641 .get(&table_key_schema.partition_key)
642 .and_then(|v| v.to_key_string());
643 let base_sk = if let Some(sk_name) = table_key_schema.sort_key.as_ref() {
651 esk.get(sk_name).and_then(|v| v.to_key_string())
652 } else {
653 Some(String::new())
654 };
655 (base_pk, base_sk)
656 } else {
657 (None, None)
658 }
659 } else {
660 (None, None)
661 };
662
663 let is_select_all_attributes = request
666 .select
667 .as_deref()
668 .map(|s| s.eq_ignore_ascii_case("ALL_ATTRIBUTES"))
669 .unwrap_or(false);
670 let fetch_from_base_table = if is_select_all_attributes {
671 if let Some(ref proj_type) = index_projection_type {
672 if *proj_type != crate::types::ProjectionType::ALL {
673 if !is_lsi {
674 return Err(DynoxideError::ValidationException(format!(
675 "One or more parameter values were invalid: \
676 Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
677 because its projection type is not ALL",
678 request.index_name.as_deref().unwrap_or("")
679 )));
680 }
681 true
683 } else {
684 false
685 }
686 } else {
687 false
688 }
689 } else {
690 false
691 };
692
693 let sk_condition_sql = if sk_sql_parts.is_empty() {
695 None
696 } else {
697 Some(sk_sql_parts.join(" "))
698 };
699
700 let fetch_limit = request.limit;
701 let sk_params_refs: Vec<&str> = sk_param_values.iter().map(|s| s.as_str()).collect();
702
703 let query_params = crate::storage::QueryParams {
705 sk_condition: sk_condition_sql.as_deref(),
706 sk_params: &sk_params_refs,
707 forward: request.scan_index_forward,
708 limit: fetch_limit,
709 exclusive_start_sk: start_sk.as_deref(),
710 exclusive_start_base_pk: start_base_pk.as_deref(),
711 exclusive_start_base_sk: start_base_sk.as_deref(),
712 };
713 let rows = if let Some(ref index_name) = request.index_name {
714 if is_lsi {
715 storage
716 .query_lsi_items(&request.table_name, index_name, &pk_str, &query_params)
717 .await?
718 } else {
719 storage
720 .query_gsi_items(&request.table_name, index_name, &pk_str, &query_params)
721 .await?
722 }
723 } else {
724 storage
725 .query_items(&request.table_name, &pk_str, &query_params)
726 .await?
727 };
728
729 let filter_expr = request
731 .filter_expression
732 .as_ref()
733 .map(|expr| expressions::condition::parse(expr))
734 .transpose()
735 .map_err(DynoxideError::ValidationException)?;
736
737 let projection = if let Some(ref proj_expr) = request.projection_expression {
739 Some(
740 expressions::projection::parse(proj_expr)
741 .map_err(DynoxideError::ValidationException)?,
742 )
743 } else {
744 legacy_projection.clone()
745 };
746
747 if let Some(ref filter) = filter_expr {
749 tracker.track_condition_expr(filter);
750 }
751 if let Some(ref proj) = projection {
752 tracker.track_projection_expr(proj);
753 }
754
755 let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
757 &request.expression_attribute_names,
758 &request.expression_attribute_values,
759 );
760
761 let is_count = request
763 .select
764 .as_deref()
765 .map(|s| s.eq_ignore_ascii_case("COUNT"))
766 .unwrap_or(false);
767
768 let mut key_attrs = vec![effective_pk.clone()];
770 if let Some(ref sk) = effective_sk {
771 key_attrs.push(sk.clone());
772 }
773 if request.index_name.is_some() {
775 if !key_attrs.contains(&table_key_schema.partition_key) {
776 key_attrs.push(table_key_schema.partition_key.clone());
777 }
778 if let Some(ref sk) = table_key_schema.sort_key {
779 if !key_attrs.contains(sk) {
780 key_attrs.push(sk.clone());
781 }
782 }
783 }
784
785 let mut items = Vec::new();
786 let mut scanned_count = 0;
787 let mut filtered_count = 0;
788 let mut cumulative_size = 0;
789 let mut last_evaluated_item: Option<Item> = None;
790 let mut truncated_by_size = false;
791
792 let mut base_table_cumulative_size = 0usize;
795 let mut index_cumulative_size = 0usize;
796
797 for (_pk, _sk, item_json) in &rows {
798 let index_item: Item = serde_json::from_str(item_json).map_err(|e| {
799 DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
800 })?;
801
802 index_cumulative_size += crate::types::item_size(&index_item);
806 let item = if fetch_from_base_table {
807 let base_pk = index_item
808 .get(&table_key_schema.partition_key)
809 .and_then(|v| v.to_key_string())
810 .unwrap_or_default();
811 let base_sk = table_key_schema
812 .sort_key
813 .as_ref()
814 .and_then(|sk_name| index_item.get(sk_name))
815 .and_then(|v| v.to_key_string())
816 .unwrap_or_default();
817 if let Some(full_json) = storage
818 .get_item(&request.table_name, &base_pk, &base_sk)
819 .await?
820 {
821 let full_item: Item = serde_json::from_str(&full_json).map_err(|e| {
822 DynoxideError::InternalServerError(format!("Bad item JSON: {e}"))
823 })?;
824 base_table_cumulative_size += crate::types::item_size(&full_item);
825 full_item
826 } else {
827 index_item.clone()
828 }
829 } else {
830 index_item.clone()
831 };
832
833 scanned_count += 1;
834
835 let item_size = crate::types::item_size(&item);
838 if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
839 truncated_by_size = true;
840 break;
841 }
842 cumulative_size += item_size;
843
844 if let Some(ref filter) = filter_expr {
846 let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
847 .map_err(DynoxideError::ValidationException)?;
848 if !passes {
849 last_evaluated_item = Some(index_item);
850 continue;
851 }
852 }
853
854 filtered_count += 1;
855
856 let result_item = if let Some(ref proj) = projection {
859 let no_keys: &[String] = &[];
860 expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
861 .map_err(DynoxideError::ValidationException)?
862 } else {
863 item
864 };
865
866 last_evaluated_item = Some(index_item);
867 if !is_count {
868 items.push(result_item);
869 }
870 }
871
872 tracker.check_unused()?;
874
875 let count = if is_count {
876 filtered_count
877 } else {
878 items.len()
879 };
880
881 let has_more = truncated_by_size
884 || (fetch_limit.is_some() && scanned_count >= fetch_limit.unwrap_or(usize::MAX));
885
886 let is_gsi_query = request.index_name.is_some() && !is_lsi;
891 let last_evaluated_key = if has_more {
892 last_evaluated_item.map(|item| {
893 let mut key = build_last_evaluated_key(&item, &effective_pk, effective_sk.as_deref());
894 if is_lsi {
896 if let Some(tsk) = table_key_schema.sort_key.as_deref() {
897 if !key.contains_key(tsk) {
898 if let Some(v) = item.get(tsk) {
899 key.insert(tsk.to_string(), v.clone());
900 }
901 }
902 }
903 }
904 if is_gsi_query {
906 if !key.contains_key(&table_key_schema.partition_key) {
907 if let Some(v) = item.get(&table_key_schema.partition_key) {
908 key.insert(table_key_schema.partition_key.clone(), v.clone());
909 }
910 }
911 if let Some(ref tsk) = table_key_schema.sort_key {
912 if !key.contains_key(tsk) {
913 if let Some(v) = item.get(tsk) {
914 key.insert(tsk.clone(), v.clone());
915 }
916 }
917 }
918 }
919 key
920 })
921 } else {
922 None
923 };
924
925 let is_gsi = is_gsi_query;
927 let consistent = request.consistent_read.unwrap_or(false);
928 let consumed_capacity = if is_gsi {
929 let mut gsi_units = std::collections::HashMap::new();
930 gsi_units.insert(
931 request.index_name.as_ref().unwrap().clone(),
932 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
933 );
934 crate::types::consumed_capacity_with_indexes(
935 &request.table_name,
936 0.0,
937 &gsi_units,
938 &request.return_consumed_capacity,
939 )
940 } else if is_lsi {
941 let (table_cap, lsi_cap) = if fetch_from_base_table {
944 let table_rcu = crate::types::read_capacity_units_with_consistency(
945 base_table_cumulative_size,
946 consistent,
947 );
948 let lsi_rcu = crate::types::read_capacity_units_with_consistency(
949 index_cumulative_size,
950 consistent,
951 );
952 (table_rcu, lsi_rcu)
953 } else {
954 (
955 0.0,
956 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
957 )
958 };
959 let mut lsi_units = std::collections::HashMap::new();
960 lsi_units.insert(request.index_name.as_ref().unwrap().clone(), lsi_cap);
961 crate::types::consumed_capacity_with_secondary_indexes(
962 &request.table_name,
963 table_cap,
964 &std::collections::HashMap::new(),
965 &lsi_units,
966 &request.return_consumed_capacity,
967 )
968 } else {
969 crate::types::consumed_capacity(
970 &request.table_name,
971 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
972 &request.return_consumed_capacity,
973 )
974 };
975
976 Ok(QueryResponse {
977 items: if is_count { None } else { Some(items) },
978 count,
979 scanned_count,
980 last_evaluated_key,
981 consumed_capacity,
982 })
983}
984
985fn build_last_evaluated_key(
986 item: &Item,
987 pk_name: &str,
988 sk_name: Option<&str>,
989) -> HashMap<String, AttributeValue> {
990 let mut key = HashMap::new();
991 if let Some(pk_val) = item.get(pk_name) {
992 key.insert(pk_name.to_string(), pk_val.clone());
993 }
994 if let Some(sk) = sk_name {
995 if let Some(sk_val) = item.get(sk) {
996 key.insert(sk.to_string(), sk_val.clone());
997 }
998 }
999 key
1000}