1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage_backend::StorageBackend;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12#[derive(Debug, Default, Deserialize)]
14struct ScanRequestRaw {
15 #[serde(rename = "TableName", default)]
16 table_name: Option<String>,
17 #[serde(rename = "FilterExpression", default)]
18 filter_expression: Option<String>,
19 #[serde(rename = "ProjectionExpression", default)]
20 projection_expression: Option<String>,
21 #[serde(rename = "ExpressionAttributeNames", default)]
22 expression_attribute_names: Option<HashMap<String, String>>,
23 #[serde(rename = "ExpressionAttributeValues", default)]
24 expression_attribute_values: Option<HashMap<String, AttributeValue>>,
25 #[serde(rename = "Limit", default)]
26 limit: Option<usize>,
27 #[serde(rename = "ExclusiveStartKey", default)]
28 exclusive_start_key: Option<serde_json::Value>,
29 #[serde(rename = "Select", default)]
30 select: Option<String>,
31 #[serde(rename = "ConsistentRead", default)]
32 consistent_read: Option<bool>,
33 #[serde(rename = "IndexName", default)]
34 index_name: Option<String>,
35 #[serde(rename = "Segment", default)]
39 segment: Option<i64>,
40 #[serde(rename = "TotalSegments", default)]
41 total_segments: Option<u32>,
42 #[serde(rename = "ReturnConsumedCapacity", default)]
43 return_consumed_capacity: Option<String>,
44 #[serde(rename = "AttributesToGet", default)]
45 attributes_to_get: Option<Vec<String>>,
46 #[serde(rename = "ScanFilter", default)]
47 scan_filter: Option<serde_json::Value>,
48 #[serde(rename = "ConditionalOperator", default)]
49 conditional_operator: Option<String>,
50}
51
52#[derive(Debug, Default)]
53pub struct ScanRequest {
54 pub table_name: String,
55 pub filter_expression: Option<String>,
56 pub projection_expression: Option<String>,
57 pub expression_attribute_names: Option<HashMap<String, String>>,
58 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
59 pub limit: Option<usize>,
60 pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
61 pub select: Option<String>,
62 pub consistent_read: Option<bool>,
63 pub index_name: Option<String>,
64 pub segment: Option<u32>,
65 pub total_segments: Option<u32>,
66 pub return_consumed_capacity: Option<String>,
67 pub attributes_to_get: Option<Vec<String>>,
68 pub scan_filter: Option<serde_json::Value>,
69 pub conditional_operator: Option<String>,
70 pub exclusive_start_key_raw: Option<serde_json::Value>,
73}
74
75impl<'de> serde::Deserialize<'de> for ScanRequest {
76 fn deserialize<D: serde::Deserializer<'de>>(
77 deserializer: D,
78 ) -> std::result::Result<Self, D::Error> {
79 let raw = ScanRequestRaw::deserialize(deserializer)?;
80 use crate::validation::{
81 TableNameContext, format_validation_errors, table_name_constraint_errors,
82 };
83
84 let mut errors = Vec::new();
85 errors.extend(table_name_constraint_errors(
86 raw.table_name.as_deref(),
87 TableNameContext::ReadWrite,
88 ));
89 let table_name = raw.table_name.unwrap_or_default();
90
91 if let Some(ref rcc) = raw.return_consumed_capacity {
93 if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
94 errors.push(format!(
95 "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
96 Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
97 rcc
98 ));
99 }
100 }
101
102 if let Some(ref sel) = raw.select {
104 if ![
105 "ALL_ATTRIBUTES",
106 "ALL_PROJECTED_ATTRIBUTES",
107 "COUNT",
108 "SPECIFIC_ATTRIBUTES",
109 ]
110 .contains(&sel.as_str())
111 {
112 errors.push(format!(
113 "Value '{}' at 'select' failed to satisfy constraint: \
114 Member must satisfy enum value set: [SPECIFIC_ATTRIBUTES, COUNT, ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES]",
115 sel
116 ));
117 }
118 }
119
120 if let Some(limit) = raw.limit {
125 if limit == 0 {
126 errors.push(
127 "Value '0' at 'limit' failed to satisfy constraint: \
128 Member must have value greater than or equal to 1"
129 .to_string(),
130 );
131 }
132 }
133
134 if let Some(segment) = raw.segment {
137 if segment < 0 {
138 errors.push(format!(
139 "Value '{}' at 'segment' failed to satisfy constraint: \
140 Member must have value greater than or equal to 0",
141 segment
142 ));
143 }
144 }
145
146 if let Some(msg) = format_validation_errors(&errors) {
147 return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
148 }
149
150 Ok(ScanRequest {
151 table_name,
152 filter_expression: raw.filter_expression,
153 projection_expression: raw.projection_expression,
154 expression_attribute_names: raw.expression_attribute_names,
155 expression_attribute_values: raw.expression_attribute_values,
156 limit: raw.limit,
157 exclusive_start_key: None,
158 select: raw.select,
159 consistent_read: raw.consistent_read,
160 index_name: raw.index_name,
161 segment: raw.segment.map(|s| s as u32),
162 total_segments: raw.total_segments,
163 return_consumed_capacity: raw.return_consumed_capacity,
164 attributes_to_get: raw.attributes_to_get,
165 scan_filter: raw.scan_filter,
166 conditional_operator: raw.conditional_operator,
167 exclusive_start_key_raw: raw.exclusive_start_key,
168 })
169 }
170}
171
172#[derive(Debug, Default, Serialize)]
173pub struct ScanResponse {
174 #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
175 pub items: Option<Vec<Item>>,
176 #[serde(rename = "Count")]
177 pub count: usize,
178 #[serde(rename = "ScannedCount")]
179 pub scanned_count: usize,
180 #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
181 pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
182 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
183 pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
184}
185
186pub async fn execute<S: StorageBackend>(
187 storage: &S,
188 mut request: ScanRequest,
189) -> Result<ScanResponse> {
190 crate::validation::validate_table_name(&request.table_name)?;
192
193 {
195 let mut non_expr = Vec::new();
196 let mut expr = Vec::new();
197 if request.attributes_to_get.is_some() {
198 non_expr.push("AttributesToGet");
199 }
200 if request.scan_filter.is_some()
201 && request.scan_filter.as_ref().is_some_and(|v| !v.is_null())
202 {
203 non_expr.push("ScanFilter");
204 }
205 if request.conditional_operator.is_some() {
206 non_expr.push("ConditionalOperator");
207 }
208 if request.projection_expression.is_some() {
209 expr.push("ProjectionExpression");
210 }
211 if request.filter_expression.is_some() {
212 expr.push("FilterExpression");
213 }
214 let no_raw_eav: Option<serde_json::Value> = None;
215 let ctx = helpers::ExpressionParamContext {
216 non_expression_params: non_expr,
217 expression_params: expr,
218 all_expression_param_names: vec!["FilterExpression"],
219 expression_attribute_names: &request.expression_attribute_names,
220 expression_attribute_values: &request.expression_attribute_values,
221 expression_attribute_values_raw: &no_raw_eav,
222 };
223 helpers::validate_expression_params(&ctx)?;
224 }
225
226 helpers::validate_filter_conditions_raw(request.scan_filter.as_ref(), "ScanFilter")?;
228
229 helpers::validate_filter_condition_args(request.scan_filter.as_ref())?;
231
232 if let Some(ref attrs) = request.attributes_to_get {
234 helpers::validate_attributes_to_get_no_duplicates(attrs)?;
235 }
236
237 let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
239 Some(helpers::parse_exclusive_start_key(esk_val)?)
240 } else {
241 request.exclusive_start_key.clone()
242 };
243
244 if request.filter_expression.is_none() {
246 if let Some(ref sf_val) = request.scan_filter {
247 if let Ok(sf) =
248 serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(sf_val.clone())
249 {
250 if !sf.is_empty() {
251 let converted = helpers::convert_filter_conditions(
252 &sf,
253 request.conditional_operator.as_deref(),
254 )?;
255 if !converted.expression.is_empty() {
256 request.filter_expression = Some(converted.expression);
257 let expr_values = request
258 .expression_attribute_values
259 .get_or_insert_with(HashMap::new);
260 expr_values.extend(converted.attribute_values);
261 let expr_names = request
262 .expression_attribute_names
263 .get_or_insert_with(HashMap::new);
264 expr_names.extend(converted.attribute_names);
265 }
266 }
267 }
268 }
269 }
270
271 match (request.segment, request.total_segments) {
273 (Some(segment), Some(total)) => {
274 if !(1..=1_000_000).contains(&total) {
275 return Err(DynoxideError::ValidationException(
276 "1 validation error detected: Value at 'totalSegments' failed to satisfy constraint: \
277 Member must have value between 1 and 1000000".to_string(),
278 ));
279 }
280 if segment >= total {
281 return Err(DynoxideError::ValidationException(format!(
282 "The Segment parameter is zero-based and must be less than parameter TotalSegments: Segment: {} is not less than TotalSegments: {}",
283 segment, total
284 )));
285 }
286 }
287 (Some(_), None) => {
288 return Err(DynoxideError::ValidationException(
289 "The TotalSegments parameter is required but was not present in the request when Segment parameter is present".to_string(),
290 ));
291 }
292 (None, Some(_)) => {
293 return Err(DynoxideError::ValidationException(
294 "The Segment parameter is required but was not present in the request when parameter TotalSegments is present".to_string(),
295 ));
296 }
297 (None, None) => {}
298 }
299
300 if let Some(ref filter_expr_str) = request.filter_expression {
303 if filter_expr_str.is_empty() {
304 if request.scan_filter.is_none() || request.filter_expression.as_deref() == Some("") {
307 return Err(DynoxideError::ValidationException(
308 "Invalid FilterExpression: The expression can not be empty;".to_string(),
309 ));
310 }
311 } else {
312 let parsed_fe = expressions::condition::parse(filter_expr_str).map_err(|e| {
314 DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
315 })?;
316 if let Err(e) = expressions::condition::validate_name_refs(
318 &parsed_fe,
319 &request.expression_attribute_names,
320 ) {
321 return Err(DynoxideError::ValidationException(format!(
322 "Invalid FilterExpression: {e}"
323 )));
324 }
325 if let Err(e) = expressions::condition::validate_operand_semantics(
326 &parsed_fe,
327 &request.expression_attribute_names,
328 &request.expression_attribute_values,
329 ) {
330 return Err(DynoxideError::ValidationException(format!(
331 "Invalid FilterExpression: {e}"
332 )));
333 }
334 }
335 }
336 if let Some(ref proj_expr_str) = request.projection_expression {
337 if proj_expr_str.is_empty() {
338 return Err(DynoxideError::ValidationException(
339 "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
340 ));
341 }
342 }
343
344 if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
346 && request.projection_expression.is_none()
347 && request.attributes_to_get.is_none()
348 {
349 return Err(DynoxideError::ValidationException(
350 "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
351 .to_string(),
352 ));
353 }
354
355 if request.projection_expression.is_some() {
357 if let Some(select) = request.select.as_deref() {
358 if select != "SPECIFIC_ATTRIBUTES" {
359 let target = if select == "COUNT" {
361 "only the Count"
362 } else {
363 select
364 };
365 return Err(DynoxideError::ValidationException(format!(
366 "Cannot specify the ProjectionExpression when choosing to get {target}"
367 )));
368 }
369 }
370 }
371
372 if request.select.as_deref() == Some("ALL_PROJECTED_ATTRIBUTES") && request.index_name.is_none()
375 {
376 return Err(DynoxideError::ValidationException(
377 "ALL_PROJECTED_ATTRIBUTES can be used only when Querying using an IndexName"
378 .to_string(),
379 ));
380 }
381
382 let meta = helpers::require_table_for_item_op(storage, &request.table_name).await?;
383 let table_key_schema = helpers::parse_key_schema(&meta)?;
384
385 let legacy_projection = if request.projection_expression.is_none() {
388 request
389 .attributes_to_get
390 .as_ref()
391 .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
392 } else {
393 None
394 };
395
396 let lsi_keys = request
398 .index_name
399 .as_ref()
400 .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
401 let is_lsi = lsi_keys.is_some();
402
403 if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
405 return Err(DynoxideError::ValidationException(
406 "Consistent reads are not supported on global secondary indexes".to_string(),
407 ));
408 }
409
410 let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
411 if let Some(keys) = lsi_keys {
412 keys
413 } else {
414 super::gsi::parse_gsi_key_schema(&meta, index_name)?
415 }
416 } else {
417 (
418 table_key_schema.partition_key.clone(),
419 table_key_schema.sort_key.clone(),
420 )
421 };
422
423 if let Some(ref esk) = exclusive_start_key {
426 let count_msg = if request.index_name.is_some() {
427 "The provided starting key is invalid"
428 } else {
429 "The provided starting key is invalid: The provided key element does not match the schema"
430 };
431 helpers::validate_esk_count_and_index_keys(
432 esk,
433 &meta,
434 request.index_name.as_deref(),
435 count_msg,
436 )?;
437 }
438
439 if let Some(ref index_name) = request.index_name {
441 if !is_lsi {
442 if let Some(ref select) = request.select {
443 if select == "ALL_ATTRIBUTES" {
444 let gsi_defs = super::gsi::parse_gsi_defs(&meta)?;
446 if let Some(gsi) = gsi_defs.iter().find(|g| g.index_name == *index_name) {
447 if gsi.projection_type != crate::types::ProjectionType::ALL {
448 return Err(DynoxideError::ValidationException(format!(
449 "One or more parameter values were invalid: \
450 Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
451 because its projection type is not ALL",
452 index_name
453 )));
454 }
455 }
456 }
457 }
458 }
459 }
460
461 if let Some(ref esk) = exclusive_start_key {
463 helpers::validate_esk_table_keys(esk, &meta)?;
464 }
465
466 let (start_pk, start_sk) = if let Some(ref esk) = exclusive_start_key {
468 let pk = esk.get(&effective_pk).and_then(|v| v.to_key_string());
469 let sk = if let Some(ref sk_name) = effective_sk {
470 esk.get(sk_name).and_then(|v| v.to_key_string())
471 } else {
472 Some(String::new())
473 };
474 (pk, sk)
475 } else {
476 (None, None)
477 };
478
479 let (start_base_pk, start_base_sk) = if is_lsi || request.index_name.is_some() {
485 if let Some(ref esk) = exclusive_start_key {
486 let base_pk = esk
487 .get(&table_key_schema.partition_key)
488 .and_then(|v| v.to_key_string());
489 let base_sk = if let Some(sk_name) = table_key_schema.sort_key.as_ref() {
497 esk.get(sk_name).and_then(|v| v.to_key_string())
498 } else {
499 Some(String::new())
500 };
501 (base_pk, base_sk)
502 } else {
503 (None, None)
504 }
505 } else {
506 (None, None)
507 };
508
509 let scan_params = crate::storage::ScanParams {
511 limit: request.limit,
512 exclusive_start_pk: start_pk.as_deref(),
513 exclusive_start_sk: start_sk.as_deref(),
514 segment: request.segment,
515 total_segments: request.total_segments,
516 exclusive_start_base_pk: start_base_pk.as_deref(),
517 exclusive_start_base_sk: start_base_sk.as_deref(),
518 };
519 let rows = if let Some(ref index_name) = request.index_name {
520 if is_lsi {
521 storage
522 .scan_lsi_items(&request.table_name, index_name, &scan_params)
523 .await?
524 } else {
525 storage
526 .scan_gsi_items(&request.table_name, index_name, &scan_params)
527 .await?
528 }
529 } else {
530 storage
531 .scan_items(&request.table_name, &scan_params)
532 .await?
533 };
534
535 let tracker = crate::expressions::TrackedExpressionAttributes::new(
537 &request.expression_attribute_names,
538 &request.expression_attribute_values,
539 );
540
541 let filter_expr = request
543 .filter_expression
544 .as_ref()
545 .map(|expr| expressions::condition::parse(expr))
546 .transpose()
547 .map_err(DynoxideError::ValidationException)?;
548
549 if let Some(ref filter) = filter_expr {
551 let mut base_key_attrs = vec![table_key_schema.partition_key.clone()];
553 if let Some(ref sk) = table_key_schema.sort_key {
554 base_key_attrs.push(sk.clone());
555 }
556 let mut index_key_attrs = Vec::new();
557 if request.index_name.is_some() {
558 if !base_key_attrs.contains(&effective_pk) {
559 index_key_attrs.push(effective_pk.clone());
560 }
561 if let Some(ref sk) = effective_sk {
562 if !base_key_attrs.contains(sk) {
563 index_key_attrs.push(sk.clone());
564 }
565 }
566 }
567 if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
568 filter,
569 &request.expression_attribute_names,
570 &base_key_attrs,
571 &index_key_attrs,
572 ) {
573 let prefix = if is_index { "IndexKey" } else { "Key" };
574 return Err(DynoxideError::ValidationException(format!(
575 "Key attributes must be scalars; \
576 list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
577 )));
578 }
579 }
580
581 let projection = if let Some(ref proj_expr) = request.projection_expression {
583 Some(
584 expressions::projection::parse(proj_expr)
585 .map_err(DynoxideError::ValidationException)?,
586 )
587 } else {
588 legacy_projection.clone()
589 };
590
591 if let Some(ref filter) = filter_expr {
593 tracker.track_condition_expr(filter);
594 }
595 if let Some(ref proj) = projection {
596 tracker.track_projection_expr(proj);
597 }
598
599 let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
601 &request.expression_attribute_names,
602 &request.expression_attribute_values,
603 );
604
605 let is_count = request
607 .select
608 .as_deref()
609 .map(|s| s.eq_ignore_ascii_case("COUNT"))
610 .unwrap_or(false);
611
612 let mut key_attrs = vec![effective_pk.clone()];
614 if let Some(ref sk) = effective_sk {
615 key_attrs.push(sk.clone());
616 }
617 if request.index_name.is_some() {
618 if !key_attrs.contains(&table_key_schema.partition_key) {
619 key_attrs.push(table_key_schema.partition_key.clone());
620 }
621 if let Some(ref sk) = table_key_schema.sort_key {
622 if !key_attrs.contains(sk) {
623 key_attrs.push(sk.clone());
624 }
625 }
626 }
627
628 let mut items = Vec::new();
629 let mut scanned_count = 0;
630 let mut filtered_count = 0;
631 let mut cumulative_size = 0;
632 let mut last_evaluated_item: Option<Item> = None;
633 let mut truncated_by_size = false;
634
635 for (_pk, _sk, item_json) in &rows {
636 let item: Item = serde_json::from_str(item_json).map_err(|e| {
637 DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
638 })?;
639
640 scanned_count += 1;
641
642 let item_size = crate::types::item_size(&item);
645 if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
646 truncated_by_size = true;
647 break;
648 }
649 cumulative_size += item_size;
650
651 if let Some(ref filter) = filter_expr {
653 let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
654 .map_err(DynoxideError::ValidationException)?;
655 if !passes {
656 last_evaluated_item = Some(item);
657 continue;
658 }
659 }
660
661 filtered_count += 1;
662
663 let result_item = if let Some(ref proj) = projection {
666 let no_keys: &[String] = &[];
667 expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
668 .map_err(DynoxideError::ValidationException)?
669 } else {
670 item.clone()
671 };
672
673 last_evaluated_item = Some(item);
674 if !is_count {
675 items.push(result_item);
676 }
677 }
678
679 tracker.check_unused()?;
681
682 let count = if is_count {
683 filtered_count
684 } else {
685 items.len()
686 };
687
688 let has_more = truncated_by_size
690 || (request.limit.is_some() && scanned_count >= request.limit.unwrap_or(usize::MAX));
691
692 let is_gsi_scan = request.index_name.is_some() && !is_lsi;
697 let last_evaluated_key = if has_more {
698 last_evaluated_item.map(|item| {
699 let mut key = HashMap::new();
700 if let Some(pk_val) = item.get(&effective_pk) {
701 key.insert(effective_pk.clone(), pk_val.clone());
702 }
703 if let Some(ref sk_name) = effective_sk {
704 if let Some(sk_val) = item.get(sk_name) {
705 key.insert(sk_name.clone(), sk_val.clone());
706 }
707 }
708 if is_lsi {
710 if let Some(tsk) = table_key_schema.sort_key.as_deref() {
711 if !key.contains_key(tsk) {
712 if let Some(v) = item.get(tsk) {
713 key.insert(tsk.to_string(), v.clone());
714 }
715 }
716 }
717 }
718 if is_gsi_scan {
720 if !key.contains_key(&table_key_schema.partition_key) {
721 if let Some(v) = item.get(&table_key_schema.partition_key) {
722 key.insert(table_key_schema.partition_key.clone(), v.clone());
723 }
724 }
725 if let Some(ref tsk) = table_key_schema.sort_key {
726 if !key.contains_key(tsk) {
727 if let Some(v) = item.get(tsk) {
728 key.insert(tsk.clone(), v.clone());
729 }
730 }
731 }
732 }
733 key
734 })
735 } else {
736 None
737 };
738
739 let is_gsi = is_gsi_scan;
741 let consistent = request.consistent_read.unwrap_or(false);
742 let consumed_capacity = if is_gsi {
743 let mut gsi_units = std::collections::HashMap::new();
744 gsi_units.insert(
745 request.index_name.as_ref().unwrap().clone(),
746 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
747 );
748 crate::types::consumed_capacity_with_indexes(
749 &request.table_name,
750 0.0,
751 &gsi_units,
752 &request.return_consumed_capacity,
753 )
754 } else {
755 crate::types::consumed_capacity(
756 &request.table_name,
757 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
758 &request.return_consumed_capacity,
759 )
760 };
761
762 Ok(ScanResponse {
763 items: if is_count { None } else { Some(items) },
764 count,
765 scanned_count,
766 last_evaluated_key,
767 consumed_capacity,
768 })
769}