Skip to main content

dynoxide/actions/
query.rs

1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage::Storage;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// 1MB response size limit for Query/Scan.
10const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12/// Internal deserialization struct for detecting missing fields.
13#[derive(Debug, Default, Deserialize)]
14struct QueryRequestRaw {
15    #[serde(rename = "TableName", default)]
16    table_name: Option<String>,
17    #[serde(rename = "KeyConditionExpression", default)]
18    key_condition_expression: Option<String>,
19    #[serde(rename = "FilterExpression", default)]
20    filter_expression: Option<String>,
21    #[serde(rename = "ProjectionExpression", default)]
22    projection_expression: Option<String>,
23    #[serde(rename = "ExpressionAttributeNames", default)]
24    expression_attribute_names: Option<HashMap<String, String>>,
25    #[serde(rename = "ExpressionAttributeValues", default)]
26    expression_attribute_values: Option<HashMap<String, AttributeValue>>,
27    #[serde(rename = "ScanIndexForward", default = "default_true")]
28    scan_index_forward: bool,
29    #[serde(rename = "Limit", default)]
30    limit: Option<usize>,
31    #[serde(rename = "ExclusiveStartKey", default)]
32    exclusive_start_key: Option<serde_json::Value>,
33    #[serde(rename = "Select", default)]
34    select: Option<String>,
35    #[serde(rename = "ConsistentRead", default)]
36    consistent_read: Option<bool>,
37    #[serde(rename = "IndexName", default)]
38    index_name: Option<String>,
39    #[serde(rename = "ReturnConsumedCapacity", default)]
40    return_consumed_capacity: Option<String>,
41    #[serde(rename = "KeyConditions", default)]
42    key_conditions: Option<serde_json::Value>,
43    #[serde(rename = "AttributesToGet", default)]
44    attributes_to_get: Option<Vec<String>>,
45    #[serde(rename = "QueryFilter", default)]
46    query_filter: Option<serde_json::Value>,
47    #[serde(rename = "ConditionalOperator", default)]
48    conditional_operator: Option<String>,
49}
50
51fn default_true() -> bool {
52    true
53}
54
55#[derive(Debug, Default)]
56pub struct QueryRequest {
57    pub table_name: String,
58    pub key_condition_expression: Option<String>,
59    pub filter_expression: Option<String>,
60    pub projection_expression: Option<String>,
61    pub expression_attribute_names: Option<HashMap<String, String>>,
62    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
63    pub scan_index_forward: bool,
64    pub limit: Option<usize>,
65    pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
66    pub select: Option<String>,
67    pub consistent_read: Option<bool>,
68    pub index_name: Option<String>,
69    pub return_consumed_capacity: Option<String>,
70    pub key_conditions: Option<serde_json::Value>,
71    pub attributes_to_get: Option<Vec<String>>,
72    pub query_filter: Option<serde_json::Value>,
73    pub conditional_operator: Option<String>,
74    /// Raw JSON for ExclusiveStartKey when deserialized from HTTP request.
75    /// Parsed lazily in `execute()` after other validations run.
76    /// When constructed directly (e.g. from MCP), this is `None` and
77    /// `exclusive_start_key` is used instead.
78    pub exclusive_start_key_raw: Option<serde_json::Value>,
79}
80
81impl<'de> serde::Deserialize<'de> for QueryRequest {
82    fn deserialize<D: serde::Deserializer<'de>>(
83        deserializer: D,
84    ) -> std::result::Result<Self, D::Error> {
85        let raw = QueryRequestRaw::deserialize(deserializer)?;
86        use crate::validation::{
87            TableNameContext, format_validation_errors, table_name_constraint_errors,
88        };
89
90        let mut errors = Vec::new();
91        errors.extend(table_name_constraint_errors(
92            raw.table_name.as_deref(),
93            TableNameContext::ReadWrite,
94        ));
95        let table_name = raw.table_name.unwrap_or_default();
96
97        // ReturnConsumedCapacity enum
98        if let Some(ref rcc) = raw.return_consumed_capacity {
99            if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
100                errors.push(format!(
101                    "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
102                     Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
103                    rcc
104                ));
105            }
106        }
107
108        // Select enum
109        if let Some(ref sel) = raw.select {
110            if ![
111                "ALL_ATTRIBUTES",
112                "ALL_PROJECTED_ATTRIBUTES",
113                "COUNT",
114                "SPECIFIC_ATTRIBUTES",
115            ]
116            .contains(&sel.as_str())
117            {
118                errors.push(format!(
119                    "Value '{}' at 'select' failed to satisfy constraint: \
120                     Member must satisfy enum value set: [SPECIFIC_ATTRIBUTES, COUNT, ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES]",
121                    sel
122                ));
123            }
124        }
125
126        // Limit must be >= 1.
127        // AWS DynamoDB's Query message diverges from Scan: Query omits the rejected
128        // value entirely and capitalises 'Limit', whereas Scan keeps the value and
129        // lowercases 'limit'. Do not collapse these into a shared helper.
130        if let Some(limit) = raw.limit {
131            if limit == 0 {
132                errors.push(
133                    "Value at 'Limit' failed to satisfy constraint: \
134                     Member must have value greater than or equal to 1"
135                        .to_string(),
136                );
137            }
138        }
139
140        if let Some(msg) = format_validation_errors(&errors) {
141            return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
142        }
143
144        Ok(QueryRequest {
145            table_name,
146            key_condition_expression: raw.key_condition_expression,
147            filter_expression: raw.filter_expression,
148            projection_expression: raw.projection_expression,
149            expression_attribute_names: raw.expression_attribute_names,
150            expression_attribute_values: raw.expression_attribute_values,
151            scan_index_forward: raw.scan_index_forward,
152            limit: raw.limit,
153            exclusive_start_key: None,
154            select: raw.select,
155            consistent_read: raw.consistent_read,
156            index_name: raw.index_name,
157            return_consumed_capacity: raw.return_consumed_capacity,
158            key_conditions: raw.key_conditions,
159            attributes_to_get: raw.attributes_to_get,
160            query_filter: raw.query_filter,
161            conditional_operator: raw.conditional_operator,
162            exclusive_start_key_raw: raw.exclusive_start_key,
163        })
164    }
165}
166
167#[derive(Debug, Default, Serialize)]
168pub struct QueryResponse {
169    #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
170    pub items: Option<Vec<Item>>,
171    #[serde(rename = "Count")]
172    pub count: usize,
173    #[serde(rename = "ScannedCount")]
174    pub scanned_count: usize,
175    #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
176    pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
177    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
178    pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
179}
180
181pub fn execute(storage: &Storage, mut request: QueryRequest) -> Result<QueryResponse> {
182    // Validate table name format before checking existence (DynamoDB validates input first)
183    crate::validation::validate_table_name(&request.table_name)?;
184
185    // ---- Expression vs non-expression mixing validation ----
186    // DynamoDB checks this before anything else (except table name format and ESK values).
187    {
188        let mut non_expr = Vec::new();
189        let mut expr = Vec::new();
190        if request.attributes_to_get.is_some() {
191            non_expr.push("AttributesToGet");
192        }
193        if request.query_filter.is_some()
194            && request.query_filter.as_ref().is_some_and(|v| !v.is_null())
195        {
196            non_expr.push("QueryFilter");
197        }
198        if request.conditional_operator.is_some() {
199            non_expr.push("ConditionalOperator");
200        }
201        if request.key_conditions.is_some()
202            && request
203                .key_conditions
204                .as_ref()
205                .is_some_and(|v| !v.is_null())
206        {
207            non_expr.push("KeyConditions");
208        }
209        if request.projection_expression.is_some() {
210            expr.push("ProjectionExpression");
211        }
212        if request.filter_expression.is_some() {
213            expr.push("FilterExpression");
214        }
215        if request.key_condition_expression.is_some() {
216            expr.push("KeyConditionExpression");
217        }
218        let no_raw_eav: Option<serde_json::Value> = None;
219        let ctx = helpers::ExpressionParamContext {
220            non_expression_params: non_expr,
221            expression_params: expr,
222            all_expression_param_names: vec!["FilterExpression", "KeyConditionExpression"],
223            expression_attribute_names: &request.expression_attribute_names,
224            expression_attribute_values: &request.expression_attribute_values,
225            expression_attribute_values_raw: &no_raw_eav,
226        };
227        helpers::validate_expression_params(&ctx)?;
228    }
229
230    // ---- Validate filter attribute values (before argument counts, matching DynamoDB) ----
231    helpers::validate_filter_conditions_raw(request.query_filter.as_ref(), "QueryFilter")?;
232    helpers::validate_filter_conditions_raw(request.key_conditions.as_ref(), "KeyConditions")?;
233
234    // ---- Validate filter argument counts and type compatibility ----
235    helpers::validate_filter_condition_args(request.query_filter.as_ref())?;
236    helpers::validate_filter_condition_args(request.key_conditions.as_ref())?;
237
238    // ---- Validate duplicate AttributesToGet ----
239    if let Some(ref attrs) = request.attributes_to_get {
240        helpers::validate_attributes_to_get_no_duplicates(attrs)?;
241    }
242
243    // ---- Parse ExclusiveStartKey from JSON value ----
244    let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
245        Some(helpers::parse_exclusive_start_key(esk_val)?)
246    } else {
247        request.exclusive_start_key.clone()
248    };
249
250    // ---- Validate expression syntax BEFORE table existence ----
251    // DynamoDB validates KeyConditionExpression, FilterExpression, and
252    // ProjectionExpression syntax before checking if the table exists.
253    if let Some(ref kce) = request.key_condition_expression {
254        if kce.is_empty() {
255            return Err(DynoxideError::ValidationException(
256                "Invalid KeyConditionExpression: The expression can not be empty;".to_string(),
257            ));
258        }
259    }
260    if let Some(ref fe) = request.filter_expression {
261        if fe.is_empty() {
262            if request.query_filter.is_none() || request.filter_expression.as_deref() == Some("") {
263                return Err(DynoxideError::ValidationException(
264                    "Invalid FilterExpression: The expression can not be empty;".to_string(),
265                ));
266            }
267        } else {
268            let parsed_fe = expressions::condition::parse(fe).map_err(|e| {
269                DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
270            })?;
271            // Validate that all #name references are defined in ExpressionAttributeNames
272            // (before table existence check, matching DynamoDB's validation ordering)
273            if let Err(e) = expressions::condition::validate_name_refs(
274                &parsed_fe,
275                &request.expression_attribute_names,
276            ) {
277                return Err(DynoxideError::ValidationException(format!(
278                    "Invalid FilterExpression: {e}"
279                )));
280            }
281        }
282    }
283    if let Some(ref pe) = request.projection_expression {
284        if pe.is_empty() {
285            return Err(DynoxideError::ValidationException(
286                "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
287            ));
288        }
289    }
290
291    // SPECIFIC_ATTRIBUTES requires ProjectionExpression or AttributesToGet
292    if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
293        && request.projection_expression.is_none()
294        && request.attributes_to_get.is_none()
295    {
296        return Err(DynoxideError::ValidationException(
297            "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
298                .to_string(),
299        ));
300    }
301    // For KeyConditionExpression, validate syntax early too
302    if let Some(ref kce) = request.key_condition_expression {
303        if !kce.is_empty() {
304            // Create a temporary tracker for early syntax validation
305            let temp_tracker = crate::expressions::TrackedExpressionAttributes::new(
306                &request.expression_attribute_names,
307                &request.expression_attribute_values,
308            );
309            if let Err(e) = expressions::key_condition::parse(kce, &temp_tracker) {
310                return Err(DynoxideError::ValidationException(e));
311            }
312        }
313    }
314
315    let meta = helpers::require_table_for_item_op(storage, &request.table_name)?;
316    let table_key_schema = helpers::parse_key_schema(&meta)?;
317
318    // Determine effective partition key name early so we can pass it to
319    // the legacy KeyConditions converter (ensures correct ordering when
320    // both hash and range keys use EQ).
321    let effective_pk_for_kc = if let Some(ref index_name) = request.index_name {
322        if let Some((pk, _)) = request
323            .index_name
324            .as_ref()
325            .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok())
326        {
327            pk
328        } else if let Ok((pk, _)) = super::gsi::parse_gsi_key_schema(&meta, index_name) {
329            pk
330        } else {
331            table_key_schema.partition_key.clone()
332        }
333    } else {
334        table_key_schema.partition_key.clone()
335    };
336
337    // Convert legacy KeyConditions to KeyConditionExpression if no expression is set
338    if request.key_condition_expression.is_none() {
339        if let Some(ref kc_val) = request.key_conditions {
340            if let Ok(kc) =
341                serde_json::from_value::<HashMap<String, helpers::KeyCondition>>(kc_val.clone())
342            {
343                if !kc.is_empty() {
344                    let converted =
345                        helpers::convert_key_conditions(&kc, Some(&effective_pk_for_kc))?;
346                    request.key_condition_expression = Some(converted.expression);
347                    let expr_values = request
348                        .expression_attribute_values
349                        .get_or_insert_with(HashMap::new);
350                    expr_values.extend(converted.attribute_values);
351                    let expr_names = request
352                        .expression_attribute_names
353                        .get_or_insert_with(HashMap::new);
354                    expr_names.extend(converted.attribute_names);
355                }
356            }
357        }
358    }
359
360    // Convert legacy QueryFilter to FilterExpression if no expression is set
361    if request.filter_expression.is_none() {
362        if let Some(ref qf_val) = request.query_filter {
363            if let Ok(qf) =
364                serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(qf_val.clone())
365            {
366                if !qf.is_empty() {
367                    let converted = helpers::convert_filter_conditions(
368                        &qf,
369                        request.conditional_operator.as_deref(),
370                    )?;
371                    if !converted.expression.is_empty() {
372                        request.filter_expression = Some(converted.expression);
373                        let expr_values = request
374                            .expression_attribute_values
375                            .get_or_insert_with(HashMap::new);
376                        expr_values.extend(converted.attribute_values);
377                        let expr_names = request
378                            .expression_attribute_names
379                            .get_or_insert_with(HashMap::new);
380                        expr_names.extend(converted.attribute_names);
381                    }
382                }
383            }
384        }
385    }
386
387    // Convert legacy AttributesToGet to projection
388    let legacy_projection = if request.projection_expression.is_none() {
389        request
390            .attributes_to_get
391            .as_ref()
392            .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
393    } else {
394        None
395    };
396
397    // Ensure KeyConditionExpression is present (required)
398    let key_condition_expression = request.key_condition_expression.as_deref().ok_or_else(|| {
399        DynoxideError::ValidationException(
400            "Either the KeyConditions or KeyConditionExpression parameter must be specified in the request."
401                .to_string(),
402        )
403    })?;
404    let key_condition_expression = key_condition_expression.to_string();
405
406    // Determine effective key schema (GSI, LSI, or base table)
407    let lsi_keys = request
408        .index_name
409        .as_ref()
410        .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
411    let is_lsi = lsi_keys.is_some();
412
413    // ConsistentRead is not supported on GSIs (LSIs are fine)
414    if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
415        return Err(DynoxideError::ValidationException(
416            "Consistent reads are not supported on global secondary indexes".to_string(),
417        ));
418    }
419
420    // Parse full index definition to get projection type
421    let index_projection_type = if let Some(ref index_name) = request.index_name {
422        if is_lsi {
423            super::lsi::parse_lsi_defs(&meta)?
424                .into_iter()
425                .find(|l| l.index_name == *index_name)
426                .map(|l| l.projection_type)
427        } else {
428            super::gsi::parse_gsi_defs(&meta)?
429                .into_iter()
430                .find(|g| g.index_name == *index_name)
431                .map(|g| g.projection_type)
432        }
433    } else {
434        None
435    };
436
437    let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
438        if let Some(keys) = lsi_keys {
439            keys
440        } else {
441            super::gsi::parse_gsi_key_schema(&meta, index_name)?
442        }
443    } else {
444        (
445            table_key_schema.partition_key.clone(),
446            table_key_schema.sort_key.clone(),
447        )
448    };
449
450    // ---- Validate ExclusiveStartKey structure against key schema ----
451    if let Some(ref esk) = exclusive_start_key {
452        // Stage 1+2: count check + index key type check
453        helpers::validate_esk_count_and_index_keys(
454            esk,
455            &meta,
456            request.index_name.as_deref(),
457            "The provided starting key is invalid",
458        )?;
459        // Stage 3: table key type check
460        helpers::validate_esk_table_keys(esk, &meta)?;
461    }
462
463    // Create tracker for unused expression attribute names/values
464    let tracker = crate::expressions::TrackedExpressionAttributes::new(
465        &request.expression_attribute_names,
466        &request.expression_attribute_values,
467    );
468
469    // Parse KeyConditionExpression
470    let key_cond = expressions::key_condition::parse(&key_condition_expression, &tracker)
471        .map_err(DynoxideError::ValidationException)?;
472
473    // Validate pk_name matches the effective partition key
474    if key_cond.pk_name != effective_pk {
475        return Err(DynoxideError::ValidationException(format!(
476            "Query condition missed key schema element: {}",
477            effective_pk
478        )));
479    }
480
481    // Resolve values
482    let resolved = expressions::key_condition::resolve_values(&key_cond, &tracker)
483        .map_err(DynoxideError::ValidationException)?;
484
485    // Get pk string
486    let pk_str = resolved.pk_value.to_key_string().ok_or_else(|| {
487        DynoxideError::ValidationException(
488            "Cannot convert partition key value to string".to_string(),
489        )
490    })?;
491
492    // Build sk SQL conditions
493    let mut sk_sql_parts = Vec::new();
494    let mut sk_param_values = Vec::new();
495
496    if let Some(ref sk_cond) = resolved.sk_condition {
497        // Validate sk name matches effective sort key
498        if let Some(ref eff_sk) = effective_sk {
499            if sk_cond.sk_name() != eff_sk {
500                return Err(DynoxideError::ValidationException(format!(
501                    "Query condition missed key schema element: {eff_sk}"
502                )));
503            }
504        } else {
505            return Err(DynoxideError::ValidationException(
506                "Query filter contains a sort key condition but the table has no sort key"
507                    .to_string(),
508            ));
509        }
510
511        let conditions = sk_cond.to_sql_conditions();
512        for (i, (op, val)) in conditions.iter().enumerate() {
513            let param_idx = i + 2; // pk is ?1, sk params start at ?2
514            if op == "LIKE" {
515                sk_sql_parts.push(format!("AND sk LIKE ?{param_idx} ESCAPE '\\'"));
516            } else {
517                sk_sql_parts.push(format!("AND sk {op} ?{param_idx}"));
518            }
519            sk_param_values.push(val.clone());
520        }
521    }
522
523    // ---- Validate QueryFilter/FilterExpression don't reference primary key attrs ----
524    // Collect effective key attribute names
525    let mut effective_key_attrs = vec![effective_pk.clone()];
526    if let Some(ref sk) = effective_sk {
527        effective_key_attrs.push(sk.clone());
528    }
529
530    // Check legacy QueryFilter
531    if let Some(ref qf_val) = request.query_filter {
532        if let Some(obj) = qf_val.as_object() {
533            for attr_name in obj.keys() {
534                if effective_key_attrs.contains(attr_name) {
535                    return Err(DynoxideError::ValidationException(format!(
536                        "QueryFilter can only contain non-primary key attributes: \
537                         Primary key attribute: {attr_name}"
538                    )));
539                }
540            }
541        }
542    }
543
544    // Check FilterExpression for key attribute references (only for user-supplied expressions,
545    // not those converted from QueryFilter - QueryFilter is checked separately above)
546    if request.query_filter.is_none() {
547        if let Some(ref fe) = request.filter_expression {
548            if let Ok(parsed_fe) = expressions::condition::parse(fe) {
549                let top_attrs = expressions::condition::extract_top_level_attributes(
550                    &parsed_fe,
551                    &request.expression_attribute_names,
552                );
553                for attr in &top_attrs {
554                    if effective_key_attrs.contains(attr) {
555                        return Err(DynoxideError::ValidationException(format!(
556                            "Filter Expression can only contain non-primary key attributes: \
557                             Primary key attribute: {attr}"
558                        )));
559                    }
560                }
561                // Check for non-scalar key access in FilterExpression
562                // Build index key attribute lists
563                let mut index_key_attrs = Vec::new();
564                if request.index_name.is_some() {
565                    // Index keys that are not also table keys
566                    if !effective_key_attrs
567                        .iter()
568                        .any(|k| k == &table_key_schema.partition_key)
569                    {
570                        // This shouldn't normally happen for query, but just in case
571                    }
572                    // Check all effective key attrs for non-scalar access
573                    for k in &effective_key_attrs {
574                        if ![table_key_schema.partition_key.clone()]
575                            .iter()
576                            .chain(table_key_schema.sort_key.iter())
577                            .any(|tk| tk == k)
578                        {
579                            index_key_attrs.push(k.clone());
580                        }
581                    }
582                }
583                let base_key_attrs: Vec<String> = {
584                    let mut v = vec![table_key_schema.partition_key.clone()];
585                    if let Some(ref sk) = table_key_schema.sort_key {
586                        v.push(sk.clone());
587                    }
588                    v
589                };
590                if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
591                    &parsed_fe,
592                    &request.expression_attribute_names,
593                    &base_key_attrs,
594                    &index_key_attrs,
595                ) {
596                    let prefix = if is_index { "IndexKey" } else { "Key" };
597                    return Err(DynoxideError::ValidationException(format!(
598                        "Key attributes must be scalars; \
599                         list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
600                    )));
601                }
602            }
603        }
604    }
605
606    let is_index_query = request.index_name.is_some();
607
608    // Build ExclusiveStartKey sk value.
609    // For hash-only GSIs (no sort key), use empty string so the composite
610    // cursor (gsi_sk, table_pk, table_sk) can drive pagination.
611    let start_sk = if let Some(ref esk) = exclusive_start_key {
612        if let Some(ref sk_name) = effective_sk {
613            esk.get(sk_name).and_then(|v| v.to_key_string())
614        } else if is_index_query {
615            // Hash-only index: gsi_sk / lsi_sk is always ''
616            Some(String::new())
617        } else {
618            None
619        }
620    } else {
621        None
622    };
623
624    // For LSI and GSI queries, extract the base table keys from ExclusiveStartKey
625    // to enable composite cursor pagination.
626    let (start_base_pk, start_base_sk) = if is_index_query {
627        if let Some(ref esk) = exclusive_start_key {
628            let base_pk = esk
629                .get(&table_key_schema.partition_key)
630                .and_then(|v| v.to_key_string());
631            let base_sk = table_key_schema
632                .sort_key
633                .as_ref()
634                .and_then(|sk_name| esk.get(sk_name))
635                .and_then(|v| v.to_key_string());
636            (base_pk, base_sk)
637        } else {
638            (None, None)
639        }
640    } else {
641        (None, None)
642    };
643
644    // Validate Select=ALL_ATTRIBUTES against index projection type.
645    // For GSI with non-ALL projection, DynamoDB rejects ALL_ATTRIBUTES.
646    let is_select_all_attributes = request
647        .select
648        .as_deref()
649        .map(|s| s.eq_ignore_ascii_case("ALL_ATTRIBUTES"))
650        .unwrap_or(false);
651    let fetch_from_base_table = if is_select_all_attributes {
652        if let Some(ref proj_type) = index_projection_type {
653            if *proj_type != crate::types::ProjectionType::ALL {
654                if !is_lsi {
655                    return Err(DynoxideError::ValidationException(format!(
656                        "One or more parameter values were invalid: \
657                         Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
658                         because its projection type is not ALL",
659                        request.index_name.as_deref().unwrap_or("")
660                    )));
661                }
662                // LSI with non-ALL projection: fetch full items from base table
663                true
664            } else {
665                false
666            }
667        } else {
668            false
669        }
670    } else {
671        false
672    };
673
674    // Combine sk conditions into a single SQL fragment
675    let sk_condition_sql = if sk_sql_parts.is_empty() {
676        None
677    } else {
678        Some(sk_sql_parts.join(" "))
679    };
680
681    let fetch_limit = request.limit;
682    let sk_params_refs: Vec<&str> = sk_param_values.iter().map(|s| s.as_str()).collect();
683
684    // Query either GSI table or base table
685    let query_params = crate::storage::QueryParams {
686        sk_condition: sk_condition_sql.as_deref(),
687        sk_params: &sk_params_refs,
688        forward: request.scan_index_forward,
689        limit: fetch_limit,
690        exclusive_start_sk: start_sk.as_deref(),
691        exclusive_start_base_pk: start_base_pk.as_deref(),
692        exclusive_start_base_sk: start_base_sk.as_deref(),
693    };
694    let rows = if let Some(ref index_name) = request.index_name {
695        if is_lsi {
696            storage.query_lsi_items(&request.table_name, index_name, &pk_str, &query_params)?
697        } else {
698            storage.query_gsi_items(&request.table_name, index_name, &pk_str, &query_params)?
699        }
700    } else {
701        storage.query_items(&request.table_name, &pk_str, &query_params)?
702    };
703
704    // Parse filter expression if present
705    let filter_expr = request
706        .filter_expression
707        .as_ref()
708        .map(|expr| expressions::condition::parse(expr))
709        .transpose()
710        .map_err(DynoxideError::ValidationException)?;
711
712    // Parse projection expression if present; fall back to legacy AttributesToGet
713    let projection = if let Some(ref proj_expr) = request.projection_expression {
714        Some(
715            expressions::projection::parse(proj_expr)
716                .map_err(DynoxideError::ValidationException)?,
717        )
718    } else {
719        legacy_projection.clone()
720    };
721
722    // Pre-register expression references so unused check works even with zero items
723    if let Some(ref filter) = filter_expr {
724        tracker.track_condition_expr(filter);
725    }
726    if let Some(ref proj) = projection {
727        tracker.track_projection_expr(proj);
728    }
729
730    // Untracked variant for the per-item hot loop — tracking already done above
731    let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
732        &request.expression_attribute_names,
733        &request.expression_attribute_values,
734    );
735
736    // Determine if SELECT COUNT
737    let is_count = request
738        .select
739        .as_deref()
740        .map(|s| s.eq_ignore_ascii_case("COUNT"))
741        .unwrap_or(false);
742
743    // Key attribute names for projection (use effective keys for GSI)
744    let mut key_attrs = vec![effective_pk.clone()];
745    if let Some(ref sk) = effective_sk {
746        key_attrs.push(sk.clone());
747    }
748    // Also include base table keys when querying a GSI
749    if request.index_name.is_some() {
750        if !key_attrs.contains(&table_key_schema.partition_key) {
751            key_attrs.push(table_key_schema.partition_key.clone());
752        }
753        if let Some(ref sk) = table_key_schema.sort_key {
754            if !key_attrs.contains(sk) {
755                key_attrs.push(sk.clone());
756            }
757        }
758    }
759
760    let mut items = Vec::new();
761    let mut scanned_count = 0;
762    let mut filtered_count = 0;
763    let mut cumulative_size = 0;
764    let mut last_evaluated_item: Option<Item> = None;
765    let mut truncated_by_size = false;
766
767    // Track sizes separately for ALL_ATTRIBUTES LSI queries where both
768    // index reads and base table reads contribute to ConsumedCapacity.
769    let mut base_table_cumulative_size = 0usize;
770    let mut index_cumulative_size = 0usize;
771
772    for (_pk, _sk, item_json) in &rows {
773        let index_item: Item = serde_json::from_str(item_json).map_err(|e| {
774            DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
775        })?;
776
777        // If Select=ALL_ATTRIBUTES on LSI with non-ALL projection, fetch full
778        // item from the base table for the response while using the index item
779        // for cursor tracking.
780        index_cumulative_size += crate::types::item_size(&index_item);
781        let item = if fetch_from_base_table {
782            let base_pk = index_item
783                .get(&table_key_schema.partition_key)
784                .and_then(|v| v.to_key_string())
785                .unwrap_or_default();
786            let base_sk = table_key_schema
787                .sort_key
788                .as_ref()
789                .and_then(|sk_name| index_item.get(sk_name))
790                .and_then(|v| v.to_key_string())
791                .unwrap_or_default();
792            if let Some(full_json) = storage.get_item(&request.table_name, &base_pk, &base_sk)? {
793                let full_item: Item = serde_json::from_str(&full_json).map_err(|e| {
794                    DynoxideError::InternalServerError(format!("Bad item JSON: {e}"))
795                })?;
796                base_table_cumulative_size += crate::types::item_size(&full_item);
797                full_item
798            } else {
799                index_item.clone()
800            }
801        } else {
802            index_item.clone()
803        };
804
805        scanned_count += 1;
806
807        // Check 1MB limit BEFORE filtering — DynamoDB counts all evaluated data
808        // towards the 1MB response size limit, not just items that pass the filter.
809        let item_size = crate::types::item_size(&item);
810        if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
811            truncated_by_size = true;
812            break;
813        }
814        cumulative_size += item_size;
815
816        // Apply filter
817        if let Some(ref filter) = filter_expr {
818            let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
819                .map_err(DynoxideError::ValidationException)?;
820            if !passes {
821                last_evaluated_item = Some(index_item);
822                continue;
823            }
824        }
825
826        filtered_count += 1;
827
828        // Apply projection -- do NOT auto-include key attributes when the
829        // user explicitly specified ProjectionExpression or AttributesToGet.
830        let result_item = if let Some(ref proj) = projection {
831            let no_keys: &[String] = &[];
832            expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
833                .map_err(DynoxideError::ValidationException)?
834        } else {
835            item
836        };
837
838        last_evaluated_item = Some(index_item);
839        if !is_count {
840            items.push(result_item);
841        }
842    }
843
844    // Check for unused expression attribute names/values
845    tracker.check_unused()?;
846
847    let count = if is_count {
848        filtered_count
849    } else {
850        items.len()
851    };
852
853    // Determine LastEvaluatedKey
854    // We return LEK if: we hit the Limit, or we hit the 1MB limit
855    let has_more = truncated_by_size
856        || (fetch_limit.is_some() && scanned_count >= fetch_limit.unwrap_or(usize::MAX));
857
858    // For index queries, include the base table primary key in LastEvaluatedKey
859    // alongside the effective (index) keys so the cursor can uniquely identify
860    // the position. For LSIs, include the table sort key. For GSIs, include
861    // both the table partition key and sort key.
862    let is_gsi_query = request.index_name.is_some() && !is_lsi;
863    let last_evaluated_key = if has_more {
864        last_evaluated_item.map(|item| {
865            let mut key = build_last_evaluated_key(&item, &effective_pk, effective_sk.as_deref());
866            // For LSI queries, add the table sort key if different from the index sort key
867            if is_lsi {
868                if let Some(tsk) = table_key_schema.sort_key.as_deref() {
869                    if !key.contains_key(tsk) {
870                        if let Some(v) = item.get(tsk) {
871                            key.insert(tsk.to_string(), v.clone());
872                        }
873                    }
874                }
875            }
876            // For GSI queries, add the base table primary key (pk and sk)
877            if is_gsi_query {
878                if !key.contains_key(&table_key_schema.partition_key) {
879                    if let Some(v) = item.get(&table_key_schema.partition_key) {
880                        key.insert(table_key_schema.partition_key.clone(), v.clone());
881                    }
882                }
883                if let Some(ref tsk) = table_key_schema.sort_key {
884                    if !key.contains_key(tsk) {
885                        if let Some(v) = item.get(tsk) {
886                            key.insert(tsk.clone(), v.clone());
887                        }
888                    }
889                }
890            }
891            key
892        })
893    } else {
894        None
895    };
896
897    // Attribute read capacity to the index if querying one
898    let is_gsi = is_gsi_query;
899    let consistent = request.consistent_read.unwrap_or(false);
900    let consumed_capacity = if is_gsi {
901        let mut gsi_units = std::collections::HashMap::new();
902        gsi_units.insert(
903            request.index_name.as_ref().unwrap().clone(),
904            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
905        );
906        crate::types::consumed_capacity_with_indexes(
907            &request.table_name,
908            0.0,
909            &gsi_units,
910            &request.return_consumed_capacity,
911        )
912    } else if is_lsi {
913        // When fetching from the base table (ALL_ATTRIBUTES on non-ALL LSI),
914        // split capacity between the index read and the table read.
915        let (table_cap, lsi_cap) = if fetch_from_base_table {
916            let table_rcu = crate::types::read_capacity_units_with_consistency(
917                base_table_cumulative_size,
918                consistent,
919            );
920            let lsi_rcu = crate::types::read_capacity_units_with_consistency(
921                index_cumulative_size,
922                consistent,
923            );
924            (table_rcu, lsi_rcu)
925        } else {
926            (
927                0.0,
928                crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
929            )
930        };
931        let mut lsi_units = std::collections::HashMap::new();
932        lsi_units.insert(request.index_name.as_ref().unwrap().clone(), lsi_cap);
933        crate::types::consumed_capacity_with_secondary_indexes(
934            &request.table_name,
935            table_cap,
936            &std::collections::HashMap::new(),
937            &lsi_units,
938            &request.return_consumed_capacity,
939        )
940    } else {
941        crate::types::consumed_capacity(
942            &request.table_name,
943            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
944            &request.return_consumed_capacity,
945        )
946    };
947
948    Ok(QueryResponse {
949        items: if is_count { None } else { Some(items) },
950        count,
951        scanned_count,
952        last_evaluated_key,
953        consumed_capacity,
954    })
955}
956
957fn build_last_evaluated_key(
958    item: &Item,
959    pk_name: &str,
960    sk_name: Option<&str>,
961) -> HashMap<String, AttributeValue> {
962    let mut key = HashMap::new();
963    if let Some(pk_val) = item.get(pk_name) {
964        key.insert(pk_name.to_string(), pk_val.clone());
965    }
966    if let Some(sk) = sk_name {
967        if let Some(sk_val) = item.get(sk) {
968            key.insert(sk.to_string(), sk_val.clone());
969        }
970    }
971    key
972}