1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage::Storage;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12#[derive(Debug, Default, Deserialize)]
14struct QueryRequestRaw {
15 #[serde(rename = "TableName", default)]
16 table_name: Option<String>,
17 #[serde(rename = "KeyConditionExpression", default)]
18 key_condition_expression: Option<String>,
19 #[serde(rename = "FilterExpression", default)]
20 filter_expression: Option<String>,
21 #[serde(rename = "ProjectionExpression", default)]
22 projection_expression: Option<String>,
23 #[serde(rename = "ExpressionAttributeNames", default)]
24 expression_attribute_names: Option<HashMap<String, String>>,
25 #[serde(rename = "ExpressionAttributeValues", default)]
26 expression_attribute_values: Option<HashMap<String, AttributeValue>>,
27 #[serde(rename = "ScanIndexForward", default = "default_true")]
28 scan_index_forward: bool,
29 #[serde(rename = "Limit", default)]
30 limit: Option<usize>,
31 #[serde(rename = "ExclusiveStartKey", default)]
32 exclusive_start_key: Option<serde_json::Value>,
33 #[serde(rename = "Select", default)]
34 select: Option<String>,
35 #[serde(rename = "ConsistentRead", default)]
36 consistent_read: Option<bool>,
37 #[serde(rename = "IndexName", default)]
38 index_name: Option<String>,
39 #[serde(rename = "ReturnConsumedCapacity", default)]
40 return_consumed_capacity: Option<String>,
41 #[serde(rename = "KeyConditions", default)]
42 key_conditions: Option<serde_json::Value>,
43 #[serde(rename = "AttributesToGet", default)]
44 attributes_to_get: Option<Vec<String>>,
45 #[serde(rename = "QueryFilter", default)]
46 query_filter: Option<serde_json::Value>,
47 #[serde(rename = "ConditionalOperator", default)]
48 conditional_operator: Option<String>,
49}
50
51fn default_true() -> bool {
52 true
53}
54
55#[derive(Debug, Default)]
56pub struct QueryRequest {
57 pub table_name: String,
58 pub key_condition_expression: Option<String>,
59 pub filter_expression: Option<String>,
60 pub projection_expression: Option<String>,
61 pub expression_attribute_names: Option<HashMap<String, String>>,
62 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
63 pub scan_index_forward: bool,
64 pub limit: Option<usize>,
65 pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
66 pub select: Option<String>,
67 pub consistent_read: Option<bool>,
68 pub index_name: Option<String>,
69 pub return_consumed_capacity: Option<String>,
70 pub key_conditions: Option<serde_json::Value>,
71 pub attributes_to_get: Option<Vec<String>>,
72 pub query_filter: Option<serde_json::Value>,
73 pub conditional_operator: Option<String>,
74 pub exclusive_start_key_raw: Option<serde_json::Value>,
79}
80
81impl<'de> serde::Deserialize<'de> for QueryRequest {
82 fn deserialize<D: serde::Deserializer<'de>>(
83 deserializer: D,
84 ) -> std::result::Result<Self, D::Error> {
85 let raw = QueryRequestRaw::deserialize(deserializer)?;
86 use crate::validation::{format_validation_errors, table_name_constraint_errors};
87
88 let mut errors = Vec::new();
89 errors.extend(table_name_constraint_errors(raw.table_name.as_deref()));
90 let table_name = raw.table_name.unwrap_or_default();
91
92 if let Some(ref rcc) = raw.return_consumed_capacity {
94 if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
95 errors.push(format!(
96 "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
97 Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
98 rcc
99 ));
100 }
101 }
102
103 if let Some(ref sel) = raw.select {
105 if ![
106 "ALL_ATTRIBUTES",
107 "ALL_PROJECTED_ATTRIBUTES",
108 "COUNT",
109 "SPECIFIC_ATTRIBUTES",
110 ]
111 .contains(&sel.as_str())
112 {
113 errors.push(format!(
114 "Value '{}' at 'select' failed to satisfy constraint: \
115 Member must satisfy enum value set: [ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES, COUNT, SPECIFIC_ATTRIBUTES]",
116 sel
117 ));
118 }
119 }
120
121 if let Some(limit) = raw.limit {
123 if limit == 0 {
124 errors.push(
125 "Value '0' at 'Limit' failed to satisfy constraint: \
126 Member must have value greater than or equal to 1"
127 .to_string(),
128 );
129 }
130 }
131
132 if let Some(msg) = format_validation_errors(&errors) {
133 return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
134 }
135
136 Ok(QueryRequest {
137 table_name,
138 key_condition_expression: raw.key_condition_expression,
139 filter_expression: raw.filter_expression,
140 projection_expression: raw.projection_expression,
141 expression_attribute_names: raw.expression_attribute_names,
142 expression_attribute_values: raw.expression_attribute_values,
143 scan_index_forward: raw.scan_index_forward,
144 limit: raw.limit,
145 exclusive_start_key: None,
146 select: raw.select,
147 consistent_read: raw.consistent_read,
148 index_name: raw.index_name,
149 return_consumed_capacity: raw.return_consumed_capacity,
150 key_conditions: raw.key_conditions,
151 attributes_to_get: raw.attributes_to_get,
152 query_filter: raw.query_filter,
153 conditional_operator: raw.conditional_operator,
154 exclusive_start_key_raw: raw.exclusive_start_key,
155 })
156 }
157}
158
159#[derive(Debug, Default, Serialize)]
160pub struct QueryResponse {
161 #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
162 pub items: Option<Vec<Item>>,
163 #[serde(rename = "Count")]
164 pub count: usize,
165 #[serde(rename = "ScannedCount")]
166 pub scanned_count: usize,
167 #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
168 pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
169 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
170 pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
171}
172
173pub fn execute(storage: &Storage, mut request: QueryRequest) -> Result<QueryResponse> {
174 crate::validation::validate_table_name(&request.table_name)?;
176
177 {
180 let mut non_expr = Vec::new();
181 let mut expr = Vec::new();
182 if request.attributes_to_get.is_some() {
183 non_expr.push("AttributesToGet");
184 }
185 if request.query_filter.is_some()
186 && request.query_filter.as_ref().is_some_and(|v| !v.is_null())
187 {
188 non_expr.push("QueryFilter");
189 }
190 if request.conditional_operator.is_some() {
191 non_expr.push("ConditionalOperator");
192 }
193 if request.key_conditions.is_some()
194 && request
195 .key_conditions
196 .as_ref()
197 .is_some_and(|v| !v.is_null())
198 {
199 non_expr.push("KeyConditions");
200 }
201 if request.projection_expression.is_some() {
202 expr.push("ProjectionExpression");
203 }
204 if request.filter_expression.is_some() {
205 expr.push("FilterExpression");
206 }
207 if request.key_condition_expression.is_some() {
208 expr.push("KeyConditionExpression");
209 }
210 let no_raw_eav: Option<serde_json::Value> = None;
211 let ctx = helpers::ExpressionParamContext {
212 non_expression_params: non_expr,
213 expression_params: expr,
214 all_expression_param_names: vec!["FilterExpression", "KeyConditionExpression"],
215 expression_attribute_names: &request.expression_attribute_names,
216 expression_attribute_values: &request.expression_attribute_values,
217 expression_attribute_values_raw: &no_raw_eav,
218 };
219 helpers::validate_expression_params(&ctx)?;
220 }
221
222 helpers::validate_filter_conditions_raw(request.query_filter.as_ref(), "QueryFilter")?;
224 helpers::validate_filter_conditions_raw(request.key_conditions.as_ref(), "KeyConditions")?;
225
226 helpers::validate_filter_condition_args(request.query_filter.as_ref())?;
228 helpers::validate_filter_condition_args(request.key_conditions.as_ref())?;
229
230 if let Some(ref attrs) = request.attributes_to_get {
232 helpers::validate_attributes_to_get_no_duplicates(attrs)?;
233 }
234
235 let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
237 Some(helpers::parse_exclusive_start_key(esk_val)?)
238 } else {
239 request.exclusive_start_key.clone()
240 };
241
242 if let Some(ref kce) = request.key_condition_expression {
246 if kce.is_empty() {
247 return Err(DynoxideError::ValidationException(
248 "Invalid KeyConditionExpression: The expression can not be empty;".to_string(),
249 ));
250 }
251 }
252 if let Some(ref fe) = request.filter_expression {
253 if fe.is_empty() {
254 if request.query_filter.is_none() || request.filter_expression.as_deref() == Some("") {
255 return Err(DynoxideError::ValidationException(
256 "Invalid FilterExpression: The expression can not be empty;".to_string(),
257 ));
258 }
259 } else {
260 let parsed_fe = expressions::condition::parse(fe).map_err(|e| {
261 DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
262 })?;
263 if let Err(e) = expressions::condition::validate_name_refs(
266 &parsed_fe,
267 &request.expression_attribute_names,
268 ) {
269 return Err(DynoxideError::ValidationException(format!(
270 "Invalid FilterExpression: {e}"
271 )));
272 }
273 }
274 }
275 if let Some(ref pe) = request.projection_expression {
276 if pe.is_empty() {
277 return Err(DynoxideError::ValidationException(
278 "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
279 ));
280 }
281 }
282
283 if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
285 && request.projection_expression.is_none()
286 && request.attributes_to_get.is_none()
287 {
288 return Err(DynoxideError::ValidationException(
289 "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
290 .to_string(),
291 ));
292 }
293 if let Some(ref kce) = request.key_condition_expression {
295 if !kce.is_empty() {
296 let temp_tracker = crate::expressions::TrackedExpressionAttributes::new(
298 &request.expression_attribute_names,
299 &request.expression_attribute_values,
300 );
301 if let Err(e) = expressions::key_condition::parse(kce, &temp_tracker) {
302 return Err(DynoxideError::ValidationException(e));
303 }
304 }
305 }
306
307 let meta = helpers::require_table_for_item_op(storage, &request.table_name)?;
308 let table_key_schema = helpers::parse_key_schema(&meta)?;
309
310 let effective_pk_for_kc = if let Some(ref index_name) = request.index_name {
314 if let Some((pk, _)) = request
315 .index_name
316 .as_ref()
317 .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok())
318 {
319 pk
320 } else if let Ok((pk, _)) = super::gsi::parse_gsi_key_schema(&meta, index_name) {
321 pk
322 } else {
323 table_key_schema.partition_key.clone()
324 }
325 } else {
326 table_key_schema.partition_key.clone()
327 };
328
329 if request.key_condition_expression.is_none() {
331 if let Some(ref kc_val) = request.key_conditions {
332 if let Ok(kc) =
333 serde_json::from_value::<HashMap<String, helpers::KeyCondition>>(kc_val.clone())
334 {
335 if !kc.is_empty() {
336 let converted =
337 helpers::convert_key_conditions(&kc, Some(&effective_pk_for_kc))?;
338 request.key_condition_expression = Some(converted.expression);
339 let expr_values = request
340 .expression_attribute_values
341 .get_or_insert_with(HashMap::new);
342 expr_values.extend(converted.attribute_values);
343 let expr_names = request
344 .expression_attribute_names
345 .get_or_insert_with(HashMap::new);
346 expr_names.extend(converted.attribute_names);
347 }
348 }
349 }
350 }
351
352 if request.filter_expression.is_none() {
354 if let Some(ref qf_val) = request.query_filter {
355 if let Ok(qf) =
356 serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(qf_val.clone())
357 {
358 if !qf.is_empty() {
359 let converted = helpers::convert_filter_conditions(
360 &qf,
361 request.conditional_operator.as_deref(),
362 )?;
363 if !converted.expression.is_empty() {
364 request.filter_expression = Some(converted.expression);
365 let expr_values = request
366 .expression_attribute_values
367 .get_or_insert_with(HashMap::new);
368 expr_values.extend(converted.attribute_values);
369 let expr_names = request
370 .expression_attribute_names
371 .get_or_insert_with(HashMap::new);
372 expr_names.extend(converted.attribute_names);
373 }
374 }
375 }
376 }
377 }
378
379 let legacy_projection = if request.projection_expression.is_none() {
381 request
382 .attributes_to_get
383 .as_ref()
384 .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
385 } else {
386 None
387 };
388
389 let key_condition_expression = request.key_condition_expression.as_deref().ok_or_else(|| {
391 DynoxideError::ValidationException(
392 "Either the KeyConditions or KeyConditionExpression parameter must be specified in the request."
393 .to_string(),
394 )
395 })?;
396 let key_condition_expression = key_condition_expression.to_string();
397
398 let lsi_keys = request
400 .index_name
401 .as_ref()
402 .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
403 let is_lsi = lsi_keys.is_some();
404
405 if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
407 return Err(DynoxideError::ValidationException(
408 "Consistent reads are not supported on global secondary indexes".to_string(),
409 ));
410 }
411
412 let index_projection_type = if let Some(ref index_name) = request.index_name {
414 if is_lsi {
415 super::lsi::parse_lsi_defs(&meta)?
416 .into_iter()
417 .find(|l| l.index_name == *index_name)
418 .map(|l| l.projection_type)
419 } else {
420 super::gsi::parse_gsi_defs(&meta)?
421 .into_iter()
422 .find(|g| g.index_name == *index_name)
423 .map(|g| g.projection_type)
424 }
425 } else {
426 None
427 };
428
429 let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
430 if let Some(keys) = lsi_keys {
431 keys
432 } else {
433 super::gsi::parse_gsi_key_schema(&meta, index_name)?
434 }
435 } else {
436 (
437 table_key_schema.partition_key.clone(),
438 table_key_schema.sort_key.clone(),
439 )
440 };
441
442 if let Some(ref esk) = exclusive_start_key {
444 helpers::validate_esk_count_and_index_keys(
446 esk,
447 &meta,
448 request.index_name.as_deref(),
449 "The provided starting key is invalid",
450 )?;
451 helpers::validate_esk_table_keys(esk, &meta)?;
453 }
454
455 let tracker = crate::expressions::TrackedExpressionAttributes::new(
457 &request.expression_attribute_names,
458 &request.expression_attribute_values,
459 );
460
461 let key_cond = expressions::key_condition::parse(&key_condition_expression, &tracker)
463 .map_err(DynoxideError::ValidationException)?;
464
465 if key_cond.pk_name != effective_pk {
467 return Err(DynoxideError::ValidationException(format!(
468 "Query condition missed key schema element: {}",
469 effective_pk
470 )));
471 }
472
473 let resolved = expressions::key_condition::resolve_values(&key_cond, &tracker)
475 .map_err(DynoxideError::ValidationException)?;
476
477 let pk_str = resolved.pk_value.to_key_string().ok_or_else(|| {
479 DynoxideError::ValidationException(
480 "Cannot convert partition key value to string".to_string(),
481 )
482 })?;
483
484 let mut sk_sql_parts = Vec::new();
486 let mut sk_param_values = Vec::new();
487
488 if let Some(ref sk_cond) = resolved.sk_condition {
489 if let Some(ref eff_sk) = effective_sk {
491 if sk_cond.sk_name() != eff_sk {
492 return Err(DynoxideError::ValidationException(format!(
493 "Query condition missed key schema element: {eff_sk}"
494 )));
495 }
496 } else {
497 return Err(DynoxideError::ValidationException(
498 "Query filter contains a sort key condition but the table has no sort key"
499 .to_string(),
500 ));
501 }
502
503 let conditions = sk_cond.to_sql_conditions();
504 for (i, (op, val)) in conditions.iter().enumerate() {
505 let param_idx = i + 2; if op == "LIKE" {
507 sk_sql_parts.push(format!("AND sk LIKE ?{param_idx} ESCAPE '\\'"));
508 } else {
509 sk_sql_parts.push(format!("AND sk {op} ?{param_idx}"));
510 }
511 sk_param_values.push(val.clone());
512 }
513 }
514
515 let mut effective_key_attrs = vec![effective_pk.clone()];
518 if let Some(ref sk) = effective_sk {
519 effective_key_attrs.push(sk.clone());
520 }
521
522 if let Some(ref qf_val) = request.query_filter {
524 if let Some(obj) = qf_val.as_object() {
525 for attr_name in obj.keys() {
526 if effective_key_attrs.contains(attr_name) {
527 return Err(DynoxideError::ValidationException(format!(
528 "QueryFilter can only contain non-primary key attributes: \
529 Primary key attribute: {attr_name}"
530 )));
531 }
532 }
533 }
534 }
535
536 if request.query_filter.is_none() {
539 if let Some(ref fe) = request.filter_expression {
540 if let Ok(parsed_fe) = expressions::condition::parse(fe) {
541 let top_attrs = expressions::condition::extract_top_level_attributes(
542 &parsed_fe,
543 &request.expression_attribute_names,
544 );
545 for attr in &top_attrs {
546 if effective_key_attrs.contains(attr) {
547 return Err(DynoxideError::ValidationException(format!(
548 "Filter Expression can only contain non-primary key attributes: \
549 Primary key attribute: {attr}"
550 )));
551 }
552 }
553 let mut index_key_attrs = Vec::new();
556 if request.index_name.is_some() {
557 if !effective_key_attrs
559 .iter()
560 .any(|k| k == &table_key_schema.partition_key)
561 {
562 }
564 for k in &effective_key_attrs {
566 if ![table_key_schema.partition_key.clone()]
567 .iter()
568 .chain(table_key_schema.sort_key.iter())
569 .any(|tk| tk == k)
570 {
571 index_key_attrs.push(k.clone());
572 }
573 }
574 }
575 let base_key_attrs: Vec<String> = {
576 let mut v = vec![table_key_schema.partition_key.clone()];
577 if let Some(ref sk) = table_key_schema.sort_key {
578 v.push(sk.clone());
579 }
580 v
581 };
582 if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
583 &parsed_fe,
584 &request.expression_attribute_names,
585 &base_key_attrs,
586 &index_key_attrs,
587 ) {
588 let prefix = if is_index { "IndexKey" } else { "Key" };
589 return Err(DynoxideError::ValidationException(format!(
590 "Key attributes must be scalars; \
591 list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
592 )));
593 }
594 }
595 }
596 }
597
598 let is_index_query = request.index_name.is_some();
599
600 let start_sk = if let Some(ref esk) = exclusive_start_key {
604 if let Some(ref sk_name) = effective_sk {
605 esk.get(sk_name).and_then(|v| v.to_key_string())
606 } else if is_index_query {
607 Some(String::new())
609 } else {
610 None
611 }
612 } else {
613 None
614 };
615
616 let (start_base_pk, start_base_sk) = if is_index_query {
619 if let Some(ref esk) = exclusive_start_key {
620 let base_pk = esk
621 .get(&table_key_schema.partition_key)
622 .and_then(|v| v.to_key_string());
623 let base_sk = table_key_schema
624 .sort_key
625 .as_ref()
626 .and_then(|sk_name| esk.get(sk_name))
627 .and_then(|v| v.to_key_string());
628 (base_pk, base_sk)
629 } else {
630 (None, None)
631 }
632 } else {
633 (None, None)
634 };
635
636 let is_select_all_attributes = request
639 .select
640 .as_deref()
641 .map(|s| s.eq_ignore_ascii_case("ALL_ATTRIBUTES"))
642 .unwrap_or(false);
643 let fetch_from_base_table = if is_select_all_attributes {
644 if let Some(ref proj_type) = index_projection_type {
645 if *proj_type != crate::types::ProjectionType::ALL {
646 if !is_lsi {
647 return Err(DynoxideError::ValidationException(format!(
648 "One or more parameter values were invalid: \
649 Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
650 because its projection type is not ALL",
651 request.index_name.as_deref().unwrap_or("")
652 )));
653 }
654 true
656 } else {
657 false
658 }
659 } else {
660 false
661 }
662 } else {
663 false
664 };
665
666 let sk_condition_sql = if sk_sql_parts.is_empty() {
668 None
669 } else {
670 Some(sk_sql_parts.join(" "))
671 };
672
673 let fetch_limit = request.limit;
674 let sk_params_refs: Vec<&str> = sk_param_values.iter().map(|s| s.as_str()).collect();
675
676 let query_params = crate::storage::QueryParams {
678 sk_condition: sk_condition_sql.as_deref(),
679 sk_params: &sk_params_refs,
680 forward: request.scan_index_forward,
681 limit: fetch_limit,
682 exclusive_start_sk: start_sk.as_deref(),
683 exclusive_start_base_pk: start_base_pk.as_deref(),
684 exclusive_start_base_sk: start_base_sk.as_deref(),
685 };
686 let rows = if let Some(ref index_name) = request.index_name {
687 if is_lsi {
688 storage.query_lsi_items(&request.table_name, index_name, &pk_str, &query_params)?
689 } else {
690 storage.query_gsi_items(&request.table_name, index_name, &pk_str, &query_params)?
691 }
692 } else {
693 storage.query_items(&request.table_name, &pk_str, &query_params)?
694 };
695
696 let filter_expr = request
698 .filter_expression
699 .as_ref()
700 .map(|expr| expressions::condition::parse(expr))
701 .transpose()
702 .map_err(DynoxideError::ValidationException)?;
703
704 let projection = if let Some(ref proj_expr) = request.projection_expression {
706 Some(
707 expressions::projection::parse(proj_expr)
708 .map_err(DynoxideError::ValidationException)?,
709 )
710 } else {
711 legacy_projection.clone()
712 };
713
714 if let Some(ref filter) = filter_expr {
716 tracker.track_condition_expr(filter);
717 }
718 if let Some(ref proj) = projection {
719 tracker.track_projection_expr(proj);
720 }
721
722 let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
724 &request.expression_attribute_names,
725 &request.expression_attribute_values,
726 );
727
728 let is_count = request
730 .select
731 .as_deref()
732 .map(|s| s.eq_ignore_ascii_case("COUNT"))
733 .unwrap_or(false);
734
735 let mut key_attrs = vec![effective_pk.clone()];
737 if let Some(ref sk) = effective_sk {
738 key_attrs.push(sk.clone());
739 }
740 if request.index_name.is_some() {
742 if !key_attrs.contains(&table_key_schema.partition_key) {
743 key_attrs.push(table_key_schema.partition_key.clone());
744 }
745 if let Some(ref sk) = table_key_schema.sort_key {
746 if !key_attrs.contains(sk) {
747 key_attrs.push(sk.clone());
748 }
749 }
750 }
751
752 let mut items = Vec::new();
753 let mut scanned_count = 0;
754 let mut filtered_count = 0;
755 let mut cumulative_size = 0;
756 let mut last_evaluated_item: Option<Item> = None;
757 let mut truncated_by_size = false;
758
759 let mut base_table_cumulative_size = 0usize;
762 let mut index_cumulative_size = 0usize;
763
764 for (_pk, _sk, item_json) in &rows {
765 let index_item: Item = serde_json::from_str(item_json).map_err(|e| {
766 DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
767 })?;
768
769 index_cumulative_size += crate::types::item_size(&index_item);
773 let item = if fetch_from_base_table {
774 let base_pk = index_item
775 .get(&table_key_schema.partition_key)
776 .and_then(|v| v.to_key_string())
777 .unwrap_or_default();
778 let base_sk = table_key_schema
779 .sort_key
780 .as_ref()
781 .and_then(|sk_name| index_item.get(sk_name))
782 .and_then(|v| v.to_key_string())
783 .unwrap_or_default();
784 if let Some(full_json) = storage.get_item(&request.table_name, &base_pk, &base_sk)? {
785 let full_item: Item = serde_json::from_str(&full_json).map_err(|e| {
786 DynoxideError::InternalServerError(format!("Bad item JSON: {e}"))
787 })?;
788 base_table_cumulative_size += crate::types::item_size(&full_item);
789 full_item
790 } else {
791 index_item.clone()
792 }
793 } else {
794 index_item.clone()
795 };
796
797 scanned_count += 1;
798
799 let item_size = crate::types::item_size(&item);
802 if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
803 truncated_by_size = true;
804 break;
805 }
806 cumulative_size += item_size;
807
808 if let Some(ref filter) = filter_expr {
810 let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
811 .map_err(DynoxideError::ValidationException)?;
812 if !passes {
813 last_evaluated_item = Some(index_item);
814 continue;
815 }
816 }
817
818 filtered_count += 1;
819
820 let result_item = if let Some(ref proj) = projection {
823 let no_keys: &[String] = &[];
824 expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
825 .map_err(DynoxideError::ValidationException)?
826 } else {
827 item
828 };
829
830 last_evaluated_item = Some(index_item);
831 if !is_count {
832 items.push(result_item);
833 }
834 }
835
836 tracker.check_unused()?;
838
839 let count = if is_count {
840 filtered_count
841 } else {
842 items.len()
843 };
844
845 let has_more = truncated_by_size
848 || (fetch_limit.is_some() && scanned_count >= fetch_limit.unwrap_or(usize::MAX));
849
850 let is_gsi_query = request.index_name.is_some() && !is_lsi;
855 let last_evaluated_key = if has_more {
856 last_evaluated_item.map(|item| {
857 let mut key = build_last_evaluated_key(&item, &effective_pk, effective_sk.as_deref());
858 if is_lsi {
860 if let Some(tsk) = table_key_schema.sort_key.as_deref() {
861 if !key.contains_key(tsk) {
862 if let Some(v) = item.get(tsk) {
863 key.insert(tsk.to_string(), v.clone());
864 }
865 }
866 }
867 }
868 if is_gsi_query {
870 if !key.contains_key(&table_key_schema.partition_key) {
871 if let Some(v) = item.get(&table_key_schema.partition_key) {
872 key.insert(table_key_schema.partition_key.clone(), v.clone());
873 }
874 }
875 if let Some(ref tsk) = table_key_schema.sort_key {
876 if !key.contains_key(tsk) {
877 if let Some(v) = item.get(tsk) {
878 key.insert(tsk.clone(), v.clone());
879 }
880 }
881 }
882 }
883 key
884 })
885 } else {
886 None
887 };
888
889 let is_gsi = is_gsi_query;
891 let consistent = request.consistent_read.unwrap_or(false);
892 let consumed_capacity = if is_gsi {
893 let mut gsi_units = std::collections::HashMap::new();
894 gsi_units.insert(
895 request.index_name.as_ref().unwrap().clone(),
896 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
897 );
898 crate::types::consumed_capacity_with_indexes(
899 &request.table_name,
900 0.0,
901 &gsi_units,
902 &request.return_consumed_capacity,
903 )
904 } else if is_lsi {
905 let (table_cap, lsi_cap) = if fetch_from_base_table {
908 let table_rcu = crate::types::read_capacity_units_with_consistency(
909 base_table_cumulative_size,
910 consistent,
911 );
912 let lsi_rcu = crate::types::read_capacity_units_with_consistency(
913 index_cumulative_size,
914 consistent,
915 );
916 (table_rcu, lsi_rcu)
917 } else {
918 (
919 0.0,
920 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
921 )
922 };
923 let mut lsi_units = std::collections::HashMap::new();
924 lsi_units.insert(request.index_name.as_ref().unwrap().clone(), lsi_cap);
925 crate::types::consumed_capacity_with_secondary_indexes(
926 &request.table_name,
927 table_cap,
928 &std::collections::HashMap::new(),
929 &lsi_units,
930 &request.return_consumed_capacity,
931 )
932 } else {
933 crate::types::consumed_capacity(
934 &request.table_name,
935 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
936 &request.return_consumed_capacity,
937 )
938 };
939
940 Ok(QueryResponse {
941 items: if is_count { None } else { Some(items) },
942 count,
943 scanned_count,
944 last_evaluated_key,
945 consumed_capacity,
946 })
947}
948
949fn build_last_evaluated_key(
950 item: &Item,
951 pk_name: &str,
952 sk_name: Option<&str>,
953) -> HashMap<String, AttributeValue> {
954 let mut key = HashMap::new();
955 if let Some(pk_val) = item.get(pk_name) {
956 key.insert(pk_name.to_string(), pk_val.clone());
957 }
958 if let Some(sk) = sk_name {
959 if let Some(sk_val) = item.get(sk) {
960 key.insert(sk.to_string(), sk_val.clone());
961 }
962 }
963 key
964}