Skip to main content

dynamodb_facade/operations/
query.rs

1use super::*;
2
3use aws_sdk_dynamodb::operation::query::builders::QueryFluentBuilder;
4
5/// Builder for a DynamoDB `Query` request.
6///
7/// Constructed via [`DynamoDBItemOp::query`] / [`DynamoDBItemOp::query_index`]
8/// (typed, with a concrete `T`) or [`QueryRequest::new`] /
9/// [`QueryRequest::new_index`] (stand-alone, raw output). The builder provides:
10///
11/// - **Output format** — the result can be deserialized into `T`.
12///   Call [`.raw()`][QueryRequest::raw] to receive untyped [`Item<TD>`]
13///   values instead (one-way). Calling [`.project()`][QueryRequest::project]
14///   also forces raw output.
15/// - **Filter** — call [`.filter()`][QueryRequest::filter] to add a
16///   server-side filter expression. DynamoDB accepts a single filter
17///   expression per request, so this can only be called once.
18/// - **Projection** — call [`.project()`][QueryRequest::project] to limit
19///   which attributes are returned. This can only be called once.
20///
21/// Use [`.all()`][QueryRequest::all] to collect all pages into a `Vec`, or
22/// [`.stream()`][QueryRequest::stream] for lazy page-by-page iteration.
23///
24/// # Errors
25///
26/// Returns [`Err`] if any DynamoDB page request fails or if deserialization
27/// of any returned item fails.
28///
29/// # Examples
30///
31/// ```no_run
32/// # use dynamodb_facade::test_fixtures::*;
33/// use dynamodb_facade::{DynamoDBItemOp, Condition, KeyCondition};
34///
35/// # async fn example(cclient: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
36/// # let client = cclient.clone();
37/// // Simple query
38/// let enrollments /* : Vec<Enrollment> */ =
39///     Enrollment::query(client, Enrollment::key_condition("user-1"))
40///         .all()
41///         .await?;
42///
43/// # let client = cclient.clone();
44/// // Query with a filter
45/// let advanced /* : Vec<Enrollment> */ =
46///     Enrollment::query(client, Enrollment::key_condition("user-1"))
47///         .filter(Condition::gt("progress", 0.5))
48///         .all()
49///         .await?;
50///
51/// # let client = cclient.clone();
52/// // Query a secondary index
53/// let users /* : Vec<User> */ = User::query_index::<EmailIndex>(
54///     client,
55///     KeyCondition::pk("alice@example.com".to_owned()),
56/// )
57/// .all()
58/// .await?;
59/// # Ok(())
60/// # }
61/// ```
62#[must_use = "builder does nothing until executed via .all() or .stream()"]
63pub struct QueryRequest<
64    TD: TableDefinition,
65    T = (),
66    O: OutputFormat = Raw,
67    F: FilterState = NoFilter,
68    P: ProjectionState = NoProjection,
69> {
70    builder: QueryFluentBuilder,
71    _marker: PhantomData<(TD, T, O, F, P)>,
72}
73
74// -- Stand-alone constructors (T = (), O = Raw)
75
76impl<TD: TableDefinition> QueryRequest<TD> {
77    /// Creates a stand-alone `QueryRequest` against the table's primary key schema.
78    ///
79    /// Output is raw (`T = ()`, `O = Raw`). For typed access, prefer
80    /// [`DynamoDBItemOp::query`] instead.
81    ///
82    /// # Examples
83    ///
84    /// ```no_run
85    /// # use dynamodb_facade::test_fixtures::*;
86    /// use dynamodb_facade::{QueryRequest, KeyCondition};
87    ///
88    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
89    /// let items = QueryRequest::<PlatformTable>::new(
90    ///     client,
91    ///     KeyCondition::pk("USER#user-1".to_owned()),
92    /// )
93    /// .all()
94    /// .await?;
95    /// # Ok(())
96    /// # }
97    /// ```
98    pub fn new(
99        client: aws_sdk_dynamodb::Client,
100        key_condition: KeyCondition<'_, TD::KeySchema, impl KeyConditionState>,
101    ) -> Self {
102        Self::_new(client, key_condition)
103    }
104
105    /// Creates a stand-alone `QueryRequest` against a secondary index.
106    ///
107    /// Output is raw (`T = ()`, `O = Raw`). For typed access, prefer
108    /// [`DynamoDBItemOp::query_index`] instead.
109    ///
110    /// # Examples
111    ///
112    /// ```no_run
113    /// # use dynamodb_facade::test_fixtures::*;
114    /// use dynamodb_facade::{QueryRequest, KeyCondition};
115    ///
116    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
117    /// let items = QueryRequest::<PlatformTable>::new_index::<EmailIndex>(
118    ///     client,
119    ///     KeyCondition::pk("alice@example.com".to_owned()),
120    /// )
121    /// .all()
122    /// .await?;
123    /// # Ok(())
124    /// # }
125    /// ```
126    pub fn new_index<I: IndexDefinition<TD>>(
127        client: aws_sdk_dynamodb::Client,
128        key_condition: KeyCondition<'_, I::KeySchema, impl KeyConditionState>,
129    ) -> Self {
130        Self::_new_index::<I>(client, key_condition)
131    }
132}
133
134// -- Common methods (all states) --------------------------------------------
135
136impl<TD: TableDefinition, T, O: OutputFormat, F: FilterState, P: ProjectionState>
137    QueryRequest<TD, T, O, F, P>
138{
139    pub(super) fn _new(
140        client: aws_sdk_dynamodb::Client,
141        key_condition: KeyCondition<'_, TD::KeySchema, impl KeyConditionState>,
142    ) -> Self {
143        let table_name = TD::table_name();
144        tracing::debug!(table_name, %key_condition, "Query");
145        Self {
146            builder: key_condition.apply_key_condition(client.query().table_name(table_name)),
147            _marker: PhantomData,
148        }
149    }
150
151    pub(super) fn _new_index<I: IndexDefinition<TD>>(
152        client: aws_sdk_dynamodb::Client,
153        key_condition: KeyCondition<'_, I::KeySchema, impl KeyConditionState>,
154    ) -> Self {
155        let table_name = TD::table_name();
156        let index_name = I::index_name();
157        tracing::debug!(table_name, index_name, %key_condition, "Query (index)");
158        Self {
159            builder: key_condition
160                .apply_key_condition(client.query().table_name(table_name).index_name(index_name)),
161            _marker: PhantomData,
162        }
163    }
164
165    /// Enables strongly consistent reads for this query.
166    ///
167    /// By default DynamoDB uses eventually consistent reads. Enabling consistent
168    /// reads guarantees the most up-to-date data but consumes twice the read
169    /// capacity units and is not supported on Global Secondary Indexes.
170    ///
171    /// # Examples
172    ///
173    /// ```no_run
174    /// # use dynamodb_facade::test_fixtures::*;
175    /// use dynamodb_facade::DynamoDBItemOp;
176    ///
177    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
178    /// let enrollments /* : Vec<Enrollment> */ =
179    ///     Enrollment::query(client, Enrollment::key_condition("user-1"))
180    ///         .consistent_read()
181    ///         .all()
182    ///         .await?;
183    /// # Ok(())
184    /// # }
185    /// ```
186    pub fn consistent_read(mut self) -> Self {
187        tracing::debug!("Query consistent_read");
188        self.builder = self.builder.consistent_read(true);
189        self
190    }
191
192    /// Sets the maximum number of items to evaluate per page.
193    ///
194    /// Note that DynamoDB evaluates up to `limit` items before applying any
195    /// filter expression, so the number of items returned may be less than
196    /// `limit` when a filter is active. Pagination continues automatically
197    /// when using [`.all()`][QueryRequest::all] or
198    /// [`.stream()`][QueryRequest::stream].
199    ///
200    /// # Examples
201    ///
202    /// ```no_run
203    /// # use dynamodb_facade::test_fixtures::*;
204    /// use dynamodb_facade::DynamoDBItemOp;
205    ///
206    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
207    /// // Evaluate at most 10 items per page
208    /// let enrollments /* : Vec<Enrollment> */ =
209    ///     Enrollment::query(client, Enrollment::key_condition("user-1"))
210    ///         .limit(10)
211    ///         .all()
212    ///         .await?;
213    /// # Ok(())
214    /// # }
215    /// ```
216    pub fn limit(mut self, limit: i32) -> Self {
217        tracing::debug!(limit, "Query limit");
218        self.builder = self.builder.limit(limit);
219        self
220    }
221
222    /// Change the sort order of results by sort key by setting
223    /// `scan_index_forward = false`.
224    ///
225    /// # Examples
226    ///
227    /// ```no_run
228    /// # use dynamodb_facade::test_fixtures::*;
229    /// use dynamodb_facade::DynamoDBItemOp;
230    ///
231    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
232    /// // Return enrollments in reverse sort-key order
233    /// let enrollments /* : Vec<Enrollment> */ =
234    ///     Enrollment::query(client, Enrollment::key_condition("user-1"))
235    ///         .reverse()
236    ///         .all()
237    ///         .await?;
238    /// # Ok(())
239    /// # }
240    /// ```
241    pub fn reverse(mut self) -> Self {
242        tracing::debug!("Query scan_index_forward = false");
243        self.builder = self.builder.scan_index_forward(false);
244        self
245    }
246
247    /// Consumes the builder and returns the underlying SDK [`QueryFluentBuilder`].
248    ///
249    /// Use this escape hatch when you need to set options not exposed by this
250    /// facade, or when integrating with code that expects the raw SDK builder.
251    ///
252    /// # Examples
253    ///
254    /// ```no_run
255    /// # use dynamodb_facade::test_fixtures::*;
256    /// use dynamodb_facade::DynamoDBItemOp;
257    ///
258    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
259    /// let sdk_builder =
260    ///     Enrollment::query(client, Enrollment::key_condition("user-1")).into_inner();
261    /// // configure sdk_builder further, then call .send().await
262    /// # Ok(())
263    /// # }
264    /// ```
265    pub fn into_inner(self) -> QueryFluentBuilder {
266        self.builder
267    }
268}
269
270// -- Filter (NoFilter only) -------------------------------------------------
271
272impl<TD: TableDefinition, T, O: OutputFormat, P: ProjectionState>
273    QueryRequest<TD, T, O, NoFilter, P>
274{
275    /// Adds a filter expression applied after the key condition.
276    ///
277    /// DynamoDB accepts a single filter expression per request, so this method
278    /// can only be called once. The filter is evaluated server-side after items
279    /// are read but before they are returned, so it does not reduce read
280    /// capacity consumption.
281    ///
282    /// # Examples
283    ///
284    /// ```no_run
285    /// # use dynamodb_facade::test_fixtures::*;
286    /// use dynamodb_facade::{DynamoDBItemOp, Condition};
287    ///
288    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
289    /// // Query enrollments with progress above 50%
290    /// let advanced /* : Vec<Enrollment> */ =
291    ///     Enrollment::query(client, Enrollment::key_condition("user-1"))
292    ///         .filter(Condition::gt("progress", 0.5))
293    ///         .all()
294    ///         .await?;
295    /// # Ok(())
296    /// # }
297    /// ```
298    pub fn filter(mut self, filter: Condition<'_>) -> QueryRequest<TD, T, O, AlreadyHasFilter, P> {
299        tracing::debug!(%filter, "Query filter");
300        self.builder = filter.apply_filter(self.builder);
301        QueryRequest {
302            builder: self.builder,
303            _marker: PhantomData,
304        }
305    }
306}
307
308// -- Projection (NoProjection only) -----------------------------------------
309
310impl<TD: TableDefinition, T, O: OutputFormat, F: FilterState>
311    QueryRequest<TD, T, O, F, NoProjection>
312{
313    /// Applies a projection expression, limiting the attributes returned per item.
314    ///
315    /// This method can only be called once. It forces the output to raw
316    /// [`Item<TD>`] because projected results may not contain all fields
317    /// required for deserialization into `T`.
318    ///
319    /// # Examples
320    ///
321    /// ```no_run
322    /// # use dynamodb_facade::test_fixtures::*;
323    /// use dynamodb_facade::{DynamoDBItemOp, Projection};
324    ///
325    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
326    /// // Fetch only the "progress" attribute for each enrollment
327    /// let partial /* : Vec<Item<PlatformTable>> */ =
328    ///     Enrollment::query(client, Enrollment::key_condition("user-1"))
329    ///         .project(Projection::new(["progress"]))
330    ///         .all()
331    ///         .await?;
332    /// // partial: contains only "PK", "SK" and "progress"
333    /// # Ok(())
334    /// # }
335    /// ```
336    pub fn project(
337        mut self,
338        projection: Projection<'_, TD>,
339    ) -> QueryRequest<TD, T, Raw, F, AlreadyHasProjection> {
340        tracing::debug!(%projection, "Query project");
341        self.builder = projection.apply_projection(self.builder);
342        QueryRequest {
343            builder: self.builder,
344            _marker: PhantomData,
345        }
346    }
347}
348
349// -- Output format transition (preserve F, P) -------------------------------
350
351impl<TD: TableDefinition, T, F: FilterState, P: ProjectionState> QueryRequest<TD, T, Typed, F, P> {
352    /// Switches the output format from `Typed` to `Raw`.
353    ///
354    /// After calling `.raw()`, [`.all()`][QueryRequest::all] returns
355    /// `Vec<Item<TD>>` and [`.stream()`][QueryRequest::stream] yields
356    /// `Result<Vec<Item<TD>>>` (pages of raw items) instead of the typed
357    /// equivalents. This transition is one-way.
358    ///
359    /// # Examples
360    ///
361    /// ```no_run
362    /// # use dynamodb_facade::test_fixtures::*;
363    /// use dynamodb_facade::DynamoDBItemOp;
364    ///
365    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
366    /// let raw_items /* : Vec<Item<PlatformTable>> */ =
367    ///     Enrollment::query(client, Enrollment::key_condition("user-1"))
368    ///         .raw()
369    ///         .all()
370    ///         .await?;
371    /// # Ok(())
372    /// # }
373    /// ```
374    pub fn raw(self) -> QueryRequest<TD, T, Raw, F, P> {
375        QueryRequest {
376            builder: self.builder,
377            _marker: PhantomData,
378        }
379    }
380}
381
382// -- Terminal: Typed (any F, any P) -----------------------------------------
383
384impl<
385    TD: TableDefinition,
386    T: DynamoDBItem<TD> + DeserializeOwned,
387    F: FilterState,
388    P: ProjectionState,
389> QueryRequest<TD, T, Typed, F, P>
390{
391    /// Executes the query, collecting all pages and returning items deserialized as `T`.
392    ///
393    /// Automatically follows pagination tokens until all matching items have
394    /// been retrieved. For large result sets, prefer
395    /// [`.stream()`][QueryRequest::stream] to avoid loading everything into
396    /// memory at once.
397    ///
398    /// # Errors
399    ///
400    /// Returns [`Err`] if any DynamoDB page request fails or if deserialization
401    /// of any item fails.
402    ///
403    /// # Examples
404    ///
405    /// ```
406    /// # use dynamodb_facade::test_fixtures::*;
407    /// use dynamodb_facade::DynamoDBItemOp;
408    ///
409    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
410    /// let enrollments /* : Vec<Enrollment> */ =
411    ///     Enrollment::query(client, Enrollment::key_condition("user-1"))
412    ///         .all()
413    ///         .await?;
414    /// # Ok(())
415    /// # }
416    /// ```
417    #[tracing::instrument(level = "debug", skip(self), name = "query_all")]
418    pub async fn all(self) -> Result<Vec<T>> {
419        dynamodb_execute_query(self.builder)
420            .await?
421            .into_iter()
422            .map(T::try_from_item)
423            .collect()
424    }
425
426    /// Executes the query as a lazy async stream, yielding one page at a time.
427    ///
428    /// Each element yielded by the stream is a `Vec<T>` representing one page
429    /// of results deserialized as `T`. Pages are fetched on demand as the
430    /// stream is consumed. Use this for large result sets where loading
431    /// everything into memory at once is undesirable.
432    ///
433    /// # Examples
434    ///
435    /// ```no_run
436    /// # use dynamodb_facade::test_fixtures::*;
437    /// use dynamodb_facade::DynamoDBItemOp;
438    /// use futures_util::StreamExt;
439    /// use std::pin::pin;
440    ///
441    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
442    /// let stream = Enrollment::query(client, Enrollment::key_condition("user-1"))
443    ///     .stream();
444    /// // Must pin the stream
445    /// let mut stream = pin!(stream);
446    ///
447    /// while let Some(result) = stream.next().await {
448    ///     let page /* : Vec<Enrollment> */ = result?;
449    ///     for enrollment in page {
450    ///         let _ = enrollment;
451    ///     }
452    /// }
453    /// # Ok(())
454    /// # }
455    /// ```
456    pub fn stream(self) -> impl Stream<Item = Result<Vec<T>>> {
457        dynamodb_stream_query::<TD>(self.builder).map(|result| {
458            result.and_then(|items| items.into_iter().map(T::try_from_item).collect())
459        })
460    }
461}
462
463// -- Terminal: Raw (any F, any P) -------------------------------------------
464
465impl<TD: TableDefinition, T, F: FilterState, P: ProjectionState> QueryRequest<TD, T, Raw, F, P> {
466    /// Executes the query, collecting all pages and returning raw item maps.
467    ///
468    /// Automatically follows pagination tokens until all matching items have
469    /// been retrieved.
470    ///
471    /// # Errors
472    ///
473    /// Returns [`Err`] if any DynamoDB page request fails.
474    ///
475    /// # Examples
476    ///
477    /// ```no_run
478    /// # use dynamodb_facade::test_fixtures::*;
479    /// use dynamodb_facade::{DynamoDBItemOp, QueryRequest, KeyCondition};
480    ///
481    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
482    /// let raw_items = QueryRequest::<PlatformTable>::new(
483    ///     client,
484    ///     KeyCondition::pk("USER#user-1".to_owned()),
485    /// )
486    /// .all()
487    /// .await?;
488    /// // raw_items: Vec<Item<PlatformTable>>
489    /// # Ok(())
490    /// # }
491    /// ```
492    #[tracing::instrument(level = "debug", skip(self), name = "query_all_raw")]
493    pub async fn all(self) -> Result<Vec<Item<TD>>> {
494        dynamodb_execute_query(self.builder).await
495    }
496
497    /// Executes the query as a lazy async stream, yielding one page of raw item maps at a time.
498    ///
499    /// Each element yielded by the stream is a `Vec<Item<TD>>` representing one
500    /// page of results. Pages are fetched on demand as the stream is consumed.
501    ///
502    /// # Examples
503    ///
504    /// ```no_run
505    /// # use dynamodb_facade::test_fixtures::*;
506    /// use dynamodb_facade::{QueryRequest, KeyCondition};
507    /// use futures_util::StreamExt;
508    /// use std::pin::pin;
509    ///
510    /// # async fn example(client: aws_sdk_dynamodb::Client) -> dynamodb_facade::Result<()> {
511    /// let stream = QueryRequest::<PlatformTable>::new(
512    ///     client,
513    ///     KeyCondition::pk("USER#user-1".to_owned()),
514    /// )
515    /// .stream();
516    /// // Must pin the stream
517    /// let mut stream = pin!(stream);
518    ///
519    /// while let Some(result) = stream.next().await {
520    ///     let page /* : Vec<Item<PlatformTable>> */ = result?;
521    ///     for item in page {
522    ///         let _ = item;
523    ///     }
524    /// }
525    /// # Ok(())
526    /// # }
527    /// ```
528    pub fn stream(self) -> impl Stream<Item = Result<Vec<Item<TD>>>> {
529        dynamodb_stream_query(self.builder)
530    }
531}