Skip to main content

dynoxide/actions/
scan.rs

1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage::Storage;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// 1MB response size limit for Query/Scan.
10const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12/// Internal deserialization struct for detecting missing fields.
13#[derive(Debug, Default, Deserialize)]
14struct ScanRequestRaw {
15    #[serde(rename = "TableName", default)]
16    table_name: Option<String>,
17    #[serde(rename = "FilterExpression", default)]
18    filter_expression: Option<String>,
19    #[serde(rename = "ProjectionExpression", default)]
20    projection_expression: Option<String>,
21    #[serde(rename = "ExpressionAttributeNames", default)]
22    expression_attribute_names: Option<HashMap<String, String>>,
23    #[serde(rename = "ExpressionAttributeValues", default)]
24    expression_attribute_values: Option<HashMap<String, AttributeValue>>,
25    #[serde(rename = "Limit", default)]
26    limit: Option<usize>,
27    #[serde(rename = "ExclusiveStartKey", default)]
28    exclusive_start_key: Option<serde_json::Value>,
29    #[serde(rename = "Select", default)]
30    select: Option<String>,
31    #[serde(rename = "ConsistentRead", default)]
32    consistent_read: Option<bool>,
33    #[serde(rename = "IndexName", default)]
34    index_name: Option<String>,
35    #[serde(rename = "Segment", default)]
36    segment: Option<u32>,
37    #[serde(rename = "TotalSegments", default)]
38    total_segments: Option<u32>,
39    #[serde(rename = "ReturnConsumedCapacity", default)]
40    return_consumed_capacity: Option<String>,
41    #[serde(rename = "AttributesToGet", default)]
42    attributes_to_get: Option<Vec<String>>,
43    #[serde(rename = "ScanFilter", default)]
44    scan_filter: Option<serde_json::Value>,
45    #[serde(rename = "ConditionalOperator", default)]
46    conditional_operator: Option<String>,
47}
48
49#[derive(Debug, Default)]
50pub struct ScanRequest {
51    pub table_name: String,
52    pub filter_expression: Option<String>,
53    pub projection_expression: Option<String>,
54    pub expression_attribute_names: Option<HashMap<String, String>>,
55    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
56    pub limit: Option<usize>,
57    pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
58    pub select: Option<String>,
59    pub consistent_read: Option<bool>,
60    pub index_name: Option<String>,
61    pub segment: Option<u32>,
62    pub total_segments: Option<u32>,
63    pub return_consumed_capacity: Option<String>,
64    pub attributes_to_get: Option<Vec<String>>,
65    pub scan_filter: Option<serde_json::Value>,
66    pub conditional_operator: Option<String>,
67    /// Raw JSON for ExclusiveStartKey when deserialized from HTTP request.
68    /// Parsed lazily in `execute()` after other validations run.
69    pub exclusive_start_key_raw: Option<serde_json::Value>,
70}
71
72impl<'de> serde::Deserialize<'de> for ScanRequest {
73    fn deserialize<D: serde::Deserializer<'de>>(
74        deserializer: D,
75    ) -> std::result::Result<Self, D::Error> {
76        let raw = ScanRequestRaw::deserialize(deserializer)?;
77        use crate::validation::{format_validation_errors, table_name_constraint_errors};
78
79        let mut errors = Vec::new();
80        errors.extend(table_name_constraint_errors(raw.table_name.as_deref()));
81        let table_name = raw.table_name.unwrap_or_default();
82
83        // ReturnConsumedCapacity enum
84        if let Some(ref rcc) = raw.return_consumed_capacity {
85            if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
86                errors.push(format!(
87                    "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
88                     Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
89                    rcc
90                ));
91            }
92        }
93
94        // Select enum
95        if let Some(ref sel) = raw.select {
96            if ![
97                "ALL_ATTRIBUTES",
98                "ALL_PROJECTED_ATTRIBUTES",
99                "COUNT",
100                "SPECIFIC_ATTRIBUTES",
101            ]
102            .contains(&sel.as_str())
103            {
104                errors.push(format!(
105                    "Value '{}' at 'select' failed to satisfy constraint: \
106                     Member must satisfy enum value set: [ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES, COUNT, SPECIFIC_ATTRIBUTES]",
107                    sel
108                ));
109            }
110        }
111
112        // Limit must be >= 1
113        if let Some(limit) = raw.limit {
114            if limit == 0 {
115                errors.push(
116                    "Value '0' at 'Limit' failed to satisfy constraint: \
117                     Member must have value greater than or equal to 1"
118                        .to_string(),
119                );
120            }
121        }
122
123        if let Some(msg) = format_validation_errors(&errors) {
124            return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
125        }
126
127        Ok(ScanRequest {
128            table_name,
129            filter_expression: raw.filter_expression,
130            projection_expression: raw.projection_expression,
131            expression_attribute_names: raw.expression_attribute_names,
132            expression_attribute_values: raw.expression_attribute_values,
133            limit: raw.limit,
134            exclusive_start_key: None,
135            select: raw.select,
136            consistent_read: raw.consistent_read,
137            index_name: raw.index_name,
138            segment: raw.segment,
139            total_segments: raw.total_segments,
140            return_consumed_capacity: raw.return_consumed_capacity,
141            attributes_to_get: raw.attributes_to_get,
142            scan_filter: raw.scan_filter,
143            conditional_operator: raw.conditional_operator,
144            exclusive_start_key_raw: raw.exclusive_start_key,
145        })
146    }
147}
148
149#[derive(Debug, Default, Serialize)]
150pub struct ScanResponse {
151    #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
152    pub items: Option<Vec<Item>>,
153    #[serde(rename = "Count")]
154    pub count: usize,
155    #[serde(rename = "ScannedCount")]
156    pub scanned_count: usize,
157    #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
158    pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
159    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
160    pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
161}
162
163pub fn execute(storage: &Storage, mut request: ScanRequest) -> Result<ScanResponse> {
164    // Validate table name format before checking existence (DynamoDB validates input first)
165    crate::validation::validate_table_name(&request.table_name)?;
166
167    // ---- Expression vs non-expression mixing validation ----
168    {
169        let mut non_expr = Vec::new();
170        let mut expr = Vec::new();
171        if request.attributes_to_get.is_some() {
172            non_expr.push("AttributesToGet");
173        }
174        if request.scan_filter.is_some()
175            && request.scan_filter.as_ref().is_some_and(|v| !v.is_null())
176        {
177            non_expr.push("ScanFilter");
178        }
179        if request.conditional_operator.is_some() {
180            non_expr.push("ConditionalOperator");
181        }
182        if request.projection_expression.is_some() {
183            expr.push("ProjectionExpression");
184        }
185        if request.filter_expression.is_some() {
186            expr.push("FilterExpression");
187        }
188        let no_raw_eav: Option<serde_json::Value> = None;
189        let ctx = helpers::ExpressionParamContext {
190            non_expression_params: non_expr,
191            expression_params: expr,
192            all_expression_param_names: vec!["FilterExpression"],
193            expression_attribute_names: &request.expression_attribute_names,
194            expression_attribute_values: &request.expression_attribute_values,
195            expression_attribute_values_raw: &no_raw_eav,
196        };
197        helpers::validate_expression_params(&ctx)?;
198    }
199
200    // ---- Validate ScanFilter attribute values (before argument counts, matching DynamoDB) ----
201    helpers::validate_filter_conditions_raw(request.scan_filter.as_ref(), "ScanFilter")?;
202
203    // ---- Validate filter argument counts and type compatibility ----
204    helpers::validate_filter_condition_args(request.scan_filter.as_ref())?;
205
206    // ---- Validate duplicate AttributesToGet ----
207    if let Some(ref attrs) = request.attributes_to_get {
208        helpers::validate_attributes_to_get_no_duplicates(attrs)?;
209    }
210
211    // ---- Parse ExclusiveStartKey from JSON value ----
212    let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
213        Some(helpers::parse_exclusive_start_key(esk_val)?)
214    } else {
215        request.exclusive_start_key.clone()
216    };
217
218    // Convert legacy ScanFilter to FilterExpression if no expression is set
219    if request.filter_expression.is_none() {
220        if let Some(ref sf_val) = request.scan_filter {
221            if let Ok(sf) =
222                serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(sf_val.clone())
223            {
224                if !sf.is_empty() {
225                    let converted = helpers::convert_filter_conditions(
226                        &sf,
227                        request.conditional_operator.as_deref(),
228                    )?;
229                    if !converted.expression.is_empty() {
230                        request.filter_expression = Some(converted.expression);
231                        let expr_values = request
232                            .expression_attribute_values
233                            .get_or_insert_with(HashMap::new);
234                        expr_values.extend(converted.attribute_values);
235                        let expr_names = request
236                            .expression_attribute_names
237                            .get_or_insert_with(HashMap::new);
238                        expr_names.extend(converted.attribute_names);
239                    }
240                }
241            }
242        }
243    }
244
245    // Validate parallel scan parameters (before table existence check)
246    match (request.segment, request.total_segments) {
247        (Some(segment), Some(total)) => {
248            if !(1..=1_000_000).contains(&total) {
249                return Err(DynoxideError::ValidationException(
250                    "1 validation error detected: Value at 'totalSegments' failed to satisfy constraint: \
251                     Member must have value between 1 and 1000000".to_string(),
252                ));
253            }
254            if segment >= total {
255                return Err(DynoxideError::ValidationException(format!(
256                    "The Segment parameter is zero-based and must be less than parameter TotalSegments: Segment: {} is not less than TotalSegments: {}",
257                    segment, total
258                )));
259            }
260        }
261        (Some(_), None) => {
262            return Err(DynoxideError::ValidationException(
263                "The TotalSegments parameter is required but was not present in the request when Segment parameter is present".to_string(),
264            ));
265        }
266        (None, Some(_)) => {
267            return Err(DynoxideError::ValidationException(
268                "The Segment parameter is required but was not present in the request when parameter TotalSegments is present".to_string(),
269            ));
270        }
271        (None, None) => {}
272    }
273
274    // ---- Validate FilterExpression and ProjectionExpression BEFORE table existence ----
275    // DynamoDB validates expression syntax before checking if the table exists.
276    if let Some(ref filter_expr_str) = request.filter_expression {
277        if filter_expr_str.is_empty() {
278            // Only report empty if the user explicitly set FilterExpression
279            // (not if it was converted from ScanFilter, which never produces empty)
280            if request.scan_filter.is_none() || request.filter_expression.as_deref() == Some("") {
281                return Err(DynoxideError::ValidationException(
282                    "Invalid FilterExpression: The expression can not be empty;".to_string(),
283                ));
284            }
285        } else {
286            // Try parsing the expression to catch syntax errors early
287            let parsed_fe = expressions::condition::parse(filter_expr_str).map_err(|e| {
288                DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
289            })?;
290            // Validate that all #name references are defined in ExpressionAttributeNames
291            if let Err(e) = expressions::condition::validate_name_refs(
292                &parsed_fe,
293                &request.expression_attribute_names,
294            ) {
295                return Err(DynoxideError::ValidationException(format!(
296                    "Invalid FilterExpression: {e}"
297                )));
298            }
299        }
300    }
301    if let Some(ref proj_expr_str) = request.projection_expression {
302        if proj_expr_str.is_empty() {
303            return Err(DynoxideError::ValidationException(
304                "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
305            ));
306        }
307    }
308
309    // SPECIFIC_ATTRIBUTES requires ProjectionExpression or AttributesToGet
310    if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
311        && request.projection_expression.is_none()
312        && request.attributes_to_get.is_none()
313    {
314        return Err(DynoxideError::ValidationException(
315            "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
316                .to_string(),
317        ));
318    }
319
320    let meta = helpers::require_table_for_item_op(storage, &request.table_name)?;
321    let table_key_schema = helpers::parse_key_schema(&meta)?;
322
323    // Convert legacy AttributesToGet to ProjectionExpression if no expression-based
324    // projection is provided.
325    let legacy_projection = if request.projection_expression.is_none() {
326        request
327            .attributes_to_get
328            .as_ref()
329            .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
330    } else {
331        None
332    };
333
334    // Determine effective key schema (GSI, LSI, or base table)
335    let lsi_keys = request
336        .index_name
337        .as_ref()
338        .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
339    let is_lsi = lsi_keys.is_some();
340
341    // ConsistentRead is not supported on GSIs (LSIs are fine)
342    if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
343        return Err(DynoxideError::ValidationException(
344            "Consistent reads are not supported on global secondary indexes".to_string(),
345        ));
346    }
347
348    let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
349        if let Some(keys) = lsi_keys {
350            keys
351        } else {
352            super::gsi::parse_gsi_key_schema(&meta, index_name)?
353        }
354    } else {
355        (
356            table_key_schema.partition_key.clone(),
357            table_key_schema.sort_key.clone(),
358        )
359    };
360
361    // ---- Validate ExclusiveStartKey structure against key schema ----
362    // Stage 1+2: count check + index key type check
363    if let Some(ref esk) = exclusive_start_key {
364        let count_msg = if request.index_name.is_some() {
365            "The provided starting key is invalid"
366        } else {
367            "The provided starting key is invalid: The provided key element does not match the schema"
368        };
369        helpers::validate_esk_count_and_index_keys(
370            esk,
371            &meta,
372            request.index_name.as_deref(),
373            count_msg,
374        )?;
375    }
376
377    // Check ALL_ATTRIBUTES on global index (between index key check and table key check)
378    if let Some(ref index_name) = request.index_name {
379        if !is_lsi {
380            if let Some(ref select) = request.select {
381                if select == "ALL_ATTRIBUTES" {
382                    // Check if index projection is ALL
383                    let gsi_defs = super::gsi::parse_gsi_defs(&meta)?;
384                    if let Some(gsi) = gsi_defs.iter().find(|g| g.index_name == *index_name) {
385                        if gsi.projection_type != crate::types::ProjectionType::ALL {
386                            return Err(DynoxideError::ValidationException(format!(
387                                "One or more parameter values were invalid: \
388                                 Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
389                                 because its projection type is not ALL",
390                                index_name
391                            )));
392                        }
393                    }
394                }
395            }
396        }
397    }
398
399    // Stage 3: table key type check
400    if let Some(ref esk) = exclusive_start_key {
401        helpers::validate_esk_table_keys(esk, &meta)?;
402    }
403
404    // Extract ExclusiveStartKey pk/sk using effective key names
405    let (start_pk, start_sk) = if let Some(ref esk) = exclusive_start_key {
406        let pk = esk.get(&effective_pk).and_then(|v| v.to_key_string());
407        let sk = if let Some(ref sk_name) = effective_sk {
408            esk.get(sk_name).and_then(|v| v.to_key_string())
409        } else {
410            Some(String::new())
411        };
412        (pk, sk)
413    } else {
414        (None, None)
415    };
416
417    // For index scans (LSI and GSI), extract the base table key from
418    // ExclusiveStartKey for composite cursor pagination. The GSI/LSI
419    // tables have a composite primary key that includes the base table
420    // keys, so the cursor must include them to avoid skipping rows that
421    // share the same index key.
422    let (start_base_pk, start_base_sk) = if is_lsi || request.index_name.is_some() {
423        if let Some(ref esk) = exclusive_start_key {
424            let base_pk = esk
425                .get(&table_key_schema.partition_key)
426                .and_then(|v| v.to_key_string());
427            let base_sk = table_key_schema
428                .sort_key
429                .as_ref()
430                .and_then(|sk_name| esk.get(sk_name))
431                .and_then(|v| v.to_key_string());
432            (base_pk, base_sk)
433        } else {
434            (None, None)
435        }
436    } else {
437        (None, None)
438    };
439
440    // Scan either GSI table or base table
441    let scan_params = crate::storage::ScanParams {
442        limit: request.limit,
443        exclusive_start_pk: start_pk.as_deref(),
444        exclusive_start_sk: start_sk.as_deref(),
445        segment: request.segment,
446        total_segments: request.total_segments,
447        exclusive_start_base_pk: start_base_pk.as_deref(),
448        exclusive_start_base_sk: start_base_sk.as_deref(),
449    };
450    let rows = if let Some(ref index_name) = request.index_name {
451        if is_lsi {
452            storage.scan_lsi_items(&request.table_name, index_name, &scan_params)?
453        } else {
454            storage.scan_gsi_items(&request.table_name, index_name, &scan_params)?
455        }
456    } else {
457        storage.scan_items(&request.table_name, &scan_params)?
458    };
459
460    // Create tracker for unused expression attribute names/values
461    let tracker = crate::expressions::TrackedExpressionAttributes::new(
462        &request.expression_attribute_names,
463        &request.expression_attribute_values,
464    );
465
466    // Parse filter expression if present
467    let filter_expr = request
468        .filter_expression
469        .as_ref()
470        .map(|expr| expressions::condition::parse(expr))
471        .transpose()
472        .map_err(DynoxideError::ValidationException)?;
473
474    // Check for non-scalar key access in FilterExpression
475    if let Some(ref filter) = filter_expr {
476        // Build key attribute lists for non-scalar check
477        let mut base_key_attrs = vec![table_key_schema.partition_key.clone()];
478        if let Some(ref sk) = table_key_schema.sort_key {
479            base_key_attrs.push(sk.clone());
480        }
481        let mut index_key_attrs = Vec::new();
482        if request.index_name.is_some() {
483            if !base_key_attrs.contains(&effective_pk) {
484                index_key_attrs.push(effective_pk.clone());
485            }
486            if let Some(ref sk) = effective_sk {
487                if !base_key_attrs.contains(sk) {
488                    index_key_attrs.push(sk.clone());
489                }
490            }
491        }
492        if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
493            filter,
494            &request.expression_attribute_names,
495            &base_key_attrs,
496            &index_key_attrs,
497        ) {
498            let prefix = if is_index { "IndexKey" } else { "Key" };
499            return Err(DynoxideError::ValidationException(format!(
500                "Key attributes must be scalars; \
501                 list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
502            )));
503        }
504    }
505
506    // Parse projection expression if present; fall back to legacy AttributesToGet
507    let projection = if let Some(ref proj_expr) = request.projection_expression {
508        Some(
509            expressions::projection::parse(proj_expr)
510                .map_err(DynoxideError::ValidationException)?,
511        )
512    } else {
513        legacy_projection.clone()
514    };
515
516    // Pre-register expression references so unused check works even with zero items
517    if let Some(ref filter) = filter_expr {
518        tracker.track_condition_expr(filter);
519    }
520    if let Some(ref proj) = projection {
521        tracker.track_projection_expr(proj);
522    }
523
524    // Untracked variant for the per-item hot loop — tracking already done above
525    let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
526        &request.expression_attribute_names,
527        &request.expression_attribute_values,
528    );
529
530    // Determine if SELECT COUNT
531    let is_count = request
532        .select
533        .as_deref()
534        .map(|s| s.eq_ignore_ascii_case("COUNT"))
535        .unwrap_or(false);
536
537    // Key attribute names for projection (use effective keys for GSI)
538    let mut key_attrs = vec![effective_pk.clone()];
539    if let Some(ref sk) = effective_sk {
540        key_attrs.push(sk.clone());
541    }
542    if request.index_name.is_some() {
543        if !key_attrs.contains(&table_key_schema.partition_key) {
544            key_attrs.push(table_key_schema.partition_key.clone());
545        }
546        if let Some(ref sk) = table_key_schema.sort_key {
547            if !key_attrs.contains(sk) {
548                key_attrs.push(sk.clone());
549            }
550        }
551    }
552
553    let mut items = Vec::new();
554    let mut scanned_count = 0;
555    let mut filtered_count = 0;
556    let mut cumulative_size = 0;
557    let mut last_evaluated_item: Option<Item> = None;
558    let mut truncated_by_size = false;
559
560    for (_pk, _sk, item_json) in &rows {
561        let item: Item = serde_json::from_str(item_json).map_err(|e| {
562            DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
563        })?;
564
565        scanned_count += 1;
566
567        // Check 1MB limit BEFORE filtering — DynamoDB counts all evaluated data
568        // towards the 1MB response size limit, not just items that pass the filter.
569        let item_size = crate::types::item_size(&item);
570        if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
571            truncated_by_size = true;
572            break;
573        }
574        cumulative_size += item_size;
575
576        // Apply filter
577        if let Some(ref filter) = filter_expr {
578            let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
579                .map_err(DynoxideError::ValidationException)?;
580            if !passes {
581                last_evaluated_item = Some(item);
582                continue;
583            }
584        }
585
586        filtered_count += 1;
587
588        // Apply projection -- do NOT auto-include key attributes when the
589        // user explicitly specified ProjectionExpression or AttributesToGet.
590        let result_item = if let Some(ref proj) = projection {
591            let no_keys: &[String] = &[];
592            expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
593                .map_err(DynoxideError::ValidationException)?
594        } else {
595            item.clone()
596        };
597
598        last_evaluated_item = Some(item);
599        if !is_count {
600            items.push(result_item);
601        }
602    }
603
604    // Check for unused expression attribute names/values
605    tracker.check_unused()?;
606
607    let count = if is_count {
608        filtered_count
609    } else {
610        items.len()
611    };
612
613    // Determine LastEvaluatedKey
614    let has_more = truncated_by_size
615        || (request.limit.is_some() && scanned_count >= request.limit.unwrap_or(usize::MAX));
616
617    // For index scans, include the base table primary key in LastEvaluatedKey
618    // alongside the effective (index) keys so the cursor can uniquely identify
619    // the position. For LSIs, include the table sort key. For GSIs, include
620    // both the table partition key and sort key.
621    let is_gsi_scan = request.index_name.is_some() && !is_lsi;
622    let last_evaluated_key = if has_more {
623        last_evaluated_item.map(|item| {
624            let mut key = HashMap::new();
625            if let Some(pk_val) = item.get(&effective_pk) {
626                key.insert(effective_pk.clone(), pk_val.clone());
627            }
628            if let Some(ref sk_name) = effective_sk {
629                if let Some(sk_val) = item.get(sk_name) {
630                    key.insert(sk_name.clone(), sk_val.clone());
631                }
632            }
633            // For LSI scans, add the table sort key if different from the index sort key
634            if is_lsi {
635                if let Some(tsk) = table_key_schema.sort_key.as_deref() {
636                    if !key.contains_key(tsk) {
637                        if let Some(v) = item.get(tsk) {
638                            key.insert(tsk.to_string(), v.clone());
639                        }
640                    }
641                }
642            }
643            // For GSI scans, add the base table primary key (pk and sk)
644            if is_gsi_scan {
645                if !key.contains_key(&table_key_schema.partition_key) {
646                    if let Some(v) = item.get(&table_key_schema.partition_key) {
647                        key.insert(table_key_schema.partition_key.clone(), v.clone());
648                    }
649                }
650                if let Some(ref tsk) = table_key_schema.sort_key {
651                    if !key.contains_key(tsk) {
652                        if let Some(v) = item.get(tsk) {
653                            key.insert(tsk.clone(), v.clone());
654                        }
655                    }
656                }
657            }
658            key
659        })
660    } else {
661        None
662    };
663
664    // Attribute read capacity to the GSI if scanning one
665    let is_gsi = is_gsi_scan;
666    let consistent = request.consistent_read.unwrap_or(false);
667    let consumed_capacity = if is_gsi {
668        let mut gsi_units = std::collections::HashMap::new();
669        gsi_units.insert(
670            request.index_name.as_ref().unwrap().clone(),
671            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
672        );
673        crate::types::consumed_capacity_with_indexes(
674            &request.table_name,
675            0.0,
676            &gsi_units,
677            &request.return_consumed_capacity,
678        )
679    } else {
680        crate::types::consumed_capacity(
681            &request.table_name,
682            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
683            &request.return_consumed_capacity,
684        )
685    };
686
687    Ok(ScanResponse {
688        items: if is_count { None } else { Some(items) },
689        count,
690        scanned_count,
691        last_evaluated_key,
692        consumed_capacity,
693    })
694}