dynamodb_facade/operations/query.rs
1use super::*;
2
3use aws_sdk_dynamodb::operation::query::builders::QueryFluentBuilder;
4
5/// Builder for a DynamoDB `Query` request.
6///
7/// Constructed via [`DynamoDBItemOp::query`] / [`DynamoDBItemOp::query_index`]
8/// (typed, with a concrete `T`) or [`QueryRequest::new`] /
9/// [`QueryRequest::new_index`] (stand-alone, raw output). The builder provides:
10///
11/// - **Output format** — the result can be deserialized into `T`.
12/// Call [`.raw()`][QueryRequest::raw] to receive untyped [`Item<TD>`]
13/// values instead (one-way). Calling [`.project()`][QueryRequest::project]
14/// also forces raw output.
15/// - **Filter** — call [`.filter()`][QueryRequest::filter] to add a
16/// server-side filter expression. DynamoDB accepts a single filter
17/// expression per request, so this can only be called once.
18/// - **Projection** — call [`.project()`][QueryRequest::project] to limit
19/// which attributes are returned. This can only be called once.
20///
21/// Use [`.all()`][QueryRequest::all] to collect all pages into a `Vec`, or
22/// [`.stream()`][QueryRequest::stream] for lazy page-by-page iteration.
23///
24/// # Errors
25///
26/// Returns [`Err`] if any DynamoDB page request fails or if deserialization
27/// of any returned item fails.
28///
29/// # Examples
30///
31/// ```no_run
32/// # use dynamodb_facade::test_fixtures::*;
33/// use dynamodb_facade::{DynamoDBItemOp, Condition, KeyCondition};
34///
35/// # async fn example(cclient: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
36/// # let client = cclient.clone();
37/// // Simple query
38/// let enrollments /* : Vec<Enrollment> */ =
39/// Enrollment::query(client, Enrollment::key_condition("user-1"))
40/// .all()
41/// .await?;
42///
43/// # let client = cclient.clone();
44/// // Query with a filter
45/// let advanced /* : Vec<Enrollment> */ =
46/// Enrollment::query(client, Enrollment::key_condition("user-1"))
47/// .filter(Condition::gt("progress", 0.5))
48/// .all()
49/// .await?;
50///
51/// # let client = cclient.clone();
52/// // Query a secondary index
53/// let users /* : Vec<User> */ = User::query_index::<EmailIndex>(
54/// client,
55/// KeyCondition::pk("alice@example.com".to_owned()),
56/// )
57/// .all()
58/// .await?;
59/// # Ok(())
60/// # }
61/// ```
62#[must_use = "builder does nothing until executed via .all() or .stream()"]
63pub struct QueryRequest<
64 TD: TableDefinition,
65 T = (),
66 O: OutputFormat = Raw,
67 F: FilterState = NoFilter,
68 P: ProjectionState = NoProjection,
69> {
70 builder: QueryFluentBuilder,
71 _marker: PhantomData<(TD, T, O, F, P)>,
72}
73
74// -- Stand-alone constructors (T = (), O = Raw)
75
76impl<TD: TableDefinition> QueryRequest<TD> {
77 /// Creates a stand-alone `QueryRequest` against the table's primary key schema.
78 ///
79 /// Output is raw (`T = ()`, `O = Raw`). For typed access, prefer
80 /// [`DynamoDBItemOp::query`] instead.
81 ///
82 /// # Examples
83 ///
84 /// ```no_run
85 /// # use dynamodb_facade::test_fixtures::*;
86 /// use dynamodb_facade::{QueryRequest, KeyCondition};
87 ///
88 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
89 /// let items = QueryRequest::<PlatformTable>::new(
90 /// client,
91 /// KeyCondition::pk("USER#user-1".to_owned()),
92 /// )
93 /// .all()
94 /// .await?;
95 /// # Ok(())
96 /// # }
97 /// ```
98 pub fn new(
99 client: aws_sdk_dynamodb::Client,
100 key_condition: KeyCondition<'_, TD::KeySchema, impl KeyConditionState>,
101 ) -> Self {
102 Self::_new(client, key_condition)
103 }
104
105 /// Creates a stand-alone `QueryRequest` against a secondary index.
106 ///
107 /// Output is raw (`T = ()`, `O = Raw`). For typed access, prefer
108 /// [`DynamoDBItemOp::query_index`] instead.
109 ///
110 /// # Examples
111 ///
112 /// ```no_run
113 /// # use dynamodb_facade::test_fixtures::*;
114 /// use dynamodb_facade::{QueryRequest, KeyCondition};
115 ///
116 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
117 /// let items = QueryRequest::<PlatformTable>::new_index::<EmailIndex>(
118 /// client,
119 /// KeyCondition::pk("alice@example.com".to_owned()),
120 /// )
121 /// .all()
122 /// .await?;
123 /// # Ok(())
124 /// # }
125 /// ```
126 pub fn new_index<I: IndexDefinition<TD>>(
127 client: aws_sdk_dynamodb::Client,
128 key_condition: KeyCondition<'_, I::KeySchema, impl KeyConditionState>,
129 ) -> Self {
130 Self::_new_index::<I>(client, key_condition)
131 }
132}
133
134// -- Common methods (all states) --------------------------------------------
135
136impl<TD: TableDefinition, T, O: OutputFormat, F: FilterState, P: ProjectionState>
137 QueryRequest<TD, T, O, F, P>
138{
139 pub(super) fn _new(
140 client: aws_sdk_dynamodb::Client,
141 key_condition: KeyCondition<'_, TD::KeySchema, impl KeyConditionState>,
142 ) -> Self {
143 let table_name = TD::table_name();
144 tracing::debug!(table_name, %key_condition, "Query");
145 Self {
146 builder: key_condition.apply_key_condition(client.query().table_name(table_name)),
147 _marker: PhantomData,
148 }
149 }
150
151 pub(super) fn _new_index<I: IndexDefinition<TD>>(
152 client: aws_sdk_dynamodb::Client,
153 key_condition: KeyCondition<'_, I::KeySchema, impl KeyConditionState>,
154 ) -> Self {
155 let table_name = TD::table_name();
156 let index_name = I::index_name();
157 tracing::debug!(table_name, index_name, %key_condition, "Query (index)");
158 Self {
159 builder: key_condition
160 .apply_key_condition(client.query().table_name(table_name).index_name(index_name)),
161 _marker: PhantomData,
162 }
163 }
164
165 /// Enables strongly consistent reads for this query.
166 ///
167 /// By default DynamoDB uses eventually consistent reads. Enabling consistent
168 /// reads guarantees the most up-to-date data but consumes twice the read
169 /// capacity units and is not supported on Global Secondary Indexes.
170 ///
171 /// # Examples
172 ///
173 /// ```no_run
174 /// # use dynamodb_facade::test_fixtures::*;
175 /// use dynamodb_facade::DynamoDBItemOp;
176 ///
177 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
178 /// let enrollments /* : Vec<Enrollment> */ =
179 /// Enrollment::query(client, Enrollment::key_condition("user-1"))
180 /// .consistent_read()
181 /// .all()
182 /// .await?;
183 /// # Ok(())
184 /// # }
185 /// ```
186 pub fn consistent_read(mut self) -> Self {
187 tracing::debug!("Query consistent_read");
188 self.builder = self.builder.consistent_read(true);
189 self
190 }
191
192 /// Sets the maximum number of items to evaluate per page.
193 ///
194 /// Note that DynamoDB evaluates up to `limit` items before applying any
195 /// filter expression, so the number of items returned may be less than
196 /// `limit` when a filter is active. Pagination continues automatically
197 /// when using [`.all()`][QueryRequest::all] or
198 /// [`.stream()`][QueryRequest::stream].
199 ///
200 /// # Examples
201 ///
202 /// ```no_run
203 /// # use dynamodb_facade::test_fixtures::*;
204 /// use dynamodb_facade::DynamoDBItemOp;
205 ///
206 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
207 /// // Evaluate at most 10 items per page
208 /// let enrollments /* : Vec<Enrollment> */ =
209 /// Enrollment::query(client, Enrollment::key_condition("user-1"))
210 /// .limit(10)
211 /// .all()
212 /// .await?;
213 /// # Ok(())
214 /// # }
215 /// ```
216 pub fn limit(mut self, limit: i32) -> Self {
217 tracing::debug!(limit, "Query limit");
218 self.builder = self.builder.limit(limit);
219 self
220 }
221
222 /// Change the sort order of results by sort key by setting
223 /// `scan_index_forward = false`.
224 ///
225 /// # Examples
226 ///
227 /// ```no_run
228 /// # use dynamodb_facade::test_fixtures::*;
229 /// use dynamodb_facade::DynamoDBItemOp;
230 ///
231 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
232 /// // Return enrollments in reverse sort-key order
233 /// let enrollments /* : Vec<Enrollment> */ =
234 /// Enrollment::query(client, Enrollment::key_condition("user-1"))
235 /// .reverse()
236 /// .all()
237 /// .await?;
238 /// # Ok(())
239 /// # }
240 /// ```
241 pub fn reverse(mut self) -> Self {
242 tracing::debug!("Query scan_index_forward = false");
243 self.builder = self.builder.scan_index_forward(false);
244 self
245 }
246
247 /// Consumes the builder and returns the underlying SDK [`QueryFluentBuilder`].
248 ///
249 /// Use this escape hatch when you need to set options not exposed by this
250 /// facade, or when integrating with code that expects the raw SDK builder.
251 ///
252 /// # Examples
253 ///
254 /// ```no_run
255 /// # use dynamodb_facade::test_fixtures::*;
256 /// use dynamodb_facade::DynamoDBItemOp;
257 ///
258 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
259 /// let sdk_builder =
260 /// Enrollment::query(client, Enrollment::key_condition("user-1")).into_inner();
261 /// // configure sdk_builder further, then call .send().await
262 /// # Ok(())
263 /// # }
264 /// ```
265 pub fn into_inner(self) -> QueryFluentBuilder {
266 self.builder
267 }
268}
269
270// -- Filter (NoFilter only) -------------------------------------------------
271
272impl<TD: TableDefinition, T, O: OutputFormat, P: ProjectionState>
273 QueryRequest<TD, T, O, NoFilter, P>
274{
275 /// Adds a filter expression applied after the key condition.
276 ///
277 /// DynamoDB accepts a single filter expression per request, so this method
278 /// can only be called once. The filter is evaluated server-side after items
279 /// are read but before they are returned, so it does not reduce read
280 /// capacity consumption.
281 ///
282 /// # Examples
283 ///
284 /// ```no_run
285 /// # use dynamodb_facade::test_fixtures::*;
286 /// use dynamodb_facade::{DynamoDBItemOp, Condition};
287 ///
288 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
289 /// // Query enrollments with progress above 50%
290 /// let advanced /* : Vec<Enrollment> */ =
291 /// Enrollment::query(client, Enrollment::key_condition("user-1"))
292 /// .filter(Condition::gt("progress", 0.5))
293 /// .all()
294 /// .await?;
295 /// # Ok(())
296 /// # }
297 /// ```
298 pub fn filter(mut self, filter: Condition<'_>) -> QueryRequest<TD, T, O, AlreadyHasFilter, P> {
299 tracing::debug!(%filter, "Query filter");
300 self.builder = filter.apply_filter(self.builder);
301 QueryRequest {
302 builder: self.builder,
303 _marker: PhantomData,
304 }
305 }
306}
307
308// -- Projection (NoProjection only) -----------------------------------------
309
310impl<TD: TableDefinition, T, O: OutputFormat, F: FilterState>
311 QueryRequest<TD, T, O, F, NoProjection>
312{
313 /// Applies a projection expression, limiting the attributes returned per item.
314 ///
315 /// This method can only be called once. It forces the output to raw
316 /// [`Item<TD>`] because projected results may not contain all fields
317 /// required for deserialization into `T`.
318 ///
319 /// # Examples
320 ///
321 /// ```no_run
322 /// # use dynamodb_facade::test_fixtures::*;
323 /// use dynamodb_facade::{DynamoDBItemOp, Projection};
324 ///
325 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
326 /// // Fetch only the "progress" attribute for each enrollment
327 /// let partial /* : Vec<Item<PlatformTable>> */ =
328 /// Enrollment::query(client, Enrollment::key_condition("user-1"))
329 /// .project(Projection::new(["progress"]))
330 /// .all()
331 /// .await?;
332 /// // partial: contains only "PK", "SK" and "progress"
333 /// # Ok(())
334 /// # }
335 /// ```
336 pub fn project(
337 mut self,
338 projection: Projection<'_, TD>,
339 ) -> QueryRequest<TD, T, Raw, F, AlreadyHasProjection> {
340 tracing::debug!(%projection, "Query project");
341 self.builder = projection.apply_projection(self.builder);
342 QueryRequest {
343 builder: self.builder,
344 _marker: PhantomData,
345 }
346 }
347}
348
349// -- Output format transition (preserve F, P) -------------------------------
350
351impl<TD: TableDefinition, T, F: FilterState, P: ProjectionState> QueryRequest<TD, T, Typed, F, P> {
352 /// Switches the output format from `Typed` to `Raw`.
353 ///
354 /// After calling `.raw()`, [`.all()`][QueryRequest::all] returns
355 /// `Vec<Item<TD>>` and [`.stream()`][QueryRequest::stream] yields
356 /// `Result<Vec<Item<TD>>>` (pages of raw items) instead of the typed
357 /// equivalents. This transition is one-way.
358 ///
359 /// # Examples
360 ///
361 /// ```no_run
362 /// # use dynamodb_facade::test_fixtures::*;
363 /// use dynamodb_facade::DynamoDBItemOp;
364 ///
365 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
366 /// let raw_items /* : Vec<Item<PlatformTable>> */ =
367 /// Enrollment::query(client, Enrollment::key_condition("user-1"))
368 /// .raw()
369 /// .all()
370 /// .await?;
371 /// # Ok(())
372 /// # }
373 /// ```
374 pub fn raw(self) -> QueryRequest<TD, T, Raw, F, P> {
375 QueryRequest {
376 builder: self.builder,
377 _marker: PhantomData,
378 }
379 }
380}
381
382// -- Terminal: Typed (any F, any P) -----------------------------------------
383
384impl<
385 TD: TableDefinition,
386 T: DynamoDBItem<TD> + DeserializeOwned,
387 F: FilterState,
388 P: ProjectionState,
389> QueryRequest<TD, T, Typed, F, P>
390{
391 /// Executes the query, collecting all pages and returning items deserialized as `T`.
392 ///
393 /// Automatically follows pagination tokens until all matching items have
394 /// been retrieved. For large result sets, prefer
395 /// [`.stream()`][QueryRequest::stream] to avoid loading everything into
396 /// memory at once.
397 ///
398 /// # Errors
399 ///
400 /// Returns [`Err`] if any DynamoDB page request fails or if deserialization
401 /// of any item fails.
402 ///
403 /// # Examples
404 ///
405 /// ```
406 /// # use dynamodb_facade::test_fixtures::*;
407 /// use dynamodb_facade::DynamoDBItemOp;
408 ///
409 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
410 /// let enrollments /* : Vec<Enrollment> */ =
411 /// Enrollment::query(client, Enrollment::key_condition("user-1"))
412 /// .all()
413 /// .await?;
414 /// # Ok(())
415 /// # }
416 /// ```
417 #[tracing::instrument(level = "debug", skip(self), name = "query_all")]
418 pub async fn all(self) -> Result<Vec<T>> {
419 dynamodb_execute_query(self.builder)
420 .await?
421 .into_iter()
422 .map(T::try_from_item)
423 .collect()
424 }
425
426 /// Executes the query as a lazy async stream, yielding one page at a time.
427 ///
428 /// Each element yielded by the stream is a `Vec<T>` representing one page
429 /// of results deserialized as `T`. Pages are fetched on demand as the
430 /// stream is consumed. Use this for large result sets where loading
431 /// everything into memory at once is undesirable.
432 ///
433 /// # Examples
434 ///
435 /// ```no_run
436 /// # use dynamodb_facade::test_fixtures::*;
437 /// use dynamodb_facade::DynamoDBItemOp;
438 /// use futures_util::StreamExt;
439 /// use std::pin::pin;
440 ///
441 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
442 /// let stream = Enrollment::query(client, Enrollment::key_condition("user-1"))
443 /// .stream();
444 /// // Must pin the stream
445 /// let mut stream = pin!(stream);
446 ///
447 /// while let Some(result) = stream.next().await {
448 /// let page /* : Vec<Enrollment> */ = result?;
449 /// for enrollment in page {
450 /// let _ = enrollment;
451 /// }
452 /// }
453 /// # Ok(())
454 /// # }
455 /// ```
456 pub fn stream(self) -> impl Stream<Item = Result<Vec<T>>> {
457 dynamodb_stream_query::<TD>(self.builder).map(|result| {
458 result.and_then(|items| items.into_iter().map(T::try_from_item).collect())
459 })
460 }
461}
462
463// -- Terminal: Raw (any F, any P) -------------------------------------------
464
465impl<TD: TableDefinition, T, F: FilterState, P: ProjectionState> QueryRequest<TD, T, Raw, F, P> {
466 /// Executes the query, collecting all pages and returning raw item maps.
467 ///
468 /// Automatically follows pagination tokens until all matching items have
469 /// been retrieved.
470 ///
471 /// # Errors
472 ///
473 /// Returns [`Err`] if any DynamoDB page request fails.
474 ///
475 /// # Examples
476 ///
477 /// ```no_run
478 /// # use dynamodb_facade::test_fixtures::*;
479 /// use dynamodb_facade::{DynamoDBItemOp, QueryRequest, KeyCondition};
480 ///
481 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
482 /// let raw_items = QueryRequest::<PlatformTable>::new(
483 /// client,
484 /// KeyCondition::pk("USER#user-1".to_owned()),
485 /// )
486 /// .all()
487 /// .await?;
488 /// // raw_items: Vec<Item<PlatformTable>>
489 /// # Ok(())
490 /// # }
491 /// ```
492 #[tracing::instrument(level = "debug", skip(self), name = "query_all_raw")]
493 pub async fn all(self) -> Result<Vec<Item<TD>>> {
494 dynamodb_execute_query(self.builder).await
495 }
496
497 /// Executes the query as a lazy async stream, yielding one page of raw item maps at a time.
498 ///
499 /// Each element yielded by the stream is a `Vec<Item<TD>>` representing one
500 /// page of results. Pages are fetched on demand as the stream is consumed.
501 ///
502 /// # Examples
503 ///
504 /// ```no_run
505 /// # use dynamodb_facade::test_fixtures::*;
506 /// use dynamodb_facade::{QueryRequest, KeyCondition};
507 /// use futures_util::StreamExt;
508 /// use std::pin::pin;
509 ///
510 /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
511 /// let stream = QueryRequest::<PlatformTable>::new(
512 /// client,
513 /// KeyCondition::pk("USER#user-1".to_owned()),
514 /// )
515 /// .stream();
516 /// // Must pin the stream
517 /// let mut stream = pin!(stream);
518 ///
519 /// while let Some(result) = stream.next().await {
520 /// let page /* : Vec<Item<PlatformTable>> */ = result?;
521 /// for item in page {
522 /// let _ = item;
523 /// }
524 /// }
525 /// # Ok(())
526 /// # }
527 /// ```
528 pub fn stream(self) -> impl Stream<Item = Result<Vec<Item<TD>>>> {
529 dynamodb_stream_query(self.builder)
530 }
531}