Skip to main content

dynoxide/actions/
scan.rs

1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage::Storage;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// 1MB response size limit for Query/Scan.
10const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12/// Internal deserialization struct for detecting missing fields.
13#[derive(Debug, Default, Deserialize)]
14struct ScanRequestRaw {
15    #[serde(rename = "TableName", default)]
16    table_name: Option<String>,
17    #[serde(rename = "FilterExpression", default)]
18    filter_expression: Option<String>,
19    #[serde(rename = "ProjectionExpression", default)]
20    projection_expression: Option<String>,
21    #[serde(rename = "ExpressionAttributeNames", default)]
22    expression_attribute_names: Option<HashMap<String, String>>,
23    #[serde(rename = "ExpressionAttributeValues", default)]
24    expression_attribute_values: Option<HashMap<String, AttributeValue>>,
25    #[serde(rename = "Limit", default)]
26    limit: Option<usize>,
27    #[serde(rename = "ExclusiveStartKey", default)]
28    exclusive_start_key: Option<serde_json::Value>,
29    #[serde(rename = "Select", default)]
30    select: Option<String>,
31    #[serde(rename = "ConsistentRead", default)]
32    consistent_read: Option<bool>,
33    #[serde(rename = "IndexName", default)]
34    index_name: Option<String>,
35    #[serde(rename = "Segment", default)]
36    segment: Option<u32>,
37    #[serde(rename = "TotalSegments", default)]
38    total_segments: Option<u32>,
39    #[serde(rename = "ReturnConsumedCapacity", default)]
40    return_consumed_capacity: Option<String>,
41    #[serde(rename = "AttributesToGet", default)]
42    attributes_to_get: Option<Vec<String>>,
43    #[serde(rename = "ScanFilter", default)]
44    scan_filter: Option<serde_json::Value>,
45    #[serde(rename = "ConditionalOperator", default)]
46    conditional_operator: Option<String>,
47}
48
49#[derive(Debug, Default)]
50pub struct ScanRequest {
51    pub table_name: String,
52    pub filter_expression: Option<String>,
53    pub projection_expression: Option<String>,
54    pub expression_attribute_names: Option<HashMap<String, String>>,
55    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
56    pub limit: Option<usize>,
57    pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
58    pub select: Option<String>,
59    pub consistent_read: Option<bool>,
60    pub index_name: Option<String>,
61    pub segment: Option<u32>,
62    pub total_segments: Option<u32>,
63    pub return_consumed_capacity: Option<String>,
64    pub attributes_to_get: Option<Vec<String>>,
65    pub scan_filter: Option<serde_json::Value>,
66    pub conditional_operator: Option<String>,
67    /// Raw JSON for ExclusiveStartKey when deserialized from HTTP request.
68    /// Parsed lazily in `execute()` after other validations run.
69    pub exclusive_start_key_raw: Option<serde_json::Value>,
70}
71
72impl<'de> serde::Deserialize<'de> for ScanRequest {
73    fn deserialize<D: serde::Deserializer<'de>>(
74        deserializer: D,
75    ) -> std::result::Result<Self, D::Error> {
76        let raw = ScanRequestRaw::deserialize(deserializer)?;
77        use crate::validation::{
78            TableNameContext, format_validation_errors, table_name_constraint_errors,
79        };
80
81        let mut errors = Vec::new();
82        errors.extend(table_name_constraint_errors(
83            raw.table_name.as_deref(),
84            TableNameContext::ReadWrite,
85        ));
86        let table_name = raw.table_name.unwrap_or_default();
87
88        // ReturnConsumedCapacity enum
89        if let Some(ref rcc) = raw.return_consumed_capacity {
90            if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
91                errors.push(format!(
92                    "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
93                     Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
94                    rcc
95                ));
96            }
97        }
98
99        // Select enum
100        if let Some(ref sel) = raw.select {
101            if ![
102                "ALL_ATTRIBUTES",
103                "ALL_PROJECTED_ATTRIBUTES",
104                "COUNT",
105                "SPECIFIC_ATTRIBUTES",
106            ]
107            .contains(&sel.as_str())
108            {
109                errors.push(format!(
110                    "Value '{}' at 'select' failed to satisfy constraint: \
111                     Member must satisfy enum value set: [SPECIFIC_ATTRIBUTES, COUNT, ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES]",
112                    sel
113                ));
114            }
115        }
116
117        // Limit must be >= 1.
118        // AWS DynamoDB's Scan message diverges from Query: Scan keeps the rejected
119        // value and lowercases 'limit', whereas Query omits the value and uses
120        // capital 'Limit'. Do not collapse these into a shared helper.
121        if let Some(limit) = raw.limit {
122            if limit == 0 {
123                errors.push(
124                    "Value '0' at 'limit' failed to satisfy constraint: \
125                     Member must have value greater than or equal to 1"
126                        .to_string(),
127                );
128            }
129        }
130
131        if let Some(msg) = format_validation_errors(&errors) {
132            return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
133        }
134
135        Ok(ScanRequest {
136            table_name,
137            filter_expression: raw.filter_expression,
138            projection_expression: raw.projection_expression,
139            expression_attribute_names: raw.expression_attribute_names,
140            expression_attribute_values: raw.expression_attribute_values,
141            limit: raw.limit,
142            exclusive_start_key: None,
143            select: raw.select,
144            consistent_read: raw.consistent_read,
145            index_name: raw.index_name,
146            segment: raw.segment,
147            total_segments: raw.total_segments,
148            return_consumed_capacity: raw.return_consumed_capacity,
149            attributes_to_get: raw.attributes_to_get,
150            scan_filter: raw.scan_filter,
151            conditional_operator: raw.conditional_operator,
152            exclusive_start_key_raw: raw.exclusive_start_key,
153        })
154    }
155}
156
157#[derive(Debug, Default, Serialize)]
158pub struct ScanResponse {
159    #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
160    pub items: Option<Vec<Item>>,
161    #[serde(rename = "Count")]
162    pub count: usize,
163    #[serde(rename = "ScannedCount")]
164    pub scanned_count: usize,
165    #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
166    pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
167    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
168    pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
169}
170
171pub fn execute(storage: &Storage, mut request: ScanRequest) -> Result<ScanResponse> {
172    // Validate table name format before checking existence (DynamoDB validates input first)
173    crate::validation::validate_table_name(&request.table_name)?;
174
175    // ---- Expression vs non-expression mixing validation ----
176    {
177        let mut non_expr = Vec::new();
178        let mut expr = Vec::new();
179        if request.attributes_to_get.is_some() {
180            non_expr.push("AttributesToGet");
181        }
182        if request.scan_filter.is_some()
183            && request.scan_filter.as_ref().is_some_and(|v| !v.is_null())
184        {
185            non_expr.push("ScanFilter");
186        }
187        if request.conditional_operator.is_some() {
188            non_expr.push("ConditionalOperator");
189        }
190        if request.projection_expression.is_some() {
191            expr.push("ProjectionExpression");
192        }
193        if request.filter_expression.is_some() {
194            expr.push("FilterExpression");
195        }
196        let no_raw_eav: Option<serde_json::Value> = None;
197        let ctx = helpers::ExpressionParamContext {
198            non_expression_params: non_expr,
199            expression_params: expr,
200            all_expression_param_names: vec!["FilterExpression"],
201            expression_attribute_names: &request.expression_attribute_names,
202            expression_attribute_values: &request.expression_attribute_values,
203            expression_attribute_values_raw: &no_raw_eav,
204        };
205        helpers::validate_expression_params(&ctx)?;
206    }
207
208    // ---- Validate ScanFilter attribute values (before argument counts, matching DynamoDB) ----
209    helpers::validate_filter_conditions_raw(request.scan_filter.as_ref(), "ScanFilter")?;
210
211    // ---- Validate filter argument counts and type compatibility ----
212    helpers::validate_filter_condition_args(request.scan_filter.as_ref())?;
213
214    // ---- Validate duplicate AttributesToGet ----
215    if let Some(ref attrs) = request.attributes_to_get {
216        helpers::validate_attributes_to_get_no_duplicates(attrs)?;
217    }
218
219    // ---- Parse ExclusiveStartKey from JSON value ----
220    let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
221        Some(helpers::parse_exclusive_start_key(esk_val)?)
222    } else {
223        request.exclusive_start_key.clone()
224    };
225
226    // Convert legacy ScanFilter to FilterExpression if no expression is set
227    if request.filter_expression.is_none() {
228        if let Some(ref sf_val) = request.scan_filter {
229            if let Ok(sf) =
230                serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(sf_val.clone())
231            {
232                if !sf.is_empty() {
233                    let converted = helpers::convert_filter_conditions(
234                        &sf,
235                        request.conditional_operator.as_deref(),
236                    )?;
237                    if !converted.expression.is_empty() {
238                        request.filter_expression = Some(converted.expression);
239                        let expr_values = request
240                            .expression_attribute_values
241                            .get_or_insert_with(HashMap::new);
242                        expr_values.extend(converted.attribute_values);
243                        let expr_names = request
244                            .expression_attribute_names
245                            .get_or_insert_with(HashMap::new);
246                        expr_names.extend(converted.attribute_names);
247                    }
248                }
249            }
250        }
251    }
252
253    // Validate parallel scan parameters (before table existence check)
254    match (request.segment, request.total_segments) {
255        (Some(segment), Some(total)) => {
256            if !(1..=1_000_000).contains(&total) {
257                return Err(DynoxideError::ValidationException(
258                    "1 validation error detected: Value at 'totalSegments' failed to satisfy constraint: \
259                     Member must have value between 1 and 1000000".to_string(),
260                ));
261            }
262            if segment >= total {
263                return Err(DynoxideError::ValidationException(format!(
264                    "The Segment parameter is zero-based and must be less than parameter TotalSegments: Segment: {} is not less than TotalSegments: {}",
265                    segment, total
266                )));
267            }
268        }
269        (Some(_), None) => {
270            return Err(DynoxideError::ValidationException(
271                "The TotalSegments parameter is required but was not present in the request when Segment parameter is present".to_string(),
272            ));
273        }
274        (None, Some(_)) => {
275            return Err(DynoxideError::ValidationException(
276                "The Segment parameter is required but was not present in the request when parameter TotalSegments is present".to_string(),
277            ));
278        }
279        (None, None) => {}
280    }
281
282    // ---- Validate FilterExpression and ProjectionExpression BEFORE table existence ----
283    // DynamoDB validates expression syntax before checking if the table exists.
284    if let Some(ref filter_expr_str) = request.filter_expression {
285        if filter_expr_str.is_empty() {
286            // Only report empty if the user explicitly set FilterExpression
287            // (not if it was converted from ScanFilter, which never produces empty)
288            if request.scan_filter.is_none() || request.filter_expression.as_deref() == Some("") {
289                return Err(DynoxideError::ValidationException(
290                    "Invalid FilterExpression: The expression can not be empty;".to_string(),
291                ));
292            }
293        } else {
294            // Try parsing the expression to catch syntax errors early
295            let parsed_fe = expressions::condition::parse(filter_expr_str).map_err(|e| {
296                DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
297            })?;
298            // Validate that all #name references are defined in ExpressionAttributeNames
299            if let Err(e) = expressions::condition::validate_name_refs(
300                &parsed_fe,
301                &request.expression_attribute_names,
302            ) {
303                return Err(DynoxideError::ValidationException(format!(
304                    "Invalid FilterExpression: {e}"
305                )));
306            }
307        }
308    }
309    if let Some(ref proj_expr_str) = request.projection_expression {
310        if proj_expr_str.is_empty() {
311            return Err(DynoxideError::ValidationException(
312                "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
313            ));
314        }
315    }
316
317    // SPECIFIC_ATTRIBUTES requires ProjectionExpression or AttributesToGet
318    if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
319        && request.projection_expression.is_none()
320        && request.attributes_to_get.is_none()
321    {
322        return Err(DynoxideError::ValidationException(
323            "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
324                .to_string(),
325        ));
326    }
327
328    let meta = helpers::require_table_for_item_op(storage, &request.table_name)?;
329    let table_key_schema = helpers::parse_key_schema(&meta)?;
330
331    // Convert legacy AttributesToGet to ProjectionExpression if no expression-based
332    // projection is provided.
333    let legacy_projection = if request.projection_expression.is_none() {
334        request
335            .attributes_to_get
336            .as_ref()
337            .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
338    } else {
339        None
340    };
341
342    // Determine effective key schema (GSI, LSI, or base table)
343    let lsi_keys = request
344        .index_name
345        .as_ref()
346        .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
347    let is_lsi = lsi_keys.is_some();
348
349    // ConsistentRead is not supported on GSIs (LSIs are fine)
350    if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
351        return Err(DynoxideError::ValidationException(
352            "Consistent reads are not supported on global secondary indexes".to_string(),
353        ));
354    }
355
356    let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
357        if let Some(keys) = lsi_keys {
358            keys
359        } else {
360            super::gsi::parse_gsi_key_schema(&meta, index_name)?
361        }
362    } else {
363        (
364            table_key_schema.partition_key.clone(),
365            table_key_schema.sort_key.clone(),
366        )
367    };
368
369    // ---- Validate ExclusiveStartKey structure against key schema ----
370    // Stage 1+2: count check + index key type check
371    if let Some(ref esk) = exclusive_start_key {
372        let count_msg = if request.index_name.is_some() {
373            "The provided starting key is invalid"
374        } else {
375            "The provided starting key is invalid: The provided key element does not match the schema"
376        };
377        helpers::validate_esk_count_and_index_keys(
378            esk,
379            &meta,
380            request.index_name.as_deref(),
381            count_msg,
382        )?;
383    }
384
385    // Check ALL_ATTRIBUTES on global index (between index key check and table key check)
386    if let Some(ref index_name) = request.index_name {
387        if !is_lsi {
388            if let Some(ref select) = request.select {
389                if select == "ALL_ATTRIBUTES" {
390                    // Check if index projection is ALL
391                    let gsi_defs = super::gsi::parse_gsi_defs(&meta)?;
392                    if let Some(gsi) = gsi_defs.iter().find(|g| g.index_name == *index_name) {
393                        if gsi.projection_type != crate::types::ProjectionType::ALL {
394                            return Err(DynoxideError::ValidationException(format!(
395                                "One or more parameter values were invalid: \
396                                 Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
397                                 because its projection type is not ALL",
398                                index_name
399                            )));
400                        }
401                    }
402                }
403            }
404        }
405    }
406
407    // Stage 3: table key type check
408    if let Some(ref esk) = exclusive_start_key {
409        helpers::validate_esk_table_keys(esk, &meta)?;
410    }
411
412    // Extract ExclusiveStartKey pk/sk using effective key names
413    let (start_pk, start_sk) = if let Some(ref esk) = exclusive_start_key {
414        let pk = esk.get(&effective_pk).and_then(|v| v.to_key_string());
415        let sk = if let Some(ref sk_name) = effective_sk {
416            esk.get(sk_name).and_then(|v| v.to_key_string())
417        } else {
418            Some(String::new())
419        };
420        (pk, sk)
421    } else {
422        (None, None)
423    };
424
425    // For index scans (LSI and GSI), extract the base table key from
426    // ExclusiveStartKey for composite cursor pagination. The GSI/LSI
427    // tables have a composite primary key that includes the base table
428    // keys, so the cursor must include them to avoid skipping rows that
429    // share the same index key.
430    let (start_base_pk, start_base_sk) = if is_lsi || request.index_name.is_some() {
431        if let Some(ref esk) = exclusive_start_key {
432            let base_pk = esk
433                .get(&table_key_schema.partition_key)
434                .and_then(|v| v.to_key_string());
435            let base_sk = table_key_schema
436                .sort_key
437                .as_ref()
438                .and_then(|sk_name| esk.get(sk_name))
439                .and_then(|v| v.to_key_string());
440            (base_pk, base_sk)
441        } else {
442            (None, None)
443        }
444    } else {
445        (None, None)
446    };
447
448    // Scan either GSI table or base table
449    let scan_params = crate::storage::ScanParams {
450        limit: request.limit,
451        exclusive_start_pk: start_pk.as_deref(),
452        exclusive_start_sk: start_sk.as_deref(),
453        segment: request.segment,
454        total_segments: request.total_segments,
455        exclusive_start_base_pk: start_base_pk.as_deref(),
456        exclusive_start_base_sk: start_base_sk.as_deref(),
457    };
458    let rows = if let Some(ref index_name) = request.index_name {
459        if is_lsi {
460            storage.scan_lsi_items(&request.table_name, index_name, &scan_params)?
461        } else {
462            storage.scan_gsi_items(&request.table_name, index_name, &scan_params)?
463        }
464    } else {
465        storage.scan_items(&request.table_name, &scan_params)?
466    };
467
468    // Create tracker for unused expression attribute names/values
469    let tracker = crate::expressions::TrackedExpressionAttributes::new(
470        &request.expression_attribute_names,
471        &request.expression_attribute_values,
472    );
473
474    // Parse filter expression if present
475    let filter_expr = request
476        .filter_expression
477        .as_ref()
478        .map(|expr| expressions::condition::parse(expr))
479        .transpose()
480        .map_err(DynoxideError::ValidationException)?;
481
482    // Check for non-scalar key access in FilterExpression
483    if let Some(ref filter) = filter_expr {
484        // Build key attribute lists for non-scalar check
485        let mut base_key_attrs = vec![table_key_schema.partition_key.clone()];
486        if let Some(ref sk) = table_key_schema.sort_key {
487            base_key_attrs.push(sk.clone());
488        }
489        let mut index_key_attrs = Vec::new();
490        if request.index_name.is_some() {
491            if !base_key_attrs.contains(&effective_pk) {
492                index_key_attrs.push(effective_pk.clone());
493            }
494            if let Some(ref sk) = effective_sk {
495                if !base_key_attrs.contains(sk) {
496                    index_key_attrs.push(sk.clone());
497                }
498            }
499        }
500        if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
501            filter,
502            &request.expression_attribute_names,
503            &base_key_attrs,
504            &index_key_attrs,
505        ) {
506            let prefix = if is_index { "IndexKey" } else { "Key" };
507            return Err(DynoxideError::ValidationException(format!(
508                "Key attributes must be scalars; \
509                 list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
510            )));
511        }
512    }
513
514    // Parse projection expression if present; fall back to legacy AttributesToGet
515    let projection = if let Some(ref proj_expr) = request.projection_expression {
516        Some(
517            expressions::projection::parse(proj_expr)
518                .map_err(DynoxideError::ValidationException)?,
519        )
520    } else {
521        legacy_projection.clone()
522    };
523
524    // Pre-register expression references so unused check works even with zero items
525    if let Some(ref filter) = filter_expr {
526        tracker.track_condition_expr(filter);
527    }
528    if let Some(ref proj) = projection {
529        tracker.track_projection_expr(proj);
530    }
531
532    // Untracked variant for the per-item hot loop — tracking already done above
533    let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
534        &request.expression_attribute_names,
535        &request.expression_attribute_values,
536    );
537
538    // Determine if SELECT COUNT
539    let is_count = request
540        .select
541        .as_deref()
542        .map(|s| s.eq_ignore_ascii_case("COUNT"))
543        .unwrap_or(false);
544
545    // Key attribute names for projection (use effective keys for GSI)
546    let mut key_attrs = vec![effective_pk.clone()];
547    if let Some(ref sk) = effective_sk {
548        key_attrs.push(sk.clone());
549    }
550    if request.index_name.is_some() {
551        if !key_attrs.contains(&table_key_schema.partition_key) {
552            key_attrs.push(table_key_schema.partition_key.clone());
553        }
554        if let Some(ref sk) = table_key_schema.sort_key {
555            if !key_attrs.contains(sk) {
556                key_attrs.push(sk.clone());
557            }
558        }
559    }
560
561    let mut items = Vec::new();
562    let mut scanned_count = 0;
563    let mut filtered_count = 0;
564    let mut cumulative_size = 0;
565    let mut last_evaluated_item: Option<Item> = None;
566    let mut truncated_by_size = false;
567
568    for (_pk, _sk, item_json) in &rows {
569        let item: Item = serde_json::from_str(item_json).map_err(|e| {
570            DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
571        })?;
572
573        scanned_count += 1;
574
575        // Check 1MB limit BEFORE filtering — DynamoDB counts all evaluated data
576        // towards the 1MB response size limit, not just items that pass the filter.
577        let item_size = crate::types::item_size(&item);
578        if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
579            truncated_by_size = true;
580            break;
581        }
582        cumulative_size += item_size;
583
584        // Apply filter
585        if let Some(ref filter) = filter_expr {
586            let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
587                .map_err(DynoxideError::ValidationException)?;
588            if !passes {
589                last_evaluated_item = Some(item);
590                continue;
591            }
592        }
593
594        filtered_count += 1;
595
596        // Apply projection -- do NOT auto-include key attributes when the
597        // user explicitly specified ProjectionExpression or AttributesToGet.
598        let result_item = if let Some(ref proj) = projection {
599            let no_keys: &[String] = &[];
600            expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
601                .map_err(DynoxideError::ValidationException)?
602        } else {
603            item.clone()
604        };
605
606        last_evaluated_item = Some(item);
607        if !is_count {
608            items.push(result_item);
609        }
610    }
611
612    // Check for unused expression attribute names/values
613    tracker.check_unused()?;
614
615    let count = if is_count {
616        filtered_count
617    } else {
618        items.len()
619    };
620
621    // Determine LastEvaluatedKey
622    let has_more = truncated_by_size
623        || (request.limit.is_some() && scanned_count >= request.limit.unwrap_or(usize::MAX));
624
625    // For index scans, include the base table primary key in LastEvaluatedKey
626    // alongside the effective (index) keys so the cursor can uniquely identify
627    // the position. For LSIs, include the table sort key. For GSIs, include
628    // both the table partition key and sort key.
629    let is_gsi_scan = request.index_name.is_some() && !is_lsi;
630    let last_evaluated_key = if has_more {
631        last_evaluated_item.map(|item| {
632            let mut key = HashMap::new();
633            if let Some(pk_val) = item.get(&effective_pk) {
634                key.insert(effective_pk.clone(), pk_val.clone());
635            }
636            if let Some(ref sk_name) = effective_sk {
637                if let Some(sk_val) = item.get(sk_name) {
638                    key.insert(sk_name.clone(), sk_val.clone());
639                }
640            }
641            // For LSI scans, add the table sort key if different from the index sort key
642            if is_lsi {
643                if let Some(tsk) = table_key_schema.sort_key.as_deref() {
644                    if !key.contains_key(tsk) {
645                        if let Some(v) = item.get(tsk) {
646                            key.insert(tsk.to_string(), v.clone());
647                        }
648                    }
649                }
650            }
651            // For GSI scans, add the base table primary key (pk and sk)
652            if is_gsi_scan {
653                if !key.contains_key(&table_key_schema.partition_key) {
654                    if let Some(v) = item.get(&table_key_schema.partition_key) {
655                        key.insert(table_key_schema.partition_key.clone(), v.clone());
656                    }
657                }
658                if let Some(ref tsk) = table_key_schema.sort_key {
659                    if !key.contains_key(tsk) {
660                        if let Some(v) = item.get(tsk) {
661                            key.insert(tsk.clone(), v.clone());
662                        }
663                    }
664                }
665            }
666            key
667        })
668    } else {
669        None
670    };
671
672    // Attribute read capacity to the GSI if scanning one
673    let is_gsi = is_gsi_scan;
674    let consistent = request.consistent_read.unwrap_or(false);
675    let consumed_capacity = if is_gsi {
676        let mut gsi_units = std::collections::HashMap::new();
677        gsi_units.insert(
678            request.index_name.as_ref().unwrap().clone(),
679            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
680        );
681        crate::types::consumed_capacity_with_indexes(
682            &request.table_name,
683            0.0,
684            &gsi_units,
685            &request.return_consumed_capacity,
686        )
687    } else {
688        crate::types::consumed_capacity(
689            &request.table_name,
690            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
691            &request.return_consumed_capacity,
692        )
693    };
694
695    Ok(ScanResponse {
696        items: if is_count { None } else { Some(items) },
697        count,
698        scanned_count,
699        last_evaluated_key,
700        consumed_capacity,
701    })
702}