Skip to main content

dynoxide/actions/
scan.rs

1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage_backend::StorageBackend;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// 1MB response size limit for Query/Scan.
10const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12/// Internal deserialization struct for detecting missing fields.
13#[derive(Debug, Default, Deserialize)]
14struct ScanRequestRaw {
15    #[serde(rename = "TableName", default)]
16    table_name: Option<String>,
17    #[serde(rename = "FilterExpression", default)]
18    filter_expression: Option<String>,
19    #[serde(rename = "ProjectionExpression", default)]
20    projection_expression: Option<String>,
21    #[serde(rename = "ExpressionAttributeNames", default)]
22    expression_attribute_names: Option<HashMap<String, String>>,
23    #[serde(rename = "ExpressionAttributeValues", default)]
24    expression_attribute_values: Option<HashMap<String, AttributeValue>>,
25    #[serde(rename = "Limit", default)]
26    limit: Option<usize>,
27    #[serde(rename = "ExclusiveStartKey", default)]
28    exclusive_start_key: Option<serde_json::Value>,
29    #[serde(rename = "Select", default)]
30    select: Option<String>,
31    #[serde(rename = "ConsistentRead", default)]
32    consistent_read: Option<bool>,
33    #[serde(rename = "IndexName", default)]
34    index_name: Option<String>,
35    // Signed so a negative Segment deserialises and can be rejected with the
36    // DynamoDB ValidationException rather than a serde error. Cast to u32 after
37    // the >= 0 check below.
38    #[serde(rename = "Segment", default)]
39    segment: Option<i64>,
40    #[serde(rename = "TotalSegments", default)]
41    total_segments: Option<u32>,
42    #[serde(rename = "ReturnConsumedCapacity", default)]
43    return_consumed_capacity: Option<String>,
44    #[serde(rename = "AttributesToGet", default)]
45    attributes_to_get: Option<Vec<String>>,
46    #[serde(rename = "ScanFilter", default)]
47    scan_filter: Option<serde_json::Value>,
48    #[serde(rename = "ConditionalOperator", default)]
49    conditional_operator: Option<String>,
50}
51
52#[derive(Debug, Default)]
53pub struct ScanRequest {
54    pub table_name: String,
55    pub filter_expression: Option<String>,
56    pub projection_expression: Option<String>,
57    pub expression_attribute_names: Option<HashMap<String, String>>,
58    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
59    pub limit: Option<usize>,
60    pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
61    pub select: Option<String>,
62    pub consistent_read: Option<bool>,
63    pub index_name: Option<String>,
64    pub segment: Option<u32>,
65    pub total_segments: Option<u32>,
66    pub return_consumed_capacity: Option<String>,
67    pub attributes_to_get: Option<Vec<String>>,
68    pub scan_filter: Option<serde_json::Value>,
69    pub conditional_operator: Option<String>,
70    /// Raw JSON for ExclusiveStartKey when deserialized from HTTP request.
71    /// Parsed lazily in `execute()` after other validations run.
72    pub exclusive_start_key_raw: Option<serde_json::Value>,
73}
74
75impl<'de> serde::Deserialize<'de> for ScanRequest {
76    fn deserialize<D: serde::Deserializer<'de>>(
77        deserializer: D,
78    ) -> std::result::Result<Self, D::Error> {
79        let raw = ScanRequestRaw::deserialize(deserializer)?;
80        use crate::validation::{
81            TableNameContext, format_validation_errors, table_name_constraint_errors,
82        };
83
84        let mut errors = Vec::new();
85        errors.extend(table_name_constraint_errors(
86            raw.table_name.as_deref(),
87            TableNameContext::ReadWrite,
88        ));
89        let table_name = raw.table_name.unwrap_or_default();
90
91        // ReturnConsumedCapacity enum
92        if let Some(ref rcc) = raw.return_consumed_capacity {
93            if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
94                errors.push(format!(
95                    "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
96                     Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
97                    rcc
98                ));
99            }
100        }
101
102        // Select enum
103        if let Some(ref sel) = raw.select {
104            if ![
105                "ALL_ATTRIBUTES",
106                "ALL_PROJECTED_ATTRIBUTES",
107                "COUNT",
108                "SPECIFIC_ATTRIBUTES",
109            ]
110            .contains(&sel.as_str())
111            {
112                errors.push(format!(
113                    "Value '{}' at 'select' failed to satisfy constraint: \
114                     Member must satisfy enum value set: [SPECIFIC_ATTRIBUTES, COUNT, ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES]",
115                    sel
116                ));
117            }
118        }
119
120        // Limit must be >= 1.
121        // AWS DynamoDB's Scan message diverges from Query: Scan keeps the rejected
122        // value and lowercases 'limit', whereas Query omits the value and uses
123        // capital 'Limit'. Do not collapse these into a shared helper.
124        if let Some(limit) = raw.limit {
125            if limit == 0 {
126                errors.push(
127                    "Value '0' at 'limit' failed to satisfy constraint: \
128                     Member must have value greater than or equal to 1"
129                        .to_string(),
130                );
131            }
132        }
133
134        // Segment must be >= 0. (Segment vs TotalSegments and the TotalSegments
135        // range are checked in execute().)
136        if let Some(segment) = raw.segment {
137            if segment < 0 {
138                errors.push(format!(
139                    "Value '{}' at 'segment' failed to satisfy constraint: \
140                     Member must have value greater than or equal to 0",
141                    segment
142                ));
143            }
144        }
145
146        if let Some(msg) = format_validation_errors(&errors) {
147            return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
148        }
149
150        Ok(ScanRequest {
151            table_name,
152            filter_expression: raw.filter_expression,
153            projection_expression: raw.projection_expression,
154            expression_attribute_names: raw.expression_attribute_names,
155            expression_attribute_values: raw.expression_attribute_values,
156            limit: raw.limit,
157            exclusive_start_key: None,
158            select: raw.select,
159            consistent_read: raw.consistent_read,
160            index_name: raw.index_name,
161            segment: raw.segment.map(|s| s as u32),
162            total_segments: raw.total_segments,
163            return_consumed_capacity: raw.return_consumed_capacity,
164            attributes_to_get: raw.attributes_to_get,
165            scan_filter: raw.scan_filter,
166            conditional_operator: raw.conditional_operator,
167            exclusive_start_key_raw: raw.exclusive_start_key,
168        })
169    }
170}
171
172#[derive(Debug, Default, Serialize)]
173pub struct ScanResponse {
174    #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
175    pub items: Option<Vec<Item>>,
176    #[serde(rename = "Count")]
177    pub count: usize,
178    #[serde(rename = "ScannedCount")]
179    pub scanned_count: usize,
180    #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
181    pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
182    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
183    pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
184}
185
186pub async fn execute<S: StorageBackend>(
187    storage: &S,
188    mut request: ScanRequest,
189) -> Result<ScanResponse> {
190    // Validate table name format before checking existence (DynamoDB validates input first)
191    crate::validation::validate_table_name(&request.table_name)?;
192
193    // ---- Expression vs non-expression mixing validation ----
194    {
195        let mut non_expr = Vec::new();
196        let mut expr = Vec::new();
197        if request.attributes_to_get.is_some() {
198            non_expr.push("AttributesToGet");
199        }
200        if request.scan_filter.is_some()
201            && request.scan_filter.as_ref().is_some_and(|v| !v.is_null())
202        {
203            non_expr.push("ScanFilter");
204        }
205        if request.conditional_operator.is_some() {
206            non_expr.push("ConditionalOperator");
207        }
208        if request.projection_expression.is_some() {
209            expr.push("ProjectionExpression");
210        }
211        if request.filter_expression.is_some() {
212            expr.push("FilterExpression");
213        }
214        let no_raw_eav: Option<serde_json::Value> = None;
215        let ctx = helpers::ExpressionParamContext {
216            non_expression_params: non_expr,
217            expression_params: expr,
218            all_expression_param_names: vec!["FilterExpression"],
219            expression_attribute_names: &request.expression_attribute_names,
220            expression_attribute_values: &request.expression_attribute_values,
221            expression_attribute_values_raw: &no_raw_eav,
222        };
223        helpers::validate_expression_params(&ctx)?;
224    }
225
226    // ---- Validate ScanFilter attribute values (before argument counts, matching DynamoDB) ----
227    helpers::validate_filter_conditions_raw(request.scan_filter.as_ref(), "ScanFilter")?;
228
229    // ---- Validate filter argument counts and type compatibility ----
230    helpers::validate_filter_condition_args(request.scan_filter.as_ref())?;
231
232    // ---- Validate duplicate AttributesToGet ----
233    if let Some(ref attrs) = request.attributes_to_get {
234        helpers::validate_attributes_to_get_no_duplicates(attrs)?;
235    }
236
237    // ---- Parse ExclusiveStartKey from JSON value ----
238    let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
239        Some(helpers::parse_exclusive_start_key(esk_val)?)
240    } else {
241        request.exclusive_start_key.clone()
242    };
243
244    // Convert legacy ScanFilter to FilterExpression if no expression is set
245    if request.filter_expression.is_none() {
246        if let Some(ref sf_val) = request.scan_filter {
247            if let Ok(sf) =
248                serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(sf_val.clone())
249            {
250                if !sf.is_empty() {
251                    let converted = helpers::convert_filter_conditions(
252                        &sf,
253                        request.conditional_operator.as_deref(),
254                    )?;
255                    if !converted.expression.is_empty() {
256                        request.filter_expression = Some(converted.expression);
257                        let expr_values = request
258                            .expression_attribute_values
259                            .get_or_insert_with(HashMap::new);
260                        expr_values.extend(converted.attribute_values);
261                        let expr_names = request
262                            .expression_attribute_names
263                            .get_or_insert_with(HashMap::new);
264                        expr_names.extend(converted.attribute_names);
265                    }
266                }
267            }
268        }
269    }
270
271    // Validate parallel scan parameters (before table existence check)
272    match (request.segment, request.total_segments) {
273        (Some(segment), Some(total)) => {
274            if !(1..=1_000_000).contains(&total) {
275                return Err(DynoxideError::ValidationException(
276                    "1 validation error detected: Value at 'totalSegments' failed to satisfy constraint: \
277                     Member must have value between 1 and 1000000".to_string(),
278                ));
279            }
280            if segment >= total {
281                return Err(DynoxideError::ValidationException(format!(
282                    "The Segment parameter is zero-based and must be less than parameter TotalSegments: Segment: {} is not less than TotalSegments: {}",
283                    segment, total
284                )));
285            }
286        }
287        (Some(_), None) => {
288            return Err(DynoxideError::ValidationException(
289                "The TotalSegments parameter is required but was not present in the request when Segment parameter is present".to_string(),
290            ));
291        }
292        (None, Some(_)) => {
293            return Err(DynoxideError::ValidationException(
294                "The Segment parameter is required but was not present in the request when parameter TotalSegments is present".to_string(),
295            ));
296        }
297        (None, None) => {}
298    }
299
300    // ---- Validate FilterExpression and ProjectionExpression BEFORE table existence ----
301    // DynamoDB validates expression syntax before checking if the table exists.
302    if let Some(ref filter_expr_str) = request.filter_expression {
303        if filter_expr_str.is_empty() {
304            // Only report empty if the user explicitly set FilterExpression
305            // (not if it was converted from ScanFilter, which never produces empty)
306            if request.scan_filter.is_none() || request.filter_expression.as_deref() == Some("") {
307                return Err(DynoxideError::ValidationException(
308                    "Invalid FilterExpression: The expression can not be empty;".to_string(),
309                ));
310            }
311        } else {
312            // Try parsing the expression to catch syntax errors early
313            let parsed_fe = expressions::condition::parse(filter_expr_str).map_err(|e| {
314                DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
315            })?;
316            // Validate that all #name references are defined in ExpressionAttributeNames
317            if let Err(e) = expressions::condition::validate_name_refs(
318                &parsed_fe,
319                &request.expression_attribute_names,
320            ) {
321                return Err(DynoxideError::ValidationException(format!(
322                    "Invalid FilterExpression: {e}"
323                )));
324            }
325            if let Err(e) = expressions::condition::validate_operand_semantics(
326                &parsed_fe,
327                &request.expression_attribute_names,
328                &request.expression_attribute_values,
329            ) {
330                return Err(DynoxideError::ValidationException(format!(
331                    "Invalid FilterExpression: {e}"
332                )));
333            }
334        }
335    }
336    if let Some(ref proj_expr_str) = request.projection_expression {
337        if proj_expr_str.is_empty() {
338            return Err(DynoxideError::ValidationException(
339                "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
340            ));
341        }
342    }
343
344    // SPECIFIC_ATTRIBUTES requires ProjectionExpression or AttributesToGet
345    if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
346        && request.projection_expression.is_none()
347        && request.attributes_to_get.is_none()
348    {
349        return Err(DynoxideError::ValidationException(
350            "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
351                .to_string(),
352        ));
353    }
354
355    // A ProjectionExpression is only valid with Select = SPECIFIC_ATTRIBUTES.
356    if request.projection_expression.is_some() {
357        if let Some(select) = request.select.as_deref() {
358            if select != "SPECIFIC_ATTRIBUTES" {
359                // AWS phrases COUNT as "only the Count"; other values use the literal.
360                let target = if select == "COUNT" {
361                    "only the Count"
362                } else {
363                    select
364                };
365                return Err(DynoxideError::ValidationException(format!(
366                    "Cannot specify the ProjectionExpression when choosing to get {target}"
367                )));
368            }
369        }
370    }
371
372    // ALL_PROJECTED_ATTRIBUTES projects an index, so it requires an IndexName.
373    // AWS keeps the word "Querying" in this message even for Scan; preserve it verbatim.
374    if request.select.as_deref() == Some("ALL_PROJECTED_ATTRIBUTES") && request.index_name.is_none()
375    {
376        return Err(DynoxideError::ValidationException(
377            "ALL_PROJECTED_ATTRIBUTES can be used only when Querying using an IndexName"
378                .to_string(),
379        ));
380    }
381
382    let meta = helpers::require_table_for_item_op(storage, &request.table_name).await?;
383    let table_key_schema = helpers::parse_key_schema(&meta)?;
384
385    // Convert legacy AttributesToGet to ProjectionExpression if no expression-based
386    // projection is provided.
387    let legacy_projection = if request.projection_expression.is_none() {
388        request
389            .attributes_to_get
390            .as_ref()
391            .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
392    } else {
393        None
394    };
395
396    // Determine effective key schema (GSI, LSI, or base table)
397    let lsi_keys = request
398        .index_name
399        .as_ref()
400        .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
401    let is_lsi = lsi_keys.is_some();
402
403    // ConsistentRead is not supported on GSIs (LSIs are fine)
404    if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
405        return Err(DynoxideError::ValidationException(
406            "Consistent reads are not supported on global secondary indexes".to_string(),
407        ));
408    }
409
410    let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
411        if let Some(keys) = lsi_keys {
412            keys
413        } else {
414            super::gsi::parse_gsi_key_schema(&meta, index_name)?
415        }
416    } else {
417        (
418            table_key_schema.partition_key.clone(),
419            table_key_schema.sort_key.clone(),
420        )
421    };
422
423    // ---- Validate ExclusiveStartKey structure against key schema ----
424    // Stage 1+2: count check + index key type check
425    if let Some(ref esk) = exclusive_start_key {
426        let count_msg = if request.index_name.is_some() {
427            "The provided starting key is invalid"
428        } else {
429            "The provided starting key is invalid: The provided key element does not match the schema"
430        };
431        helpers::validate_esk_count_and_index_keys(
432            esk,
433            &meta,
434            request.index_name.as_deref(),
435            count_msg,
436        )?;
437    }
438
439    // Check ALL_ATTRIBUTES on global index (between index key check and table key check)
440    if let Some(ref index_name) = request.index_name {
441        if !is_lsi {
442            if let Some(ref select) = request.select {
443                if select == "ALL_ATTRIBUTES" {
444                    // Check if index projection is ALL
445                    let gsi_defs = super::gsi::parse_gsi_defs(&meta)?;
446                    if let Some(gsi) = gsi_defs.iter().find(|g| g.index_name == *index_name) {
447                        if gsi.projection_type != crate::types::ProjectionType::ALL {
448                            return Err(DynoxideError::ValidationException(format!(
449                                "One or more parameter values were invalid: \
450                                 Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
451                                 because its projection type is not ALL",
452                                index_name
453                            )));
454                        }
455                    }
456                }
457            }
458        }
459    }
460
461    // Stage 3: table key type check
462    if let Some(ref esk) = exclusive_start_key {
463        helpers::validate_esk_table_keys(esk, &meta)?;
464    }
465
466    // Extract ExclusiveStartKey pk/sk using effective key names
467    let (start_pk, start_sk) = if let Some(ref esk) = exclusive_start_key {
468        let pk = esk.get(&effective_pk).and_then(|v| v.to_key_string());
469        let sk = if let Some(ref sk_name) = effective_sk {
470            esk.get(sk_name).and_then(|v| v.to_key_string())
471        } else {
472            Some(String::new())
473        };
474        (pk, sk)
475    } else {
476        (None, None)
477    };
478
479    // For index scans (LSI and GSI), extract the base table key from
480    // ExclusiveStartKey for composite cursor pagination. The GSI/LSI
481    // tables have a composite primary key that includes the base table
482    // keys, so the cursor must include them to avoid skipping rows that
483    // share the same index key.
484    let (start_base_pk, start_base_sk) = if is_lsi || request.index_name.is_some() {
485        if let Some(ref esk) = exclusive_start_key {
486            let base_pk = esk
487                .get(&table_key_schema.partition_key)
488                .and_then(|v| v.to_key_string());
489            // When the base table is hash-only there is no base sort key, but
490            // the GSI/LSI row stores the empty-string default in its table_sk
491            // column. Default base_sk to "" (mirroring start_sk above) so the
492            // composite cursor keeps its full width and disambiguates tied
493            // index keys by table_pk. Leaving it None collapses scan_gsi_items
494            // to the 2-column (gsi_pk, gsi_sk) cursor, which cannot advance past
495            // rows that share the same index key and silently drops them.
496            let base_sk = if let Some(sk_name) = table_key_schema.sort_key.as_ref() {
497                esk.get(sk_name).and_then(|v| v.to_key_string())
498            } else {
499                Some(String::new())
500            };
501            (base_pk, base_sk)
502        } else {
503            (None, None)
504        }
505    } else {
506        (None, None)
507    };
508
509    // Scan either GSI table or base table
510    let scan_params = crate::storage::ScanParams {
511        limit: request.limit,
512        exclusive_start_pk: start_pk.as_deref(),
513        exclusive_start_sk: start_sk.as_deref(),
514        segment: request.segment,
515        total_segments: request.total_segments,
516        exclusive_start_base_pk: start_base_pk.as_deref(),
517        exclusive_start_base_sk: start_base_sk.as_deref(),
518    };
519    let rows = if let Some(ref index_name) = request.index_name {
520        if is_lsi {
521            storage
522                .scan_lsi_items(&request.table_name, index_name, &scan_params)
523                .await?
524        } else {
525            storage
526                .scan_gsi_items(&request.table_name, index_name, &scan_params)
527                .await?
528        }
529    } else {
530        storage
531            .scan_items(&request.table_name, &scan_params)
532            .await?
533    };
534
535    // Create tracker for unused expression attribute names/values
536    let tracker = crate::expressions::TrackedExpressionAttributes::new(
537        &request.expression_attribute_names,
538        &request.expression_attribute_values,
539    );
540
541    // Parse filter expression if present
542    let filter_expr = request
543        .filter_expression
544        .as_ref()
545        .map(|expr| expressions::condition::parse(expr))
546        .transpose()
547        .map_err(DynoxideError::ValidationException)?;
548
549    // Check for non-scalar key access in FilterExpression
550    if let Some(ref filter) = filter_expr {
551        // Build key attribute lists for non-scalar check
552        let mut base_key_attrs = vec![table_key_schema.partition_key.clone()];
553        if let Some(ref sk) = table_key_schema.sort_key {
554            base_key_attrs.push(sk.clone());
555        }
556        let mut index_key_attrs = Vec::new();
557        if request.index_name.is_some() {
558            if !base_key_attrs.contains(&effective_pk) {
559                index_key_attrs.push(effective_pk.clone());
560            }
561            if let Some(ref sk) = effective_sk {
562                if !base_key_attrs.contains(sk) {
563                    index_key_attrs.push(sk.clone());
564                }
565            }
566        }
567        if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
568            filter,
569            &request.expression_attribute_names,
570            &base_key_attrs,
571            &index_key_attrs,
572        ) {
573            let prefix = if is_index { "IndexKey" } else { "Key" };
574            return Err(DynoxideError::ValidationException(format!(
575                "Key attributes must be scalars; \
576                 list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
577            )));
578        }
579    }
580
581    // Parse projection expression if present; fall back to legacy AttributesToGet
582    let projection = if let Some(ref proj_expr) = request.projection_expression {
583        Some(
584            expressions::projection::parse(proj_expr)
585                .map_err(DynoxideError::ValidationException)?,
586        )
587    } else {
588        legacy_projection.clone()
589    };
590
591    // Pre-register expression references so unused check works even with zero items
592    if let Some(ref filter) = filter_expr {
593        tracker.track_condition_expr(filter);
594    }
595    if let Some(ref proj) = projection {
596        tracker.track_projection_expr(proj);
597    }
598
599    // Untracked variant for the per-item hot loop — tracking already done above
600    let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
601        &request.expression_attribute_names,
602        &request.expression_attribute_values,
603    );
604
605    // Determine if SELECT COUNT
606    let is_count = request
607        .select
608        .as_deref()
609        .map(|s| s.eq_ignore_ascii_case("COUNT"))
610        .unwrap_or(false);
611
612    // Key attribute names for projection (use effective keys for GSI)
613    let mut key_attrs = vec![effective_pk.clone()];
614    if let Some(ref sk) = effective_sk {
615        key_attrs.push(sk.clone());
616    }
617    if request.index_name.is_some() {
618        if !key_attrs.contains(&table_key_schema.partition_key) {
619            key_attrs.push(table_key_schema.partition_key.clone());
620        }
621        if let Some(ref sk) = table_key_schema.sort_key {
622            if !key_attrs.contains(sk) {
623                key_attrs.push(sk.clone());
624            }
625        }
626    }
627
628    let mut items = Vec::new();
629    let mut scanned_count = 0;
630    let mut filtered_count = 0;
631    let mut cumulative_size = 0;
632    let mut last_evaluated_item: Option<Item> = None;
633    let mut truncated_by_size = false;
634
635    for (_pk, _sk, item_json) in &rows {
636        let item: Item = serde_json::from_str(item_json).map_err(|e| {
637            DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
638        })?;
639
640        scanned_count += 1;
641
642        // Check 1MB limit BEFORE filtering — DynamoDB counts all evaluated data
643        // towards the 1MB response size limit, not just items that pass the filter.
644        let item_size = crate::types::item_size(&item);
645        if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
646            truncated_by_size = true;
647            break;
648        }
649        cumulative_size += item_size;
650
651        // Apply filter
652        if let Some(ref filter) = filter_expr {
653            let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
654                .map_err(DynoxideError::ValidationException)?;
655            if !passes {
656                last_evaluated_item = Some(item);
657                continue;
658            }
659        }
660
661        filtered_count += 1;
662
663        // Apply projection -- do NOT auto-include key attributes when the
664        // user explicitly specified ProjectionExpression or AttributesToGet.
665        let result_item = if let Some(ref proj) = projection {
666            let no_keys: &[String] = &[];
667            expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
668                .map_err(DynoxideError::ValidationException)?
669        } else {
670            item.clone()
671        };
672
673        last_evaluated_item = Some(item);
674        if !is_count {
675            items.push(result_item);
676        }
677    }
678
679    // Check for unused expression attribute names/values
680    tracker.check_unused()?;
681
682    let count = if is_count {
683        filtered_count
684    } else {
685        items.len()
686    };
687
688    // Determine LastEvaluatedKey
689    let has_more = truncated_by_size
690        || (request.limit.is_some() && scanned_count >= request.limit.unwrap_or(usize::MAX));
691
692    // For index scans, include the base table primary key in LastEvaluatedKey
693    // alongside the effective (index) keys so the cursor can uniquely identify
694    // the position. For LSIs, include the table sort key. For GSIs, include
695    // both the table partition key and sort key.
696    let is_gsi_scan = request.index_name.is_some() && !is_lsi;
697    let last_evaluated_key = if has_more {
698        last_evaluated_item.map(|item| {
699            let mut key = HashMap::new();
700            if let Some(pk_val) = item.get(&effective_pk) {
701                key.insert(effective_pk.clone(), pk_val.clone());
702            }
703            if let Some(ref sk_name) = effective_sk {
704                if let Some(sk_val) = item.get(sk_name) {
705                    key.insert(sk_name.clone(), sk_val.clone());
706                }
707            }
708            // For LSI scans, add the table sort key if different from the index sort key
709            if is_lsi {
710                if let Some(tsk) = table_key_schema.sort_key.as_deref() {
711                    if !key.contains_key(tsk) {
712                        if let Some(v) = item.get(tsk) {
713                            key.insert(tsk.to_string(), v.clone());
714                        }
715                    }
716                }
717            }
718            // For GSI scans, add the base table primary key (pk and sk)
719            if is_gsi_scan {
720                if !key.contains_key(&table_key_schema.partition_key) {
721                    if let Some(v) = item.get(&table_key_schema.partition_key) {
722                        key.insert(table_key_schema.partition_key.clone(), v.clone());
723                    }
724                }
725                if let Some(ref tsk) = table_key_schema.sort_key {
726                    if !key.contains_key(tsk) {
727                        if let Some(v) = item.get(tsk) {
728                            key.insert(tsk.clone(), v.clone());
729                        }
730                    }
731                }
732            }
733            key
734        })
735    } else {
736        None
737    };
738
739    // Attribute read capacity to the GSI if scanning one
740    let is_gsi = is_gsi_scan;
741    let consistent = request.consistent_read.unwrap_or(false);
742    let consumed_capacity = if is_gsi {
743        let mut gsi_units = std::collections::HashMap::new();
744        gsi_units.insert(
745            request.index_name.as_ref().unwrap().clone(),
746            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
747        );
748        crate::types::consumed_capacity_with_indexes(
749            &request.table_name,
750            0.0,
751            &gsi_units,
752            &request.return_consumed_capacity,
753        )
754    } else {
755        crate::types::consumed_capacity(
756            &request.table_name,
757            crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
758            &request.return_consumed_capacity,
759        )
760    };
761
762    Ok(ScanResponse {
763        items: if is_count { None } else { Some(items) },
764        count,
765        scanned_count,
766        last_evaluated_key,
767        consumed_capacity,
768    })
769}