1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage::Storage;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12#[derive(Debug, Default, Deserialize)]
14struct ScanRequestRaw {
15 #[serde(rename = "TableName", default)]
16 table_name: Option<String>,
17 #[serde(rename = "FilterExpression", default)]
18 filter_expression: Option<String>,
19 #[serde(rename = "ProjectionExpression", default)]
20 projection_expression: Option<String>,
21 #[serde(rename = "ExpressionAttributeNames", default)]
22 expression_attribute_names: Option<HashMap<String, String>>,
23 #[serde(rename = "ExpressionAttributeValues", default)]
24 expression_attribute_values: Option<HashMap<String, AttributeValue>>,
25 #[serde(rename = "Limit", default)]
26 limit: Option<usize>,
27 #[serde(rename = "ExclusiveStartKey", default)]
28 exclusive_start_key: Option<serde_json::Value>,
29 #[serde(rename = "Select", default)]
30 select: Option<String>,
31 #[serde(rename = "ConsistentRead", default)]
32 consistent_read: Option<bool>,
33 #[serde(rename = "IndexName", default)]
34 index_name: Option<String>,
35 #[serde(rename = "Segment", default)]
36 segment: Option<u32>,
37 #[serde(rename = "TotalSegments", default)]
38 total_segments: Option<u32>,
39 #[serde(rename = "ReturnConsumedCapacity", default)]
40 return_consumed_capacity: Option<String>,
41 #[serde(rename = "AttributesToGet", default)]
42 attributes_to_get: Option<Vec<String>>,
43 #[serde(rename = "ScanFilter", default)]
44 scan_filter: Option<serde_json::Value>,
45 #[serde(rename = "ConditionalOperator", default)]
46 conditional_operator: Option<String>,
47}
48
49#[derive(Debug, Default)]
50pub struct ScanRequest {
51 pub table_name: String,
52 pub filter_expression: Option<String>,
53 pub projection_expression: Option<String>,
54 pub expression_attribute_names: Option<HashMap<String, String>>,
55 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
56 pub limit: Option<usize>,
57 pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
58 pub select: Option<String>,
59 pub consistent_read: Option<bool>,
60 pub index_name: Option<String>,
61 pub segment: Option<u32>,
62 pub total_segments: Option<u32>,
63 pub return_consumed_capacity: Option<String>,
64 pub attributes_to_get: Option<Vec<String>>,
65 pub scan_filter: Option<serde_json::Value>,
66 pub conditional_operator: Option<String>,
67 pub exclusive_start_key_raw: Option<serde_json::Value>,
70}
71
72impl<'de> serde::Deserialize<'de> for ScanRequest {
73 fn deserialize<D: serde::Deserializer<'de>>(
74 deserializer: D,
75 ) -> std::result::Result<Self, D::Error> {
76 let raw = ScanRequestRaw::deserialize(deserializer)?;
77 use crate::validation::{
78 TableNameContext, format_validation_errors, table_name_constraint_errors,
79 };
80
81 let mut errors = Vec::new();
82 errors.extend(table_name_constraint_errors(
83 raw.table_name.as_deref(),
84 TableNameContext::ReadWrite,
85 ));
86 let table_name = raw.table_name.unwrap_or_default();
87
88 if let Some(ref rcc) = raw.return_consumed_capacity {
90 if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
91 errors.push(format!(
92 "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
93 Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
94 rcc
95 ));
96 }
97 }
98
99 if let Some(ref sel) = raw.select {
101 if ![
102 "ALL_ATTRIBUTES",
103 "ALL_PROJECTED_ATTRIBUTES",
104 "COUNT",
105 "SPECIFIC_ATTRIBUTES",
106 ]
107 .contains(&sel.as_str())
108 {
109 errors.push(format!(
110 "Value '{}' at 'select' failed to satisfy constraint: \
111 Member must satisfy enum value set: [SPECIFIC_ATTRIBUTES, COUNT, ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES]",
112 sel
113 ));
114 }
115 }
116
117 if let Some(limit) = raw.limit {
122 if limit == 0 {
123 errors.push(
124 "Value '0' at 'limit' failed to satisfy constraint: \
125 Member must have value greater than or equal to 1"
126 .to_string(),
127 );
128 }
129 }
130
131 if let Some(msg) = format_validation_errors(&errors) {
132 return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
133 }
134
135 Ok(ScanRequest {
136 table_name,
137 filter_expression: raw.filter_expression,
138 projection_expression: raw.projection_expression,
139 expression_attribute_names: raw.expression_attribute_names,
140 expression_attribute_values: raw.expression_attribute_values,
141 limit: raw.limit,
142 exclusive_start_key: None,
143 select: raw.select,
144 consistent_read: raw.consistent_read,
145 index_name: raw.index_name,
146 segment: raw.segment,
147 total_segments: raw.total_segments,
148 return_consumed_capacity: raw.return_consumed_capacity,
149 attributes_to_get: raw.attributes_to_get,
150 scan_filter: raw.scan_filter,
151 conditional_operator: raw.conditional_operator,
152 exclusive_start_key_raw: raw.exclusive_start_key,
153 })
154 }
155}
156
157#[derive(Debug, Default, Serialize)]
158pub struct ScanResponse {
159 #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
160 pub items: Option<Vec<Item>>,
161 #[serde(rename = "Count")]
162 pub count: usize,
163 #[serde(rename = "ScannedCount")]
164 pub scanned_count: usize,
165 #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
166 pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
167 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
168 pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
169}
170
171pub fn execute(storage: &Storage, mut request: ScanRequest) -> Result<ScanResponse> {
172 crate::validation::validate_table_name(&request.table_name)?;
174
175 {
177 let mut non_expr = Vec::new();
178 let mut expr = Vec::new();
179 if request.attributes_to_get.is_some() {
180 non_expr.push("AttributesToGet");
181 }
182 if request.scan_filter.is_some()
183 && request.scan_filter.as_ref().is_some_and(|v| !v.is_null())
184 {
185 non_expr.push("ScanFilter");
186 }
187 if request.conditional_operator.is_some() {
188 non_expr.push("ConditionalOperator");
189 }
190 if request.projection_expression.is_some() {
191 expr.push("ProjectionExpression");
192 }
193 if request.filter_expression.is_some() {
194 expr.push("FilterExpression");
195 }
196 let no_raw_eav: Option<serde_json::Value> = None;
197 let ctx = helpers::ExpressionParamContext {
198 non_expression_params: non_expr,
199 expression_params: expr,
200 all_expression_param_names: vec!["FilterExpression"],
201 expression_attribute_names: &request.expression_attribute_names,
202 expression_attribute_values: &request.expression_attribute_values,
203 expression_attribute_values_raw: &no_raw_eav,
204 };
205 helpers::validate_expression_params(&ctx)?;
206 }
207
208 helpers::validate_filter_conditions_raw(request.scan_filter.as_ref(), "ScanFilter")?;
210
211 helpers::validate_filter_condition_args(request.scan_filter.as_ref())?;
213
214 if let Some(ref attrs) = request.attributes_to_get {
216 helpers::validate_attributes_to_get_no_duplicates(attrs)?;
217 }
218
219 let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
221 Some(helpers::parse_exclusive_start_key(esk_val)?)
222 } else {
223 request.exclusive_start_key.clone()
224 };
225
226 if request.filter_expression.is_none() {
228 if let Some(ref sf_val) = request.scan_filter {
229 if let Ok(sf) =
230 serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(sf_val.clone())
231 {
232 if !sf.is_empty() {
233 let converted = helpers::convert_filter_conditions(
234 &sf,
235 request.conditional_operator.as_deref(),
236 )?;
237 if !converted.expression.is_empty() {
238 request.filter_expression = Some(converted.expression);
239 let expr_values = request
240 .expression_attribute_values
241 .get_or_insert_with(HashMap::new);
242 expr_values.extend(converted.attribute_values);
243 let expr_names = request
244 .expression_attribute_names
245 .get_or_insert_with(HashMap::new);
246 expr_names.extend(converted.attribute_names);
247 }
248 }
249 }
250 }
251 }
252
253 match (request.segment, request.total_segments) {
255 (Some(segment), Some(total)) => {
256 if !(1..=1_000_000).contains(&total) {
257 return Err(DynoxideError::ValidationException(
258 "1 validation error detected: Value at 'totalSegments' failed to satisfy constraint: \
259 Member must have value between 1 and 1000000".to_string(),
260 ));
261 }
262 if segment >= total {
263 return Err(DynoxideError::ValidationException(format!(
264 "The Segment parameter is zero-based and must be less than parameter TotalSegments: Segment: {} is not less than TotalSegments: {}",
265 segment, total
266 )));
267 }
268 }
269 (Some(_), None) => {
270 return Err(DynoxideError::ValidationException(
271 "The TotalSegments parameter is required but was not present in the request when Segment parameter is present".to_string(),
272 ));
273 }
274 (None, Some(_)) => {
275 return Err(DynoxideError::ValidationException(
276 "The Segment parameter is required but was not present in the request when parameter TotalSegments is present".to_string(),
277 ));
278 }
279 (None, None) => {}
280 }
281
282 if let Some(ref filter_expr_str) = request.filter_expression {
285 if filter_expr_str.is_empty() {
286 if request.scan_filter.is_none() || request.filter_expression.as_deref() == Some("") {
289 return Err(DynoxideError::ValidationException(
290 "Invalid FilterExpression: The expression can not be empty;".to_string(),
291 ));
292 }
293 } else {
294 let parsed_fe = expressions::condition::parse(filter_expr_str).map_err(|e| {
296 DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
297 })?;
298 if let Err(e) = expressions::condition::validate_name_refs(
300 &parsed_fe,
301 &request.expression_attribute_names,
302 ) {
303 return Err(DynoxideError::ValidationException(format!(
304 "Invalid FilterExpression: {e}"
305 )));
306 }
307 }
308 }
309 if let Some(ref proj_expr_str) = request.projection_expression {
310 if proj_expr_str.is_empty() {
311 return Err(DynoxideError::ValidationException(
312 "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
313 ));
314 }
315 }
316
317 if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
319 && request.projection_expression.is_none()
320 && request.attributes_to_get.is_none()
321 {
322 return Err(DynoxideError::ValidationException(
323 "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
324 .to_string(),
325 ));
326 }
327
328 let meta = helpers::require_table_for_item_op(storage, &request.table_name)?;
329 let table_key_schema = helpers::parse_key_schema(&meta)?;
330
331 let legacy_projection = if request.projection_expression.is_none() {
334 request
335 .attributes_to_get
336 .as_ref()
337 .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
338 } else {
339 None
340 };
341
342 let lsi_keys = request
344 .index_name
345 .as_ref()
346 .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
347 let is_lsi = lsi_keys.is_some();
348
349 if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
351 return Err(DynoxideError::ValidationException(
352 "Consistent reads are not supported on global secondary indexes".to_string(),
353 ));
354 }
355
356 let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
357 if let Some(keys) = lsi_keys {
358 keys
359 } else {
360 super::gsi::parse_gsi_key_schema(&meta, index_name)?
361 }
362 } else {
363 (
364 table_key_schema.partition_key.clone(),
365 table_key_schema.sort_key.clone(),
366 )
367 };
368
369 if let Some(ref esk) = exclusive_start_key {
372 let count_msg = if request.index_name.is_some() {
373 "The provided starting key is invalid"
374 } else {
375 "The provided starting key is invalid: The provided key element does not match the schema"
376 };
377 helpers::validate_esk_count_and_index_keys(
378 esk,
379 &meta,
380 request.index_name.as_deref(),
381 count_msg,
382 )?;
383 }
384
385 if let Some(ref index_name) = request.index_name {
387 if !is_lsi {
388 if let Some(ref select) = request.select {
389 if select == "ALL_ATTRIBUTES" {
390 let gsi_defs = super::gsi::parse_gsi_defs(&meta)?;
392 if let Some(gsi) = gsi_defs.iter().find(|g| g.index_name == *index_name) {
393 if gsi.projection_type != crate::types::ProjectionType::ALL {
394 return Err(DynoxideError::ValidationException(format!(
395 "One or more parameter values were invalid: \
396 Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
397 because its projection type is not ALL",
398 index_name
399 )));
400 }
401 }
402 }
403 }
404 }
405 }
406
407 if let Some(ref esk) = exclusive_start_key {
409 helpers::validate_esk_table_keys(esk, &meta)?;
410 }
411
412 let (start_pk, start_sk) = if let Some(ref esk) = exclusive_start_key {
414 let pk = esk.get(&effective_pk).and_then(|v| v.to_key_string());
415 let sk = if let Some(ref sk_name) = effective_sk {
416 esk.get(sk_name).and_then(|v| v.to_key_string())
417 } else {
418 Some(String::new())
419 };
420 (pk, sk)
421 } else {
422 (None, None)
423 };
424
425 let (start_base_pk, start_base_sk) = if is_lsi || request.index_name.is_some() {
431 if let Some(ref esk) = exclusive_start_key {
432 let base_pk = esk
433 .get(&table_key_schema.partition_key)
434 .and_then(|v| v.to_key_string());
435 let base_sk = table_key_schema
436 .sort_key
437 .as_ref()
438 .and_then(|sk_name| esk.get(sk_name))
439 .and_then(|v| v.to_key_string());
440 (base_pk, base_sk)
441 } else {
442 (None, None)
443 }
444 } else {
445 (None, None)
446 };
447
448 let scan_params = crate::storage::ScanParams {
450 limit: request.limit,
451 exclusive_start_pk: start_pk.as_deref(),
452 exclusive_start_sk: start_sk.as_deref(),
453 segment: request.segment,
454 total_segments: request.total_segments,
455 exclusive_start_base_pk: start_base_pk.as_deref(),
456 exclusive_start_base_sk: start_base_sk.as_deref(),
457 };
458 let rows = if let Some(ref index_name) = request.index_name {
459 if is_lsi {
460 storage.scan_lsi_items(&request.table_name, index_name, &scan_params)?
461 } else {
462 storage.scan_gsi_items(&request.table_name, index_name, &scan_params)?
463 }
464 } else {
465 storage.scan_items(&request.table_name, &scan_params)?
466 };
467
468 let tracker = crate::expressions::TrackedExpressionAttributes::new(
470 &request.expression_attribute_names,
471 &request.expression_attribute_values,
472 );
473
474 let filter_expr = request
476 .filter_expression
477 .as_ref()
478 .map(|expr| expressions::condition::parse(expr))
479 .transpose()
480 .map_err(DynoxideError::ValidationException)?;
481
482 if let Some(ref filter) = filter_expr {
484 let mut base_key_attrs = vec![table_key_schema.partition_key.clone()];
486 if let Some(ref sk) = table_key_schema.sort_key {
487 base_key_attrs.push(sk.clone());
488 }
489 let mut index_key_attrs = Vec::new();
490 if request.index_name.is_some() {
491 if !base_key_attrs.contains(&effective_pk) {
492 index_key_attrs.push(effective_pk.clone());
493 }
494 if let Some(ref sk) = effective_sk {
495 if !base_key_attrs.contains(sk) {
496 index_key_attrs.push(sk.clone());
497 }
498 }
499 }
500 if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
501 filter,
502 &request.expression_attribute_names,
503 &base_key_attrs,
504 &index_key_attrs,
505 ) {
506 let prefix = if is_index { "IndexKey" } else { "Key" };
507 return Err(DynoxideError::ValidationException(format!(
508 "Key attributes must be scalars; \
509 list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
510 )));
511 }
512 }
513
514 let projection = if let Some(ref proj_expr) = request.projection_expression {
516 Some(
517 expressions::projection::parse(proj_expr)
518 .map_err(DynoxideError::ValidationException)?,
519 )
520 } else {
521 legacy_projection.clone()
522 };
523
524 if let Some(ref filter) = filter_expr {
526 tracker.track_condition_expr(filter);
527 }
528 if let Some(ref proj) = projection {
529 tracker.track_projection_expr(proj);
530 }
531
532 let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
534 &request.expression_attribute_names,
535 &request.expression_attribute_values,
536 );
537
538 let is_count = request
540 .select
541 .as_deref()
542 .map(|s| s.eq_ignore_ascii_case("COUNT"))
543 .unwrap_or(false);
544
545 let mut key_attrs = vec![effective_pk.clone()];
547 if let Some(ref sk) = effective_sk {
548 key_attrs.push(sk.clone());
549 }
550 if request.index_name.is_some() {
551 if !key_attrs.contains(&table_key_schema.partition_key) {
552 key_attrs.push(table_key_schema.partition_key.clone());
553 }
554 if let Some(ref sk) = table_key_schema.sort_key {
555 if !key_attrs.contains(sk) {
556 key_attrs.push(sk.clone());
557 }
558 }
559 }
560
561 let mut items = Vec::new();
562 let mut scanned_count = 0;
563 let mut filtered_count = 0;
564 let mut cumulative_size = 0;
565 let mut last_evaluated_item: Option<Item> = None;
566 let mut truncated_by_size = false;
567
568 for (_pk, _sk, item_json) in &rows {
569 let item: Item = serde_json::from_str(item_json).map_err(|e| {
570 DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
571 })?;
572
573 scanned_count += 1;
574
575 let item_size = crate::types::item_size(&item);
578 if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
579 truncated_by_size = true;
580 break;
581 }
582 cumulative_size += item_size;
583
584 if let Some(ref filter) = filter_expr {
586 let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
587 .map_err(DynoxideError::ValidationException)?;
588 if !passes {
589 last_evaluated_item = Some(item);
590 continue;
591 }
592 }
593
594 filtered_count += 1;
595
596 let result_item = if let Some(ref proj) = projection {
599 let no_keys: &[String] = &[];
600 expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
601 .map_err(DynoxideError::ValidationException)?
602 } else {
603 item.clone()
604 };
605
606 last_evaluated_item = Some(item);
607 if !is_count {
608 items.push(result_item);
609 }
610 }
611
612 tracker.check_unused()?;
614
615 let count = if is_count {
616 filtered_count
617 } else {
618 items.len()
619 };
620
621 let has_more = truncated_by_size
623 || (request.limit.is_some() && scanned_count >= request.limit.unwrap_or(usize::MAX));
624
625 let is_gsi_scan = request.index_name.is_some() && !is_lsi;
630 let last_evaluated_key = if has_more {
631 last_evaluated_item.map(|item| {
632 let mut key = HashMap::new();
633 if let Some(pk_val) = item.get(&effective_pk) {
634 key.insert(effective_pk.clone(), pk_val.clone());
635 }
636 if let Some(ref sk_name) = effective_sk {
637 if let Some(sk_val) = item.get(sk_name) {
638 key.insert(sk_name.clone(), sk_val.clone());
639 }
640 }
641 if is_lsi {
643 if let Some(tsk) = table_key_schema.sort_key.as_deref() {
644 if !key.contains_key(tsk) {
645 if let Some(v) = item.get(tsk) {
646 key.insert(tsk.to_string(), v.clone());
647 }
648 }
649 }
650 }
651 if is_gsi_scan {
653 if !key.contains_key(&table_key_schema.partition_key) {
654 if let Some(v) = item.get(&table_key_schema.partition_key) {
655 key.insert(table_key_schema.partition_key.clone(), v.clone());
656 }
657 }
658 if let Some(ref tsk) = table_key_schema.sort_key {
659 if !key.contains_key(tsk) {
660 if let Some(v) = item.get(tsk) {
661 key.insert(tsk.clone(), v.clone());
662 }
663 }
664 }
665 }
666 key
667 })
668 } else {
669 None
670 };
671
672 let is_gsi = is_gsi_scan;
674 let consistent = request.consistent_read.unwrap_or(false);
675 let consumed_capacity = if is_gsi {
676 let mut gsi_units = std::collections::HashMap::new();
677 gsi_units.insert(
678 request.index_name.as_ref().unwrap().clone(),
679 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
680 );
681 crate::types::consumed_capacity_with_indexes(
682 &request.table_name,
683 0.0,
684 &gsi_units,
685 &request.return_consumed_capacity,
686 )
687 } else {
688 crate::types::consumed_capacity(
689 &request.table_name,
690 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
691 &request.return_consumed_capacity,
692 )
693 };
694
695 Ok(ScanResponse {
696 items: if is_count { None } else { Some(items) },
697 count,
698 scanned_count,
699 last_evaluated_key,
700 consumed_capacity,
701 })
702}