1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage_backend::StorageBackend;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9const MAX_RESPONSE_SIZE: usize = 1_048_576;
11
12#[derive(Debug, Default, Deserialize)]
14struct ScanRequestRaw {
15 #[serde(rename = "TableName", default)]
16 table_name: Option<String>,
17 #[serde(rename = "FilterExpression", default)]
18 filter_expression: Option<String>,
19 #[serde(rename = "ProjectionExpression", default)]
20 projection_expression: Option<String>,
21 #[serde(rename = "ExpressionAttributeNames", default)]
22 expression_attribute_names: Option<HashMap<String, String>>,
23 #[serde(rename = "ExpressionAttributeValues", default)]
24 expression_attribute_values: Option<HashMap<String, AttributeValue>>,
25 #[serde(rename = "Limit", default)]
26 limit: Option<usize>,
27 #[serde(rename = "ExclusiveStartKey", default)]
28 exclusive_start_key: Option<serde_json::Value>,
29 #[serde(rename = "Select", default)]
30 select: Option<String>,
31 #[serde(rename = "ConsistentRead", default)]
32 consistent_read: Option<bool>,
33 #[serde(rename = "IndexName", default)]
34 index_name: Option<String>,
35 #[serde(rename = "Segment", default)]
39 segment: Option<i64>,
40 #[serde(rename = "TotalSegments", default)]
41 total_segments: Option<u32>,
42 #[serde(rename = "ReturnConsumedCapacity", default)]
43 return_consumed_capacity: Option<String>,
44 #[serde(rename = "AttributesToGet", default)]
45 attributes_to_get: Option<Vec<String>>,
46 #[serde(rename = "ScanFilter", default)]
47 scan_filter: Option<serde_json::Value>,
48 #[serde(rename = "ConditionalOperator", default)]
49 conditional_operator: Option<String>,
50}
51
52#[derive(Debug, Default)]
53pub struct ScanRequest {
54 pub table_name: String,
55 pub filter_expression: Option<String>,
56 pub projection_expression: Option<String>,
57 pub expression_attribute_names: Option<HashMap<String, String>>,
58 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
59 pub limit: Option<usize>,
60 pub exclusive_start_key: Option<HashMap<String, AttributeValue>>,
61 pub select: Option<String>,
62 pub consistent_read: Option<bool>,
63 pub index_name: Option<String>,
64 pub segment: Option<u32>,
65 pub total_segments: Option<u32>,
66 pub return_consumed_capacity: Option<String>,
67 pub attributes_to_get: Option<Vec<String>>,
68 pub scan_filter: Option<serde_json::Value>,
69 pub conditional_operator: Option<String>,
70 pub exclusive_start_key_raw: Option<serde_json::Value>,
73}
74
75impl<'de> serde::Deserialize<'de> for ScanRequest {
76 fn deserialize<D: serde::Deserializer<'de>>(
77 deserializer: D,
78 ) -> std::result::Result<Self, D::Error> {
79 let raw = ScanRequestRaw::deserialize(deserializer)?;
80 use crate::validation::{
81 TableNameContext, format_validation_errors, table_name_constraint_errors,
82 };
83
84 let mut errors = Vec::new();
85 errors.extend(table_name_constraint_errors(
86 raw.table_name.as_deref(),
87 TableNameContext::ReadWrite,
88 ));
89 let table_name = raw.table_name.unwrap_or_default();
90
91 if let Some(ref rcc) = raw.return_consumed_capacity {
93 if !["INDEXES", "TOTAL", "NONE"].contains(&rcc.as_str()) {
94 errors.push(format!(
95 "Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: \
96 Member must satisfy enum value set: [INDEXES, TOTAL, NONE]",
97 rcc
98 ));
99 }
100 }
101
102 if let Some(ref sel) = raw.select {
104 if ![
105 "ALL_ATTRIBUTES",
106 "ALL_PROJECTED_ATTRIBUTES",
107 "COUNT",
108 "SPECIFIC_ATTRIBUTES",
109 ]
110 .contains(&sel.as_str())
111 {
112 errors.push(format!(
113 "Value '{}' at 'select' failed to satisfy constraint: \
114 Member must satisfy enum value set: [SPECIFIC_ATTRIBUTES, COUNT, ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES]",
115 sel
116 ));
117 }
118 }
119
120 if let Some(limit) = raw.limit {
125 if limit == 0 {
126 errors.push(
127 "Value '0' at 'limit' failed to satisfy constraint: \
128 Member must have value greater than or equal to 1"
129 .to_string(),
130 );
131 }
132 }
133
134 if let Some(segment) = raw.segment {
137 if segment < 0 {
138 errors.push(format!(
139 "Value '{}' at 'segment' failed to satisfy constraint: \
140 Member must have value greater than or equal to 0",
141 segment
142 ));
143 }
144 }
145
146 if let Some(msg) = format_validation_errors(&errors) {
147 return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
148 }
149
150 Ok(ScanRequest {
151 table_name,
152 filter_expression: raw.filter_expression,
153 projection_expression: raw.projection_expression,
154 expression_attribute_names: raw.expression_attribute_names,
155 expression_attribute_values: raw.expression_attribute_values,
156 limit: raw.limit,
157 exclusive_start_key: None,
158 select: raw.select,
159 consistent_read: raw.consistent_read,
160 index_name: raw.index_name,
161 segment: raw.segment.map(|s| s as u32),
162 total_segments: raw.total_segments,
163 return_consumed_capacity: raw.return_consumed_capacity,
164 attributes_to_get: raw.attributes_to_get,
165 scan_filter: raw.scan_filter,
166 conditional_operator: raw.conditional_operator,
167 exclusive_start_key_raw: raw.exclusive_start_key,
168 })
169 }
170}
171
172#[derive(Debug, Default, Serialize)]
173pub struct ScanResponse {
174 #[serde(rename = "Items", skip_serializing_if = "Option::is_none")]
175 pub items: Option<Vec<Item>>,
176 #[serde(rename = "Count")]
177 pub count: usize,
178 #[serde(rename = "ScannedCount")]
179 pub scanned_count: usize,
180 #[serde(rename = "LastEvaluatedKey", skip_serializing_if = "Option::is_none")]
181 pub last_evaluated_key: Option<HashMap<String, AttributeValue>>,
182 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
183 pub consumed_capacity: Option<crate::types::ConsumedCapacity>,
184}
185
186pub async fn execute<S: StorageBackend>(
187 storage: &S,
188 mut request: ScanRequest,
189) -> Result<ScanResponse> {
190 crate::validation::validate_table_name(&request.table_name)?;
192
193 {
195 let mut non_expr = Vec::new();
196 let mut expr = Vec::new();
197 if request.attributes_to_get.is_some() {
198 non_expr.push("AttributesToGet");
199 }
200 if request.scan_filter.is_some()
201 && request.scan_filter.as_ref().is_some_and(|v| !v.is_null())
202 {
203 non_expr.push("ScanFilter");
204 }
205 if request.conditional_operator.is_some() {
206 non_expr.push("ConditionalOperator");
207 }
208 if request.projection_expression.is_some() {
209 expr.push("ProjectionExpression");
210 }
211 if request.filter_expression.is_some() {
212 expr.push("FilterExpression");
213 }
214 let no_raw_eav: Option<serde_json::Value> = None;
215 let ctx = helpers::ExpressionParamContext {
216 non_expression_params: non_expr,
217 expression_params: expr,
218 all_expression_param_names: vec!["FilterExpression"],
219 expression_attribute_names: &request.expression_attribute_names,
220 expression_attribute_values: &request.expression_attribute_values,
221 expression_attribute_values_raw: &no_raw_eav,
222 };
223 helpers::validate_expression_params(&ctx)?;
224 }
225
226 helpers::validate_filter_conditions_raw(request.scan_filter.as_ref(), "ScanFilter")?;
228
229 helpers::validate_filter_condition_args(request.scan_filter.as_ref())?;
231
232 if let Some(ref attrs) = request.attributes_to_get {
234 helpers::validate_attributes_to_get_no_duplicates(attrs)?;
235 }
236
237 let exclusive_start_key = if let Some(ref esk_val) = request.exclusive_start_key_raw {
239 Some(helpers::parse_exclusive_start_key(esk_val)?)
240 } else {
241 request.exclusive_start_key.clone()
242 };
243
244 if request.filter_expression.is_none() {
246 if let Some(ref sf_val) = request.scan_filter {
247 if let Ok(sf) =
248 serde_json::from_value::<HashMap<String, helpers::FilterCondition>>(sf_val.clone())
249 {
250 if !sf.is_empty() {
251 let converted = helpers::convert_filter_conditions(
252 &sf,
253 request.conditional_operator.as_deref(),
254 )?;
255 if !converted.expression.is_empty() {
256 request.filter_expression = Some(converted.expression);
257 let expr_values = request
258 .expression_attribute_values
259 .get_or_insert_with(HashMap::new);
260 expr_values.extend(converted.attribute_values);
261 let expr_names = request
262 .expression_attribute_names
263 .get_or_insert_with(HashMap::new);
264 expr_names.extend(converted.attribute_names);
265 }
266 }
267 }
268 }
269 }
270
271 match (request.segment, request.total_segments) {
273 (Some(segment), Some(total)) => {
274 if !(1..=1_000_000).contains(&total) {
275 return Err(DynoxideError::ValidationException(
276 "1 validation error detected: Value at 'totalSegments' failed to satisfy constraint: \
277 Member must have value between 1 and 1000000".to_string(),
278 ));
279 }
280 if segment >= total {
281 return Err(DynoxideError::ValidationException(format!(
282 "The Segment parameter is zero-based and must be less than parameter TotalSegments: Segment: {} is not less than TotalSegments: {}",
283 segment, total
284 )));
285 }
286 }
287 (Some(_), None) => {
288 return Err(DynoxideError::ValidationException(
289 "The TotalSegments parameter is required but was not present in the request when Segment parameter is present".to_string(),
290 ));
291 }
292 (None, Some(_)) => {
293 return Err(DynoxideError::ValidationException(
294 "The Segment parameter is required but was not present in the request when parameter TotalSegments is present".to_string(),
295 ));
296 }
297 (None, None) => {}
298 }
299
300 if let Some(ref filter_expr_str) = request.filter_expression {
303 if filter_expr_str.is_empty() {
304 if request.scan_filter.is_none() || request.filter_expression.as_deref() == Some("") {
307 return Err(DynoxideError::ValidationException(
308 "Invalid FilterExpression: The expression can not be empty;".to_string(),
309 ));
310 }
311 } else {
312 let parsed_fe = expressions::condition::parse(filter_expr_str).map_err(|e| {
314 DynoxideError::ValidationException(format!("Invalid FilterExpression: {e}"))
315 })?;
316 if let Err(e) = expressions::condition::validate_name_refs(
318 &parsed_fe,
319 &request.expression_attribute_names,
320 ) {
321 return Err(DynoxideError::ValidationException(format!(
322 "Invalid FilterExpression: {e}"
323 )));
324 }
325 if let Err(e) = expressions::condition::validate_operand_semantics(
326 &parsed_fe,
327 &request.expression_attribute_names,
328 &request.expression_attribute_values,
329 ) {
330 return Err(DynoxideError::ValidationException(format!(
331 "Invalid FilterExpression: {e}"
332 )));
333 }
334 }
335 }
336 if let Some(ref proj_expr_str) = request.projection_expression {
337 if proj_expr_str.is_empty() {
338 return Err(DynoxideError::ValidationException(
339 "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
340 ));
341 }
342 }
343
344 if request.select.as_deref() == Some("SPECIFIC_ATTRIBUTES")
346 && request.projection_expression.is_none()
347 && request.attributes_to_get.is_none()
348 {
349 return Err(DynoxideError::ValidationException(
350 "SPECIFIC_ATTRIBUTES requires either ProjectionExpression or AttributesToGet"
351 .to_string(),
352 ));
353 }
354
355 let meta = helpers::require_table_for_item_op(storage, &request.table_name).await?;
356 let table_key_schema = helpers::parse_key_schema(&meta)?;
357
358 let legacy_projection = if request.projection_expression.is_none() {
361 request
362 .attributes_to_get
363 .as_ref()
364 .map(|attrs| helpers::attributes_to_get_to_projection(attrs))
365 } else {
366 None
367 };
368
369 let lsi_keys = request
371 .index_name
372 .as_ref()
373 .and_then(|idx| super::lsi::parse_lsi_key_schema(&meta, idx).ok());
374 let is_lsi = lsi_keys.is_some();
375
376 if request.consistent_read.unwrap_or(false) && request.index_name.is_some() && !is_lsi {
378 return Err(DynoxideError::ValidationException(
379 "Consistent reads are not supported on global secondary indexes".to_string(),
380 ));
381 }
382
383 let (effective_pk, effective_sk) = if let Some(ref index_name) = request.index_name {
384 if let Some(keys) = lsi_keys {
385 keys
386 } else {
387 super::gsi::parse_gsi_key_schema(&meta, index_name)?
388 }
389 } else {
390 (
391 table_key_schema.partition_key.clone(),
392 table_key_schema.sort_key.clone(),
393 )
394 };
395
396 if let Some(ref esk) = exclusive_start_key {
399 let count_msg = if request.index_name.is_some() {
400 "The provided starting key is invalid"
401 } else {
402 "The provided starting key is invalid: The provided key element does not match the schema"
403 };
404 helpers::validate_esk_count_and_index_keys(
405 esk,
406 &meta,
407 request.index_name.as_deref(),
408 count_msg,
409 )?;
410 }
411
412 if let Some(ref index_name) = request.index_name {
414 if !is_lsi {
415 if let Some(ref select) = request.select {
416 if select == "ALL_ATTRIBUTES" {
417 let gsi_defs = super::gsi::parse_gsi_defs(&meta)?;
419 if let Some(gsi) = gsi_defs.iter().find(|g| g.index_name == *index_name) {
420 if gsi.projection_type != crate::types::ProjectionType::ALL {
421 return Err(DynoxideError::ValidationException(format!(
422 "One or more parameter values were invalid: \
423 Select type ALL_ATTRIBUTES is not supported for global secondary index {} \
424 because its projection type is not ALL",
425 index_name
426 )));
427 }
428 }
429 }
430 }
431 }
432 }
433
434 if let Some(ref esk) = exclusive_start_key {
436 helpers::validate_esk_table_keys(esk, &meta)?;
437 }
438
439 let (start_pk, start_sk) = if let Some(ref esk) = exclusive_start_key {
441 let pk = esk.get(&effective_pk).and_then(|v| v.to_key_string());
442 let sk = if let Some(ref sk_name) = effective_sk {
443 esk.get(sk_name).and_then(|v| v.to_key_string())
444 } else {
445 Some(String::new())
446 };
447 (pk, sk)
448 } else {
449 (None, None)
450 };
451
452 let (start_base_pk, start_base_sk) = if is_lsi || request.index_name.is_some() {
458 if let Some(ref esk) = exclusive_start_key {
459 let base_pk = esk
460 .get(&table_key_schema.partition_key)
461 .and_then(|v| v.to_key_string());
462 let base_sk = if let Some(sk_name) = table_key_schema.sort_key.as_ref() {
470 esk.get(sk_name).and_then(|v| v.to_key_string())
471 } else {
472 Some(String::new())
473 };
474 (base_pk, base_sk)
475 } else {
476 (None, None)
477 }
478 } else {
479 (None, None)
480 };
481
482 let scan_params = crate::storage::ScanParams {
484 limit: request.limit,
485 exclusive_start_pk: start_pk.as_deref(),
486 exclusive_start_sk: start_sk.as_deref(),
487 segment: request.segment,
488 total_segments: request.total_segments,
489 exclusive_start_base_pk: start_base_pk.as_deref(),
490 exclusive_start_base_sk: start_base_sk.as_deref(),
491 };
492 let rows = if let Some(ref index_name) = request.index_name {
493 if is_lsi {
494 storage
495 .scan_lsi_items(&request.table_name, index_name, &scan_params)
496 .await?
497 } else {
498 storage
499 .scan_gsi_items(&request.table_name, index_name, &scan_params)
500 .await?
501 }
502 } else {
503 storage
504 .scan_items(&request.table_name, &scan_params)
505 .await?
506 };
507
508 let tracker = crate::expressions::TrackedExpressionAttributes::new(
510 &request.expression_attribute_names,
511 &request.expression_attribute_values,
512 );
513
514 let filter_expr = request
516 .filter_expression
517 .as_ref()
518 .map(|expr| expressions::condition::parse(expr))
519 .transpose()
520 .map_err(DynoxideError::ValidationException)?;
521
522 if let Some(ref filter) = filter_expr {
524 let mut base_key_attrs = vec![table_key_schema.partition_key.clone()];
526 if let Some(ref sk) = table_key_schema.sort_key {
527 base_key_attrs.push(sk.clone());
528 }
529 let mut index_key_attrs = Vec::new();
530 if request.index_name.is_some() {
531 if !base_key_attrs.contains(&effective_pk) {
532 index_key_attrs.push(effective_pk.clone());
533 }
534 if let Some(ref sk) = effective_sk {
535 if !base_key_attrs.contains(sk) {
536 index_key_attrs.push(sk.clone());
537 }
538 }
539 }
540 if let Some((attr, is_index)) = expressions::condition::check_non_scalar_key_access(
541 filter,
542 &request.expression_attribute_names,
543 &base_key_attrs,
544 &index_key_attrs,
545 ) {
546 let prefix = if is_index { "IndexKey" } else { "Key" };
547 return Err(DynoxideError::ValidationException(format!(
548 "Key attributes must be scalars; \
549 list random access '[]' and map lookup '.' are not allowed: {prefix}: {attr}"
550 )));
551 }
552 }
553
554 let projection = if let Some(ref proj_expr) = request.projection_expression {
556 Some(
557 expressions::projection::parse(proj_expr)
558 .map_err(DynoxideError::ValidationException)?,
559 )
560 } else {
561 legacy_projection.clone()
562 };
563
564 if let Some(ref filter) = filter_expr {
566 tracker.track_condition_expr(filter);
567 }
568 if let Some(ref proj) = projection {
569 tracker.track_projection_expr(proj);
570 }
571
572 let loop_tracker = crate::expressions::TrackedExpressionAttributes::without_tracking(
574 &request.expression_attribute_names,
575 &request.expression_attribute_values,
576 );
577
578 let is_count = request
580 .select
581 .as_deref()
582 .map(|s| s.eq_ignore_ascii_case("COUNT"))
583 .unwrap_or(false);
584
585 let mut key_attrs = vec![effective_pk.clone()];
587 if let Some(ref sk) = effective_sk {
588 key_attrs.push(sk.clone());
589 }
590 if request.index_name.is_some() {
591 if !key_attrs.contains(&table_key_schema.partition_key) {
592 key_attrs.push(table_key_schema.partition_key.clone());
593 }
594 if let Some(ref sk) = table_key_schema.sort_key {
595 if !key_attrs.contains(sk) {
596 key_attrs.push(sk.clone());
597 }
598 }
599 }
600
601 let mut items = Vec::new();
602 let mut scanned_count = 0;
603 let mut filtered_count = 0;
604 let mut cumulative_size = 0;
605 let mut last_evaluated_item: Option<Item> = None;
606 let mut truncated_by_size = false;
607
608 for (_pk, _sk, item_json) in &rows {
609 let item: Item = serde_json::from_str(item_json).map_err(|e| {
610 DynoxideError::InternalServerError(format!("Bad item JSON in storage: {e}"))
611 })?;
612
613 scanned_count += 1;
614
615 let item_size = crate::types::item_size(&item);
618 if cumulative_size + item_size > MAX_RESPONSE_SIZE && scanned_count > 1 {
619 truncated_by_size = true;
620 break;
621 }
622 cumulative_size += item_size;
623
624 if let Some(ref filter) = filter_expr {
626 let passes = expressions::condition::evaluate(filter, &item, &loop_tracker)
627 .map_err(DynoxideError::ValidationException)?;
628 if !passes {
629 last_evaluated_item = Some(item);
630 continue;
631 }
632 }
633
634 filtered_count += 1;
635
636 let result_item = if let Some(ref proj) = projection {
639 let no_keys: &[String] = &[];
640 expressions::projection::apply(&item, proj, &loop_tracker, no_keys)
641 .map_err(DynoxideError::ValidationException)?
642 } else {
643 item.clone()
644 };
645
646 last_evaluated_item = Some(item);
647 if !is_count {
648 items.push(result_item);
649 }
650 }
651
652 tracker.check_unused()?;
654
655 let count = if is_count {
656 filtered_count
657 } else {
658 items.len()
659 };
660
661 let has_more = truncated_by_size
663 || (request.limit.is_some() && scanned_count >= request.limit.unwrap_or(usize::MAX));
664
665 let is_gsi_scan = request.index_name.is_some() && !is_lsi;
670 let last_evaluated_key = if has_more {
671 last_evaluated_item.map(|item| {
672 let mut key = HashMap::new();
673 if let Some(pk_val) = item.get(&effective_pk) {
674 key.insert(effective_pk.clone(), pk_val.clone());
675 }
676 if let Some(ref sk_name) = effective_sk {
677 if let Some(sk_val) = item.get(sk_name) {
678 key.insert(sk_name.clone(), sk_val.clone());
679 }
680 }
681 if is_lsi {
683 if let Some(tsk) = table_key_schema.sort_key.as_deref() {
684 if !key.contains_key(tsk) {
685 if let Some(v) = item.get(tsk) {
686 key.insert(tsk.to_string(), v.clone());
687 }
688 }
689 }
690 }
691 if is_gsi_scan {
693 if !key.contains_key(&table_key_schema.partition_key) {
694 if let Some(v) = item.get(&table_key_schema.partition_key) {
695 key.insert(table_key_schema.partition_key.clone(), v.clone());
696 }
697 }
698 if let Some(ref tsk) = table_key_schema.sort_key {
699 if !key.contains_key(tsk) {
700 if let Some(v) = item.get(tsk) {
701 key.insert(tsk.clone(), v.clone());
702 }
703 }
704 }
705 }
706 key
707 })
708 } else {
709 None
710 };
711
712 let is_gsi = is_gsi_scan;
714 let consistent = request.consistent_read.unwrap_or(false);
715 let consumed_capacity = if is_gsi {
716 let mut gsi_units = std::collections::HashMap::new();
717 gsi_units.insert(
718 request.index_name.as_ref().unwrap().clone(),
719 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
720 );
721 crate::types::consumed_capacity_with_indexes(
722 &request.table_name,
723 0.0,
724 &gsi_units,
725 &request.return_consumed_capacity,
726 )
727 } else {
728 crate::types::consumed_capacity(
729 &request.table_name,
730 crate::types::read_capacity_units_with_consistency(cumulative_size, consistent),
731 &request.return_consumed_capacity,
732 )
733 };
734
735 Ok(ScanResponse {
736 items: if is_count { None } else { Some(items) },
737 count,
738 scanned_count,
739 last_evaluated_key,
740 consumed_capacity,
741 })
742}