1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage::Storage;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12#[derive(Debug, Default, Deserialize)]
14struct ScanRequestRaw {
15 #[serde(rename = "TableName", default)]
16 table_name: Option<String>,
17 #[serde(rename = "FilterExpression", default)]
18 filter_expression: Option<String>,
19 #[serde(rename = "ProjectionExpression", default)]
20 projection_expression: Option<String>,
21 #[serde(rename = "ExpressionAttributeNames", default)]
22 expression_attribute_names: Option<HashMap<String, String>>,
23 #[serde(rename = "ExpressionAttributeValues", default)]
24 expression_attribute_values: Option<HashMap<String, AttributeValue>>,
25 #[serde(rename = "Limit", default)]
26 limit: Option<usize>,
27 #[serde(rename = "ExclusiveStartKey", default)]
28 exclusive_start_key: Option<serde_json::Value>,
29 #[serde(rename = "Select", default)]
30 select: Option<String>,
31 #[serde(rename = "ConsistentRead", default)]
32 consistent_read: Option<bool>,
33 #[serde(rename = "IndexName", default)]
34 index_name: Option<String>,
35 #[serde(rename = "Segment", default)]
36 segment: Option<u32>,
37 #[serde(rename = "TotalSegments", default)]
38 total_segments: Option<u32>,
39 #[serde(rename = "ReturnConsumedCapacity", default)]
40 return_consumed_capacity: Option<String>,
41 #[serde(rename = "AttributesToGet", default)]
42 attributes_to_get: Option<Vec<String>>,
43 #[serde(rename = "ScanFilter", default)]
44 scan_filter: Option<serde_json::Value>,
45 #[serde(rename = "ConditionalOperator", default)]
46 conditional_operator: Option<String>,
47}
48
49#[derive(Debug, Default)]
50pub struct ScanRequest {
51 pub table_name: String,
52 pub filter_expression: Option<String>,
53 pub projection_expression: Option<String>,
54 pub expression_attribute_names: Option<HashMap<String, String>>,
55 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
56 pub limit: Option<usize>,
57 pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
58 pub select: Option<String>,
59 pub consistent_read: Option<bool>,
60 pub index_name: Option<String>,
61 pub segment: Option<u32>,
62 pub total_segments: Option<u32>,
63 pub return_consumed_capacity: Option<String>,
64 pub attributes_to_get: Option<Vec<String>>,
65 pub scan_filter: Option<serde_json::Value>,
66 pub conditional_operator: Option<String>,
67 pub exclusive_start_key_raw: Option<serde_json::Value>,
70}
71
72impl<'de> serde::Deserialize<'de> for ScanRequest {
73 fn deserialize<D: serde::Deserializer<'de>>(
74 deserializer: D,
75 ) -> std::result::Result<Self, D::Error> {
76 let raw = ScanRequestRaw::deserialize(deserializer)?;
77 use crate::validation::{format_validation_errors, table_name_constraint_errors};
78
79 let mut errors = Vec::new();
80 errors.extend(table_name_constraint_errors(raw.table_name.as_deref()));
81 let table_name = raw.table_name.unwrap_or_default();
82
83 if let Some(ref rcc) = raw.return_consumed_capacity {
85 if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
86 errors.push(format!(
87 "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
88 Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
89 rcc
90 ));
91 }
92 }
93
94 if let Some(ref sel) = raw.select {
96 if ![
97 "ALL_ATTRIBUTES",
98 "ALL_PROJECTED_ATTRIBUTES",
99 "COUNT",
100 "SPECIFIC_ATTRIBUTES",
101 ]
102 .contains(&sel.as_str())
103 {
104 errors.push(format!(
105 "Value '{}' at 'select' failed to satisfy constraint: \
106 Member must satisfy enum value set: [ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES, COUNT, SPECIFIC_ATTRIBUTES]",
107 sel
108 ));
109 }
110 }
111
112 if let Some(limit) = raw.limit {
114 if limit == 0 {
115 errors.push(
116 "Value '0' at 'Limit' failed to satisfy constraint: \
117 Member must have value greater than or equal to 1"
118 .to_string(),
119 );
120 }
121 }
122
123 if let Some(msg) = format_validation_errors(&errors) {
124 return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
125 }
126
127 Ok(ScanRequest {
128 table_name,
129 filter_expression: raw.filter_expression,
130 projection_expression: raw.projection_expression,
131 expression_attribute_names: raw.expression_attribute_names,
132 expression_attribute_values: raw.expression_attribute_values,
133 limit: raw.limit,
134 exclusive_start_key: None,
135 select: raw.select,
136 consistent_read: raw.consistent_read,
137 index_name: raw.index_name,
138 segment: raw.segment,
139 total_segments: raw.total_segments,
140 return_consumed_capacity: raw.return_consumed_capacity,
141 attributes_to_get: raw.attributes_to_get,
142 scan_filter: raw.scan_filter,
143 conditional_operator: raw.conditional_operator,
144 exclusive_start_key_raw: raw.exclusive_start_key,
145 })
146 }
147}
148
149#[derive(Debug, Default, Serialize)]
150pub struct ScanResponse {
151 #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
152 pub items: Option<Vec<Item>>,
153 #[serde(rename = "Count")]
154 pub count: usize,
155 #[serde(rename = "ScannedCount")]
156 pub scanned_count: usize,
157 #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
158 pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
159 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
160 pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
161}
162
163pub fn execute(storage: &Storage, mut request: ScanRequest) -> Result<ScanResponse> {
164 crate::validation::validate_table_name(&request.table_name)?;
166
167 {
169 let mut non_expr = Vec::new();
170 let mut expr = Vec::new();
171 if request.attributes_to_get.is_some() {
172 non_expr.push("AttributesToGet");
173 }
174 if request.scan_filter.is_some()
175 && request.scan_filter.as_ref().is_some_and(|v| !v.is_null())
176 {
177 non_expr.push("ScanFilter");
178 }
179 if request.conditional_operator.is_some() {
180 non_expr.push("ConditionalOperator");
181 }
182 if request.projection_expression.is_some() {
183 expr.push("ProjectionExpression");
184 }
185 if request.filter_expression.is_some() {
186 expr.push("FilterExpression");
187 }
188 let no_raw_eav: Option<serde_json::Value> = None;
189 let ctx = helpers::ExpressionParamContext {
190 non_expression_params: non_expr,
191 expression_params: expr,
192 all_expression_param_names: vec!["FilterExpression"],
193 expression_attribute_names: &request.expression_attribute_names,
194 expression_attribute_values: &request.expression_attribute_values,
195 expression_attribute_values_raw: &no_raw_eav,
196 };
197 helpers::validate_expression_params(&ctx)?;
198 }
199
200 helpers::validate_filter_conditions_raw(request.scan_filter.as_ref(), "ScanFilter")?;
202
203 helpers::validate_filter_condition_args(request.scan_filter.as_ref())?;
205
206 if let Some(ref attrs) = request.attributes_to_get {
208 helpers::validate_attributes_to_get_no_duplicates(attrs)?;
209 }
210
211 let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
213 Some(helpers::parse_exclusive_start_key(esk_val)?)
214 } else {
215 request.exclusive_start_key.clone()
216 };
217
218 if request.filter_expression.is_none() {
220 if let Some(ref sf_val) = request.scan_filter {
221 if let Ok(sf) =
222 serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(sf_val.clone())
223 {
224 if !sf.is_empty() {
225 let converted = helpers::convert_filter_conditions(
226 &sf,
227 request.conditional_operator.as_deref(),
228 )?;
229 if !converted.expression.is_empty() {
230 request.filter_expression = Some(converted.expression);
231 let expr_values = request
232 .expression_attribute_values
233 .get_or_insert_with(HashMap::new);
234 expr_values.extend(converted.attribute_values);
235 let expr_names = request
236 .expression_attribute_names
237 .get_or_insert_with(HashMap::new);
238 expr_names.extend(converted.attribute_names);
239 }
240 }
241 }
242 }
243 }
244
245 match (request.segment, request.total_segments) {
247 (Some(segment), Some(total)) => {
248 if !(1..=1_000_000).contains(&total) {
249 return Err(DynoxideError::ValidationException(
250 "1 validation error detected: Value at 'totalSegments' failed to satisfy constraint: \
251 Member must have value between 1 and 1000000".to_string(),
252 ));
253 }
254 if segment >= total {
255 return Err(DynoxideError::ValidationException(format!(
256 "The Segment parameter is zero-based and must be less than parameter TotalSegments: Segment: {} is not less than TotalSegments: {}",
257 segment, total
258 )));
259 }
260 }
261 (Some(_), None) => {
262 return Err(DynoxideError::ValidationException(
263 "The TotalSegments parameter is required but was not present in the request when Segment parameter is present".to_string(),
264 ));
265 }
266 (None, Some(_)) => {
267 return Err(DynoxideError::ValidationException(
268 "The Segment parameter is required but was not present in the request when parameter TotalSegments is present".to_string(),
269 ));
270 }
271 (None, None) => {}
272 }
273
274 if let Some(ref filter_expr_str) = request.filter_expression {
277 if filter_expr_str.is_empty() {
278 if request.scan_filter.is_none() || request.filter_expression.as_deref() == Some("") {
281 return Err(DynoxideError::ValidationException(
282 "Invalid FilterExpression: The expression can not be empty;".to_string(),
283 ));
284 }
285 } else {
286 let parsed_fe = expressions::condition::parse(filter_expr_str).map_err(|e| {
288 DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
289 })?;
290 if let Err(e) = expressions::condition::validate_name_refs(
292 &parsed_fe,
293 &request.expression_attribute_names,
294 ) {
295 return Err(DynoxideError::ValidationException(format!(
296 "Invalid FilterExpression: {e}"
297 )));
298 }
299 }
300 }
301 if let Some(ref proj_expr_str) = request.projection_expression {
302 if proj_expr_str.is_empty() {
303 return Err(DynoxideError::ValidationException(
304 "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
305 ));
306 }
307 }
308
309 if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
311 && request.projection_expression.is_none()
312 && request.attributes_to_get.is_none()
313 {
314 return Err(DynoxideError::ValidationException(
315 "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
316 .to_string(),
317 ));
318 }
319
320 let meta = helpers::require_table_for_item_op(storage, &request.table_name)?;
321 let table_key_schema = helpers::parse_key_schema(&meta)?;
322
323 let legacy_projection = if request.projection_expression.is_none() {
326 request
327 .attributes_to_get
328 .as_ref()
329 .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
330 } else {
331 None
332 };
333
334 let lsi_keys = request
336 .index_name
337 .as_ref()
338 .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
339 let is_lsi = lsi_keys.is_some();
340
341 if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
343 return Err(DynoxideError::ValidationException(
344 "Consistent reads are not supported on global secondary indexes".to_string(),
345 ));
346 }
347
348 let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
349 if let Some(keys) = lsi_keys {
350 keys
351 } else {
352 super::gsi::parse_gsi_key_schema(&meta, index_name)?
353 }
354 } else {
355 (
356 table_key_schema.partition_key.clone(),
357 table_key_schema.sort_key.clone(),
358 )
359 };
360
361 if let Some(ref esk) = exclusive_start_key {
364 let count_msg = if request.index_name.is_some() {
365 "The provided starting key is invalid"
366 } else {
367 "The provided starting key is invalid: The provided key element does not match the schema"
368 };
369 helpers::validate_esk_count_and_index_keys(
370 esk,
371 &meta,
372 request.index_name.as_deref(),
373 count_msg,
374 )?;
375 }
376
377 if let Some(ref index_name) = request.index_name {
379 if !is_lsi {
380 if let Some(ref select) = request.select {
381 if select == "ALL_ATTRIBUTES" {
382 let gsi_defs = super::gsi::parse_gsi_defs(&meta)?;
384 if let Some(gsi) = gsi_defs.iter().find(|g| g.index_name == *index_name) {
385 if gsi.projection_type != crate::types::ProjectionType::ALL {
386 return Err(DynoxideError::ValidationException(format!(
387 "One or more parameter values were invalid: \
388 Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
389 because its projection type is not ALL",
390 index_name
391 )));
392 }
393 }
394 }
395 }
396 }
397 }
398
399 if let Some(ref esk) = exclusive_start_key {
401 helpers::validate_esk_table_keys(esk, &meta)?;
402 }
403
404 let (start_pk, start_sk) = if let Some(ref esk) = exclusive_start_key {
406 let pk = esk.get(&effective_pk).and_then(|v| v.to_key_string());
407 let sk = if let Some(ref sk_name) = effective_sk {
408 esk.get(sk_name).and_then(|v| v.to_key_string())
409 } else {
410 Some(String::new())
411 };
412 (pk, sk)
413 } else {
414 (None, None)
415 };
416
417 let (start_base_pk, start_base_sk) = if is_lsi || request.index_name.is_some() {
423 if let Some(ref esk) = exclusive_start_key {
424 let base_pk = esk
425 .get(&table_key_schema.partition_key)
426 .and_then(|v| v.to_key_string());
427 let base_sk = table_key_schema
428 .sort_key
429 .as_ref()
430 .and_then(|sk_name| esk.get(sk_name))
431 .and_then(|v| v.to_key_string());
432 (base_pk, base_sk)
433 } else {
434 (None, None)
435 }
436 } else {
437 (None, None)
438 };
439
440 let scan_params = crate::storage::ScanParams {
442 limit: request.limit,
443 exclusive_start_pk: start_pk.as_deref(),
444 exclusive_start_sk: start_sk.as_deref(),
445 segment: request.segment,
446 total_segments: request.total_segments,
447 exclusive_start_base_pk: start_base_pk.as_deref(),
448 exclusive_start_base_sk: start_base_sk.as_deref(),
449 };
450 let rows = if let Some(ref index_name) = request.index_name {
451 if is_lsi {
452 storage.scan_lsi_items(&request.table_name, index_name, &scan_params)?
453 } else {
454 storage.scan_gsi_items(&request.table_name, index_name, &scan_params)?
455 }
456 } else {
457 storage.scan_items(&request.table_name, &scan_params)?
458 };
459
460 let tracker = crate::expressions::TrackedExpressionAttributes::new(
462 &request.expression_attribute_names,
463 &request.expression_attribute_values,
464 );
465
466 let filter_expr = request
468 .filter_expression
469 .as_ref()
470 .map(|expr| expressions::condition::parse(expr))
471 .transpose()
472 .map_err(DynoxideError::ValidationException)?;
473
474 if let Some(ref filter) = filter_expr {
476 let mut base_key_attrs = vec![table_key_schema.partition_key.clone()];
478 if let Some(ref sk) = table_key_schema.sort_key {
479 base_key_attrs.push(sk.clone());
480 }
481 let mut index_key_attrs = Vec::new();
482 if request.index_name.is_some() {
483 if !base_key_attrs.contains(&effective_pk) {
484 index_key_attrs.push(effective_pk.clone());
485 }
486 if let Some(ref sk) = effective_sk {
487 if !base_key_attrs.contains(sk) {
488 index_key_attrs.push(sk.clone());
489 }
490 }
491 }
492 if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
493 filter,
494 &request.expression_attribute_names,
495 &base_key_attrs,
496 &index_key_attrs,
497 ) {
498 let prefix = if is_index { "IndexKey" } else { "Key" };
499 return Err(DynoxideError::ValidationException(format!(
500 "Key attributes must be scalars; \
501 list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
502 )));
503 }
504 }
505
506 let projection = if let Some(ref proj_expr) = request.projection_expression {
508 Some(
509 expressions::projection::parse(proj_expr)
510 .map_err(DynoxideError::ValidationException)?,
511 )
512 } else {
513 legacy_projection.clone()
514 };
515
516 if let Some(ref filter) = filter_expr {
518 tracker.track_condition_expr(filter);
519 }
520 if let Some(ref proj) = projection {
521 tracker.track_projection_expr(proj);
522 }
523
524 let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
526 &request.expression_attribute_names,
527 &request.expression_attribute_values,
528 );
529
530 let is_count = request
532 .select
533 .as_deref()
534 .map(|s| s.eq_ignore_ascii_case("COUNT"))
535 .unwrap_or(false);
536
537 let mut key_attrs = vec![effective_pk.clone()];
539 if let Some(ref sk) = effective_sk {
540 key_attrs.push(sk.clone());
541 }
542 if request.index_name.is_some() {
543 if !key_attrs.contains(&table_key_schema.partition_key) {
544 key_attrs.push(table_key_schema.partition_key.clone());
545 }
546 if let Some(ref sk) = table_key_schema.sort_key {
547 if !key_attrs.contains(sk) {
548 key_attrs.push(sk.clone());
549 }
550 }
551 }
552
553 let mut items = Vec::new();
554 let mut scanned_count = 0;
555 let mut filtered_count = 0;
556 let mut cumulative_size = 0;
557 let mut last_evaluated_item: Option<Item> = None;
558 let mut truncated_by_size = false;
559
560 for (_pk, _sk, item_json) in &rows {
561 let item: Item = serde_json::from_str(item_json).map_err(|e| {
562 DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
563 })?;
564
565 scanned_count += 1;
566
567 let item_size = crate::types::item_size(&item);
570 if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
571 truncated_by_size = true;
572 break;
573 }
574 cumulative_size += item_size;
575
576 if let Some(ref filter) = filter_expr {
578 let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
579 .map_err(DynoxideError::ValidationException)?;
580 if !passes {
581 last_evaluated_item = Some(item);
582 continue;
583 }
584 }
585
586 filtered_count += 1;
587
588 let result_item = if let Some(ref proj) = projection {
591 let no_keys: &[String] = &[];
592 expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
593 .map_err(DynoxideError::ValidationException)?
594 } else {
595 item.clone()
596 };
597
598 last_evaluated_item = Some(item);
599 if !is_count {
600 items.push(result_item);
601 }
602 }
603
604 tracker.check_unused()?;
606
607 let count = if is_count {
608 filtered_count
609 } else {
610 items.len()
611 };
612
613 let has_more = truncated_by_size
615 || (request.limit.is_some() && scanned_count >= request.limit.unwrap_or(usize::MAX));
616
617 let is_gsi_scan = request.index_name.is_some() && !is_lsi;
622 let last_evaluated_key = if has_more {
623 last_evaluated_item.map(|item| {
624 let mut key = HashMap::new();
625 if let Some(pk_val) = item.get(&effective_pk) {
626 key.insert(effective_pk.clone(), pk_val.clone());
627 }
628 if let Some(ref sk_name) = effective_sk {
629 if let Some(sk_val) = item.get(sk_name) {
630 key.insert(sk_name.clone(), sk_val.clone());
631 }
632 }
633 if is_lsi {
635 if let Some(tsk) = table_key_schema.sort_key.as_deref() {
636 if !key.contains_key(tsk) {
637 if let Some(v) = item.get(tsk) {
638 key.insert(tsk.to_string(), v.clone());
639 }
640 }
641 }
642 }
643 if is_gsi_scan {
645 if !key.contains_key(&table_key_schema.partition_key) {
646 if let Some(v) = item.get(&table_key_schema.partition_key) {
647 key.insert(table_key_schema.partition_key.clone(), v.clone());
648 }
649 }
650 if let Some(ref tsk) = table_key_schema.sort_key {
651 if !key.contains_key(tsk) {
652 if let Some(v) = item.get(tsk) {
653 key.insert(tsk.clone(), v.clone());
654 }
655 }
656 }
657 }
658 key
659 })
660 } else {
661 None
662 };
663
664 let is_gsi = is_gsi_scan;
666 let consistent = request.consistent_read.unwrap_or(false);
667 let consumed_capacity = if is_gsi {
668 let mut gsi_units = std::collections::HashMap::new();
669 gsi_units.insert(
670 request.index_name.as_ref().unwrap().clone(),
671 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
672 );
673 crate::types::consumed_capacity_with_indexes(
674 &request.table_name,
675 0.0,
676 &gsi_units,
677 &request.return_consumed_capacity,
678 )
679 } else {
680 crate::types::consumed_capacity(
681 &request.table_name,
682 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
683 &request.return_consumed_capacity,
684 )
685 };
686
687 Ok(ScanResponse {
688 items: if is_count { None } else { Some(items) },
689 count,
690 scanned_count,
691 last_evaluated_key,
692 consumed_capacity,
693 })
694}