dynamodb_facade/operations/pagination.rs
1use std::pin::pin;
2
3use super::*;
4
5use async_stream::try_stream;
6use aws_sdk_dynamodb::operation::{
7 query::builders::QueryFluentBuilder, scan::builders::ScanFluentBuilder,
8};
9pub(super) use futures_core::Stream;
10pub(super) use futures_util::StreamExt;
11
12/// Executes a DynamoDB `Scan` request, collecting all pages into a `Vec`.
13///
14/// Automatically follows `LastEvaluatedKey` pagination tokens until all items
15/// have been retrieved. This is the low-level function used by
16/// [`ScanRequest::all`][crate::ScanRequest::all]. Prefer using
17/// [`DynamoDBItemOp::scan`] or [`ScanRequest`] directly
18/// unless you are working with a raw [`ScanFluentBuilder`].
19///
20/// # Errors
21///
22/// Returns [`Err`] if any DynamoDB page request fails.
23///
24/// # Examples
25///
26/// ```no_run
27/// # use dynamodb_facade::test_fixtures::*;
28/// use dynamodb_facade::dynamodb_execute_scan;
29///
30/// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
31/// let builder = client.scan().table_name("platform");
32/// let items = dynamodb_execute_scan::<PlatformTable>(builder).await?;
33/// # Ok(())
34/// # }
35/// ```
36#[tracing::instrument(level = "debug", skip(builder))]
37pub async fn dynamodb_execute_scan<TD: TableDefinition>(
38 builder: ScanFluentBuilder,
39) -> Result<Vec<Item<TD>>> {
40 let mut stream = pin!(dynamodb_stream_scan::<TD>(builder));
41 let mut items = Vec::new();
42 while let Some(item) = stream.next().await {
43 items.extend(item?);
44 }
45 Ok(items)
46}
47
48/// Creates a lazy async [`Stream`] of scan results with automatic pagination.
49///
50/// Each element yielded by the stream is a `Vec<Item<TD>>` representing one
51/// page of results. Pages are fetched on demand as the stream is consumed.
52/// Each page's `LastEvaluatedKey` is used as the `ExclusiveStartKey` for the
53/// next request. This is the low-level function used by
54/// [`ScanRequest::stream`][crate::ScanRequest::stream]. Prefer using
55/// [`DynamoDBItemOp::scan`] or [`ScanRequest`] directly
56/// unless you are working with a raw [`ScanFluentBuilder`].
57///
58/// # Examples
59///
60/// ```no_run
61/// # use dynamodb_facade::test_fixtures::*;
62/// use dynamodb_facade::dynamodb_stream_scan;
63/// use futures_util::StreamExt;
64/// use std::pin::pin;
65///
66/// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
67/// let builder = client.scan().table_name("platform");
68/// let stream = dynamodb_stream_scan::<PlatformTable>(builder);
69/// // Must pin the stream
70/// let mut stream = pin!(stream);
71///
72/// while let Some(result) = stream.next().await {
73/// let page /* : Vec<Item<PlatformTable>> */ = result?;
74/// for item in page {
75/// let _ = item;
76/// }
77/// }
78/// # Ok(())
79/// # }
80/// ```
81pub fn dynamodb_stream_scan<TD: TableDefinition>(
82 builder: ScanFluentBuilder,
83) -> impl Stream<Item = Result<Vec<Item<TD>>>> {
84 try_stream! {
85 let res = builder.clone().send().await?;
86 yield res.items.unwrap_or_default()
87 .into_iter()
88 .map(Item::from_dynamodb_response)
89 .collect();
90 let mut lek = res.last_evaluated_key;
91 while lek.is_some() {
92 let res = builder.clone().set_exclusive_start_key(lek).send().await?;
93 lek = res.last_evaluated_key;
94 yield res.items.unwrap_or_default()
95 .into_iter()
96 .map(Item::from_dynamodb_response)
97 .collect();
98 }
99 }
100}
101
102/// Executes a DynamoDB `Query` request, collecting all pages into a `Vec`.
103///
104/// Automatically follows `LastEvaluatedKey` pagination tokens until all
105/// matching items have been retrieved. This is the low-level function used by
106/// [`QueryRequest::all`][crate::QueryRequest::all]. Prefer using
107/// [`DynamoDBItemOp::query`] or [`QueryRequest`] directly
108/// unless you are working with a raw [`QueryFluentBuilder`].
109///
110/// # Errors
111///
112/// Returns [`Err`] if any DynamoDB page request fails.
113///
114/// # Examples
115///
116/// ```no_run
117/// # use dynamodb_facade::test_fixtures::*;
118/// use dynamodb_facade::dynamodb_execute_query;
119///
120/// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
121/// let builder = client
122/// .query()
123/// .table_name("platform")
124/// .key_condition_expression("PK = :pk")
125/// .expression_attribute_values(
126/// ":pk",
127/// aws_sdk_dynamodb::types::AttributeValue::S("USER#user-1".into()),
128/// );
129/// let items = dynamodb_execute_query::<PlatformTable>(builder).await?;
130/// # Ok(())
131/// # }
132/// ```
133#[tracing::instrument(level = "debug", skip(builder))]
134pub async fn dynamodb_execute_query<TD: TableDefinition>(
135 builder: QueryFluentBuilder,
136) -> Result<Vec<Item<TD>>> {
137 let mut stream = pin!(dynamodb_stream_query::<TD>(builder));
138 let mut items = Vec::new();
139 while let Some(item) = stream.next().await {
140 items.extend(item?);
141 }
142 Ok(items)
143}
144
145/// Creates a lazy async [`Stream`] of query results with automatic pagination.
146///
147/// Each element yielded by the stream is a `Vec<Item<TD>>` representing one
148/// page of results. Pages are fetched on demand as the stream is consumed.
149/// Each page's `LastEvaluatedKey` is used as the `ExclusiveStartKey` for the
150/// next request. This is the low-level function used by
151/// [`QueryRequest::stream`][crate::QueryRequest::stream]. Prefer using
152/// [`DynamoDBItemOp::query`] or [`QueryRequest`] directly
153/// unless you are working with a raw [`QueryFluentBuilder`].
154///
155/// # Examples
156///
157/// ```no_run
158/// # use dynamodb_facade::test_fixtures::*;
159/// use dynamodb_facade::dynamodb_stream_query;
160/// use futures_util::StreamExt;
161/// use std::pin::pin;
162///
163/// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
164/// let builder = client
165/// .query()
166/// .table_name("platform")
167/// .key_condition_expression("PK = :pk")
168/// .expression_attribute_values(
169/// ":pk",
170/// aws_sdk_dynamodb::types::AttributeValue::S("USER#user-1".into()),
171/// );
172/// let stream = dynamodb_stream_query::<PlatformTable>(builder);
173/// // Must pin the stream
174/// let mut stream = pin!(stream);
175///
176/// while let Some(result) = stream.next().await {
177/// let page /* : Vec<Item<PlatformTable>> */ = result?;
178/// for item in page {
179/// let _ = item;
180/// }
181/// }
182/// # Ok(())
183/// # }
184/// ```
185pub fn dynamodb_stream_query<TD: TableDefinition>(
186 builder: QueryFluentBuilder,
187) -> impl Stream<Item = Result<Vec<Item<TD>>>> {
188 try_stream! {
189 let res = builder.clone().send().await?;
190 yield res.items.unwrap_or_default()
191 .into_iter()
192 .map(Item::from_dynamodb_response)
193 .collect();
194 let mut lek = res.last_evaluated_key;
195 while lek.is_some() {
196 let res = builder.clone().set_exclusive_start_key(lek).send().await?;
197 lek = res.last_evaluated_key;
198 yield res.items.unwrap_or_default()
199 .into_iter()
200 .map(Item::from_dynamodb_response)
201 .collect();
202 }
203 }
204}