sqlx_core_oldapi/
query.rs

1use std::marker::PhantomData;
2
3use either::Either;
4use futures_core::stream::BoxStream;
5use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt};
6
7use crate::arguments::{Arguments, IntoArguments};
8use crate::database::{Database, HasArguments, HasStatement, HasStatementCache};
9use crate::encode::Encode;
10use crate::error::Error;
11use crate::executor::{Execute, Executor};
12use crate::statement::Statement;
13use crate::types::Type;
14
15/// Raw SQL query with bind parameters. Returned by [`query`][crate::query::query].
16#[must_use = "query must be executed to affect database"]
17pub struct Query<'q, DB: Database, A> {
18    pub(crate) statement: Either<&'q str, &'q <DB as HasStatement<'q>>::Statement>,
19    pub(crate) arguments: Option<A>,
20    pub(crate) database: PhantomData<DB>,
21    pub(crate) persistent: bool,
22}
23
24/// SQL query that will map its results to owned Rust types.
25///
26/// Returned by [`Query::try_map`], `query!()`, etc. Has most of the same methods as [`Query`] but
27/// the return types are changed to reflect the mapping. However, there is no equivalent of
28/// [`Query::execute`] as it doesn't make sense to map the result type and then ignore it.
29///
30/// [`Query::bind`] is also omitted; stylistically we recommend placing your `.bind()` calls
31/// before `.try_map()`. This is also to prevent adding superfluous binds to the result of
32/// `query!()` et al.
33#[must_use = "query must be executed to affect database"]
34pub struct Map<'q, DB: Database, F, A> {
35    inner: Query<'q, DB, A>,
36    mapper: F,
37}
38
39impl<'q, DB, A> Execute<'q, DB> for Query<'q, DB, A>
40where
41    DB: Database,
42    A: Send + IntoArguments<'q, DB>,
43{
44    #[inline]
45    fn sql(&self) -> &'q str {
46        match self.statement {
47            Either::Right(ref statement) => statement.sql(),
48            Either::Left(sql) => sql,
49        }
50    }
51
52    fn statement(&self) -> Option<&<DB as HasStatement<'q>>::Statement> {
53        match self.statement {
54            Either::Right(ref statement) => Some(&statement),
55            Either::Left(_) => None,
56        }
57    }
58
59    #[inline]
60    fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
61        self.arguments.take().map(IntoArguments::into_arguments)
62    }
63
64    #[inline]
65    fn persistent(&self) -> bool {
66        self.persistent
67    }
68}
69
70impl<'q, DB: Database> Query<'q, DB, <DB as HasArguments<'q>>::Arguments> {
71    /// Bind a value for use with this SQL query.
72    ///
73    /// If the number of times this is called does not match the number of bind parameters that
74    /// appear in the query (`?` for most SQL flavors, `$1 .. $N` for Postgres) then an error
75    /// will be returned when this query is executed.
76    ///
77    /// There is no validation that the value is of the type expected by the query. Most SQL
78    /// flavors will perform type coercion (Postgres will return a database error).
79    pub fn bind<T: 'q + Send + Encode<'q, DB> + Type<DB>>(mut self, value: T) -> Self {
80        self.arguments
81            .get_or_insert_with(Default::default)
82            .add(value);
83        self
84    }
85}
86
87impl<'q, DB, A> Query<'q, DB, A>
88where
89    DB: Database + HasStatementCache,
90{
91    /// If `true`, the statement will get prepared once and cached to the
92    /// connection's statement cache.
93    ///
94    /// If queried once with the flag set to `true`, all subsequent queries
95    /// matching the one with the flag will use the cached statement until the
96    /// cache is cleared.
97    ///
98    /// Default: `true`.
99    pub fn persistent(mut self, value: bool) -> Self {
100        self.persistent = value;
101        self
102    }
103}
104
105impl<'q, DB, A: Send> Query<'q, DB, A>
106where
107    DB: Database,
108    A: 'q + IntoArguments<'q, DB>,
109{
110    /// Map each row in the result to another type.
111    ///
112    /// See [`try_map`](Query::try_map) for a fallible version of this method.
113    ///
114    /// The [`query_as`](super::query_as::query_as) method will construct a mapped query using
115    /// a [`FromRow`](super::from_row::FromRow) implementation.
116    #[inline]
117    pub fn map<F, O>(
118        self,
119        mut f: F,
120    ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<O, Error> + Send, A>
121    where
122        F: FnMut(DB::Row) -> O + Send,
123        O: Unpin,
124    {
125        self.try_map(move |row| Ok(f(row)))
126    }
127
128    /// Map each row in the result to another type.
129    ///
130    /// The [`query_as`](super::query_as::query_as) method will construct a mapped query using
131    /// a [`FromRow`](super::from_row::FromRow) implementation.
132    #[inline]
133    pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, A>
134    where
135        F: FnMut(DB::Row) -> Result<O, Error> + Send,
136        O: Unpin,
137    {
138        Map {
139            inner: self,
140            mapper: f,
141        }
142    }
143
144    /// Execute the query and return the total number of rows affected.
145    #[inline]
146    pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::QueryResult, Error>
147    where
148        'q: 'e,
149        A: 'e,
150        E: Executor<'c, Database = DB>,
151    {
152        executor.execute(self).await
153    }
154
155    /// Execute multiple queries and return the rows affected from each query, in a stream.
156    #[inline]
157    pub async fn execute_many<'e, 'c: 'e, E>(
158        self,
159        executor: E,
160    ) -> BoxStream<'e, Result<DB::QueryResult, Error>>
161    where
162        'q: 'e,
163        A: 'e,
164        E: Executor<'c, Database = DB>,
165    {
166        executor.execute_many(self)
167    }
168
169    /// Execute the query and return the generated results as a stream.
170    #[inline]
171    pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<DB::Row, Error>>
172    where
173        'q: 'e,
174        A: 'e,
175        E: Executor<'c, Database = DB>,
176    {
177        executor.fetch(self)
178    }
179
180    /// Execute multiple queries and return the generated results as a stream
181    /// from each query, in a stream.
182    #[inline]
183    pub fn fetch_many<'e, 'c: 'e, E>(
184        self,
185        executor: E,
186    ) -> BoxStream<'e, Result<Either<DB::QueryResult, DB::Row>, Error>>
187    where
188        'q: 'e,
189        A: 'e,
190        E: Executor<'c, Database = DB>,
191    {
192        executor.fetch_many(self)
193    }
194
195    /// Execute the query and return all the generated results, collected into a [`Vec`].
196    #[inline]
197    pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<DB::Row>, Error>
198    where
199        'q: 'e,
200        A: 'e,
201        E: Executor<'c, Database = DB>,
202    {
203        executor.fetch_all(self).await
204    }
205
206    /// Execute the query and returns exactly one row.
207    #[inline]
208    pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<DB::Row, Error>
209    where
210        'q: 'e,
211        A: 'e,
212        E: Executor<'c, Database = DB>,
213    {
214        executor.fetch_one(self).await
215    }
216
217    /// Execute the query and returns at most one row.
218    #[inline]
219    pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<DB::Row>, Error>
220    where
221        'q: 'e,
222        A: 'e,
223        E: Executor<'c, Database = DB>,
224    {
225        executor.fetch_optional(self).await
226    }
227}
228
229impl<'q, DB, F: Send, A: Send> Execute<'q, DB> for Map<'q, DB, F, A>
230where
231    DB: Database,
232    A: IntoArguments<'q, DB>,
233{
234    #[inline]
235    fn sql(&self) -> &'q str {
236        self.inner.sql()
237    }
238
239    #[inline]
240    fn statement(&self) -> Option<&<DB as HasStatement<'q>>::Statement> {
241        self.inner.statement()
242    }
243
244    #[inline]
245    fn take_arguments(&mut self) -> Option<<DB as HasArguments<'q>>::Arguments> {
246        self.inner.take_arguments()
247    }
248
249    #[inline]
250    fn persistent(&self) -> bool {
251        self.inner.arguments.is_some()
252    }
253}
254
255impl<'q, DB, F, O, A> Map<'q, DB, F, A>
256where
257    DB: Database,
258    F: FnMut(DB::Row) -> Result<O, Error> + Send,
259    O: Send + Unpin,
260    A: 'q + Send + IntoArguments<'q, DB>,
261{
262    /// Map each row in the result to another type.
263    ///
264    /// See [`try_map`](Map::try_map) for a fallible version of this method.
265    ///
266    /// The [`query_as`](super::query_as::query_as) method will construct a mapped query using
267    /// a [`FromRow`](super::from_row::FromRow) implementation.
268    #[inline]
269    pub fn map<G, P>(
270        self,
271        mut g: G,
272    ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<P, Error> + Send, A>
273    where
274        G: FnMut(O) -> P + Send,
275        P: Unpin,
276    {
277        self.try_map(move |data| Ok(g(data)))
278    }
279
280    /// Map each row in the result to another type.
281    ///
282    /// The [`query_as`](super::query_as::query_as) method will construct a mapped query using
283    /// a [`FromRow`](super::from_row::FromRow) implementation.
284    #[inline]
285    pub fn try_map<G, P>(
286        self,
287        mut g: G,
288    ) -> Map<'q, DB, impl FnMut(DB::Row) -> Result<P, Error> + Send, A>
289    where
290        G: FnMut(O) -> Result<P, Error> + Send,
291        P: Unpin,
292    {
293        let mut f = self.mapper;
294        Map {
295            inner: self.inner,
296            mapper: move |row| f(row).and_then(|o| g(o)),
297        }
298    }
299
300    /// Execute the query and return the generated results as a stream.
301    pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
302    where
303        'q: 'e,
304        E: 'e + Executor<'c, Database = DB>,
305        DB: 'e,
306        F: 'e,
307        O: 'e,
308    {
309        self.fetch_many(executor)
310            .try_filter_map(|step| async move {
311                Ok(match step {
312                    Either::Left(_) => None,
313                    Either::Right(o) => Some(o),
314                })
315            })
316            .boxed()
317    }
318
319    /// Execute multiple queries and return the generated results as a stream
320    /// from each query, in a stream.
321    pub fn fetch_many<'e, 'c: 'e, E>(
322        mut self,
323        executor: E,
324    ) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
325    where
326        'q: 'e,
327        E: 'e + Executor<'c, Database = DB>,
328        DB: 'e,
329        F: 'e,
330        O: 'e,
331    {
332        Box::pin(try_stream! {
333            let mut s = executor.fetch_many(self.inner);
334
335            while let Some(v) = s.try_next().await? {
336                r#yield!(match v {
337                    Either::Left(v) => Either::Left(v),
338                    Either::Right(row) => {
339                        Either::Right((self.mapper)(row)?)
340                    }
341                });
342            }
343
344            Ok(())
345        })
346    }
347
348    /// Execute the query and return all the generated results, collected into a [`Vec`].
349    pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<O>, Error>
350    where
351        'q: 'e,
352        E: 'e + Executor<'c, Database = DB>,
353        DB: 'e,
354        F: 'e,
355        O: 'e,
356    {
357        self.fetch(executor).try_collect().await
358    }
359
360    /// Execute the query and returns exactly one row.
361    pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<O, Error>
362    where
363        'q: 'e,
364        E: 'e + Executor<'c, Database = DB>,
365        DB: 'e,
366        F: 'e,
367        O: 'e,
368    {
369        self.fetch_optional(executor)
370            .and_then(|row| match row {
371                Some(row) => future::ok(row),
372                None => future::err(Error::RowNotFound),
373            })
374            .await
375    }
376
377    /// Execute the query and returns at most one row.
378    pub async fn fetch_optional<'e, 'c: 'e, E>(mut self, executor: E) -> Result<Option<O>, Error>
379    where
380        'q: 'e,
381        E: 'e + Executor<'c, Database = DB>,
382        DB: 'e,
383        F: 'e,
384        O: 'e,
385    {
386        let row = executor.fetch_optional(self.inner).await?;
387
388        if let Some(row) = row {
389            (self.mapper)(row).map(Some)
390        } else {
391            Ok(None)
392        }
393    }
394}
395
396// Make a SQL query from a statement.
397pub(crate) fn query_statement<'q, DB>(
398    statement: &'q <DB as HasStatement<'q>>::Statement,
399) -> Query<'q, DB, <DB as HasArguments<'q>>::Arguments>
400where
401    DB: Database,
402{
403    Query {
404        database: PhantomData,
405        arguments: Some(Default::default()), // Even if there are no arguments, we signify that we should use a prepared statement
406        statement: Either::Right(statement),
407        persistent: true,
408    }
409}
410
411// Make a SQL query from a statement, with the given arguments.
412pub(crate) fn query_statement_with<'q, DB, A>(
413    statement: &'q <DB as HasStatement<'q>>::Statement,
414    arguments: A,
415) -> Query<'q, DB, A>
416where
417    DB: Database,
418    A: IntoArguments<'q, DB>,
419{
420    Query {
421        database: PhantomData,
422        arguments: Some(arguments),
423        statement: Either::Right(statement),
424        persistent: true,
425    }
426}
427
428/// Make a SQL query.
429pub fn query<DB>(sql: &str) -> Query<'_, DB, <DB as HasArguments<'_>>::Arguments>
430where
431    DB: Database,
432{
433    Query {
434        database: PhantomData,
435        arguments: None,
436        statement: Either::Left(sql),
437        persistent: true,
438    }
439}
440
441/// Make a SQL query, with the given arguments.
442pub fn query_with<'q, DB, A>(sql: &'q str, arguments: A) -> Query<'q, DB, A>
443where
444    DB: Database,
445    A: IntoArguments<'q, DB>,
446{
447    Query {
448        database: PhantomData,
449        arguments: Some(arguments),
450        statement: Either::Left(sql),
451        persistent: true,
452    }
453}