Skip to main content

dynoxide/actions/
scan.rs

1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage_backend::StorageBackend;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// 1MB response size limit for Query/Scan.
10const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12/// Internal deserialization struct for detecting missing fields.
13#[derive(Debug, Default, Deserialize)]
14struct ScanRequestRaw {
15    #[serde(rename = "TableName", default)]
16    table_name: Option<String>,
17    #[serde(rename = "FilterExpression", default)]
18    filter_expression: Option<String>,
19    #[serde(rename = "ProjectionExpression", default)]
20    projection_expression: Option<String>,
21    #[serde(rename = "ExpressionAttributeNames", default)]
22    expression_attribute_names: Option<HashMap<String, String>>,
23    #[serde(rename = "ExpressionAttributeValues", default)]
24    expression_attribute_values: Option<HashMap<String, AttributeValue>>,
25    #[serde(rename = "Limit", default)]
26    limit: Option<usize>,
27    #[serde(rename = "ExclusiveStartKey", default)]
28    exclusive_start_key: Option<serde_json::Value>,
29    #[serde(rename = "Select", default)]
30    select: Option<String>,
31    #[serde(rename = "ConsistentRead", default)]
32    consistent_read: Option<bool>,
33    #[serde(rename = "IndexName", default)]
34    index_name: Option<String>,
35    // Signed so a negative Segment deserialises and can be rejected with the
36    // DynamoDB ValidationException rather than a serde error. Cast to u32 after
37    // the >= 0 check below.
38    #[serde(rename = "Segment", default)]
39    segment: Option<i64>,
40    #[serde(rename = "TotalSegments", default)]
41    total_segments: Option<u32>,
42    #[serde(rename = "ReturnConsumedCapacity", default)]
43    return_consumed_capacity: Option<String>,
44    #[serde(rename = "AttributesToGet", default)]
45    attributes_to_get: Option<Vec<String>>,
46    #[serde(rename = "ScanFilter", default)]
47    scan_filter: Option<serde_json::Value>,
48    #[serde(rename = "ConditionalOperator", default)]
49    conditional_operator: Option<String>,
50}
51
52#[derive(Debug, Default)]
53pub struct ScanRequest {
54    pub table_name: String,
55    pub filter_expression: Option<String>,
56    pub projection_expression: Option<String>,
57    pub expression_attribute_names: Option<HashMap<String, String>>,
58    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
59    pub limit: Option<usize>,
60    pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
61    pub select: Option<String>,
62    pub consistent_read: Option<bool>,
63    pub index_name: Option<String>,
64    pub segment: Option<u32>,
65    pub total_segments: Option<u32>,
66    pub return_consumed_capacity: Option<String>,
67    pub attributes_to_get: Option<Vec<String>>,
68    pub scan_filter: Option<serde_json::Value>,
69    pub conditional_operator: Option<String>,
70    /// Raw JSON for ExclusiveStartKey when deserialized from HTTP request.
71    /// Parsed lazily in `execute()` after other validations run.
72    pub exclusive_start_key_raw: Option<serde_json::Value>,
73}
74
75impl<'de> serde::Deserialize<'de> for ScanRequest {
76    fn deserialize<D: serde::Deserializer<'de>>(
77        deserializer: D,
78    ) -> std::result::Result<Self, D::Error> {
79        let raw = ScanRequestRaw::deserialize(deserializer)?;
80        use crate::validation::{
81            TableNameContext, format_validation_errors, table_name_constraint_errors,
82        };
83
84        let mut errors = Vec::new();
85        errors.extend(table_name_constraint_errors(
86            raw.table_name.as_deref(),
87            TableNameContext::ReadWrite,
88        ));
89        let table_name = raw.table_name.unwrap_or_default();
90
91        // ReturnConsumedCapacity enum
92        if let Some(ref rcc) = raw.return_consumed_capacity {
93            if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
94                errors.push(format!(
95                    "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
96                     Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
97                    rcc
98                ));
99            }
100        }
101
102        // Select enum
103        if let Some(ref sel) = raw.select {
104            if ![
105                "ALL_ATTRIBUTES",
106                "ALL_PROJECTED_ATTRIBUTES",
107                "COUNT",
108                "SPECIFIC_ATTRIBUTES",
109            ]
110            .contains(&sel.as_str())
111            {
112                errors.push(format!(
113                    "Value '{}' at 'select' failed to satisfy constraint: \
114                     Member must satisfy enum value set: [SPECIFIC_ATTRIBUTES, COUNT, ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES]",
115                    sel
116                ));
117            }
118        }
119
120        // Limit must be >= 1.
121        // AWS DynamoDB's Scan message diverges from Query: Scan keeps the rejected
122        // value and lowercases 'limit', whereas Query omits the value and uses
123        // capital 'Limit'. Do not collapse these into a shared helper.
124        if let Some(limit) = raw.limit {
125            if limit == 0 {
126                errors.push(
127                    "Value '0' at 'limit' failed to satisfy constraint: \
128                     Member must have value greater than or equal to 1"
129                        .to_string(),
130                );
131            }
132        }
133
134        // Segment must be >= 0. (Segment vs TotalSegments and the TotalSegments
135        // range are checked in execute().)
136        if let Some(segment) = raw.segment {
137            if segment < 0 {
138                errors.push(format!(
139                    "Value '{}' at 'segment' failed to satisfy constraint: \
140                     Member must have value greater than or equal to 0",
141                    segment
142                ));
143            }
144        }
145
146        if let Some(msg) = format_validation_errors(&errors) {
147            return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
148        }
149
150        Ok(ScanRequest {
151            table_name,
152            filter_expression: raw.filter_expression,
153            projection_expression: raw.projection_expression,
154            expression_attribute_names: raw.expression_attribute_names,
155            expression_attribute_values: raw.expression_attribute_values,
156            limit: raw.limit,
157            exclusive_start_key: None,
158            select: raw.select,
159            consistent_read: raw.consistent_read,
160            index_name: raw.index_name,
161            segment: raw.segment.map(|s| s as u32),
162            total_segments: raw.total_segments,
163            return_consumed_capacity: raw.return_consumed_capacity,
164            attributes_to_get: raw.attributes_to_get,
165            scan_filter: raw.scan_filter,
166            conditional_operator: raw.conditional_operator,
167            exclusive_start_key_raw: raw.exclusive_start_key,
168        })
169    }
170}
171
172#[derive(Debug, Default, Serialize)]
173pub struct ScanResponse {
174    #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
175    pub items: Option<Vec<Item>>,
176    #[serde(rename = "Count")]
177    pub count: usize,
178    #[serde(rename = "ScannedCount")]
179    pub scanned_count: usize,
180    #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
181    pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
182    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
183    pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
184}
185
186pub async fn execute<S: StorageBackend>(
187    storage: &S,
188    mut request: ScanRequest,
189) -> Result<ScanResponse> {
190    // Validate table name format before checking existence (DynamoDB validates input first)
191    crate::validation::validate_table_name(&request.table_name)?;
192
193    // ---- Expression vs non-expression mixing validation ----
194    {
195        let mut non_expr = Vec::new();
196        let mut expr = Vec::new();
197        if request.attributes_to_get.is_some() {
198            non_expr.push("AttributesToGet");
199        }
200        if request.scan_filter.is_some()
201            && request.scan_filter.as_ref().is_some_and(|v| !v.is_null())
202        {
203            non_expr.push("ScanFilter");
204        }
205        if request.conditional_operator.is_some() {
206            non_expr.push("ConditionalOperator");
207        }
208        if request.projection_expression.is_some() {
209            expr.push("ProjectionExpression");
210        }
211        if request.filter_expression.is_some() {
212            expr.push("FilterExpression");
213        }
214        let no_raw_eav: Option<serde_json::Value> = None;
215        let ctx = helpers::ExpressionParamContext {
216            non_expression_params: non_expr,
217            expression_params: expr,
218            all_expression_param_names: vec!["FilterExpression"],
219            expression_attribute_names: &request.expression_attribute_names,
220            expression_attribute_values: &request.expression_attribute_values,
221            expression_attribute_values_raw: &no_raw_eav,
222        };
223        helpers::validate_expression_params(&ctx)?;
224    }
225
226    // ---- Validate ScanFilter attribute values (before argument counts, matching DynamoDB) ----
227    helpers::validate_filter_conditions_raw(request.scan_filter.as_ref(), "ScanFilter")?;
228
229    // ---- Validate filter argument counts and type compatibility ----
230    helpers::validate_filter_condition_args(request.scan_filter.as_ref())?;
231
232    // ---- Validate duplicate AttributesToGet ----
233    if let Some(ref attrs) = request.attributes_to_get {
234        helpers::validate_attributes_to_get_no_duplicates(attrs)?;
235    }
236
237    // ---- Parse ExclusiveStartKey from JSON value ----
238    let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
239        Some(helpers::parse_exclusive_start_key(esk_val)?)
240    } else {
241        request.exclusive_start_key.clone()
242    };
243
244    // Convert legacy ScanFilter to FilterExpression if no expression is set
245    if request.filter_expression.is_none() {
246        if let Some(ref sf_val) = request.scan_filter {
247            if let Ok(sf) =
248                serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(sf_val.clone())
249            {
250                if !sf.is_empty() {
251                    let converted = helpers::convert_filter_conditions(
252                        &sf,
253                        request.conditional_operator.as_deref(),
254                    )?;
255                    if !converted.expression.is_empty() {
256                        request.filter_expression = Some(converted.expression);
257                        let expr_values = request
258                            .expression_attribute_values
259                            .get_or_insert_with(HashMap::new);
260                        expr_values.extend(converted.attribute_values);
261                        let expr_names = request
262                            .expression_attribute_names
263                            .get_or_insert_with(HashMap::new);
264                        expr_names.extend(converted.attribute_names);
265                    }
266                }
267            }
268        }
269    }
270
271    // Validate parallel scan parameters (before table existence check)
272    match (request.segment, request.total_segments) {
273        (Some(segment), Some(total)) => {
274            if !(1..=1_000_000).contains(&total) {
275                return Err(DynoxideError::ValidationException(
276                    "1 validation error detected: Value at 'totalSegments' failed to satisfy constraint: \
277                     Member must have value between 1 and 1000000".to_string(),
278                ));
279            }
280            if segment >= total {
281                return Err(DynoxideError::ValidationException(format!(
282                    "The Segment parameter is zero-based and must be less than parameter TotalSegments: Segment: {} is not less than TotalSegments: {}",
283                    segment, total
284                )));
285            }
286        }
287        (Some(_), None) => {
288            return Err(DynoxideError::ValidationException(
289                "The TotalSegments parameter is required but was not present in the request when Segment parameter is present".to_string(),
290            ));
291        }
292        (None, Some(_)) => {
293            return Err(DynoxideError::ValidationException(
294                "The Segment parameter is required but was not present in the request when parameter TotalSegments is present".to_string(),
295            ));
296        }
297        (None, None) => {}
298    }
299
300    // ---- Validate FilterExpression and ProjectionExpression BEFORE table existence ----
301    // DynamoDB validates expression syntax before checking if the table exists.
302    if let Some(ref filter_expr_str) = request.filter_expression {
303        if filter_expr_str.is_empty() {
304            // Only report empty if the user explicitly set FilterExpression
305            // (not if it was converted from ScanFilter, which never produces empty)
306            if request.scan_filter.is_none() || request.filter_expression.as_deref() == Some("") {
307                return Err(DynoxideError::ValidationException(
308                    "Invalid FilterExpression: The expression can not be empty;".to_string(),
309                ));
310            }
311        } else {
312            // Try parsing the expression to catch syntax errors early
313            let parsed_fe = expressions::condition::parse(filter_expr_str).map_err(|e| {
314                DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
315            })?;
316            // Validate that all #name references are defined in ExpressionAttributeNames
317            if let Err(e) = expressions::condition::validate_name_refs(
318                &parsed_fe,
319                &request.expression_attribute_names,
320            ) {
321                return Err(DynoxideError::ValidationException(format!(
322                    "Invalid FilterExpression: {e}"
323                )));
324            }
325            if let Err(e) = expressions::condition::validate_operand_semantics(
326                &parsed_fe,
327                &request.expression_attribute_names,
328                &request.expression_attribute_values,
329            ) {
330                return Err(DynoxideError::ValidationException(format!(
331                    "Invalid FilterExpression: {e}"
332                )));
333            }
334        }
335    }
336    if let Some(ref proj_expr_str) = request.projection_expression {
337        if proj_expr_str.is_empty() {
338            return Err(DynoxideError::ValidationException(
339                "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
340            ));
341        }
342    }
343
344    // SPECIFIC_ATTRIBUTES requires ProjectionExpression or AttributesToGet
345    if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
346        && request.projection_expression.is_none()
347        && request.attributes_to_get.is_none()
348    {
349        return Err(DynoxideError::ValidationException(
350            "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
351                .to_string(),
352        ));
353    }
354
355    let meta = helpers::require_table_for_item_op(storage, &request.table_name).await?;
356    let table_key_schema = helpers::parse_key_schema(&meta)?;
357
358    // Convert legacy AttributesToGet to ProjectionExpression if no expression-based
359    // projection is provided.
360    let legacy_projection = if request.projection_expression.is_none() {
361        request
362            .attributes_to_get
363            .as_ref()
364            .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
365    } else {
366        None
367    };
368
369    // Determine effective key schema (GSI, LSI, or base table)
370    let lsi_keys = request
371        .index_name
372        .as_ref()
373        .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
374    let is_lsi = lsi_keys.is_some();
375
376    // ConsistentRead is not supported on GSIs (LSIs are fine)
377    if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
378        return Err(DynoxideError::ValidationException(
379            "Consistent reads are not supported on global secondary indexes".to_string(),
380        ));
381    }
382
383    let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
384        if let Some(keys) = lsi_keys {
385            keys
386        } else {
387            super::gsi::parse_gsi_key_schema(&meta, index_name)?
388        }
389    } else {
390        (
391            table_key_schema.partition_key.clone(),
392            table_key_schema.sort_key.clone(),
393        )
394    };
395
396    // ---- Validate ExclusiveStartKey structure against key schema ----
397    // Stage 1+2: count check + index key type check
398    if let Some(ref esk) = exclusive_start_key {
399        let count_msg = if request.index_name.is_some() {
400            "The provided starting key is invalid"
401        } else {
402            "The provided starting key is invalid: The provided key element does not match the schema"
403        };
404        helpers::validate_esk_count_and_index_keys(
405            esk,
406            &meta,
407            request.index_name.as_deref(),
408            count_msg,
409        )?;
410    }
411
412    // Check ALL_ATTRIBUTES on global index (between index key check and table key check)
413    if let Some(ref index_name) = request.index_name {
414        if !is_lsi {
415            if let Some(ref select) = request.select {
416                if select == "ALL_ATTRIBUTES" {
417                    // Check if index projection is ALL
418                    let gsi_defs = super::gsi::parse_gsi_defs(&meta)?;
419                    if let Some(gsi) = gsi_defs.iter().find(|g| g.index_name == *index_name) {
420                        if gsi.projection_type != crate::types::ProjectionType::ALL {
421                            return Err(DynoxideError::ValidationException(format!(
422                                "One or more parameter values were invalid: \
423                                 Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
424                                 because its projection type is not ALL",
425                                index_name
426                            )));
427                        }
428                    }
429                }
430            }
431        }
432    }
433
434    // Stage 3: table key type check
435    if let Some(ref esk) = exclusive_start_key {
436        helpers::validate_esk_table_keys(esk, &meta)?;
437    }
438
439    // Extract ExclusiveStartKey pk/sk using effective key names
440    let (start_pk, start_sk) = if let Some(ref esk) = exclusive_start_key {
441        let pk = esk.get(&effective_pk).and_then(|v| v.to_key_string());
442        let sk = if let Some(ref sk_name) = effective_sk {
443            esk.get(sk_name).and_then(|v| v.to_key_string())
444        } else {
445            Some(String::new())
446        };
447        (pk, sk)
448    } else {
449        (None, None)
450    };
451
452    // For index scans (LSI and GSI), extract the base table key from
453    // ExclusiveStartKey for composite cursor pagination. The GSI/LSI
454    // tables have a composite primary key that includes the base table
455    // keys, so the cursor must include them to avoid skipping rows that
456    // share the same index key.
457    let (start_base_pk, start_base_sk) = if is_lsi || request.index_name.is_some() {
458        if let Some(ref esk) = exclusive_start_key {
459            let base_pk = esk
460                .get(&table_key_schema.partition_key)
461                .and_then(|v| v.to_key_string());
462            // When the base table is hash-only there is no base sort key, but
463            // the GSI/LSI row stores the empty-string default in its table_sk
464            // column. Default base_sk to "" (mirroring start_sk above) so the
465            // composite cursor keeps its full width and disambiguates tied
466            // index keys by table_pk. Leaving it None collapses scan_gsi_items
467            // to the 2-column (gsi_pk, gsi_sk) cursor, which cannot advance past
468            // rows that share the same index key and silently drops them.
469            let base_sk = if let Some(sk_name) = table_key_schema.sort_key.as_ref() {
470                esk.get(sk_name).and_then(|v| v.to_key_string())
471            } else {
472                Some(String::new())
473            };
474            (base_pk, base_sk)
475        } else {
476            (None, None)
477        }
478    } else {
479        (None, None)
480    };
481
482    // Scan either GSI table or base table
483    let scan_params = crate::storage::ScanParams {
484        limit: request.limit,
485        exclusive_start_pk: start_pk.as_deref(),
486        exclusive_start_sk: start_sk.as_deref(),
487        segment: request.segment,
488        total_segments: request.total_segments,
489        exclusive_start_base_pk: start_base_pk.as_deref(),
490        exclusive_start_base_sk: start_base_sk.as_deref(),
491    };
492    let rows = if let Some(ref index_name) = request.index_name {
493        if is_lsi {
494            storage
495                .scan_lsi_items(&request.table_name, index_name, &scan_params)
496                .await?
497        } else {
498            storage
499                .scan_gsi_items(&request.table_name, index_name, &scan_params)
500                .await?
501        }
502    } else {
503        storage
504            .scan_items(&request.table_name, &scan_params)
505            .await?
506    };
507
508    // Create tracker for unused expression attribute names/values
509    let tracker = crate::expressions::TrackedExpressionAttributes::new(
510        &request.expression_attribute_names,
511        &request.expression_attribute_values,
512    );
513
514    // Parse filter expression if present
515    let filter_expr = request
516        .filter_expression
517        .as_ref()
518        .map(|expr| expressions::condition::parse(expr))
519        .transpose()
520        .map_err(DynoxideError::ValidationException)?;
521
522    // Check for non-scalar key access in FilterExpression
523    if let Some(ref filter) = filter_expr {
524        // Build key attribute lists for non-scalar check
525        let mut base_key_attrs = vec![table_key_schema.partition_key.clone()];
526        if let Some(ref sk) = table_key_schema.sort_key {
527            base_key_attrs.push(sk.clone());
528        }
529        let mut index_key_attrs = Vec::new();
530        if request.index_name.is_some() {
531            if !base_key_attrs.contains(&effective_pk) {
532                index_key_attrs.push(effective_pk.clone());
533            }
534            if let Some(ref sk) = effective_sk {
535                if !base_key_attrs.contains(sk) {
536                    index_key_attrs.push(sk.clone());
537                }
538            }
539        }
540        if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
541            filter,
542            &request.expression_attribute_names,
543            &base_key_attrs,
544            &index_key_attrs,
545        ) {
546            let prefix = if is_index { "IndexKey" } else { "Key" };
547            return Err(DynoxideError::ValidationException(format!(
548                "Key attributes must be scalars; \
549                 list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
550            )));
551        }
552    }
553
554    // Parse projection expression if present; fall back to legacy AttributesToGet
555    let projection = if let Some(ref proj_expr) = request.projection_expression {
556        Some(
557            expressions::projection::parse(proj_expr)
558                .map_err(DynoxideError::ValidationException)?,
559        )
560    } else {
561        legacy_projection.clone()
562    };
563
564    // Pre-register expression references so unused check works even with zero items
565    if let Some(ref filter) = filter_expr {
566        tracker.track_condition_expr(filter);
567    }
568    if let Some(ref proj) = projection {
569        tracker.track_projection_expr(proj);
570    }
571
572    // Untracked variant for the per-item hot loop — tracking already done above
573    let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
574        &request.expression_attribute_names,
575        &request.expression_attribute_values,
576    );
577
578    // Determine if SELECT COUNT
579    let is_count = request
580        .select
581        .as_deref()
582        .map(|s| s.eq_ignore_ascii_case("COUNT"))
583        .unwrap_or(false);
584
585    // Key attribute names for projection (use effective keys for GSI)
586    let mut key_attrs = vec![effective_pk.clone()];
587    if let Some(ref sk) = effective_sk {
588        key_attrs.push(sk.clone());
589    }
590    if request.index_name.is_some() {
591        if !key_attrs.contains(&table_key_schema.partition_key) {
592            key_attrs.push(table_key_schema.partition_key.clone());
593        }
594        if let Some(ref sk) = table_key_schema.sort_key {
595            if !key_attrs.contains(sk) {
596                key_attrs.push(sk.clone());
597            }
598        }
599    }
600
601    let mut items = Vec::new();
602    let mut scanned_count = 0;
603    let mut filtered_count = 0;
604    let mut cumulative_size = 0;
605    let mut last_evaluated_item: Option<Item> = None;
606    let mut truncated_by_size = false;
607
608    for (_pk, _sk, item_json) in &rows {
609        let item: Item = serde_json::from_str(item_json).map_err(|e| {
610            DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
611        })?;
612
613        scanned_count += 1;
614
615        // Check 1MB limit BEFORE filtering — DynamoDB counts all evaluated data
616        // towards the 1MB response size limit, not just items that pass the filter.
617        let item_size = crate::types::item_size(&item);
618        if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
619            truncated_by_size = true;
620            break;
621        }
622        cumulative_size += item_size;
623
624        // Apply filter
625        if let Some(ref filter) = filter_expr {
626            let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
627                .map_err(DynoxideError::ValidationException)?;
628            if !passes {
629                last_evaluated_item = Some(item);
630                continue;
631            }
632        }
633
634        filtered_count += 1;
635
636        // Apply projection -- do NOT auto-include key attributes when the
637        // user explicitly specified ProjectionExpression or AttributesToGet.
638        let result_item = if let Some(ref proj) = projection {
639            let no_keys: &[String] = &[];
640            expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
641                .map_err(DynoxideError::ValidationException)?
642        } else {
643            item.clone()
644        };
645
646        last_evaluated_item = Some(item);
647        if !is_count {
648            items.push(result_item);
649        }
650    }
651
652    // Check for unused expression attribute names/values
653    tracker.check_unused()?;
654
655    let count = if is_count {
656        filtered_count
657    } else {
658        items.len()
659    };
660
661    // Determine LastEvaluatedKey
662    let has_more = truncated_by_size
663        || (request.limit.is_some() && scanned_count >= request.limit.unwrap_or(usize::MAX));
664
665    // For index scans, include the base table primary key in LastEvaluatedKey
666    // alongside the effective (index) keys so the cursor can uniquely identify
667    // the position. For LSIs, include the table sort key. For GSIs, include
668    // both the table partition key and sort key.
669    let is_gsi_scan = request.index_name.is_some() && !is_lsi;
670    let last_evaluated_key = if has_more {
671        last_evaluated_item.map(|item| {
672            let mut key = HashMap::new();
673            if let Some(pk_val) = item.get(&effective_pk) {
674                key.insert(effective_pk.clone(), pk_val.clone());
675            }
676            if let Some(ref sk_name) = effective_sk {
677                if let Some(sk_val) = item.get(sk_name) {
678                    key.insert(sk_name.clone(), sk_val.clone());
679                }
680            }
681            // For LSI scans, add the table sort key if different from the index sort key
682            if is_lsi {
683                if let Some(tsk) = table_key_schema.sort_key.as_deref() {
684                    if !key.contains_key(tsk) {
685                        if let Some(v) = item.get(tsk) {
686                            key.insert(tsk.to_string(), v.clone());
687                        }
688                    }
689                }
690            }
691            // For GSI scans, add the base table primary key (pk and sk)
692            if is_gsi_scan {
693                if !key.contains_key(&table_key_schema.partition_key) {
694                    if let Some(v) = item.get(&table_key_schema.partition_key) {
695                        key.insert(table_key_schema.partition_key.clone(), v.clone());
696                    }
697                }
698                if let Some(ref tsk) = table_key_schema.sort_key {
699                    if !key.contains_key(tsk) {
700                        if let Some(v) = item.get(tsk) {
701                            key.insert(tsk.clone(), v.clone());
702                        }
703                    }
704                }
705            }
706            key
707        })
708    } else {
709        None
710    };
711
712    // Attribute read capacity to the GSI if scanning one
713    let is_gsi = is_gsi_scan;
714    let consistent = request.consistent_read.unwrap_or(false);
715    let consumed_capacity = if is_gsi {
716        let mut gsi_units = std::collections::HashMap::new();
717        gsi_units.insert(
718            request.index_name.as_ref().unwrap().clone(),
719            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
720        );
721        crate::types::consumed_capacity_with_indexes(
722            &request.table_name,
723            0.0,
724            &gsi_units,
725            &request.return_consumed_capacity,
726        )
727    } else {
728        crate::types::consumed_capacity(
729            &request.table_name,
730            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
731            &request.return_consumed_capacity,
732        )
733    };
734
735    Ok(ScanResponse {
736        items: if is_count { None } else { Some(items) },
737        count,
738        scanned_count,
739        last_evaluated_key,
740        consumed_capacity,
741    })
742}