1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage::Storage;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12#[derive(Debug, Default, Deserialize)]
14struct QueryRequestRaw {
15 #[serde(rename = "TableName", default)]
16 table_name: Option<String>,
17 #[serde(rename = "KeyConditionExpression", default)]
18 key_condition_expression: Option<String>,
19 #[serde(rename = "FilterExpression", default)]
20 filter_expression: Option<String>,
21 #[serde(rename = "ProjectionExpression", default)]
22 projection_expression: Option<String>,
23 #[serde(rename = "ExpressionAttributeNames", default)]
24 expression_attribute_names: Option<HashMap<String, String>>,
25 #[serde(rename = "ExpressionAttributeValues", default)]
26 expression_attribute_values: Option<HashMap<String, AttributeValue>>,
27 #[serde(rename = "ScanIndexForward", default = "default_true")]
28 scan_index_forward: bool,
29 #[serde(rename = "Limit", default)]
30 limit: Option<usize>,
31 #[serde(rename = "ExclusiveStartKey", default)]
32 exclusive_start_key: Option<serde_json::Value>,
33 #[serde(rename = "Select", default)]
34 select: Option<String>,
35 #[serde(rename = "ConsistentRead", default)]
36 consistent_read: Option<bool>,
37 #[serde(rename = "IndexName", default)]
38 index_name: Option<String>,
39 #[serde(rename = "ReturnConsumedCapacity", default)]
40 return_consumed_capacity: Option<String>,
41 #[serde(rename = "KeyConditions", default)]
42 key_conditions: Option<serde_json::Value>,
43 #[serde(rename = "AttributesToGet", default)]
44 attributes_to_get: Option<Vec<String>>,
45 #[serde(rename = "QueryFilter", default)]
46 query_filter: Option<serde_json::Value>,
47 #[serde(rename = "ConditionalOperator", default)]
48 conditional_operator: Option<String>,
49}
50
51fn default_true() -> bool {
52 true
53}
54
55#[derive(Debug, Default)]
56pub struct QueryRequest {
57 pub table_name: String,
58 pub key_condition_expression: Option<String>,
59 pub filter_expression: Option<String>,
60 pub projection_expression: Option<String>,
61 pub expression_attribute_names: Option<HashMap<String, String>>,
62 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
63 pub scan_index_forward: bool,
64 pub limit: Option<usize>,
65 pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
66 pub select: Option<String>,
67 pub consistent_read: Option<bool>,
68 pub index_name: Option<String>,
69 pub return_consumed_capacity: Option<String>,
70 pub key_conditions: Option<serde_json::Value>,
71 pub attributes_to_get: Option<Vec<String>>,
72 pub query_filter: Option<serde_json::Value>,
73 pub conditional_operator: Option<String>,
74 pub exclusive_start_key_raw: Option<serde_json::Value>,
79}
80
81impl<'de> serde::Deserialize<'de> for QueryRequest {
82 fn deserialize<D: serde::Deserializer<'de>>(
83 deserializer: D,
84 ) -> std::result::Result<Self, D::Error> {
85 let raw = QueryRequestRaw::deserialize(deserializer)?;
86 use crate::validation::{
87 TableNameContext, format_validation_errors, table_name_constraint_errors,
88 };
89
90 let mut errors = Vec::new();
91 errors.extend(table_name_constraint_errors(
92 raw.table_name.as_deref(),
93 TableNameContext::ReadWrite,
94 ));
95 let table_name = raw.table_name.unwrap_or_default();
96
97 if let Some(ref rcc) = raw.return_consumed_capacity {
99 if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
100 errors.push(format!(
101 "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
102 Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
103 rcc
104 ));
105 }
106 }
107
108 if let Some(ref sel) = raw.select {
110 if ![
111 "ALL_ATTRIBUTES",
112 "ALL_PROJECTED_ATTRIBUTES",
113 "COUNT",
114 "SPECIFIC_ATTRIBUTES",
115 ]
116 .contains(&sel.as_str())
117 {
118 errors.push(format!(
119 "Value '{}' at 'select' failed to satisfy constraint: \
120 Member must satisfy enum value set: [SPECIFIC_ATTRIBUTES, COUNT, ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES]",
121 sel
122 ));
123 }
124 }
125
126 if let Some(limit) = raw.limit {
131 if limit == 0 {
132 errors.push(
133 "Value at 'Limit' failed to satisfy constraint: \
134 Member must have value greater than or equal to 1"
135 .to_string(),
136 );
137 }
138 }
139
140 if let Some(msg) = format_validation_errors(&errors) {
141 return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
142 }
143
144 Ok(QueryRequest {
145 table_name,
146 key_condition_expression: raw.key_condition_expression,
147 filter_expression: raw.filter_expression,
148 projection_expression: raw.projection_expression,
149 expression_attribute_names: raw.expression_attribute_names,
150 expression_attribute_values: raw.expression_attribute_values,
151 scan_index_forward: raw.scan_index_forward,
152 limit: raw.limit,
153 exclusive_start_key: None,
154 select: raw.select,
155 consistent_read: raw.consistent_read,
156 index_name: raw.index_name,
157 return_consumed_capacity: raw.return_consumed_capacity,
158 key_conditions: raw.key_conditions,
159 attributes_to_get: raw.attributes_to_get,
160 query_filter: raw.query_filter,
161 conditional_operator: raw.conditional_operator,
162 exclusive_start_key_raw: raw.exclusive_start_key,
163 })
164 }
165}
166
167#[derive(Debug, Default, Serialize)]
168pub struct QueryResponse {
169 #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
170 pub items: Option<Vec<Item>>,
171 #[serde(rename = "Count")]
172 pub count: usize,
173 #[serde(rename = "ScannedCount")]
174 pub scanned_count: usize,
175 #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
176 pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
177 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
178 pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
179}
180
181pub fn execute(storage: &Storage, mut request: QueryRequest) -> Result<QueryResponse> {
182 crate::validation::validate_table_name(&request.table_name)?;
184
185 {
188 let mut non_expr = Vec::new();
189 let mut expr = Vec::new();
190 if request.attributes_to_get.is_some() {
191 non_expr.push("AttributesToGet");
192 }
193 if request.query_filter.is_some()
194 && request.query_filter.as_ref().is_some_and(|v| !v.is_null())
195 {
196 non_expr.push("QueryFilter");
197 }
198 if request.conditional_operator.is_some() {
199 non_expr.push("ConditionalOperator");
200 }
201 if request.key_conditions.is_some()
202 && request
203 .key_conditions
204 .as_ref()
205 .is_some_and(|v| !v.is_null())
206 {
207 non_expr.push("KeyConditions");
208 }
209 if request.projection_expression.is_some() {
210 expr.push("ProjectionExpression");
211 }
212 if request.filter_expression.is_some() {
213 expr.push("FilterExpression");
214 }
215 if request.key_condition_expression.is_some() {
216 expr.push("KeyConditionExpression");
217 }
218 let no_raw_eav: Option<serde_json::Value> = None;
219 let ctx = helpers::ExpressionParamContext {
220 non_expression_params: non_expr,
221 expression_params: expr,
222 all_expression_param_names: vec!["FilterExpression", "KeyConditionExpression"],
223 expression_attribute_names: &request.expression_attribute_names,
224 expression_attribute_values: &request.expression_attribute_values,
225 expression_attribute_values_raw: &no_raw_eav,
226 };
227 helpers::validate_expression_params(&ctx)?;
228 }
229
230 helpers::validate_filter_conditions_raw(request.query_filter.as_ref(), "QueryFilter")?;
232 helpers::validate_filter_conditions_raw(request.key_conditions.as_ref(), "KeyConditions")?;
233
234 helpers::validate_filter_condition_args(request.query_filter.as_ref())?;
236 helpers::validate_filter_condition_args(request.key_conditions.as_ref())?;
237
238 if let Some(ref attrs) = request.attributes_to_get {
240 helpers::validate_attributes_to_get_no_duplicates(attrs)?;
241 }
242
243 let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
245 Some(helpers::parse_exclusive_start_key(esk_val)?)
246 } else {
247 request.exclusive_start_key.clone()
248 };
249
250 if let Some(ref kce) = request.key_condition_expression {
254 if kce.is_empty() {
255 return Err(DynoxideError::ValidationException(
256 "Invalid KeyConditionExpression: The expression can not be empty;".to_string(),
257 ));
258 }
259 }
260 if let Some(ref fe) = request.filter_expression {
261 if fe.is_empty() {
262 if request.query_filter.is_none() || request.filter_expression.as_deref() == Some("") {
263 return Err(DynoxideError::ValidationException(
264 "Invalid FilterExpression: The expression can not be empty;".to_string(),
265 ));
266 }
267 } else {
268 let parsed_fe = expressions::condition::parse(fe).map_err(|e| {
269 DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
270 })?;
271 if let Err(e) = expressions::condition::validate_name_refs(
274 &parsed_fe,
275 &request.expression_attribute_names,
276 ) {
277 return Err(DynoxideError::ValidationException(format!(
278 "Invalid FilterExpression: {e}"
279 )));
280 }
281 }
282 }
283 if let Some(ref pe) = request.projection_expression {
284 if pe.is_empty() {
285 return Err(DynoxideError::ValidationException(
286 "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
287 ));
288 }
289 }
290
291 if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
293 && request.projection_expression.is_none()
294 && request.attributes_to_get.is_none()
295 {
296 return Err(DynoxideError::ValidationException(
297 "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
298 .to_string(),
299 ));
300 }
301 if let Some(ref kce) = request.key_condition_expression {
303 if !kce.is_empty() {
304 let temp_tracker = crate::expressions::TrackedExpressionAttributes::new(
306 &request.expression_attribute_names,
307 &request.expression_attribute_values,
308 );
309 if let Err(e) = expressions::key_condition::parse(kce, &temp_tracker) {
310 return Err(DynoxideError::ValidationException(e));
311 }
312 }
313 }
314
315 let meta = helpers::require_table_for_item_op(storage, &request.table_name)?;
316 let table_key_schema = helpers::parse_key_schema(&meta)?;
317
318 let effective_pk_for_kc = if let Some(ref index_name) = request.index_name {
322 if let Some((pk, _)) = request
323 .index_name
324 .as_ref()
325 .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok())
326 {
327 pk
328 } else if let Ok((pk, _)) = super::gsi::parse_gsi_key_schema(&meta, index_name) {
329 pk
330 } else {
331 table_key_schema.partition_key.clone()
332 }
333 } else {
334 table_key_schema.partition_key.clone()
335 };
336
337 if request.key_condition_expression.is_none() {
339 if let Some(ref kc_val) = request.key_conditions {
340 if let Ok(kc) =
341 serde_json::from_value::<HashMap<String, helpers::KeyCondition>>(kc_val.clone())
342 {
343 if !kc.is_empty() {
344 let converted =
345 helpers::convert_key_conditions(&kc, Some(&effective_pk_for_kc))?;
346 request.key_condition_expression = Some(converted.expression);
347 let expr_values = request
348 .expression_attribute_values
349 .get_or_insert_with(HashMap::new);
350 expr_values.extend(converted.attribute_values);
351 let expr_names = request
352 .expression_attribute_names
353 .get_or_insert_with(HashMap::new);
354 expr_names.extend(converted.attribute_names);
355 }
356 }
357 }
358 }
359
360 if request.filter_expression.is_none() {
362 if let Some(ref qf_val) = request.query_filter {
363 if let Ok(qf) =
364 serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(qf_val.clone())
365 {
366 if !qf.is_empty() {
367 let converted = helpers::convert_filter_conditions(
368 &qf,
369 request.conditional_operator.as_deref(),
370 )?;
371 if !converted.expression.is_empty() {
372 request.filter_expression = Some(converted.expression);
373 let expr_values = request
374 .expression_attribute_values
375 .get_or_insert_with(HashMap::new);
376 expr_values.extend(converted.attribute_values);
377 let expr_names = request
378 .expression_attribute_names
379 .get_or_insert_with(HashMap::new);
380 expr_names.extend(converted.attribute_names);
381 }
382 }
383 }
384 }
385 }
386
387 let legacy_projection = if request.projection_expression.is_none() {
389 request
390 .attributes_to_get
391 .as_ref()
392 .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
393 } else {
394 None
395 };
396
397 let key_condition_expression = request.key_condition_expression.as_deref().ok_or_else(|| {
399 DynoxideError::ValidationException(
400 "Either the KeyConditions or KeyConditionExpression parameter must be specified in the request."
401 .to_string(),
402 )
403 })?;
404 let key_condition_expression = key_condition_expression.to_string();
405
406 let lsi_keys = request
408 .index_name
409 .as_ref()
410 .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
411 let is_lsi = lsi_keys.is_some();
412
413 if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
415 return Err(DynoxideError::ValidationException(
416 "Consistent reads are not supported on global secondary indexes".to_string(),
417 ));
418 }
419
420 let index_projection_type = if let Some(ref index_name) = request.index_name {
422 if is_lsi {
423 super::lsi::parse_lsi_defs(&meta)?
424 .into_iter()
425 .find(|l| l.index_name == *index_name)
426 .map(|l| l.projection_type)
427 } else {
428 super::gsi::parse_gsi_defs(&meta)?
429 .into_iter()
430 .find(|g| g.index_name == *index_name)
431 .map(|g| g.projection_type)
432 }
433 } else {
434 None
435 };
436
437 let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
438 if let Some(keys) = lsi_keys {
439 keys
440 } else {
441 super::gsi::parse_gsi_key_schema(&meta, index_name)?
442 }
443 } else {
444 (
445 table_key_schema.partition_key.clone(),
446 table_key_schema.sort_key.clone(),
447 )
448 };
449
450 if let Some(ref esk) = exclusive_start_key {
452 helpers::validate_esk_count_and_index_keys(
454 esk,
455 &meta,
456 request.index_name.as_deref(),
457 "The provided starting key is invalid",
458 )?;
459 helpers::validate_esk_table_keys(esk, &meta)?;
461 }
462
463 let tracker = crate::expressions::TrackedExpressionAttributes::new(
465 &request.expression_attribute_names,
466 &request.expression_attribute_values,
467 );
468
469 let key_cond = expressions::key_condition::parse(&key_condition_expression, &tracker)
471 .map_err(DynoxideError::ValidationException)?;
472
473 if key_cond.pk_name != effective_pk {
475 return Err(DynoxideError::ValidationException(format!(
476 "Query condition missed key schema element: {}",
477 effective_pk
478 )));
479 }
480
481 let resolved = expressions::key_condition::resolve_values(&key_cond, &tracker)
483 .map_err(DynoxideError::ValidationException)?;
484
485 let pk_str = resolved.pk_value.to_key_string().ok_or_else(|| {
487 DynoxideError::ValidationException(
488 "Cannot convert partition key value to string".to_string(),
489 )
490 })?;
491
492 let mut sk_sql_parts = Vec::new();
494 let mut sk_param_values = Vec::new();
495
496 if let Some(ref sk_cond) = resolved.sk_condition {
497 if let Some(ref eff_sk) = effective_sk {
499 if sk_cond.sk_name() != eff_sk {
500 return Err(DynoxideError::ValidationException(format!(
501 "Query condition missed key schema element: {eff_sk}"
502 )));
503 }
504 } else {
505 return Err(DynoxideError::ValidationException(
506 "Query filter contains a sort key condition but the table has no sort key"
507 .to_string(),
508 ));
509 }
510
511 let conditions = sk_cond.to_sql_conditions();
512 for (i, (op, val)) in conditions.iter().enumerate() {
513 let param_idx = i + 2; if op == "LIKE" {
515 sk_sql_parts.push(format!("AND sk LIKE ?{param_idx} ESCAPE '\\'"));
516 } else {
517 sk_sql_parts.push(format!("AND sk {op} ?{param_idx}"));
518 }
519 sk_param_values.push(val.clone());
520 }
521 }
522
523 let mut effective_key_attrs = vec![effective_pk.clone()];
526 if let Some(ref sk) = effective_sk {
527 effective_key_attrs.push(sk.clone());
528 }
529
530 if let Some(ref qf_val) = request.query_filter {
532 if let Some(obj) = qf_val.as_object() {
533 for attr_name in obj.keys() {
534 if effective_key_attrs.contains(attr_name) {
535 return Err(DynoxideError::ValidationException(format!(
536 "QueryFilter can only contain non-primary key attributes: \
537 Primary key attribute: {attr_name}"
538 )));
539 }
540 }
541 }
542 }
543
544 if request.query_filter.is_none() {
547 if let Some(ref fe) = request.filter_expression {
548 if let Ok(parsed_fe) = expressions::condition::parse(fe) {
549 let top_attrs = expressions::condition::extract_top_level_attributes(
550 &parsed_fe,
551 &request.expression_attribute_names,
552 );
553 for attr in &top_attrs {
554 if effective_key_attrs.contains(attr) {
555 return Err(DynoxideError::ValidationException(format!(
556 "Filter Expression can only contain non-primary key attributes: \
557 Primary key attribute: {attr}"
558 )));
559 }
560 }
561 let mut index_key_attrs = Vec::new();
564 if request.index_name.is_some() {
565 if !effective_key_attrs
567 .iter()
568 .any(|k| k == &table_key_schema.partition_key)
569 {
570 }
572 for k in &effective_key_attrs {
574 if ![table_key_schema.partition_key.clone()]
575 .iter()
576 .chain(table_key_schema.sort_key.iter())
577 .any(|tk| tk == k)
578 {
579 index_key_attrs.push(k.clone());
580 }
581 }
582 }
583 let base_key_attrs: Vec<String> = {
584 let mut v = vec![table_key_schema.partition_key.clone()];
585 if let Some(ref sk) = table_key_schema.sort_key {
586 v.push(sk.clone());
587 }
588 v
589 };
590 if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
591 &parsed_fe,
592 &request.expression_attribute_names,
593 &base_key_attrs,
594 &index_key_attrs,
595 ) {
596 let prefix = if is_index { "IndexKey" } else { "Key" };
597 return Err(DynoxideError::ValidationException(format!(
598 "Key attributes must be scalars; \
599 list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
600 )));
601 }
602 }
603 }
604 }
605
606 let is_index_query = request.index_name.is_some();
607
608 let start_sk = if let Some(ref esk) = exclusive_start_key {
612 if let Some(ref sk_name) = effective_sk {
613 esk.get(sk_name).and_then(|v| v.to_key_string())
614 } else if is_index_query {
615 Some(String::new())
617 } else {
618 None
619 }
620 } else {
621 None
622 };
623
624 let (start_base_pk, start_base_sk) = if is_index_query {
627 if let Some(ref esk) = exclusive_start_key {
628 let base_pk = esk
629 .get(&table_key_schema.partition_key)
630 .and_then(|v| v.to_key_string());
631 let base_sk = table_key_schema
632 .sort_key
633 .as_ref()
634 .and_then(|sk_name| esk.get(sk_name))
635 .and_then(|v| v.to_key_string());
636 (base_pk, base_sk)
637 } else {
638 (None, None)
639 }
640 } else {
641 (None, None)
642 };
643
644 let is_select_all_attributes = request
647 .select
648 .as_deref()
649 .map(|s| s.eq_ignore_ascii_case("ALL_ATTRIBUTES"))
650 .unwrap_or(false);
651 let fetch_from_base_table = if is_select_all_attributes {
652 if let Some(ref proj_type) = index_projection_type {
653 if *proj_type != crate::types::ProjectionType::ALL {
654 if !is_lsi {
655 return Err(DynoxideError::ValidationException(format!(
656 "One or more parameter values were invalid: \
657 Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
658 because its projection type is not ALL",
659 request.index_name.as_deref().unwrap_or("")
660 )));
661 }
662 true
664 } else {
665 false
666 }
667 } else {
668 false
669 }
670 } else {
671 false
672 };
673
674 let sk_condition_sql = if sk_sql_parts.is_empty() {
676 None
677 } else {
678 Some(sk_sql_parts.join(" "))
679 };
680
681 let fetch_limit = request.limit;
682 let sk_params_refs: Vec<&str> = sk_param_values.iter().map(|s| s.as_str()).collect();
683
684 let query_params = crate::storage::QueryParams {
686 sk_condition: sk_condition_sql.as_deref(),
687 sk_params: &sk_params_refs,
688 forward: request.scan_index_forward,
689 limit: fetch_limit,
690 exclusive_start_sk: start_sk.as_deref(),
691 exclusive_start_base_pk: start_base_pk.as_deref(),
692 exclusive_start_base_sk: start_base_sk.as_deref(),
693 };
694 let rows = if let Some(ref index_name) = request.index_name {
695 if is_lsi {
696 storage.query_lsi_items(&request.table_name, index_name, &pk_str, &query_params)?
697 } else {
698 storage.query_gsi_items(&request.table_name, index_name, &pk_str, &query_params)?
699 }
700 } else {
701 storage.query_items(&request.table_name, &pk_str, &query_params)?
702 };
703
704 let filter_expr = request
706 .filter_expression
707 .as_ref()
708 .map(|expr| expressions::condition::parse(expr))
709 .transpose()
710 .map_err(DynoxideError::ValidationException)?;
711
712 let projection = if let Some(ref proj_expr) = request.projection_expression {
714 Some(
715 expressions::projection::parse(proj_expr)
716 .map_err(DynoxideError::ValidationException)?,
717 )
718 } else {
719 legacy_projection.clone()
720 };
721
722 if let Some(ref filter) = filter_expr {
724 tracker.track_condition_expr(filter);
725 }
726 if let Some(ref proj) = projection {
727 tracker.track_projection_expr(proj);
728 }
729
730 let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
732 &request.expression_attribute_names,
733 &request.expression_attribute_values,
734 );
735
736 let is_count = request
738 .select
739 .as_deref()
740 .map(|s| s.eq_ignore_ascii_case("COUNT"))
741 .unwrap_or(false);
742
743 let mut key_attrs = vec![effective_pk.clone()];
745 if let Some(ref sk) = effective_sk {
746 key_attrs.push(sk.clone());
747 }
748 if request.index_name.is_some() {
750 if !key_attrs.contains(&table_key_schema.partition_key) {
751 key_attrs.push(table_key_schema.partition_key.clone());
752 }
753 if let Some(ref sk) = table_key_schema.sort_key {
754 if !key_attrs.contains(sk) {
755 key_attrs.push(sk.clone());
756 }
757 }
758 }
759
760 let mut items = Vec::new();
761 let mut scanned_count = 0;
762 let mut filtered_count = 0;
763 let mut cumulative_size = 0;
764 let mut last_evaluated_item: Option<Item> = None;
765 let mut truncated_by_size = false;
766
767 let mut base_table_cumulative_size = 0usize;
770 let mut index_cumulative_size = 0usize;
771
772 for (_pk, _sk, item_json) in &rows {
773 let index_item: Item = serde_json::from_str(item_json).map_err(|e| {
774 DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
775 })?;
776
777 index_cumulative_size += crate::types::item_size(&index_item);
781 let item = if fetch_from_base_table {
782 let base_pk = index_item
783 .get(&table_key_schema.partition_key)
784 .and_then(|v| v.to_key_string())
785 .unwrap_or_default();
786 let base_sk = table_key_schema
787 .sort_key
788 .as_ref()
789 .and_then(|sk_name| index_item.get(sk_name))
790 .and_then(|v| v.to_key_string())
791 .unwrap_or_default();
792 if let Some(full_json) = storage.get_item(&request.table_name, &base_pk, &base_sk)? {
793 let full_item: Item = serde_json::from_str(&full_json).map_err(|e| {
794 DynoxideError::InternalServerError(format!("Bad item JSON: {e}"))
795 })?;
796 base_table_cumulative_size += crate::types::item_size(&full_item);
797 full_item
798 } else {
799 index_item.clone()
800 }
801 } else {
802 index_item.clone()
803 };
804
805 scanned_count += 1;
806
807 let item_size = crate::types::item_size(&item);
810 if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
811 truncated_by_size = true;
812 break;
813 }
814 cumulative_size += item_size;
815
816 if let Some(ref filter) = filter_expr {
818 let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
819 .map_err(DynoxideError::ValidationException)?;
820 if !passes {
821 last_evaluated_item = Some(index_item);
822 continue;
823 }
824 }
825
826 filtered_count += 1;
827
828 let result_item = if let Some(ref proj) = projection {
831 let no_keys: &[String] = &[];
832 expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
833 .map_err(DynoxideError::ValidationException)?
834 } else {
835 item
836 };
837
838 last_evaluated_item = Some(index_item);
839 if !is_count {
840 items.push(result_item);
841 }
842 }
843
844 tracker.check_unused()?;
846
847 let count = if is_count {
848 filtered_count
849 } else {
850 items.len()
851 };
852
853 let has_more = truncated_by_size
856 || (fetch_limit.is_some() && scanned_count >= fetch_limit.unwrap_or(usize::MAX));
857
858 let is_gsi_query = request.index_name.is_some() && !is_lsi;
863 let last_evaluated_key = if has_more {
864 last_evaluated_item.map(|item| {
865 let mut key = build_last_evaluated_key(&item, &effective_pk, effective_sk.as_deref());
866 if is_lsi {
868 if let Some(tsk) = table_key_schema.sort_key.as_deref() {
869 if !key.contains_key(tsk) {
870 if let Some(v) = item.get(tsk) {
871 key.insert(tsk.to_string(), v.clone());
872 }
873 }
874 }
875 }
876 if is_gsi_query {
878 if !key.contains_key(&table_key_schema.partition_key) {
879 if let Some(v) = item.get(&table_key_schema.partition_key) {
880 key.insert(table_key_schema.partition_key.clone(), v.clone());
881 }
882 }
883 if let Some(ref tsk) = table_key_schema.sort_key {
884 if !key.contains_key(tsk) {
885 if let Some(v) = item.get(tsk) {
886 key.insert(tsk.clone(), v.clone());
887 }
888 }
889 }
890 }
891 key
892 })
893 } else {
894 None
895 };
896
897 let is_gsi = is_gsi_query;
899 let consistent = request.consistent_read.unwrap_or(false);
900 let consumed_capacity = if is_gsi {
901 let mut gsi_units = std::collections::HashMap::new();
902 gsi_units.insert(
903 request.index_name.as_ref().unwrap().clone(),
904 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
905 );
906 crate::types::consumed_capacity_with_indexes(
907 &request.table_name,
908 0.0,
909 &gsi_units,
910 &request.return_consumed_capacity,
911 )
912 } else if is_lsi {
913 let (table_cap, lsi_cap) = if fetch_from_base_table {
916 let table_rcu = crate::types::read_capacity_units_with_consistency(
917 base_table_cumulative_size,
918 consistent,
919 );
920 let lsi_rcu = crate::types::read_capacity_units_with_consistency(
921 index_cumulative_size,
922 consistent,
923 );
924 (table_rcu, lsi_rcu)
925 } else {
926 (
927 0.0,
928 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
929 )
930 };
931 let mut lsi_units = std::collections::HashMap::new();
932 lsi_units.insert(request.index_name.as_ref().unwrap().clone(), lsi_cap);
933 crate::types::consumed_capacity_with_secondary_indexes(
934 &request.table_name,
935 table_cap,
936 &std::collections::HashMap::new(),
937 &lsi_units,
938 &request.return_consumed_capacity,
939 )
940 } else {
941 crate::types::consumed_capacity(
942 &request.table_name,
943 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
944 &request.return_consumed_capacity,
945 )
946 };
947
948 Ok(QueryResponse {
949 items: if is_count { None } else { Some(items) },
950 count,
951 scanned_count,
952 last_evaluated_key,
953 consumed_capacity,
954 })
955}
956
957fn build_last_evaluated_key(
958 item: &Item,
959 pk_name: &str,
960 sk_name: Option<&str>,
961) -> HashMap<String, AttributeValue> {
962 let mut key = HashMap::new();
963 if let Some(pk_val) = item.get(pk_name) {
964 key.insert(pk_name.to_string(), pk_val.clone());
965 }
966 if let Some(sk) = sk_name {
967 if let Some(sk_val) = item.get(sk) {
968 key.insert(sk.to_string(), sk_val.clone());
969 }
970 }
971 key
972}