Skip to main content

dynoxide/actions/
query.rs

1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage_backend::StorageBackend;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// 1MB response size limit for Query/Scan.
10const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12/// Internal deserialization struct for detecting missing fields.
13#[derive(Debug, Default, Deserialize)]
14struct QueryRequestRaw {
15    #[serde(rename = "TableName", default)]
16    table_name: Option<String>,
17    #[serde(rename = "KeyConditionExpression", default)]
18    key_condition_expression: Option<String>,
19    #[serde(rename = "FilterExpression", default)]
20    filter_expression: Option<String>,
21    #[serde(rename = "ProjectionExpression", default)]
22    projection_expression: Option<String>,
23    #[serde(rename = "ExpressionAttributeNames", default)]
24    expression_attribute_names: Option<HashMap<String, String>>,
25    #[serde(rename = "ExpressionAttributeValues", default)]
26    expression_attribute_values: Option<HashMap<String, AttributeValue>>,
27    #[serde(rename = "ScanIndexForward", default = "default_true")]
28    scan_index_forward: bool,
29    #[serde(rename = "Limit", default)]
30    limit: Option<usize>,
31    #[serde(rename = "ExclusiveStartKey", default)]
32    exclusive_start_key: Option<serde_json::Value>,
33    #[serde(rename = "Select", default)]
34    select: Option<String>,
35    #[serde(rename = "ConsistentRead", default)]
36    consistent_read: Option<bool>,
37    #[serde(rename = "IndexName", default)]
38    index_name: Option<String>,
39    #[serde(rename = "ReturnConsumedCapacity", default)]
40    return_consumed_capacity: Option<String>,
41    #[serde(rename = "KeyConditions", default)]
42    key_conditions: Option<serde_json::Value>,
43    #[serde(rename = "AttributesToGet", default)]
44    attributes_to_get: Option<Vec<String>>,
45    #[serde(rename = "QueryFilter", default)]
46    query_filter: Option<serde_json::Value>,
47    #[serde(rename = "ConditionalOperator", default)]
48    conditional_operator: Option<String>,
49}
50
51fn default_true() -> bool {
52    true
53}
54
55#[derive(Debug, Default)]
56pub struct QueryRequest {
57    pub table_name: String,
58    pub key_condition_expression: Option<String>,
59    pub filter_expression: Option<String>,
60    pub projection_expression: Option<String>,
61    pub expression_attribute_names: Option<HashMap<String, String>>,
62    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
63    pub scan_index_forward: bool,
64    pub limit: Option<usize>,
65    pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
66    pub select: Option<String>,
67    pub consistent_read: Option<bool>,
68    pub index_name: Option<String>,
69    pub return_consumed_capacity: Option<String>,
70    pub key_conditions: Option<serde_json::Value>,
71    pub attributes_to_get: Option<Vec<String>>,
72    pub query_filter: Option<serde_json::Value>,
73    pub conditional_operator: Option<String>,
74    /// Raw JSON for ExclusiveStartKey when deserialized from HTTP request.
75    /// Parsed lazily in `execute()` after other validations run.
76    /// When constructed directly (e.g. from MCP), this is `None` and
77    /// `exclusive_start_key` is used instead.
78    pub exclusive_start_key_raw: Option<serde_json::Value>,
79}
80
81impl<'de> serde::Deserialize<'de> for QueryRequest {
82    fn deserialize<D: serde::Deserializer<'de>>(
83        deserializer: D,
84    ) -> std::result::Result<Self, D::Error> {
85        let raw = QueryRequestRaw::deserialize(deserializer)?;
86        use crate::validation::{
87            TableNameContext, format_validation_errors, table_name_constraint_errors,
88        };
89
90        let mut errors = Vec::new();
91        errors.extend(table_name_constraint_errors(
92            raw.table_name.as_deref(),
93            TableNameContext::ReadWrite,
94        ));
95        let table_name = raw.table_name.unwrap_or_default();
96
97        // ReturnConsumedCapacity enum
98        if let Some(ref rcc) = raw.return_consumed_capacity {
99            if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
100                errors.push(format!(
101                    "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
102                     Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
103                    rcc
104                ));
105            }
106        }
107
108        // Select enum
109        if let Some(ref sel) = raw.select {
110            if ![
111                "ALL_ATTRIBUTES",
112                "ALL_PROJECTED_ATTRIBUTES",
113                "COUNT",
114                "SPECIFIC_ATTRIBUTES",
115            ]
116            .contains(&sel.as_str())
117            {
118                errors.push(format!(
119                    "Value '{}' at 'select' failed to satisfy constraint: \
120                     Member must satisfy enum value set: [SPECIFIC_ATTRIBUTES, COUNT, ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES]",
121                    sel
122                ));
123            }
124        }
125
126        // Limit must be >= 1.
127        // AWS DynamoDB's Query message diverges from Scan: Query omits the rejected
128        // value entirely and capitalises 'Limit', whereas Scan keeps the value and
129        // lowercases 'limit'. Do not collapse these into a shared helper.
130        if let Some(limit) = raw.limit {
131            if limit == 0 {
132                errors.push(
133                    "Value at 'Limit' failed to satisfy constraint: \
134                     Member must have value greater than or equal to 1"
135                        .to_string(),
136                );
137            }
138        }
139
140        if let Some(msg) = format_validation_errors(&errors) {
141            return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
142        }
143
144        Ok(QueryRequest {
145            table_name,
146            key_condition_expression: raw.key_condition_expression,
147            filter_expression: raw.filter_expression,
148            projection_expression: raw.projection_expression,
149            expression_attribute_names: raw.expression_attribute_names,
150            expression_attribute_values: raw.expression_attribute_values,
151            scan_index_forward: raw.scan_index_forward,
152            limit: raw.limit,
153            exclusive_start_key: None,
154            select: raw.select,
155            consistent_read: raw.consistent_read,
156            index_name: raw.index_name,
157            return_consumed_capacity: raw.return_consumed_capacity,
158            key_conditions: raw.key_conditions,
159            attributes_to_get: raw.attributes_to_get,
160            query_filter: raw.query_filter,
161            conditional_operator: raw.conditional_operator,
162            exclusive_start_key_raw: raw.exclusive_start_key,
163        })
164    }
165}
166
167#[derive(Debug, Default, Serialize)]
168pub struct QueryResponse {
169    #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
170    pub items: Option<Vec<Item>>,
171    #[serde(rename = "Count")]
172    pub count: usize,
173    #[serde(rename = "ScannedCount")]
174    pub scanned_count: usize,
175    #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
176    pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
177    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
178    pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
179}
180
181pub async fn execute<S: StorageBackend>(
182    storage: &S,
183    mut request: QueryRequest,
184) -> Result<QueryResponse> {
185    // Validate table name format before checking existence (DynamoDB validates input first)
186    crate::validation::validate_table_name(&request.table_name)?;
187
188    // ---- Expression vs non-expression mixing validation ----
189    // DynamoDB checks this before anything else (except table name format and ESK values).
190    {
191        let mut non_expr = Vec::new();
192        let mut expr = Vec::new();
193        if request.attributes_to_get.is_some() {
194            non_expr.push("AttributesToGet");
195        }
196        if request.query_filter.is_some()
197            && request.query_filter.as_ref().is_some_and(|v| !v.is_null())
198        {
199            non_expr.push("QueryFilter");
200        }
201        if request.conditional_operator.is_some() {
202            non_expr.push("ConditionalOperator");
203        }
204        if request.key_conditions.is_some()
205            && request
206                .key_conditions
207                .as_ref()
208                .is_some_and(|v| !v.is_null())
209        {
210            non_expr.push("KeyConditions");
211        }
212        if request.projection_expression.is_some() {
213            expr.push("ProjectionExpression");
214        }
215        if request.filter_expression.is_some() {
216            expr.push("FilterExpression");
217        }
218        if request.key_condition_expression.is_some() {
219            expr.push("KeyConditionExpression");
220        }
221        let no_raw_eav: Option<serde_json::Value> = None;
222        let ctx = helpers::ExpressionParamContext {
223            non_expression_params: non_expr,
224            expression_params: expr,
225            all_expression_param_names: vec!["FilterExpression", "KeyConditionExpression"],
226            expression_attribute_names: &request.expression_attribute_names,
227            expression_attribute_values: &request.expression_attribute_values,
228            expression_attribute_values_raw: &no_raw_eav,
229        };
230        helpers::validate_expression_params(&ctx)?;
231    }
232
233    // ---- Validate filter attribute values (before argument counts, matching DynamoDB) ----
234    helpers::validate_filter_conditions_raw(request.query_filter.as_ref(), "QueryFilter")?;
235    helpers::validate_filter_conditions_raw(request.key_conditions.as_ref(), "KeyConditions")?;
236
237    // ---- Validate filter argument counts and type compatibility ----
238    helpers::validate_filter_condition_args(request.query_filter.as_ref())?;
239    helpers::validate_filter_condition_args(request.key_conditions.as_ref())?;
240
241    // ---- Validate duplicate AttributesToGet ----
242    if let Some(ref attrs) = request.attributes_to_get {
243        helpers::validate_attributes_to_get_no_duplicates(attrs)?;
244    }
245
246    // ---- Parse ExclusiveStartKey from JSON value ----
247    let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
248        Some(helpers::parse_exclusive_start_key(esk_val)?)
249    } else {
250        request.exclusive_start_key.clone()
251    };
252
253    // ---- Validate expression syntax BEFORE table existence ----
254    // DynamoDB validates KeyConditionExpression, FilterExpression, and
255    // ProjectionExpression syntax before checking if the table exists.
256    if let Some(ref kce) = request.key_condition_expression {
257        if kce.is_empty() {
258            return Err(DynoxideError::ValidationException(
259                "Invalid KeyConditionExpression: The expression can not be empty;".to_string(),
260            ));
261        }
262    }
263    if let Some(ref fe) = request.filter_expression {
264        if fe.is_empty() {
265            if request.query_filter.is_none() || request.filter_expression.as_deref() == Some("") {
266                return Err(DynoxideError::ValidationException(
267                    "Invalid FilterExpression: The expression can not be empty;".to_string(),
268                ));
269            }
270        } else {
271            let parsed_fe = expressions::condition::parse(fe).map_err(|e| {
272                DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
273            })?;
274            // Validate that all #name references are defined in ExpressionAttributeNames
275            // (before table existence check, matching DynamoDB's validation ordering)
276            if let Err(e) = expressions::condition::validate_name_refs(
277                &parsed_fe,
278                &request.expression_attribute_names,
279            ) {
280                return Err(DynoxideError::ValidationException(format!(
281                    "Invalid FilterExpression: {e}"
282                )));
283            }
284            if let Err(e) = expressions::condition::validate_operand_semantics(
285                &parsed_fe,
286                &request.expression_attribute_names,
287                &request.expression_attribute_values,
288            ) {
289                return Err(DynoxideError::ValidationException(format!(
290                    "Invalid FilterExpression: {e}"
291                )));
292            }
293        }
294    }
295    if let Some(ref pe) = request.projection_expression {
296        if pe.is_empty() {
297            return Err(DynoxideError::ValidationException(
298                "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
299            ));
300        }
301    }
302
303    // SPECIFIC_ATTRIBUTES requires ProjectionExpression or AttributesToGet
304    if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
305        && request.projection_expression.is_none()
306        && request.attributes_to_get.is_none()
307    {
308        return Err(DynoxideError::ValidationException(
309            "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
310                .to_string(),
311        ));
312    }
313
314    // A ProjectionExpression is only valid with Select = SPECIFIC_ATTRIBUTES.
315    if request.projection_expression.is_some() {
316        if let Some(select) = request.select.as_deref() {
317            if select != "SPECIFIC_ATTRIBUTES" {
318                // AWS phrases COUNT as "only the Count"; other values use the literal.
319                let target = if select == "COUNT" {
320                    "only the Count"
321                } else {
322                    select
323                };
324                return Err(DynoxideError::ValidationException(format!(
325                    "Cannot specify the ProjectionExpression when choosing to get {target}"
326                )));
327            }
328        }
329    }
330
331    // ALL_PROJECTED_ATTRIBUTES projects an index, so it requires an IndexName.
332    if request.select.as_deref() == Some("ALL_PROJECTED_ATTRIBUTES") && request.index_name.is_none()
333    {
334        return Err(DynoxideError::ValidationException(
335            "ALL_PROJECTED_ATTRIBUTES can be used only when Querying using an IndexName"
336                .to_string(),
337        ));
338    }
339
340    // For KeyConditionExpression, validate syntax early too
341    if let Some(ref kce) = request.key_condition_expression {
342        if !kce.is_empty() {
343            // Create a temporary tracker for early syntax validation
344            let temp_tracker = crate::expressions::TrackedExpressionAttributes::new(
345                &request.expression_attribute_names,
346                &request.expression_attribute_values,
347            );
348            if let Err(e) = expressions::key_condition::parse(kce, &temp_tracker) {
349                return Err(DynoxideError::ValidationException(e));
350            }
351        }
352    }
353
354    let meta = helpers::require_table_for_item_op(storage, &request.table_name).await?;
355    let table_key_schema = helpers::parse_key_schema(&meta)?;
356
357    // Determine effective partition key name early so we can pass it to
358    // the legacy KeyConditions converter (ensures correct ordering when
359    // both hash and range keys use EQ).
360    let effective_pk_for_kc = if let Some(ref index_name) = request.index_name {
361        if let Some((pk, _)) = request
362            .index_name
363            .as_ref()
364            .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok())
365        {
366            pk
367        } else if let Ok((pk, _)) = super::gsi::parse_gsi_key_schema(&meta, index_name) {
368            pk
369        } else {
370            table_key_schema.partition_key.clone()
371        }
372    } else {
373        table_key_schema.partition_key.clone()
374    };
375
376    // Convert legacy KeyConditions to KeyConditionExpression if no expression is set
377    if request.key_condition_expression.is_none() {
378        if let Some(ref kc_val) = request.key_conditions {
379            if let Ok(kc) =
380                serde_json::from_value::<HashMap<String, helpers::KeyCondition>>(kc_val.clone())
381            {
382                if !kc.is_empty() {
383                    let converted =
384                        helpers::convert_key_conditions(&kc, Some(&effective_pk_for_kc))?;
385                    request.key_condition_expression = Some(converted.expression);
386                    let expr_values = request
387                        .expression_attribute_values
388                        .get_or_insert_with(HashMap::new);
389                    expr_values.extend(converted.attribute_values);
390                    let expr_names = request
391                        .expression_attribute_names
392                        .get_or_insert_with(HashMap::new);
393                    expr_names.extend(converted.attribute_names);
394                }
395            }
396        }
397    }
398
399    // Convert legacy QueryFilter to FilterExpression if no expression is set
400    if request.filter_expression.is_none() {
401        if let Some(ref qf_val) = request.query_filter {
402            if let Ok(qf) =
403                serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(qf_val.clone())
404            {
405                if !qf.is_empty() {
406                    let converted = helpers::convert_filter_conditions(
407                        &qf,
408                        request.conditional_operator.as_deref(),
409                    )?;
410                    if !converted.expression.is_empty() {
411                        request.filter_expression = Some(converted.expression);
412                        let expr_values = request
413                            .expression_attribute_values
414                            .get_or_insert_with(HashMap::new);
415                        expr_values.extend(converted.attribute_values);
416                        let expr_names = request
417                            .expression_attribute_names
418                            .get_or_insert_with(HashMap::new);
419                        expr_names.extend(converted.attribute_names);
420                    }
421                }
422            }
423        }
424    }
425
426    // Convert legacy AttributesToGet to projection
427    let legacy_projection = if request.projection_expression.is_none() {
428        request
429            .attributes_to_get
430            .as_ref()
431            .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
432    } else {
433        None
434    };
435
436    // Ensure KeyConditionExpression is present (required)
437    let key_condition_expression = request.key_condition_expression.as_deref().ok_or_else(|| {
438        DynoxideError::ValidationException(
439            "Either the KeyConditions or KeyConditionExpression parameter must be specified in the request."
440                .to_string(),
441        )
442    })?;
443    let key_condition_expression = key_condition_expression.to_string();
444
445    // Determine effective key schema (GSI, LSI, or base table)
446    let lsi_keys = request
447        .index_name
448        .as_ref()
449        .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
450    let is_lsi = lsi_keys.is_some();
451
452    // ConsistentRead is not supported on GSIs (LSIs are fine)
453    if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
454        return Err(DynoxideError::ValidationException(
455            "Consistent reads are not supported on global secondary indexes".to_string(),
456        ));
457    }
458
459    // Parse full index definition to get projection type
460    let index_projection_type = if let Some(ref index_name) = request.index_name {
461        if is_lsi {
462            super::lsi::parse_lsi_defs(&meta)?
463                .into_iter()
464                .find(|l| l.index_name == *index_name)
465                .map(|l| l.projection_type)
466        } else {
467            super::gsi::parse_gsi_defs(&meta)?
468                .into_iter()
469                .find(|g| g.index_name == *index_name)
470                .map(|g| g.projection_type)
471        }
472    } else {
473        None
474    };
475
476    let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
477        if let Some(keys) = lsi_keys {
478            keys
479        } else {
480            super::gsi::parse_gsi_key_schema(&meta, index_name)?
481        }
482    } else {
483        (
484            table_key_schema.partition_key.clone(),
485            table_key_schema.sort_key.clone(),
486        )
487    };
488
489    // ---- Validate ExclusiveStartKey structure against key schema ----
490    if let Some(ref esk) = exclusive_start_key {
491        // Stage 1+2: count check + index key type check
492        helpers::validate_esk_count_and_index_keys(
493            esk,
494            &meta,
495            request.index_name.as_deref(),
496            "The provided starting key is invalid",
497        )?;
498        // Stage 3: table key type check
499        helpers::validate_esk_table_keys(esk, &meta)?;
500    }
501
502    // Create tracker for unused expression attribute names/values
503    let tracker = crate::expressions::TrackedExpressionAttributes::new(
504        &request.expression_attribute_names,
505        &request.expression_attribute_values,
506    );
507
508    // Parse KeyConditionExpression
509    let key_cond = expressions::key_condition::parse(&key_condition_expression, &tracker)
510        .map_err(DynoxideError::ValidationException)?;
511
512    // Validate pk_name matches the effective partition key
513    if key_cond.pk_name != effective_pk {
514        return Err(DynoxideError::ValidationException(format!(
515            "Query condition missed key schema element: {}",
516            effective_pk
517        )));
518    }
519
520    // Resolve values
521    let resolved = expressions::key_condition::resolve_values(&key_cond, &tracker)
522        .map_err(DynoxideError::ValidationException)?;
523
524    // Get pk string
525    let pk_str = resolved.pk_value.to_key_string().ok_or_else(|| {
526        DynoxideError::ValidationException(
527            "Cannot convert partition key value to string".to_string(),
528        )
529    })?;
530
531    // Build sk SQL conditions
532    let mut sk_sql_parts = Vec::new();
533    let mut sk_param_values = Vec::new();
534
535    if let Some(ref sk_cond) = resolved.sk_condition {
536        // Validate sk name matches effective sort key
537        if let Some(ref eff_sk) = effective_sk {
538            if sk_cond.sk_name() != eff_sk {
539                return Err(DynoxideError::ValidationException(format!(
540                    "Query condition missed key schema element: {eff_sk}"
541                )));
542            }
543        } else {
544            return Err(DynoxideError::ValidationException(
545                "Query filter contains a sort key condition but the table has no sort key"
546                    .to_string(),
547            ));
548        }
549
550        let conditions = sk_cond.to_sql_conditions();
551        for (i, (op, val)) in conditions.iter().enumerate() {
552            let param_idx = i + 2; // pk is ?1, sk params start at ?2
553            if op == "LIKE" {
554                sk_sql_parts.push(format!("AND sk LIKE ?{param_idx} ESCAPE '\\'"));
555            } else {
556                sk_sql_parts.push(format!("AND sk {op} ?{param_idx}"));
557            }
558            sk_param_values.push(val.clone());
559        }
560    }
561
562    // ---- Validate QueryFilter/FilterExpression don't reference primary key attrs ----
563    // Collect effective key attribute names
564    let mut effective_key_attrs = vec![effective_pk.clone()];
565    if let Some(ref sk) = effective_sk {
566        effective_key_attrs.push(sk.clone());
567    }
568
569    // Check legacy QueryFilter
570    if let Some(ref qf_val) = request.query_filter {
571        if let Some(obj) = qf_val.as_object() {
572            for attr_name in obj.keys() {
573                if effective_key_attrs.contains(attr_name) {
574                    return Err(DynoxideError::ValidationException(format!(
575                        "QueryFilter can only contain non-primary key attributes: \
576                         Primary key attribute: {attr_name}"
577                    )));
578                }
579            }
580        }
581    }
582
583    // Check FilterExpression for key attribute references (only for user-supplied expressions,
584    // not those converted from QueryFilter - QueryFilter is checked separately above)
585    if request.query_filter.is_none() {
586        if let Some(ref fe) = request.filter_expression {
587            if let Ok(parsed_fe) = expressions::condition::parse(fe) {
588                let top_attrs = expressions::condition::extract_top_level_attributes(
589                    &parsed_fe,
590                    &request.expression_attribute_names,
591                );
592                for attr in &top_attrs {
593                    if effective_key_attrs.contains(attr) {
594                        return Err(DynoxideError::ValidationException(format!(
595                            "Filter Expression can only contain non-primary key attributes: \
596                             Primary key attribute: {attr}"
597                        )));
598                    }
599                }
600                // Check for non-scalar key access in FilterExpression
601                // Build index key attribute lists
602                let mut index_key_attrs = Vec::new();
603                if request.index_name.is_some() {
604                    // Index keys that are not also table keys
605                    if !effective_key_attrs
606                        .iter()
607                        .any(|k| k == &table_key_schema.partition_key)
608                    {
609                        // This shouldn't normally happen for query, but just in case
610                    }
611                    // Check all effective key attrs for non-scalar access
612                    for k in &effective_key_attrs {
613                        if ![table_key_schema.partition_key.clone()]
614                            .iter()
615                            .chain(table_key_schema.sort_key.iter())
616                            .any(|tk| tk == k)
617                        {
618                            index_key_attrs.push(k.clone());
619                        }
620                    }
621                }
622                let base_key_attrs: Vec<String> = {
623                    let mut v = vec![table_key_schema.partition_key.clone()];
624                    if let Some(ref sk) = table_key_schema.sort_key {
625                        v.push(sk.clone());
626                    }
627                    v
628                };
629                if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
630                    &parsed_fe,
631                    &request.expression_attribute_names,
632                    &base_key_attrs,
633                    &index_key_attrs,
634                ) {
635                    let prefix = if is_index { "IndexKey" } else { "Key" };
636                    return Err(DynoxideError::ValidationException(format!(
637                        "Key attributes must be scalars; \
638                         list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
639                    )));
640                }
641            }
642        }
643    }
644
645    let is_index_query = request.index_name.is_some();
646
647    // Build ExclusiveStartKey sk value.
648    // For hash-only GSIs (no sort key), use empty string so the composite
649    // cursor (gsi_sk, table_pk, table_sk) can drive pagination.
650    let start_sk = if let Some(ref esk) = exclusive_start_key {
651        if let Some(ref sk_name) = effective_sk {
652            esk.get(sk_name).and_then(|v| v.to_key_string())
653        } else if is_index_query {
654            // Hash-only index: gsi_sk / lsi_sk is always ''
655            Some(String::new())
656        } else {
657            None
658        }
659    } else {
660        None
661    };
662
663    // For LSI and GSI queries, extract the base table keys from ExclusiveStartKey
664    // to enable composite cursor pagination.
665    let (start_base_pk, start_base_sk) = if is_index_query {
666        if let Some(ref esk) = exclusive_start_key {
667            let base_pk = esk
668                .get(&table_key_schema.partition_key)
669                .and_then(|v| v.to_key_string());
670            // When the base table is hash-only there is no base sort key, but the
671            // GSI/LSI row stores the empty-string default in its table_sk column.
672            // Default base_sk to "" (mirroring start_sk) so the composite cursor
673            // keeps its full width and disambiguates tied index keys by table_pk.
674            // Leaving it None collapses query_gsi_items to the 1-column gsi_sk
675            // cursor, which cannot advance past rows sharing the same index key
676            // and silently drops them. See scan.rs for the Scan-path counterpart.
677            let base_sk = if let Some(sk_name) = table_key_schema.sort_key.as_ref() {
678                esk.get(sk_name).and_then(|v| v.to_key_string())
679            } else {
680                Some(String::new())
681            };
682            (base_pk, base_sk)
683        } else {
684            (None, None)
685        }
686    } else {
687        (None, None)
688    };
689
690    // Validate Select=ALL_ATTRIBUTES against index projection type.
691    // For GSI with non-ALL projection, DynamoDB rejects ALL_ATTRIBUTES.
692    let is_select_all_attributes = request
693        .select
694        .as_deref()
695        .map(|s| s.eq_ignore_ascii_case("ALL_ATTRIBUTES"))
696        .unwrap_or(false);
697    let fetch_from_base_table = if is_select_all_attributes {
698        if let Some(ref proj_type) = index_projection_type {
699            if *proj_type != crate::types::ProjectionType::ALL {
700                if !is_lsi {
701                    return Err(DynoxideError::ValidationException(format!(
702                        "One or more parameter values were invalid: \
703                         Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
704                         because its projection type is not ALL",
705                        request.index_name.as_deref().unwrap_or("")
706                    )));
707                }
708                // LSI with non-ALL projection: fetch full items from base table
709                true
710            } else {
711                false
712            }
713        } else {
714            false
715        }
716    } else {
717        false
718    };
719
720    // Combine sk conditions into a single SQL fragment
721    let sk_condition_sql = if sk_sql_parts.is_empty() {
722        None
723    } else {
724        Some(sk_sql_parts.join(" "))
725    };
726
727    let fetch_limit = request.limit;
728    let sk_params_refs: Vec<&str> = sk_param_values.iter().map(|s| s.as_str()).collect();
729
730    // Query either GSI table or base table
731    let query_params = crate::storage::QueryParams {
732        sk_condition: sk_condition_sql.as_deref(),
733        sk_params: &sk_params_refs,
734        forward: request.scan_index_forward,
735        limit: fetch_limit,
736        exclusive_start_sk: start_sk.as_deref(),
737        exclusive_start_base_pk: start_base_pk.as_deref(),
738        exclusive_start_base_sk: start_base_sk.as_deref(),
739    };
740    let rows = if let Some(ref index_name) = request.index_name {
741        if is_lsi {
742            storage
743                .query_lsi_items(&request.table_name, index_name, &pk_str, &query_params)
744                .await?
745        } else {
746            storage
747                .query_gsi_items(&request.table_name, index_name, &pk_str, &query_params)
748                .await?
749        }
750    } else {
751        storage
752            .query_items(&request.table_name, &pk_str, &query_params)
753            .await?
754    };
755
756    // Parse filter expression if present
757    let filter_expr = request
758        .filter_expression
759        .as_ref()
760        .map(|expr| expressions::condition::parse(expr))
761        .transpose()
762        .map_err(DynoxideError::ValidationException)?;
763
764    // Parse projection expression if present; fall back to legacy AttributesToGet
765    let projection = if let Some(ref proj_expr) = request.projection_expression {
766        Some(
767            expressions::projection::parse(proj_expr)
768                .map_err(DynoxideError::ValidationException)?,
769        )
770    } else {
771        legacy_projection.clone()
772    };
773
774    // Pre-register expression references so unused check works even with zero items
775    if let Some(ref filter) = filter_expr {
776        tracker.track_condition_expr(filter);
777    }
778    if let Some(ref proj) = projection {
779        tracker.track_projection_expr(proj);
780    }
781
782    // Untracked variant for the per-item hot loop — tracking already done above
783    let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
784        &request.expression_attribute_names,
785        &request.expression_attribute_values,
786    );
787
788    // Determine if SELECT COUNT
789    let is_count = request
790        .select
791        .as_deref()
792        .map(|s| s.eq_ignore_ascii_case("COUNT"))
793        .unwrap_or(false);
794
795    // Key attribute names for projection (use effective keys for GSI)
796    let mut key_attrs = vec![effective_pk.clone()];
797    if let Some(ref sk) = effective_sk {
798        key_attrs.push(sk.clone());
799    }
800    // Also include base table keys when querying a GSI
801    if request.index_name.is_some() {
802        if !key_attrs.contains(&table_key_schema.partition_key) {
803            key_attrs.push(table_key_schema.partition_key.clone());
804        }
805        if let Some(ref sk) = table_key_schema.sort_key {
806            if !key_attrs.contains(sk) {
807                key_attrs.push(sk.clone());
808            }
809        }
810    }
811
812    let mut items = Vec::new();
813    let mut scanned_count = 0;
814    let mut filtered_count = 0;
815    let mut cumulative_size = 0;
816    let mut last_evaluated_item: Option<Item> = None;
817    let mut truncated_by_size = false;
818
819    // Track sizes separately for ALL_ATTRIBUTES LSI queries where both
820    // index reads and base table reads contribute to ConsumedCapacity.
821    let mut base_table_cumulative_size = 0usize;
822    let mut index_cumulative_size = 0usize;
823
824    for (_pk, _sk, item_json) in &rows {
825        let index_item: Item = serde_json::from_str(item_json).map_err(|e| {
826            DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
827        })?;
828
829        // If Select=ALL_ATTRIBUTES on LSI with non-ALL projection, fetch full
830        // item from the base table for the response while using the index item
831        // for cursor tracking.
832        index_cumulative_size += crate::types::item_size(&index_item);
833        let item = if fetch_from_base_table {
834            let base_pk = index_item
835                .get(&table_key_schema.partition_key)
836                .and_then(|v| v.to_key_string())
837                .unwrap_or_default();
838            let base_sk = table_key_schema
839                .sort_key
840                .as_ref()
841                .and_then(|sk_name| index_item.get(sk_name))
842                .and_then(|v| v.to_key_string())
843                .unwrap_or_default();
844            if let Some(full_json) = storage
845                .get_item(&request.table_name, &base_pk, &base_sk)
846                .await?
847            {
848                let full_item: Item = serde_json::from_str(&full_json).map_err(|e| {
849                    DynoxideError::InternalServerError(format!("Bad item JSON: {e}"))
850                })?;
851                base_table_cumulative_size += crate::types::item_size(&full_item);
852                full_item
853            } else {
854                index_item.clone()
855            }
856        } else {
857            index_item.clone()
858        };
859
860        scanned_count += 1;
861
862        // Check 1MB limit BEFORE filtering — DynamoDB counts all evaluated data
863        // towards the 1MB response size limit, not just items that pass the filter.
864        let item_size = crate::types::item_size(&item);
865        if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
866            truncated_by_size = true;
867            break;
868        }
869        cumulative_size += item_size;
870
871        // Apply filter
872        if let Some(ref filter) = filter_expr {
873            let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
874                .map_err(DynoxideError::ValidationException)?;
875            if !passes {
876                last_evaluated_item = Some(index_item);
877                continue;
878            }
879        }
880
881        filtered_count += 1;
882
883        // Apply projection -- do NOT auto-include key attributes when the
884        // user explicitly specified ProjectionExpression or AttributesToGet.
885        let result_item = if let Some(ref proj) = projection {
886            let no_keys: &[String] = &[];
887            expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
888                .map_err(DynoxideError::ValidationException)?
889        } else {
890            item
891        };
892
893        last_evaluated_item = Some(index_item);
894        if !is_count {
895            items.push(result_item);
896        }
897    }
898
899    // Check for unused expression attribute names/values
900    tracker.check_unused()?;
901
902    let count = if is_count {
903        filtered_count
904    } else {
905        items.len()
906    };
907
908    // Determine LastEvaluatedKey
909    // We return LEK if: we hit the Limit, or we hit the 1MB limit
910    let has_more = truncated_by_size
911        || (fetch_limit.is_some() && scanned_count >= fetch_limit.unwrap_or(usize::MAX));
912
913    // For index queries, include the base table primary key in LastEvaluatedKey
914    // alongside the effective (index) keys so the cursor can uniquely identify
915    // the position. For LSIs, include the table sort key. For GSIs, include
916    // both the table partition key and sort key.
917    let is_gsi_query = request.index_name.is_some() && !is_lsi;
918    let last_evaluated_key = if has_more {
919        last_evaluated_item.map(|item| {
920            let mut key = build_last_evaluated_key(&item, &effective_pk, effective_sk.as_deref());
921            // For LSI queries, add the table sort key if different from the index sort key
922            if is_lsi {
923                if let Some(tsk) = table_key_schema.sort_key.as_deref() {
924                    if !key.contains_key(tsk) {
925                        if let Some(v) = item.get(tsk) {
926                            key.insert(tsk.to_string(), v.clone());
927                        }
928                    }
929                }
930            }
931            // For GSI queries, add the base table primary key (pk and sk)
932            if is_gsi_query {
933                if !key.contains_key(&table_key_schema.partition_key) {
934                    if let Some(v) = item.get(&table_key_schema.partition_key) {
935                        key.insert(table_key_schema.partition_key.clone(), v.clone());
936                    }
937                }
938                if let Some(ref tsk) = table_key_schema.sort_key {
939                    if !key.contains_key(tsk) {
940                        if let Some(v) = item.get(tsk) {
941                            key.insert(tsk.clone(), v.clone());
942                        }
943                    }
944                }
945            }
946            key
947        })
948    } else {
949        None
950    };
951
952    // Attribute read capacity to the index if querying one
953    let is_gsi = is_gsi_query;
954    let consistent = request.consistent_read.unwrap_or(false);
955    let consumed_capacity = if is_gsi {
956        let mut gsi_units = std::collections::HashMap::new();
957        gsi_units.insert(
958            request.index_name.as_ref().unwrap().clone(),
959            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
960        );
961        crate::types::consumed_capacity_with_indexes(
962            &request.table_name,
963            0.0,
964            &gsi_units,
965            &request.return_consumed_capacity,
966        )
967    } else if is_lsi {
968        // When fetching from the base table (ALL_ATTRIBUTES on non-ALL LSI),
969        // split capacity between the index read and the table read.
970        let (table_cap, lsi_cap) = if fetch_from_base_table {
971            let table_rcu = crate::types::read_capacity_units_with_consistency(
972                base_table_cumulative_size,
973                consistent,
974            );
975            let lsi_rcu = crate::types::read_capacity_units_with_consistency(
976                index_cumulative_size,
977                consistent,
978            );
979            (table_rcu, lsi_rcu)
980        } else {
981            (
982                0.0,
983                crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
984            )
985        };
986        let mut lsi_units = std::collections::HashMap::new();
987        lsi_units.insert(request.index_name.as_ref().unwrap().clone(), lsi_cap);
988        crate::types::consumed_capacity_with_secondary_indexes(
989            &request.table_name,
990            table_cap,
991            &std::collections::HashMap::new(),
992            &lsi_units,
993            &request.return_consumed_capacity,
994        )
995    } else {
996        crate::types::consumed_capacity(
997            &request.table_name,
998            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
999            &request.return_consumed_capacity,
1000        )
1001    };
1002
1003    Ok(QueryResponse {
1004        items: if is_count { None } else { Some(items) },
1005        count,
1006        scanned_count,
1007        last_evaluated_key,
1008        consumed_capacity,
1009    })
1010}
1011
1012fn build_last_evaluated_key(
1013    item: &Item,
1014    pk_name: &str,
1015    sk_name: Option<&str>,
1016) -> HashMap<String, AttributeValue> {
1017    let mut key = HashMap::new();
1018    if let Some(pk_val) = item.get(pk_name) {
1019        key.insert(pk_name.to_string(), pk_val.clone());
1020    }
1021    if let Some(sk) = sk_name {
1022        if let Some(sk_val) = item.get(sk) {
1023            key.insert(sk.to_string(), sk_val.clone());
1024        }
1025    }
1026    key
1027}