Skip to main content

dynoxide/actions/
query.rs

1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage_backend::StorageBackend;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// 1MB response size limit for Query/Scan.
10const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12/// Internal deserialization struct for detecting missing fields.
13#[derive(Debug, Default, Deserialize)]
14struct QueryRequestRaw {
15    #[serde(rename = "TableName", default)]
16    table_name: Option<String>,
17    #[serde(rename = "KeyConditionExpression", default)]
18    key_condition_expression: Option<String>,
19    #[serde(rename = "FilterExpression", default)]
20    filter_expression: Option<String>,
21    #[serde(rename = "ProjectionExpression", default)]
22    projection_expression: Option<String>,
23    #[serde(rename = "ExpressionAttributeNames", default)]
24    expression_attribute_names: Option<HashMap<String, String>>,
25    #[serde(rename = "ExpressionAttributeValues", default)]
26    expression_attribute_values: Option<HashMap<String, AttributeValue>>,
27    #[serde(rename = "ScanIndexForward", default = "default_true")]
28    scan_index_forward: bool,
29    #[serde(rename = "Limit", default)]
30    limit: Option<usize>,
31    #[serde(rename = "ExclusiveStartKey", default)]
32    exclusive_start_key: Option<serde_json::Value>,
33    #[serde(rename = "Select", default)]
34    select: Option<String>,
35    #[serde(rename = "ConsistentRead", default)]
36    consistent_read: Option<bool>,
37    #[serde(rename = "IndexName", default)]
38    index_name: Option<String>,
39    #[serde(rename = "ReturnConsumedCapacity", default)]
40    return_consumed_capacity: Option<String>,
41    #[serde(rename = "KeyConditions", default)]
42    key_conditions: Option<serde_json::Value>,
43    #[serde(rename = "AttributesToGet", default)]
44    attributes_to_get: Option<Vec<String>>,
45    #[serde(rename = "QueryFilter", default)]
46    query_filter: Option<serde_json::Value>,
47    #[serde(rename = "ConditionalOperator", default)]
48    conditional_operator: Option<String>,
49}
50
51fn default_true() -> bool {
52    true
53}
54
55#[derive(Debug, Default)]
56pub struct QueryRequest {
57    pub table_name: String,
58    pub key_condition_expression: Option<String>,
59    pub filter_expression: Option<String>,
60    pub projection_expression: Option<String>,
61    pub expression_attribute_names: Option<HashMap<String, String>>,
62    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
63    pub scan_index_forward: bool,
64    pub limit: Option<usize>,
65    pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
66    pub select: Option<String>,
67    pub consistent_read: Option<bool>,
68    pub index_name: Option<String>,
69    pub return_consumed_capacity: Option<String>,
70    pub key_conditions: Option<serde_json::Value>,
71    pub attributes_to_get: Option<Vec<String>>,
72    pub query_filter: Option<serde_json::Value>,
73    pub conditional_operator: Option<String>,
74    /// Raw JSON for ExclusiveStartKey when deserialized from HTTP request.
75    /// Parsed lazily in `execute()` after other validations run.
76    /// When constructed directly (e.g. from MCP), this is `None` and
77    /// `exclusive_start_key` is used instead.
78    pub exclusive_start_key_raw: Option<serde_json::Value>,
79}
80
81impl<'de> serde::Deserialize<'de> for QueryRequest {
82    fn deserialize<D: serde::Deserializer<'de>>(
83        deserializer: D,
84    ) -> std::result::Result<Self, D::Error> {
85        let raw = QueryRequestRaw::deserialize(deserializer)?;
86        use crate::validation::{
87            TableNameContext, format_validation_errors, table_name_constraint_errors,
88        };
89
90        let mut errors = Vec::new();
91        errors.extend(table_name_constraint_errors(
92            raw.table_name.as_deref(),
93            TableNameContext::ReadWrite,
94        ));
95        let table_name = raw.table_name.unwrap_or_default();
96
97        // ReturnConsumedCapacity enum
98        if let Some(ref rcc) = raw.return_consumed_capacity {
99            if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
100                errors.push(format!(
101                    "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
102                     Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
103                    rcc
104                ));
105            }
106        }
107
108        // Select enum
109        if let Some(ref sel) = raw.select {
110            if ![
111                "ALL_ATTRIBUTES",
112                "ALL_PROJECTED_ATTRIBUTES",
113                "COUNT",
114                "SPECIFIC_ATTRIBUTES",
115            ]
116            .contains(&sel.as_str())
117            {
118                errors.push(format!(
119                    "Value '{}' at 'select' failed to satisfy constraint: \
120                     Member must satisfy enum value set: [SPECIFIC_ATTRIBUTES, COUNT, ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES]",
121                    sel
122                ));
123            }
124        }
125
126        // Limit must be >= 1.
127        // AWS DynamoDB's Query message diverges from Scan: Query omits the rejected
128        // value entirely and capitalises 'Limit', whereas Scan keeps the value and
129        // lowercases 'limit'. Do not collapse these into a shared helper.
130        if let Some(limit) = raw.limit {
131            if limit == 0 {
132                errors.push(
133                    "Value at 'Limit' failed to satisfy constraint: \
134                     Member must have value greater than or equal to 1"
135                        .to_string(),
136                );
137            }
138        }
139
140        if let Some(msg) = format_validation_errors(&errors) {
141            return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
142        }
143
144        Ok(QueryRequest {
145            table_name,
146            key_condition_expression: raw.key_condition_expression,
147            filter_expression: raw.filter_expression,
148            projection_expression: raw.projection_expression,
149            expression_attribute_names: raw.expression_attribute_names,
150            expression_attribute_values: raw.expression_attribute_values,
151            scan_index_forward: raw.scan_index_forward,
152            limit: raw.limit,
153            exclusive_start_key: None,
154            select: raw.select,
155            consistent_read: raw.consistent_read,
156            index_name: raw.index_name,
157            return_consumed_capacity: raw.return_consumed_capacity,
158            key_conditions: raw.key_conditions,
159            attributes_to_get: raw.attributes_to_get,
160            query_filter: raw.query_filter,
161            conditional_operator: raw.conditional_operator,
162            exclusive_start_key_raw: raw.exclusive_start_key,
163        })
164    }
165}
166
167#[derive(Debug, Default, Serialize)]
168pub struct QueryResponse {
169    #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
170    pub items: Option<Vec<Item>>,
171    #[serde(rename = "Count")]
172    pub count: usize,
173    #[serde(rename = "ScannedCount")]
174    pub scanned_count: usize,
175    #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
176    pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
177    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
178    pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
179}
180
181pub async fn execute<S: StorageBackend>(
182    storage: &S,
183    mut request: QueryRequest,
184) -> Result<QueryResponse> {
185    // Validate table name format before checking existence (DynamoDB validates input first)
186    crate::validation::validate_table_name(&request.table_name)?;
187
188    // ---- Expression vs non-expression mixing validation ----
189    // DynamoDB checks this before anything else (except table name format and ESK values).
190    {
191        let mut non_expr = Vec::new();
192        let mut expr = Vec::new();
193        if request.attributes_to_get.is_some() {
194            non_expr.push("AttributesToGet");
195        }
196        if request.query_filter.is_some()
197            && request.query_filter.as_ref().is_some_and(|v| !v.is_null())
198        {
199            non_expr.push("QueryFilter");
200        }
201        if request.conditional_operator.is_some() {
202            non_expr.push("ConditionalOperator");
203        }
204        if request.key_conditions.is_some()
205            && request
206                .key_conditions
207                .as_ref()
208                .is_some_and(|v| !v.is_null())
209        {
210            non_expr.push("KeyConditions");
211        }
212        if request.projection_expression.is_some() {
213            expr.push("ProjectionExpression");
214        }
215        if request.filter_expression.is_some() {
216            expr.push("FilterExpression");
217        }
218        if request.key_condition_expression.is_some() {
219            expr.push("KeyConditionExpression");
220        }
221        let no_raw_eav: Option<serde_json::Value> = None;
222        let ctx = helpers::ExpressionParamContext {
223            non_expression_params: non_expr,
224            expression_params: expr,
225            all_expression_param_names: vec!["FilterExpression", "KeyConditionExpression"],
226            expression_attribute_names: &request.expression_attribute_names,
227            expression_attribute_values: &request.expression_attribute_values,
228            expression_attribute_values_raw: &no_raw_eav,
229        };
230        helpers::validate_expression_params(&ctx)?;
231    }
232
233    // ---- Validate filter attribute values (before argument counts, matching DynamoDB) ----
234    helpers::validate_filter_conditions_raw(request.query_filter.as_ref(), "QueryFilter")?;
235    helpers::validate_filter_conditions_raw(request.key_conditions.as_ref(), "KeyConditions")?;
236
237    // ---- Validate filter argument counts and type compatibility ----
238    helpers::validate_filter_condition_args(request.query_filter.as_ref())?;
239    helpers::validate_filter_condition_args(request.key_conditions.as_ref())?;
240
241    // ---- Validate duplicate AttributesToGet ----
242    if let Some(ref attrs) = request.attributes_to_get {
243        helpers::validate_attributes_to_get_no_duplicates(attrs)?;
244    }
245
246    // ---- Parse ExclusiveStartKey from JSON value ----
247    let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
248        Some(helpers::parse_exclusive_start_key(esk_val)?)
249    } else {
250        request.exclusive_start_key.clone()
251    };
252
253    // ---- Validate expression syntax BEFORE table existence ----
254    // DynamoDB validates KeyConditionExpression, FilterExpression, and
255    // ProjectionExpression syntax before checking if the table exists.
256    if let Some(ref kce) = request.key_condition_expression {
257        if kce.is_empty() {
258            return Err(DynoxideError::ValidationException(
259                "Invalid KeyConditionExpression: The expression can not be empty;".to_string(),
260            ));
261        }
262    }
263    if let Some(ref fe) = request.filter_expression {
264        if fe.is_empty() {
265            if request.query_filter.is_none() || request.filter_expression.as_deref() == Some("") {
266                return Err(DynoxideError::ValidationException(
267                    "Invalid FilterExpression: The expression can not be empty;".to_string(),
268                ));
269            }
270        } else {
271            let parsed_fe = expressions::condition::parse(fe).map_err(|e| {
272                DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
273            })?;
274            // Validate that all #name references are defined in ExpressionAttributeNames
275            // (before table existence check, matching DynamoDB's validation ordering)
276            if let Err(e) = expressions::condition::validate_name_refs(
277                &parsed_fe,
278                &request.expression_attribute_names,
279            ) {
280                return Err(DynoxideError::ValidationException(format!(
281                    "Invalid FilterExpression: {e}"
282                )));
283            }
284            if let Err(e) = expressions::condition::validate_operand_semantics(
285                &parsed_fe,
286                &request.expression_attribute_names,
287                &request.expression_attribute_values,
288            ) {
289                return Err(DynoxideError::ValidationException(format!(
290                    "Invalid FilterExpression: {e}"
291                )));
292            }
293        }
294    }
295    if let Some(ref pe) = request.projection_expression {
296        if pe.is_empty() {
297            return Err(DynoxideError::ValidationException(
298                "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
299            ));
300        }
301    }
302
303    // SPECIFIC_ATTRIBUTES requires ProjectionExpression or AttributesToGet
304    if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
305        && request.projection_expression.is_none()
306        && request.attributes_to_get.is_none()
307    {
308        return Err(DynoxideError::ValidationException(
309            "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
310                .to_string(),
311        ));
312    }
313    // For KeyConditionExpression, validate syntax early too
314    if let Some(ref kce) = request.key_condition_expression {
315        if !kce.is_empty() {
316            // Create a temporary tracker for early syntax validation
317            let temp_tracker = crate::expressions::TrackedExpressionAttributes::new(
318                &request.expression_attribute_names,
319                &request.expression_attribute_values,
320            );
321            if let Err(e) = expressions::key_condition::parse(kce, &temp_tracker) {
322                return Err(DynoxideError::ValidationException(e));
323            }
324        }
325    }
326
327    let meta = helpers::require_table_for_item_op(storage, &request.table_name).await?;
328    let table_key_schema = helpers::parse_key_schema(&meta)?;
329
330    // Determine effective partition key name early so we can pass it to
331    // the legacy KeyConditions converter (ensures correct ordering when
332    // both hash and range keys use EQ).
333    let effective_pk_for_kc = if let Some(ref index_name) = request.index_name {
334        if let Some((pk, _)) = request
335            .index_name
336            .as_ref()
337            .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok())
338        {
339            pk
340        } else if let Ok((pk, _)) = super::gsi::parse_gsi_key_schema(&meta, index_name) {
341            pk
342        } else {
343            table_key_schema.partition_key.clone()
344        }
345    } else {
346        table_key_schema.partition_key.clone()
347    };
348
349    // Convert legacy KeyConditions to KeyConditionExpression if no expression is set
350    if request.key_condition_expression.is_none() {
351        if let Some(ref kc_val) = request.key_conditions {
352            if let Ok(kc) =
353                serde_json::from_value::<HashMap<String, helpers::KeyCondition>>(kc_val.clone())
354            {
355                if !kc.is_empty() {
356                    let converted =
357                        helpers::convert_key_conditions(&kc, Some(&effective_pk_for_kc))?;
358                    request.key_condition_expression = Some(converted.expression);
359                    let expr_values = request
360                        .expression_attribute_values
361                        .get_or_insert_with(HashMap::new);
362                    expr_values.extend(converted.attribute_values);
363                    let expr_names = request
364                        .expression_attribute_names
365                        .get_or_insert_with(HashMap::new);
366                    expr_names.extend(converted.attribute_names);
367                }
368            }
369        }
370    }
371
372    // Convert legacy QueryFilter to FilterExpression if no expression is set
373    if request.filter_expression.is_none() {
374        if let Some(ref qf_val) = request.query_filter {
375            if let Ok(qf) =
376                serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(qf_val.clone())
377            {
378                if !qf.is_empty() {
379                    let converted = helpers::convert_filter_conditions(
380                        &qf,
381                        request.conditional_operator.as_deref(),
382                    )?;
383                    if !converted.expression.is_empty() {
384                        request.filter_expression = Some(converted.expression);
385                        let expr_values = request
386                            .expression_attribute_values
387                            .get_or_insert_with(HashMap::new);
388                        expr_values.extend(converted.attribute_values);
389                        let expr_names = request
390                            .expression_attribute_names
391                            .get_or_insert_with(HashMap::new);
392                        expr_names.extend(converted.attribute_names);
393                    }
394                }
395            }
396        }
397    }
398
399    // Convert legacy AttributesToGet to projection
400    let legacy_projection = if request.projection_expression.is_none() {
401        request
402            .attributes_to_get
403            .as_ref()
404            .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
405    } else {
406        None
407    };
408
409    // Ensure KeyConditionExpression is present (required)
410    let key_condition_expression = request.key_condition_expression.as_deref().ok_or_else(|| {
411        DynoxideError::ValidationException(
412            "Either the KeyConditions or KeyConditionExpression parameter must be specified in the request."
413                .to_string(),
414        )
415    })?;
416    let key_condition_expression = key_condition_expression.to_string();
417
418    // Determine effective key schema (GSI, LSI, or base table)
419    let lsi_keys = request
420        .index_name
421        .as_ref()
422        .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
423    let is_lsi = lsi_keys.is_some();
424
425    // ConsistentRead is not supported on GSIs (LSIs are fine)
426    if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
427        return Err(DynoxideError::ValidationException(
428            "Consistent reads are not supported on global secondary indexes".to_string(),
429        ));
430    }
431
432    // Parse full index definition to get projection type
433    let index_projection_type = if let Some(ref index_name) = request.index_name {
434        if is_lsi {
435            super::lsi::parse_lsi_defs(&meta)?
436                .into_iter()
437                .find(|l| l.index_name == *index_name)
438                .map(|l| l.projection_type)
439        } else {
440            super::gsi::parse_gsi_defs(&meta)?
441                .into_iter()
442                .find(|g| g.index_name == *index_name)
443                .map(|g| g.projection_type)
444        }
445    } else {
446        None
447    };
448
449    let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
450        if let Some(keys) = lsi_keys {
451            keys
452        } else {
453            super::gsi::parse_gsi_key_schema(&meta, index_name)?
454        }
455    } else {
456        (
457            table_key_schema.partition_key.clone(),
458            table_key_schema.sort_key.clone(),
459        )
460    };
461
462    // ---- Validate ExclusiveStartKey structure against key schema ----
463    if let Some(ref esk) = exclusive_start_key {
464        // Stage 1+2: count check + index key type check
465        helpers::validate_esk_count_and_index_keys(
466            esk,
467            &meta,
468            request.index_name.as_deref(),
469            "The provided starting key is invalid",
470        )?;
471        // Stage 3: table key type check
472        helpers::validate_esk_table_keys(esk, &meta)?;
473    }
474
475    // Create tracker for unused expression attribute names/values
476    let tracker = crate::expressions::TrackedExpressionAttributes::new(
477        &request.expression_attribute_names,
478        &request.expression_attribute_values,
479    );
480
481    // Parse KeyConditionExpression
482    let key_cond = expressions::key_condition::parse(&key_condition_expression, &tracker)
483        .map_err(DynoxideError::ValidationException)?;
484
485    // Validate pk_name matches the effective partition key
486    if key_cond.pk_name != effective_pk {
487        return Err(DynoxideError::ValidationException(format!(
488            "Query condition missed key schema element: {}",
489            effective_pk
490        )));
491    }
492
493    // Resolve values
494    let resolved = expressions::key_condition::resolve_values(&key_cond, &tracker)
495        .map_err(DynoxideError::ValidationException)?;
496
497    // Get pk string
498    let pk_str = resolved.pk_value.to_key_string().ok_or_else(|| {
499        DynoxideError::ValidationException(
500            "Cannot convert partition key value to string".to_string(),
501        )
502    })?;
503
504    // Build sk SQL conditions
505    let mut sk_sql_parts = Vec::new();
506    let mut sk_param_values = Vec::new();
507
508    if let Some(ref sk_cond) = resolved.sk_condition {
509        // Validate sk name matches effective sort key
510        if let Some(ref eff_sk) = effective_sk {
511            if sk_cond.sk_name() != eff_sk {
512                return Err(DynoxideError::ValidationException(format!(
513                    "Query condition missed key schema element: {eff_sk}"
514                )));
515            }
516        } else {
517            return Err(DynoxideError::ValidationException(
518                "Query filter contains a sort key condition but the table has no sort key"
519                    .to_string(),
520            ));
521        }
522
523        let conditions = sk_cond.to_sql_conditions();
524        for (i, (op, val)) in conditions.iter().enumerate() {
525            let param_idx = i + 2; // pk is ?1, sk params start at ?2
526            if op == "LIKE" {
527                sk_sql_parts.push(format!("AND sk LIKE ?{param_idx} ESCAPE '\\'"));
528            } else {
529                sk_sql_parts.push(format!("AND sk {op} ?{param_idx}"));
530            }
531            sk_param_values.push(val.clone());
532        }
533    }
534
535    // ---- Validate QueryFilter/FilterExpression don't reference primary key attrs ----
536    // Collect effective key attribute names
537    let mut effective_key_attrs = vec![effective_pk.clone()];
538    if let Some(ref sk) = effective_sk {
539        effective_key_attrs.push(sk.clone());
540    }
541
542    // Check legacy QueryFilter
543    if let Some(ref qf_val) = request.query_filter {
544        if let Some(obj) = qf_val.as_object() {
545            for attr_name in obj.keys() {
546                if effective_key_attrs.contains(attr_name) {
547                    return Err(DynoxideError::ValidationException(format!(
548                        "QueryFilter can only contain non-primary key attributes: \
549                         Primary key attribute: {attr_name}"
550                    )));
551                }
552            }
553        }
554    }
555
556    // Check FilterExpression for key attribute references (only for user-supplied expressions,
557    // not those converted from QueryFilter - QueryFilter is checked separately above)
558    if request.query_filter.is_none() {
559        if let Some(ref fe) = request.filter_expression {
560            if let Ok(parsed_fe) = expressions::condition::parse(fe) {
561                let top_attrs = expressions::condition::extract_top_level_attributes(
562                    &parsed_fe,
563                    &request.expression_attribute_names,
564                );
565                for attr in &top_attrs {
566                    if effective_key_attrs.contains(attr) {
567                        return Err(DynoxideError::ValidationException(format!(
568                            "Filter Expression can only contain non-primary key attributes: \
569                             Primary key attribute: {attr}"
570                        )));
571                    }
572                }
573                // Check for non-scalar key access in FilterExpression
574                // Build index key attribute lists
575                let mut index_key_attrs = Vec::new();
576                if request.index_name.is_some() {
577                    // Index keys that are not also table keys
578                    if !effective_key_attrs
579                        .iter()
580                        .any(|k| k == &table_key_schema.partition_key)
581                    {
582                        // This shouldn't normally happen for query, but just in case
583                    }
584                    // Check all effective key attrs for non-scalar access
585                    for k in &effective_key_attrs {
586                        if ![table_key_schema.partition_key.clone()]
587                            .iter()
588                            .chain(table_key_schema.sort_key.iter())
589                            .any(|tk| tk == k)
590                        {
591                            index_key_attrs.push(k.clone());
592                        }
593                    }
594                }
595                let base_key_attrs: Vec<String> = {
596                    let mut v = vec![table_key_schema.partition_key.clone()];
597                    if let Some(ref sk) = table_key_schema.sort_key {
598                        v.push(sk.clone());
599                    }
600                    v
601                };
602                if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
603                    &parsed_fe,
604                    &request.expression_attribute_names,
605                    &base_key_attrs,
606                    &index_key_attrs,
607                ) {
608                    let prefix = if is_index { "IndexKey" } else { "Key" };
609                    return Err(DynoxideError::ValidationException(format!(
610                        "Key attributes must be scalars; \
611                         list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
612                    )));
613                }
614            }
615        }
616    }
617
618    let is_index_query = request.index_name.is_some();
619
620    // Build ExclusiveStartKey sk value.
621    // For hash-only GSIs (no sort key), use empty string so the composite
622    // cursor (gsi_sk, table_pk, table_sk) can drive pagination.
623    let start_sk = if let Some(ref esk) = exclusive_start_key {
624        if let Some(ref sk_name) = effective_sk {
625            esk.get(sk_name).and_then(|v| v.to_key_string())
626        } else if is_index_query {
627            // Hash-only index: gsi_sk / lsi_sk is always ''
628            Some(String::new())
629        } else {
630            None
631        }
632    } else {
633        None
634    };
635
636    // For LSI and GSI queries, extract the base table keys from ExclusiveStartKey
637    // to enable composite cursor pagination.
638    let (start_base_pk, start_base_sk) = if is_index_query {
639        if let Some(ref esk) = exclusive_start_key {
640            let base_pk = esk
641                .get(&table_key_schema.partition_key)
642                .and_then(|v| v.to_key_string());
643            // When the base table is hash-only there is no base sort key, but the
644            // GSI/LSI row stores the empty-string default in its table_sk column.
645            // Default base_sk to "" (mirroring start_sk) so the composite cursor
646            // keeps its full width and disambiguates tied index keys by table_pk.
647            // Leaving it None collapses query_gsi_items to the 1-column gsi_sk
648            // cursor, which cannot advance past rows sharing the same index key
649            // and silently drops them. See scan.rs for the Scan-path counterpart.
650            let base_sk = if let Some(sk_name) = table_key_schema.sort_key.as_ref() {
651                esk.get(sk_name).and_then(|v| v.to_key_string())
652            } else {
653                Some(String::new())
654            };
655            (base_pk, base_sk)
656        } else {
657            (None, None)
658        }
659    } else {
660        (None, None)
661    };
662
663    // Validate Select=ALL_ATTRIBUTES against index projection type.
664    // For GSI with non-ALL projection, DynamoDB rejects ALL_ATTRIBUTES.
665    let is_select_all_attributes = request
666        .select
667        .as_deref()
668        .map(|s| s.eq_ignore_ascii_case("ALL_ATTRIBUTES"))
669        .unwrap_or(false);
670    let fetch_from_base_table = if is_select_all_attributes {
671        if let Some(ref proj_type) = index_projection_type {
672            if *proj_type != crate::types::ProjectionType::ALL {
673                if !is_lsi {
674                    return Err(DynoxideError::ValidationException(format!(
675                        "One or more parameter values were invalid: \
676                         Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
677                         because its projection type is not ALL",
678                        request.index_name.as_deref().unwrap_or("")
679                    )));
680                }
681                // LSI with non-ALL projection: fetch full items from base table
682                true
683            } else {
684                false
685            }
686        } else {
687            false
688        }
689    } else {
690        false
691    };
692
693    // Combine sk conditions into a single SQL fragment
694    let sk_condition_sql = if sk_sql_parts.is_empty() {
695        None
696    } else {
697        Some(sk_sql_parts.join(" "))
698    };
699
700    let fetch_limit = request.limit;
701    let sk_params_refs: Vec<&str> = sk_param_values.iter().map(|s| s.as_str()).collect();
702
703    // Query either GSI table or base table
704    let query_params = crate::storage::QueryParams {
705        sk_condition: sk_condition_sql.as_deref(),
706        sk_params: &sk_params_refs,
707        forward: request.scan_index_forward,
708        limit: fetch_limit,
709        exclusive_start_sk: start_sk.as_deref(),
710        exclusive_start_base_pk: start_base_pk.as_deref(),
711        exclusive_start_base_sk: start_base_sk.as_deref(),
712    };
713    let rows = if let Some(ref index_name) = request.index_name {
714        if is_lsi {
715            storage
716                .query_lsi_items(&request.table_name, index_name, &pk_str, &query_params)
717                .await?
718        } else {
719            storage
720                .query_gsi_items(&request.table_name, index_name, &pk_str, &query_params)
721                .await?
722        }
723    } else {
724        storage
725            .query_items(&request.table_name, &pk_str, &query_params)
726            .await?
727    };
728
729    // Parse filter expression if present
730    let filter_expr = request
731        .filter_expression
732        .as_ref()
733        .map(|expr| expressions::condition::parse(expr))
734        .transpose()
735        .map_err(DynoxideError::ValidationException)?;
736
737    // Parse projection expression if present; fall back to legacy AttributesToGet
738    let projection = if let Some(ref proj_expr) = request.projection_expression {
739        Some(
740            expressions::projection::parse(proj_expr)
741                .map_err(DynoxideError::ValidationException)?,
742        )
743    } else {
744        legacy_projection.clone()
745    };
746
747    // Pre-register expression references so unused check works even with zero items
748    if let Some(ref filter) = filter_expr {
749        tracker.track_condition_expr(filter);
750    }
751    if let Some(ref proj) = projection {
752        tracker.track_projection_expr(proj);
753    }
754
755    // Untracked variant for the per-item hot loop — tracking already done above
756    let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
757        &request.expression_attribute_names,
758        &request.expression_attribute_values,
759    );
760
761    // Determine if SELECT COUNT
762    let is_count = request
763        .select
764        .as_deref()
765        .map(|s| s.eq_ignore_ascii_case("COUNT"))
766        .unwrap_or(false);
767
768    // Key attribute names for projection (use effective keys for GSI)
769    let mut key_attrs = vec![effective_pk.clone()];
770    if let Some(ref sk) = effective_sk {
771        key_attrs.push(sk.clone());
772    }
773    // Also include base table keys when querying a GSI
774    if request.index_name.is_some() {
775        if !key_attrs.contains(&table_key_schema.partition_key) {
776            key_attrs.push(table_key_schema.partition_key.clone());
777        }
778        if let Some(ref sk) = table_key_schema.sort_key {
779            if !key_attrs.contains(sk) {
780                key_attrs.push(sk.clone());
781            }
782        }
783    }
784
785    let mut items = Vec::new();
786    let mut scanned_count = 0;
787    let mut filtered_count = 0;
788    let mut cumulative_size = 0;
789    let mut last_evaluated_item: Option<Item> = None;
790    let mut truncated_by_size = false;
791
792    // Track sizes separately for ALL_ATTRIBUTES LSI queries where both
793    // index reads and base table reads contribute to ConsumedCapacity.
794    let mut base_table_cumulative_size = 0usize;
795    let mut index_cumulative_size = 0usize;
796
797    for (_pk, _sk, item_json) in &rows {
798        let index_item: Item = serde_json::from_str(item_json).map_err(|e| {
799            DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
800        })?;
801
802        // If Select=ALL_ATTRIBUTES on LSI with non-ALL projection, fetch full
803        // item from the base table for the response while using the index item
804        // for cursor tracking.
805        index_cumulative_size += crate::types::item_size(&index_item);
806        let item = if fetch_from_base_table {
807            let base_pk = index_item
808                .get(&table_key_schema.partition_key)
809                .and_then(|v| v.to_key_string())
810                .unwrap_or_default();
811            let base_sk = table_key_schema
812                .sort_key
813                .as_ref()
814                .and_then(|sk_name| index_item.get(sk_name))
815                .and_then(|v| v.to_key_string())
816                .unwrap_or_default();
817            if let Some(full_json) = storage
818                .get_item(&request.table_name, &base_pk, &base_sk)
819                .await?
820            {
821                let full_item: Item = serde_json::from_str(&full_json).map_err(|e| {
822                    DynoxideError::InternalServerError(format!("Bad item JSON: {e}"))
823                })?;
824                base_table_cumulative_size += crate::types::item_size(&full_item);
825                full_item
826            } else {
827                index_item.clone()
828            }
829        } else {
830            index_item.clone()
831        };
832
833        scanned_count += 1;
834
835        // Check 1MB limit BEFORE filtering — DynamoDB counts all evaluated data
836        // towards the 1MB response size limit, not just items that pass the filter.
837        let item_size = crate::types::item_size(&item);
838        if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
839            truncated_by_size = true;
840            break;
841        }
842        cumulative_size += item_size;
843
844        // Apply filter
845        if let Some(ref filter) = filter_expr {
846            let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
847                .map_err(DynoxideError::ValidationException)?;
848            if !passes {
849                last_evaluated_item = Some(index_item);
850                continue;
851            }
852        }
853
854        filtered_count += 1;
855
856        // Apply projection -- do NOT auto-include key attributes when the
857        // user explicitly specified ProjectionExpression or AttributesToGet.
858        let result_item = if let Some(ref proj) = projection {
859            let no_keys: &[String] = &[];
860            expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
861                .map_err(DynoxideError::ValidationException)?
862        } else {
863            item
864        };
865
866        last_evaluated_item = Some(index_item);
867        if !is_count {
868            items.push(result_item);
869        }
870    }
871
872    // Check for unused expression attribute names/values
873    tracker.check_unused()?;
874
875    let count = if is_count {
876        filtered_count
877    } else {
878        items.len()
879    };
880
881    // Determine LastEvaluatedKey
882    // We return LEK if: we hit the Limit, or we hit the 1MB limit
883    let has_more = truncated_by_size
884        || (fetch_limit.is_some() && scanned_count >= fetch_limit.unwrap_or(usize::MAX));
885
886    // For index queries, include the base table primary key in LastEvaluatedKey
887    // alongside the effective (index) keys so the cursor can uniquely identify
888    // the position. For LSIs, include the table sort key. For GSIs, include
889    // both the table partition key and sort key.
890    let is_gsi_query = request.index_name.is_some() && !is_lsi;
891    let last_evaluated_key = if has_more {
892        last_evaluated_item.map(|item| {
893            let mut key = build_last_evaluated_key(&item, &effective_pk, effective_sk.as_deref());
894            // For LSI queries, add the table sort key if different from the index sort key
895            if is_lsi {
896                if let Some(tsk) = table_key_schema.sort_key.as_deref() {
897                    if !key.contains_key(tsk) {
898                        if let Some(v) = item.get(tsk) {
899                            key.insert(tsk.to_string(), v.clone());
900                        }
901                    }
902                }
903            }
904            // For GSI queries, add the base table primary key (pk and sk)
905            if is_gsi_query {
906                if !key.contains_key(&table_key_schema.partition_key) {
907                    if let Some(v) = item.get(&table_key_schema.partition_key) {
908                        key.insert(table_key_schema.partition_key.clone(), v.clone());
909                    }
910                }
911                if let Some(ref tsk) = table_key_schema.sort_key {
912                    if !key.contains_key(tsk) {
913                        if let Some(v) = item.get(tsk) {
914                            key.insert(tsk.clone(), v.clone());
915                        }
916                    }
917                }
918            }
919            key
920        })
921    } else {
922        None
923    };
924
925    // Attribute read capacity to the index if querying one
926    let is_gsi = is_gsi_query;
927    let consistent = request.consistent_read.unwrap_or(false);
928    let consumed_capacity = if is_gsi {
929        let mut gsi_units = std::collections::HashMap::new();
930        gsi_units.insert(
931            request.index_name.as_ref().unwrap().clone(),
932            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
933        );
934        crate::types::consumed_capacity_with_indexes(
935            &request.table_name,
936            0.0,
937            &gsi_units,
938            &request.return_consumed_capacity,
939        )
940    } else if is_lsi {
941        // When fetching from the base table (ALL_ATTRIBUTES on non-ALL LSI),
942        // split capacity between the index read and the table read.
943        let (table_cap, lsi_cap) = if fetch_from_base_table {
944            let table_rcu = crate::types::read_capacity_units_with_consistency(
945                base_table_cumulative_size,
946                consistent,
947            );
948            let lsi_rcu = crate::types::read_capacity_units_with_consistency(
949                index_cumulative_size,
950                consistent,
951            );
952            (table_rcu, lsi_rcu)
953        } else {
954            (
955                0.0,
956                crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
957            )
958        };
959        let mut lsi_units = std::collections::HashMap::new();
960        lsi_units.insert(request.index_name.as_ref().unwrap().clone(), lsi_cap);
961        crate::types::consumed_capacity_with_secondary_indexes(
962            &request.table_name,
963            table_cap,
964            &std::collections::HashMap::new(),
965            &lsi_units,
966            &request.return_consumed_capacity,
967        )
968    } else {
969        crate::types::consumed_capacity(
970            &request.table_name,
971            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
972            &request.return_consumed_capacity,
973        )
974    };
975
976    Ok(QueryResponse {
977        items: if is_count { None } else { Some(items) },
978        count,
979        scanned_count,
980        last_evaluated_key,
981        consumed_capacity,
982    })
983}
984
985fn build_last_evaluated_key(
986    item: &Item,
987    pk_name: &str,
988    sk_name: Option<&str>,
989) -> HashMap<String, AttributeValue> {
990    let mut key = HashMap::new();
991    if let Some(pk_val) = item.get(pk_name) {
992        key.insert(pk_name.to_string(), pk_val.clone());
993    }
994    if let Some(sk) = sk_name {
995        if let Some(sk_val) = item.get(sk) {
996            key.insert(sk.to_string(), sk_val.clone());
997        }
998    }
999    key
1000}