Skip to main content

dynoxide/actions/
query.rs

1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage::Storage;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// 1MB response size limit for Query/Scan.
10const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12/// Internal deserialization struct for detecting missing fields.
13#[derive(Debug, Default, Deserialize)]
14struct QueryRequestRaw {
15    #[serde(rename = "TableName", default)]
16    table_name: Option<String>,
17    #[serde(rename = "KeyConditionExpression", default)]
18    key_condition_expression: Option<String>,
19    #[serde(rename = "FilterExpression", default)]
20    filter_expression: Option<String>,
21    #[serde(rename = "ProjectionExpression", default)]
22    projection_expression: Option<String>,
23    #[serde(rename = "ExpressionAttributeNames", default)]
24    expression_attribute_names: Option<HashMap<String, String>>,
25    #[serde(rename = "ExpressionAttributeValues", default)]
26    expression_attribute_values: Option<HashMap<String, AttributeValue>>,
27    #[serde(rename = "ScanIndexForward", default = "default_true")]
28    scan_index_forward: bool,
29    #[serde(rename = "Limit", default)]
30    limit: Option<usize>,
31    #[serde(rename = "ExclusiveStartKey", default)]
32    exclusive_start_key: Option<serde_json::Value>,
33    #[serde(rename = "Select", default)]
34    select: Option<String>,
35    #[serde(rename = "ConsistentRead", default)]
36    consistent_read: Option<bool>,
37    #[serde(rename = "IndexName", default)]
38    index_name: Option<String>,
39    #[serde(rename = "ReturnConsumedCapacity", default)]
40    return_consumed_capacity: Option<String>,
41    #[serde(rename = "KeyConditions", default)]
42    key_conditions: Option<serde_json::Value>,
43    #[serde(rename = "AttributesToGet", default)]
44    attributes_to_get: Option<Vec<String>>,
45    #[serde(rename = "QueryFilter", default)]
46    query_filter: Option<serde_json::Value>,
47    #[serde(rename = "ConditionalOperator", default)]
48    conditional_operator: Option<String>,
49}
50
51fn default_true() -> bool {
52    true
53}
54
55#[derive(Debug, Default)]
56pub struct QueryRequest {
57    pub table_name: String,
58    pub key_condition_expression: Option<String>,
59    pub filter_expression: Option<String>,
60    pub projection_expression: Option<String>,
61    pub expression_attribute_names: Option<HashMap<String, String>>,
62    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
63    pub scan_index_forward: bool,
64    pub limit: Option<usize>,
65    pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
66    pub select: Option<String>,
67    pub consistent_read: Option<bool>,
68    pub index_name: Option<String>,
69    pub return_consumed_capacity: Option<String>,
70    pub key_conditions: Option<serde_json::Value>,
71    pub attributes_to_get: Option<Vec<String>>,
72    pub query_filter: Option<serde_json::Value>,
73    pub conditional_operator: Option<String>,
74    /// Raw JSON for ExclusiveStartKey when deserialized from HTTP request.
75    /// Parsed lazily in `execute()` after other validations run.
76    /// When constructed directly (e.g. from MCP), this is `None` and
77    /// `exclusive_start_key` is used instead.
78    pub exclusive_start_key_raw: Option<serde_json::Value>,
79}
80
81impl<'de> serde::Deserialize<'de> for QueryRequest {
82    fn deserialize<D: serde::Deserializer<'de>>(
83        deserializer: D,
84    ) -> std::result::Result<Self, D::Error> {
85        let raw = QueryRequestRaw::deserialize(deserializer)?;
86        use crate::validation::{format_validation_errors, table_name_constraint_errors};
87
88        let mut errors = Vec::new();
89        errors.extend(table_name_constraint_errors(raw.table_name.as_deref()));
90        let table_name = raw.table_name.unwrap_or_default();
91
92        // ReturnConsumedCapacity enum
93        if let Some(ref rcc) = raw.return_consumed_capacity {
94            if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
95                errors.push(format!(
96                    "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
97                     Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
98                    rcc
99                ));
100            }
101        }
102
103        // Select enum
104        if let Some(ref sel) = raw.select {
105            if ![
106                "ALL_ATTRIBUTES",
107                "ALL_PROJECTED_ATTRIBUTES",
108                "COUNT",
109                "SPECIFIC_ATTRIBUTES",
110            ]
111            .contains(&sel.as_str())
112            {
113                errors.push(format!(
114                    "Value '{}' at 'select' failed to satisfy constraint: \
115                     Member must satisfy enum value set: [ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES, COUNT, SPECIFIC_ATTRIBUTES]",
116                    sel
117                ));
118            }
119        }
120
121        // Limit must be >= 1
122        if let Some(limit) = raw.limit {
123            if limit == 0 {
124                errors.push(
125                    "Value '0' at 'Limit' failed to satisfy constraint: \
126                     Member must have value greater than or equal to 1"
127                        .to_string(),
128                );
129            }
130        }
131
132        if let Some(msg) = format_validation_errors(&errors) {
133            return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
134        }
135
136        Ok(QueryRequest {
137            table_name,
138            key_condition_expression: raw.key_condition_expression,
139            filter_expression: raw.filter_expression,
140            projection_expression: raw.projection_expression,
141            expression_attribute_names: raw.expression_attribute_names,
142            expression_attribute_values: raw.expression_attribute_values,
143            scan_index_forward: raw.scan_index_forward,
144            limit: raw.limit,
145            exclusive_start_key: None,
146            select: raw.select,
147            consistent_read: raw.consistent_read,
148            index_name: raw.index_name,
149            return_consumed_capacity: raw.return_consumed_capacity,
150            key_conditions: raw.key_conditions,
151            attributes_to_get: raw.attributes_to_get,
152            query_filter: raw.query_filter,
153            conditional_operator: raw.conditional_operator,
154            exclusive_start_key_raw: raw.exclusive_start_key,
155        })
156    }
157}
158
159#[derive(Debug, Default, Serialize)]
160pub struct QueryResponse {
161    #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
162    pub items: Option<Vec<Item>>,
163    #[serde(rename = "Count")]
164    pub count: usize,
165    #[serde(rename = "ScannedCount")]
166    pub scanned_count: usize,
167    #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
168    pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
169    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
170    pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
171}
172
173pub fn execute(storage: &Storage, mut request: QueryRequest) -> Result<QueryResponse> {
174    // Validate table name format before checking existence (DynamoDB validates input first)
175    crate::validation::validate_table_name(&request.table_name)?;
176
177    // ---- Expression vs non-expression mixing validation ----
178    // DynamoDB checks this before anything else (except table name format and ESK values).
179    {
180        let mut non_expr = Vec::new();
181        let mut expr = Vec::new();
182        if request.attributes_to_get.is_some() {
183            non_expr.push("AttributesToGet");
184        }
185        if request.query_filter.is_some()
186            && request.query_filter.as_ref().is_some_and(|v| !v.is_null())
187        {
188            non_expr.push("QueryFilter");
189        }
190        if request.conditional_operator.is_some() {
191            non_expr.push("ConditionalOperator");
192        }
193        if request.key_conditions.is_some()
194            && request
195                .key_conditions
196                .as_ref()
197                .is_some_and(|v| !v.is_null())
198        {
199            non_expr.push("KeyConditions");
200        }
201        if request.projection_expression.is_some() {
202            expr.push("ProjectionExpression");
203        }
204        if request.filter_expression.is_some() {
205            expr.push("FilterExpression");
206        }
207        if request.key_condition_expression.is_some() {
208            expr.push("KeyConditionExpression");
209        }
210        let no_raw_eav: Option<serde_json::Value> = None;
211        let ctx = helpers::ExpressionParamContext {
212            non_expression_params: non_expr,
213            expression_params: expr,
214            all_expression_param_names: vec!["FilterExpression", "KeyConditionExpression"],
215            expression_attribute_names: &request.expression_attribute_names,
216            expression_attribute_values: &request.expression_attribute_values,
217            expression_attribute_values_raw: &no_raw_eav,
218        };
219        helpers::validate_expression_params(&ctx)?;
220    }
221
222    // ---- Validate filter attribute values (before argument counts, matching DynamoDB) ----
223    helpers::validate_filter_conditions_raw(request.query_filter.as_ref(), "QueryFilter")?;
224    helpers::validate_filter_conditions_raw(request.key_conditions.as_ref(), "KeyConditions")?;
225
226    // ---- Validate filter argument counts and type compatibility ----
227    helpers::validate_filter_condition_args(request.query_filter.as_ref())?;
228    helpers::validate_filter_condition_args(request.key_conditions.as_ref())?;
229
230    // ---- Validate duplicate AttributesToGet ----
231    if let Some(ref attrs) = request.attributes_to_get {
232        helpers::validate_attributes_to_get_no_duplicates(attrs)?;
233    }
234
235    // ---- Parse ExclusiveStartKey from JSON value ----
236    let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
237        Some(helpers::parse_exclusive_start_key(esk_val)?)
238    } else {
239        request.exclusive_start_key.clone()
240    };
241
242    // ---- Validate expression syntax BEFORE table existence ----
243    // DynamoDB validates KeyConditionExpression, FilterExpression, and
244    // ProjectionExpression syntax before checking if the table exists.
245    if let Some(ref kce) = request.key_condition_expression {
246        if kce.is_empty() {
247            return Err(DynoxideError::ValidationException(
248                "Invalid KeyConditionExpression: The expression can not be empty;".to_string(),
249            ));
250        }
251    }
252    if let Some(ref fe) = request.filter_expression {
253        if fe.is_empty() {
254            if request.query_filter.is_none() || request.filter_expression.as_deref() == Some("") {
255                return Err(DynoxideError::ValidationException(
256                    "Invalid FilterExpression: The expression can not be empty;".to_string(),
257                ));
258            }
259        } else {
260            let parsed_fe = expressions::condition::parse(fe).map_err(|e| {
261                DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
262            })?;
263            // Validate that all #name references are defined in ExpressionAttributeNames
264            // (before table existence check, matching DynamoDB's validation ordering)
265            if let Err(e) = expressions::condition::validate_name_refs(
266                &parsed_fe,
267                &request.expression_attribute_names,
268            ) {
269                return Err(DynoxideError::ValidationException(format!(
270                    "Invalid FilterExpression: {e}"
271                )));
272            }
273        }
274    }
275    if let Some(ref pe) = request.projection_expression {
276        if pe.is_empty() {
277            return Err(DynoxideError::ValidationException(
278                "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
279            ));
280        }
281    }
282
283    // SPECIFIC_ATTRIBUTES requires ProjectionExpression or AttributesToGet
284    if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
285        && request.projection_expression.is_none()
286        && request.attributes_to_get.is_none()
287    {
288        return Err(DynoxideError::ValidationException(
289            "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
290                .to_string(),
291        ));
292    }
293    // For KeyConditionExpression, validate syntax early too
294    if let Some(ref kce) = request.key_condition_expression {
295        if !kce.is_empty() {
296            // Create a temporary tracker for early syntax validation
297            let temp_tracker = crate::expressions::TrackedExpressionAttributes::new(
298                &request.expression_attribute_names,
299                &request.expression_attribute_values,
300            );
301            if let Err(e) = expressions::key_condition::parse(kce, &temp_tracker) {
302                return Err(DynoxideError::ValidationException(e));
303            }
304        }
305    }
306
307    let meta = helpers::require_table_for_item_op(storage, &request.table_name)?;
308    let table_key_schema = helpers::parse_key_schema(&meta)?;
309
310    // Determine effective partition key name early so we can pass it to
311    // the legacy KeyConditions converter (ensures correct ordering when
312    // both hash and range keys use EQ).
313    let effective_pk_for_kc = if let Some(ref index_name) = request.index_name {
314        if let Some((pk, _)) = request
315            .index_name
316            .as_ref()
317            .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok())
318        {
319            pk
320        } else if let Ok((pk, _)) = super::gsi::parse_gsi_key_schema(&meta, index_name) {
321            pk
322        } else {
323            table_key_schema.partition_key.clone()
324        }
325    } else {
326        table_key_schema.partition_key.clone()
327    };
328
329    // Convert legacy KeyConditions to KeyConditionExpression if no expression is set
330    if request.key_condition_expression.is_none() {
331        if let Some(ref kc_val) = request.key_conditions {
332            if let Ok(kc) =
333                serde_json::from_value::<HashMap<String, helpers::KeyCondition>>(kc_val.clone())
334            {
335                if !kc.is_empty() {
336                    let converted =
337                        helpers::convert_key_conditions(&kc, Some(&effective_pk_for_kc))?;
338                    request.key_condition_expression = Some(converted.expression);
339                    let expr_values = request
340                        .expression_attribute_values
341                        .get_or_insert_with(HashMap::new);
342                    expr_values.extend(converted.attribute_values);
343                    let expr_names = request
344                        .expression_attribute_names
345                        .get_or_insert_with(HashMap::new);
346                    expr_names.extend(converted.attribute_names);
347                }
348            }
349        }
350    }
351
352    // Convert legacy QueryFilter to FilterExpression if no expression is set
353    if request.filter_expression.is_none() {
354        if let Some(ref qf_val) = request.query_filter {
355            if let Ok(qf) =
356                serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(qf_val.clone())
357            {
358                if !qf.is_empty() {
359                    let converted = helpers::convert_filter_conditions(
360                        &qf,
361                        request.conditional_operator.as_deref(),
362                    )?;
363                    if !converted.expression.is_empty() {
364                        request.filter_expression = Some(converted.expression);
365                        let expr_values = request
366                            .expression_attribute_values
367                            .get_or_insert_with(HashMap::new);
368                        expr_values.extend(converted.attribute_values);
369                        let expr_names = request
370                            .expression_attribute_names
371                            .get_or_insert_with(HashMap::new);
372                        expr_names.extend(converted.attribute_names);
373                    }
374                }
375            }
376        }
377    }
378
379    // Convert legacy AttributesToGet to projection
380    let legacy_projection = if request.projection_expression.is_none() {
381        request
382            .attributes_to_get
383            .as_ref()
384            .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
385    } else {
386        None
387    };
388
389    // Ensure KeyConditionExpression is present (required)
390    let key_condition_expression = request.key_condition_expression.as_deref().ok_or_else(|| {
391        DynoxideError::ValidationException(
392            "Either the KeyConditions or KeyConditionExpression parameter must be specified in the request."
393                .to_string(),
394        )
395    })?;
396    let key_condition_expression = key_condition_expression.to_string();
397
398    // Determine effective key schema (GSI, LSI, or base table)
399    let lsi_keys = request
400        .index_name
401        .as_ref()
402        .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
403    let is_lsi = lsi_keys.is_some();
404
405    // ConsistentRead is not supported on GSIs (LSIs are fine)
406    if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
407        return Err(DynoxideError::ValidationException(
408            "Consistent reads are not supported on global secondary indexes".to_string(),
409        ));
410    }
411
412    // Parse full index definition to get projection type
413    let index_projection_type = if let Some(ref index_name) = request.index_name {
414        if is_lsi {
415            super::lsi::parse_lsi_defs(&meta)?
416                .into_iter()
417                .find(|l| l.index_name == *index_name)
418                .map(|l| l.projection_type)
419        } else {
420            super::gsi::parse_gsi_defs(&meta)?
421                .into_iter()
422                .find(|g| g.index_name == *index_name)
423                .map(|g| g.projection_type)
424        }
425    } else {
426        None
427    };
428
429    let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
430        if let Some(keys) = lsi_keys {
431            keys
432        } else {
433            super::gsi::parse_gsi_key_schema(&meta, index_name)?
434        }
435    } else {
436        (
437            table_key_schema.partition_key.clone(),
438            table_key_schema.sort_key.clone(),
439        )
440    };
441
442    // ---- Validate ExclusiveStartKey structure against key schema ----
443    if let Some(ref esk) = exclusive_start_key {
444        // Stage 1+2: count check + index key type check
445        helpers::validate_esk_count_and_index_keys(
446            esk,
447            &meta,
448            request.index_name.as_deref(),
449            "The provided starting key is invalid",
450        )?;
451        // Stage 3: table key type check
452        helpers::validate_esk_table_keys(esk, &meta)?;
453    }
454
455    // Create tracker for unused expression attribute names/values
456    let tracker = crate::expressions::TrackedExpressionAttributes::new(
457        &request.expression_attribute_names,
458        &request.expression_attribute_values,
459    );
460
461    // Parse KeyConditionExpression
462    let key_cond = expressions::key_condition::parse(&key_condition_expression, &tracker)
463        .map_err(DynoxideError::ValidationException)?;
464
465    // Validate pk_name matches the effective partition key
466    if key_cond.pk_name != effective_pk {
467        return Err(DynoxideError::ValidationException(format!(
468            "Query condition missed key schema element: {}",
469            effective_pk
470        )));
471    }
472
473    // Resolve values
474    let resolved = expressions::key_condition::resolve_values(&key_cond, &tracker)
475        .map_err(DynoxideError::ValidationException)?;
476
477    // Get pk string
478    let pk_str = resolved.pk_value.to_key_string().ok_or_else(|| {
479        DynoxideError::ValidationException(
480            "Cannot convert partition key value to string".to_string(),
481        )
482    })?;
483
484    // Build sk SQL conditions
485    let mut sk_sql_parts = Vec::new();
486    let mut sk_param_values = Vec::new();
487
488    if let Some(ref sk_cond) = resolved.sk_condition {
489        // Validate sk name matches effective sort key
490        if let Some(ref eff_sk) = effective_sk {
491            if sk_cond.sk_name() != eff_sk {
492                return Err(DynoxideError::ValidationException(format!(
493                    "Query condition missed key schema element: {eff_sk}"
494                )));
495            }
496        } else {
497            return Err(DynoxideError::ValidationException(
498                "Query filter contains a sort key condition but the table has no sort key"
499                    .to_string(),
500            ));
501        }
502
503        let conditions = sk_cond.to_sql_conditions();
504        for (i, (op, val)) in conditions.iter().enumerate() {
505            let param_idx = i + 2; // pk is ?1, sk params start at ?2
506            if op == "LIKE" {
507                sk_sql_parts.push(format!("AND sk LIKE ?{param_idx} ESCAPE '\\'"));
508            } else {
509                sk_sql_parts.push(format!("AND sk {op} ?{param_idx}"));
510            }
511            sk_param_values.push(val.clone());
512        }
513    }
514
515    // ---- Validate QueryFilter/FilterExpression don't reference primary key attrs ----
516    // Collect effective key attribute names
517    let mut effective_key_attrs = vec![effective_pk.clone()];
518    if let Some(ref sk) = effective_sk {
519        effective_key_attrs.push(sk.clone());
520    }
521
522    // Check legacy QueryFilter
523    if let Some(ref qf_val) = request.query_filter {
524        if let Some(obj) = qf_val.as_object() {
525            for attr_name in obj.keys() {
526                if effective_key_attrs.contains(attr_name) {
527                    return Err(DynoxideError::ValidationException(format!(
528                        "QueryFilter can only contain non-primary key attributes: \
529                         Primary key attribute: {attr_name}"
530                    )));
531                }
532            }
533        }
534    }
535
536    // Check FilterExpression for key attribute references (only for user-supplied expressions,
537    // not those converted from QueryFilter - QueryFilter is checked separately above)
538    if request.query_filter.is_none() {
539        if let Some(ref fe) = request.filter_expression {
540            if let Ok(parsed_fe) = expressions::condition::parse(fe) {
541                let top_attrs = expressions::condition::extract_top_level_attributes(
542                    &parsed_fe,
543                    &request.expression_attribute_names,
544                );
545                for attr in &top_attrs {
546                    if effective_key_attrs.contains(attr) {
547                        return Err(DynoxideError::ValidationException(format!(
548                            "Filter Expression can only contain non-primary key attributes: \
549                             Primary key attribute: {attr}"
550                        )));
551                    }
552                }
553                // Check for non-scalar key access in FilterExpression
554                // Build index key attribute lists
555                let mut index_key_attrs = Vec::new();
556                if request.index_name.is_some() {
557                    // Index keys that are not also table keys
558                    if !effective_key_attrs
559                        .iter()
560                        .any(|k| k == &table_key_schema.partition_key)
561                    {
562                        // This shouldn't normally happen for query, but just in case
563                    }
564                    // Check all effective key attrs for non-scalar access
565                    for k in &effective_key_attrs {
566                        if ![table_key_schema.partition_key.clone()]
567                            .iter()
568                            .chain(table_key_schema.sort_key.iter())
569                            .any(|tk| tk == k)
570                        {
571                            index_key_attrs.push(k.clone());
572                        }
573                    }
574                }
575                let base_key_attrs: Vec<String> = {
576                    let mut v = vec![table_key_schema.partition_key.clone()];
577                    if let Some(ref sk) = table_key_schema.sort_key {
578                        v.push(sk.clone());
579                    }
580                    v
581                };
582                if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
583                    &parsed_fe,
584                    &request.expression_attribute_names,
585                    &base_key_attrs,
586                    &index_key_attrs,
587                ) {
588                    let prefix = if is_index { "IndexKey" } else { "Key" };
589                    return Err(DynoxideError::ValidationException(format!(
590                        "Key attributes must be scalars; \
591                         list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
592                    )));
593                }
594            }
595        }
596    }
597
598    let is_index_query = request.index_name.is_some();
599
600    // Build ExclusiveStartKey sk value.
601    // For hash-only GSIs (no sort key), use empty string so the composite
602    // cursor (gsi_sk, table_pk, table_sk) can drive pagination.
603    let start_sk = if let Some(ref esk) = exclusive_start_key {
604        if let Some(ref sk_name) = effective_sk {
605            esk.get(sk_name).and_then(|v| v.to_key_string())
606        } else if is_index_query {
607            // Hash-only index: gsi_sk / lsi_sk is always ''
608            Some(String::new())
609        } else {
610            None
611        }
612    } else {
613        None
614    };
615
616    // For LSI and GSI queries, extract the base table keys from ExclusiveStartKey
617    // to enable composite cursor pagination.
618    let (start_base_pk, start_base_sk) = if is_index_query {
619        if let Some(ref esk) = exclusive_start_key {
620            let base_pk = esk
621                .get(&table_key_schema.partition_key)
622                .and_then(|v| v.to_key_string());
623            let base_sk = table_key_schema
624                .sort_key
625                .as_ref()
626                .and_then(|sk_name| esk.get(sk_name))
627                .and_then(|v| v.to_key_string());
628            (base_pk, base_sk)
629        } else {
630            (None, None)
631        }
632    } else {
633        (None, None)
634    };
635
636    // Validate Select=ALL_ATTRIBUTES against index projection type.
637    // For GSI with non-ALL projection, DynamoDB rejects ALL_ATTRIBUTES.
638    let is_select_all_attributes = request
639        .select
640        .as_deref()
641        .map(|s| s.eq_ignore_ascii_case("ALL_ATTRIBUTES"))
642        .unwrap_or(false);
643    let fetch_from_base_table = if is_select_all_attributes {
644        if let Some(ref proj_type) = index_projection_type {
645            if *proj_type != crate::types::ProjectionType::ALL {
646                if !is_lsi {
647                    return Err(DynoxideError::ValidationException(format!(
648                        "One or more parameter values were invalid: \
649                         Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
650                         because its projection type is not ALL",
651                        request.index_name.as_deref().unwrap_or("")
652                    )));
653                }
654                // LSI with non-ALL projection: fetch full items from base table
655                true
656            } else {
657                false
658            }
659        } else {
660            false
661        }
662    } else {
663        false
664    };
665
666    // Combine sk conditions into a single SQL fragment
667    let sk_condition_sql = if sk_sql_parts.is_empty() {
668        None
669    } else {
670        Some(sk_sql_parts.join(" "))
671    };
672
673    let fetch_limit = request.limit;
674    let sk_params_refs: Vec<&str> = sk_param_values.iter().map(|s| s.as_str()).collect();
675
676    // Query either GSI table or base table
677    let query_params = crate::storage::QueryParams {
678        sk_condition: sk_condition_sql.as_deref(),
679        sk_params: &sk_params_refs,
680        forward: request.scan_index_forward,
681        limit: fetch_limit,
682        exclusive_start_sk: start_sk.as_deref(),
683        exclusive_start_base_pk: start_base_pk.as_deref(),
684        exclusive_start_base_sk: start_base_sk.as_deref(),
685    };
686    let rows = if let Some(ref index_name) = request.index_name {
687        if is_lsi {
688            storage.query_lsi_items(&request.table_name, index_name, &pk_str, &query_params)?
689        } else {
690            storage.query_gsi_items(&request.table_name, index_name, &pk_str, &query_params)?
691        }
692    } else {
693        storage.query_items(&request.table_name, &pk_str, &query_params)?
694    };
695
696    // Parse filter expression if present
697    let filter_expr = request
698        .filter_expression
699        .as_ref()
700        .map(|expr| expressions::condition::parse(expr))
701        .transpose()
702        .map_err(DynoxideError::ValidationException)?;
703
704    // Parse projection expression if present; fall back to legacy AttributesToGet
705    let projection = if let Some(ref proj_expr) = request.projection_expression {
706        Some(
707            expressions::projection::parse(proj_expr)
708                .map_err(DynoxideError::ValidationException)?,
709        )
710    } else {
711        legacy_projection.clone()
712    };
713
714    // Pre-register expression references so unused check works even with zero items
715    if let Some(ref filter) = filter_expr {
716        tracker.track_condition_expr(filter);
717    }
718    if let Some(ref proj) = projection {
719        tracker.track_projection_expr(proj);
720    }
721
722    // Untracked variant for the per-item hot loop — tracking already done above
723    let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
724        &request.expression_attribute_names,
725        &request.expression_attribute_values,
726    );
727
728    // Determine if SELECT COUNT
729    let is_count = request
730        .select
731        .as_deref()
732        .map(|s| s.eq_ignore_ascii_case("COUNT"))
733        .unwrap_or(false);
734
735    // Key attribute names for projection (use effective keys for GSI)
736    let mut key_attrs = vec![effective_pk.clone()];
737    if let Some(ref sk) = effective_sk {
738        key_attrs.push(sk.clone());
739    }
740    // Also include base table keys when querying a GSI
741    if request.index_name.is_some() {
742        if !key_attrs.contains(&table_key_schema.partition_key) {
743            key_attrs.push(table_key_schema.partition_key.clone());
744        }
745        if let Some(ref sk) = table_key_schema.sort_key {
746            if !key_attrs.contains(sk) {
747                key_attrs.push(sk.clone());
748            }
749        }
750    }
751
752    let mut items = Vec::new();
753    let mut scanned_count = 0;
754    let mut filtered_count = 0;
755    let mut cumulative_size = 0;
756    let mut last_evaluated_item: Option<Item> = None;
757    let mut truncated_by_size = false;
758
759    // Track sizes separately for ALL_ATTRIBUTES LSI queries where both
760    // index reads and base table reads contribute to ConsumedCapacity.
761    let mut base_table_cumulative_size = 0usize;
762    let mut index_cumulative_size = 0usize;
763
764    for (_pk, _sk, item_json) in &rows {
765        let index_item: Item = serde_json::from_str(item_json).map_err(|e| {
766            DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
767        })?;
768
769        // If Select=ALL_ATTRIBUTES on LSI with non-ALL projection, fetch full
770        // item from the base table for the response while using the index item
771        // for cursor tracking.
772        index_cumulative_size += crate::types::item_size(&index_item);
773        let item = if fetch_from_base_table {
774            let base_pk = index_item
775                .get(&table_key_schema.partition_key)
776                .and_then(|v| v.to_key_string())
777                .unwrap_or_default();
778            let base_sk = table_key_schema
779                .sort_key
780                .as_ref()
781                .and_then(|sk_name| index_item.get(sk_name))
782                .and_then(|v| v.to_key_string())
783                .unwrap_or_default();
784            if let Some(full_json) = storage.get_item(&request.table_name, &base_pk, &base_sk)? {
785                let full_item: Item = serde_json::from_str(&full_json).map_err(|e| {
786                    DynoxideError::InternalServerError(format!("Bad item JSON: {e}"))
787                })?;
788                base_table_cumulative_size += crate::types::item_size(&full_item);
789                full_item
790            } else {
791                index_item.clone()
792            }
793        } else {
794            index_item.clone()
795        };
796
797        scanned_count += 1;
798
799        // Check 1MB limit BEFORE filtering — DynamoDB counts all evaluated data
800        // towards the 1MB response size limit, not just items that pass the filter.
801        let item_size = crate::types::item_size(&item);
802        if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
803            truncated_by_size = true;
804            break;
805        }
806        cumulative_size += item_size;
807
808        // Apply filter
809        if let Some(ref filter) = filter_expr {
810            let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
811                .map_err(DynoxideError::ValidationException)?;
812            if !passes {
813                last_evaluated_item = Some(index_item);
814                continue;
815            }
816        }
817
818        filtered_count += 1;
819
820        // Apply projection -- do NOT auto-include key attributes when the
821        // user explicitly specified ProjectionExpression or AttributesToGet.
822        let result_item = if let Some(ref proj) = projection {
823            let no_keys: &[String] = &[];
824            expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
825                .map_err(DynoxideError::ValidationException)?
826        } else {
827            item
828        };
829
830        last_evaluated_item = Some(index_item);
831        if !is_count {
832            items.push(result_item);
833        }
834    }
835
836    // Check for unused expression attribute names/values
837    tracker.check_unused()?;
838
839    let count = if is_count {
840        filtered_count
841    } else {
842        items.len()
843    };
844
845    // Determine LastEvaluatedKey
846    // We return LEK if: we hit the Limit, or we hit the 1MB limit
847    let has_more = truncated_by_size
848        || (fetch_limit.is_some() && scanned_count >= fetch_limit.unwrap_or(usize::MAX));
849
850    // For index queries, include the base table primary key in LastEvaluatedKey
851    // alongside the effective (index) keys so the cursor can uniquely identify
852    // the position. For LSIs, include the table sort key. For GSIs, include
853    // both the table partition key and sort key.
854    let is_gsi_query = request.index_name.is_some() && !is_lsi;
855    let last_evaluated_key = if has_more {
856        last_evaluated_item.map(|item| {
857            let mut key = build_last_evaluated_key(&item, &effective_pk, effective_sk.as_deref());
858            // For LSI queries, add the table sort key if different from the index sort key
859            if is_lsi {
860                if let Some(tsk) = table_key_schema.sort_key.as_deref() {
861                    if !key.contains_key(tsk) {
862                        if let Some(v) = item.get(tsk) {
863                            key.insert(tsk.to_string(), v.clone());
864                        }
865                    }
866                }
867            }
868            // For GSI queries, add the base table primary key (pk and sk)
869            if is_gsi_query {
870                if !key.contains_key(&table_key_schema.partition_key) {
871                    if let Some(v) = item.get(&table_key_schema.partition_key) {
872                        key.insert(table_key_schema.partition_key.clone(), v.clone());
873                    }
874                }
875                if let Some(ref tsk) = table_key_schema.sort_key {
876                    if !key.contains_key(tsk) {
877                        if let Some(v) = item.get(tsk) {
878                            key.insert(tsk.clone(), v.clone());
879                        }
880                    }
881                }
882            }
883            key
884        })
885    } else {
886        None
887    };
888
889    // Attribute read capacity to the index if querying one
890    let is_gsi = is_gsi_query;
891    let consistent = request.consistent_read.unwrap_or(false);
892    let consumed_capacity = if is_gsi {
893        let mut gsi_units = std::collections::HashMap::new();
894        gsi_units.insert(
895            request.index_name.as_ref().unwrap().clone(),
896            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
897        );
898        crate::types::consumed_capacity_with_indexes(
899            &request.table_name,
900            0.0,
901            &gsi_units,
902            &request.return_consumed_capacity,
903        )
904    } else if is_lsi {
905        // When fetching from the base table (ALL_ATTRIBUTES on non-ALL LSI),
906        // split capacity between the index read and the table read.
907        let (table_cap, lsi_cap) = if fetch_from_base_table {
908            let table_rcu = crate::types::read_capacity_units_with_consistency(
909                base_table_cumulative_size,
910                consistent,
911            );
912            let lsi_rcu = crate::types::read_capacity_units_with_consistency(
913                index_cumulative_size,
914                consistent,
915            );
916            (table_rcu, lsi_rcu)
917        } else {
918            (
919                0.0,
920                crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
921            )
922        };
923        let mut lsi_units = std::collections::HashMap::new();
924        lsi_units.insert(request.index_name.as_ref().unwrap().clone(), lsi_cap);
925        crate::types::consumed_capacity_with_secondary_indexes(
926            &request.table_name,
927            table_cap,
928            &std::collections::HashMap::new(),
929            &lsi_units,
930            &request.return_consumed_capacity,
931        )
932    } else {
933        crate::types::consumed_capacity(
934            &request.table_name,
935            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
936            &request.return_consumed_capacity,
937        )
938    };
939
940    Ok(QueryResponse {
941        items: if is_count { None } else { Some(items) },
942        count,
943        scanned_count,
944        last_evaluated_key,
945        consumed_capacity,
946    })
947}
948
949fn build_last_evaluated_key(
950    item: &Item,
951    pk_name: &str,
952    sk_name: Option<&str>,
953) -> HashMap<String, AttributeValue> {
954    let mut key = HashMap::new();
955    if let Some(pk_val) = item.get(pk_name) {
956        key.insert(pk_name.to_string(), pk_val.clone());
957    }
958    if let Some(sk) = sk_name {
959        if let Some(sk_val) = item.get(sk) {
960            key.insert(sk.to_string(), sk_val.clone());
961        }
962    }
963    key
964}