sqlx_core/
query.rs

1use std::marker::PhantomData;
2
3use either::Either;
4use futures_core::stream::BoxStream;
5use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt};
6
7use crate::arguments::{Arguments, IntoArguments};
8use crate::database::{Database, HasArguments, HasStatement, HasStatementCache};
9use crate::encode::Encode;
10use crate::error::Error;
11use crate::executor::{Execute, Executor};
12use crate::statement::Statement;
13use crate::types::Type;
14
15/// Raw SQL query with bind parameters. Returned by [`query`][crate::query::query].
16#[must_use = "query must be executed to affect database"]
17pub struct Query<'q, DB: Database, A> {
18    pub(crate) statement: Either<&'q str, &'q <DB as HasStatement<'q>>::Statement>,
19    pub(crate) arguments: Option<A>,
20    pub(crate) database: PhantomData<DB>,
21    pub(crate) persistent: bool,
22}
23
24/// SQL query that will map its results to owned Rust types.
25///
26/// Returned by [`Query::try_map`], `query!()`, etc. Has most of the same methods as [`Query`] but
27/// the return types are changed to reflect the mapping. However, there is no equivalent of
28/// [`Query::execute`] as it doesn't make sense to map the result type and then ignore it.
29///
30/// [`Query::bind`] is also omitted; stylistically we recommend placing your `.bind()` calls
31/// before `.try_map()`. This is also to prevent adding superfluous binds to the result of
32/// `query!()` et al.
33#[must_use = "query must be executed to affect database"]
34pub struct Map<'q, DB: Database, F, A> {
35    inner: Query<'q, DB, A>,
36    mapper: F,
37}
38
39impl<'q, DB, A> Execute<'q, DB> for Query<'q, DB, A>
40where
41    DB: Database,
42    A: Send + IntoArguments<'q, DB>,
43{
44    #[inline]
45    fn sql(&self) -> &'q str {
46        match self.statement {
47            Either::Right(ref statement) => statement.sql(),
48            Either::Left(sql) => sql,
49        }
50    }
51
52    fn statement(&self) -> Option<&<DB as HasStatement<'q>>::Statement> {
53        match self.statement {
54            Either::Right(ref statement) => Some(&statement),
55            Either::Left(_) => None,
56        }
57    }
58
59    #[inline]
60    fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
61        self.arguments.take().map(IntoArguments::into_arguments)
62    }
63
64    #[inline]
65    fn persistent(&self) -> bool {
66        self.persistent
67    }
68}
69
70impl<'q, DB: Database> Query<'q, DB, <DB as HasArguments<'q>>::Arguments> {
71    /// Bind a value for use with this SQL query.
72    ///
73    /// If the number of times this is called does not match the number of bind parameters that
74    /// appear in the query (`?` for most SQL flavors, `$1 .. $N` for Postgres) then an error
75    /// will be returned when this query is executed.
76    ///
77    /// There is no validation that the value is of the type expected by the query. Most SQL
78    /// flavors will perform type coercion (Postgres will return a database error).
79    pub fn bind<T: 'q + Send + Encode<'q, DB> + Type<DB>>(mut self, value: T) -> Self {
80        if let Some(arguments) = &mut self.arguments {
81            arguments.add(value);
82        }
83
84        self
85    }
86}
87
88impl<'q, DB, A> Query<'q, DB, A>
89where
90    DB: Database + HasStatementCache,
91{
92    /// If `true`, the statement will get prepared once and cached to the
93    /// connection's statement cache.
94    ///
95    /// If queried once with the flag set to `true`, all subsequent queries
96    /// matching the one with the flag will use the cached statement until the
97    /// cache is cleared.
98    ///
99    /// Default: `true`.
100    pub fn persistent(mut self, value: bool) -> Self {
101        self.persistent = value;
102        self
103    }
104}
105
106impl<'q, DB, A: Send> Query<'q, DB, A>
107where
108    DB: Database,
109    A: 'q + IntoArguments<'q, DB>,
110{
111    /// Map each row in the result to another type.
112    ///
113    /// See [`try_map`](Query::try_map) for a fallible version of this method.
114    ///
115    /// The [`query_as`](super::query_as::query_as) method will construct a mapped query using
116    /// a [`FromRow`](super::from_row::FromRow) implementation.
117    #[inline]
118    pub fn map<F, O>(
119        self,
120        mut f: F,
121    ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, Error> + Send, A>
122    where
123        F: FnMut(DB::Row) -> O + Send,
124        O: Unpin,
125    {
126        self.try_map(move |row| Ok(f(row)))
127    }
128
129    /// Map each row in the result to another type.
130    ///
131    /// The [`query_as`](super::query_as::query_as) method will construct a mapped query using
132    /// a [`FromRow`](super::from_row::FromRow) implementation.
133    #[inline]
134    pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, A>
135    where
136        F: FnMut(DB::Row) -> Result<O, Error> + Send,
137        O: Unpin,
138    {
139        Map {
140            inner: self,
141            mapper: f,
142        }
143    }
144
145    /// Execute the query and return the total number of rows affected.
146    #[inline]
147    pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::QueryResult, Error>
148    where
149        'q: 'e,
150        A: 'e,
151        E: Executor<'c, Database = DB>,
152    {
153        executor.execute(self).await
154    }
155
156    /// Execute multiple queries and return the rows affected from each query, in a stream.
157    #[inline]
158    pub async fn execute_many<'e, 'c: 'e, E>(
159        self,
160        executor: E,
161    ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
162    where
163        'q: 'e,
164        A: 'e,
165        E: Executor<'c, Database = DB>,
166    {
167        executor.execute_many(self)
168    }
169
170    /// Execute the query and return the generated results as a stream.
171    #[inline]
172    pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
173    where
174        'q: 'e,
175        A: 'e,
176        E: Executor<'c, Database = DB>,
177    {
178        executor.fetch(self)
179    }
180
181    /// Execute multiple queries and return the generated results as a stream
182    /// from each query, in a stream.
183    #[inline]
184    pub fn fetch_many<'e, 'c: 'e, E>(
185        self,
186        executor: E,
187    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
188    where
189        'q: 'e,
190        A: 'e,
191        E: Executor<'c, Database = DB>,
192    {
193        executor.fetch_many(self)
194    }
195
196    /// Execute the query and return all the generated results, collected into a [`Vec`].
197    #[inline]
198    pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
199    where
200        'q: 'e,
201        A: 'e,
202        E: Executor<'c, Database = DB>,
203    {
204        executor.fetch_all(self).await
205    }
206
207    /// Execute the query and returns exactly one row.
208    #[inline]
209    pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::Row, Error>
210    where
211        'q: 'e,
212        A: 'e,
213        E: Executor<'c, Database = DB>,
214    {
215        executor.fetch_one(self).await
216    }
217
218    /// Execute the query and returns at most one row.
219    #[inline]
220    pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<DB::Row>, Error>
221    where
222        'q: 'e,
223        A: 'e,
224        E: Executor<'c, Database = DB>,
225    {
226        executor.fetch_optional(self).await
227    }
228}
229
230impl<'q, DB, F: Send, A: Send> Execute<'q, DB> for Map<'q, DB, F, A>
231where
232    DB: Database,
233    A: IntoArguments<'q, DB>,
234{
235    #[inline]
236    fn sql(&self) -> &'q str {
237        self.inner.sql()
238    }
239
240    #[inline]
241    fn statement(&self) -> Option<&<DB as HasStatement<'q>>::Statement> {
242        self.inner.statement()
243    }
244
245    #[inline]
246    fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
247        self.inner.take_arguments()
248    }
249
250    #[inline]
251    fn persistent(&self) -> bool {
252        self.inner.arguments.is_some()
253    }
254}
255
256impl<'q, DB, F, O, A> Map<'q, DB, F, A>
257where
258    DB: Database,
259    F: FnMut(DB::Row) -> Result<O, Error> + Send,
260    O: Send + Unpin,
261    A: 'q + Send + IntoArguments<'q, DB>,
262{
263    /// Map each row in the result to another type.
264    ///
265    /// See [`try_map`](Map::try_map) for a fallible version of this method.
266    ///
267    /// The [`query_as`](super::query_as::query_as) method will construct a mapped query using
268    /// a [`FromRow`](super::from_row::FromRow) implementation.
269    #[inline]
270    pub fn map<G, P>(
271        self,
272        mut g: G,
273    ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<P, Error> + Send, A>
274    where
275        G: FnMut(O) -> P + Send,
276        P: Unpin,
277    {
278        self.try_map(move |data| Ok(g(data)))
279    }
280
281    /// Map each row in the result to another type.
282    ///
283    /// The [`query_as`](super::query_as::query_as) method will construct a mapped query using
284    /// a [`FromRow`](super::from_row::FromRow) implementation.
285    #[inline]
286    pub fn try_map<G, P>(
287        self,
288        mut g: G,
289    ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<P, Error> + Send, A>
290    where
291        G: FnMut(O) -> Result<P, Error> + Send,
292        P: Unpin,
293    {
294        let mut f = self.mapper;
295        Map {
296            inner: self.inner,
297            mapper: move |row| f(row).and_then(|o| g(o)),
298        }
299    }
300
301    /// Execute the query and return the generated results as a stream.
302    pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
303    where
304        'q: 'e,
305        E: 'e + Executor<'c, Database = DB>,
306        DB: 'e,
307        F: 'e,
308        O: 'e,
309    {
310        self.fetch_many(executor)
311            .try_filter_map(|step| async move {
312                Ok(match step {
313                    Either::Left(_) => None,
314                    Either::Right(o) => Some(o),
315                })
316            })
317            .boxed()
318    }
319
320    /// Execute multiple queries and return the generated results as a stream
321    /// from each query, in a stream.
322    pub fn fetch_many<'e, 'c: 'e, E>(
323        mut self,
324        executor: E,
325    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
326    where
327        'q: 'e,
328        E: 'e + Executor<'c, Database = DB>,
329        DB: 'e,
330        F: 'e,
331        O: 'e,
332    {
333        Box::pin(try_stream! {
334            let mut s = executor.fetch_many(self.inner);
335
336            while let Some(v) = s.try_next().await? {
337                r#yield!(match v {
338                    Either::Left(v) => Either::Left(v),
339                    Either::Right(row) => {
340                        Either::Right((self.mapper)(row)?)
341                    }
342                });
343            }
344
345            Ok(())
346        })
347    }
348
349    /// Execute the query and return all the generated results, collected into a [`Vec`].
350    pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<O>, Error>
351    where
352        'q: 'e,
353        E: 'e + Executor<'c, Database = DB>,
354        DB: 'e,
355        F: 'e,
356        O: 'e,
357    {
358        self.fetch(executor).try_collect().await
359    }
360
361    /// Execute the query and returns exactly one row.
362    pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<O, Error>
363    where
364        'q: 'e,
365        E: 'e + Executor<'c, Database = DB>,
366        DB: 'e,
367        F: 'e,
368        O: 'e,
369    {
370        self.fetch_optional(executor)
371            .and_then(|row| match row {
372                Some(row) => future::ok(row),
373                None => future::err(Error::RowNotFound),
374            })
375            .await
376    }
377
378    /// Execute the query and returns at most one row.
379    pub async fn fetch_optional<'e, 'c: 'e, E>(mut self, executor: E) -> Result<Option<O>, Error>
380    where
381        'q: 'e,
382        E: 'e + Executor<'c, Database = DB>,
383        DB: 'e,
384        F: 'e,
385        O: 'e,
386    {
387        let row = executor.fetch_optional(self.inner).await?;
388
389        if let Some(row) = row {
390            (self.mapper)(row).map(Some)
391        } else {
392            Ok(None)
393        }
394    }
395}
396
397// Make a SQL query from a statement.
398pub(crate) fn query_statement<'q, DB>(
399    statement: &'q <DB as HasStatement<'q>>::Statement,
400) -> Query<'q, DB, <DB as HasArguments<'_>>::Arguments>
401where
402    DB: Database,
403{
404    Query {
405        database: PhantomData,
406        arguments: Some(Default::default()),
407        statement: Either::Right(statement),
408        persistent: true,
409    }
410}
411
412// Make a SQL query from a statement, with the given arguments.
413pub(crate) fn query_statement_with<'q, DB, A>(
414    statement: &'q <DB as HasStatement<'q>>::Statement,
415    arguments: A,
416) -> Query<'q, DB, A>
417where
418    DB: Database,
419    A: IntoArguments<'q, DB>,
420{
421    Query {
422        database: PhantomData,
423        arguments: Some(arguments),
424        statement: Either::Right(statement),
425        persistent: true,
426    }
427}
428
429/// Make a SQL query.
430pub fn query<DB>(sql: &str) -> Query<'_, DB, <DB as HasArguments<'_>>::Arguments>
431where
432    DB: Database,
433{
434    Query {
435        database: PhantomData,
436        arguments: Some(Default::default()),
437        statement: Either::Left(sql),
438        persistent: true,
439    }
440}
441
442/// Make a SQL query, with the given arguments.
443pub fn query_with<'q, DB, A>(sql: &'q str, arguments: A) -> Query<'q, DB, A>
444where
445    DB: Database,
446    A: IntoArguments<'q, DB>,
447{
448    Query {
449        database: PhantomData,
450        arguments: Some(arguments),
451        statement: Either::Left(sql),
452        persistent: true,
453    }
454}