1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
use std::marker::PhantomData;

use async_stream::try_stream;
use futures_core::Stream;
use futures_util::future::ready;
use futures_util::TryFutureExt;

use crate::arguments::Arguments;
use crate::cursor::{Cursor, HasCursor};
use crate::database::Database;
use crate::encode::Encode;
use crate::executor::{Execute, Executor, RefExecutor};
use crate::row::HasRow;
use crate::types::Type;

/// Raw SQL query with bind parameters. Returned by [`query`][crate::query::query].
#[must_use = "query must be executed to affect database"]
pub struct Query<'q, DB>
where
    DB: Database,
{
    pub(crate) query: &'q str,
    pub(crate) arguments: DB::Arguments,
    database: PhantomData<DB>,
}

/// SQL query that will map its results to owned Rust types.
///
/// Returned by [Query::try_map], `query!()`, etc. Has most of the same methods as [Query] but
/// the return types are changed to reflect the mapping. However, there is no equivalent of
/// [Query::execute] as it doesn't make sense to map the result type and then ignore it.
///
/// [Query::bind] is also omitted; stylistically we recommend placing your `.bind()` calls
/// before `.try_map()` anyway.
#[must_use = "query must be executed to affect database"]
pub struct Map<'q, DB, F>
where
    DB: Database,
{
    query: Query<'q, DB>,
    mapper: F,
}

// necessary because we can't have a blanket impl for `Query<'q, DB>`
// the compiler thinks that `ImmutableArguments<DB>` could be `DB::Arguments` even though
// that would be an infinitely recursive type
impl<'q, DB> Execute<'q, DB> for Query<'q, DB>
where
    DB: Database,
{
    fn into_parts(self) -> (&'q str, Option<DB::Arguments>) {
        (self.query, Some(self.arguments))
    }

    #[doc(hidden)]
    fn query_string(&self) -> &'q str {
        self.query
    }
}

impl<'q, DB> Query<'q, DB>
where
    DB: Database,
{
    /// Bind a value for use with this SQL query.
    ///
    /// If the number of times this is called does not match the number of bind parameters that
    /// appear in the query (`?` for most SQL flavors, `$1 .. $N` for Postgres) then an error
    /// will be returned when this query is executed.
    ///
    /// There is no validation that the value is of the type expected by the query. Most SQL
    /// flavors will perform type coercion (Postgres will return a database error).s
    pub fn bind<T>(mut self, value: T) -> Self
    where
        T: Type<DB>,
        T: Encode<DB>,
    {
        self.arguments.add(value);
        self
    }

    #[doc(hidden)]
    pub fn bind_all(self, arguments: DB::Arguments) -> Query<'q, DB> {
        Query {
            query: self.query,
            arguments,
            database: PhantomData,
        }
    }
}

impl<'q, DB> Query<'q, DB>
where
    DB: Database,
{
    /// Map each row in the result to another type.
    ///
    /// The returned type has most of the same methods but does not have
    /// [`.execute()`][Query::execute] or [`.bind()][Query::bind].
    ///
    /// See also: [query_as][crate::query_as::query_as].
    pub fn map<F, O>(self, mapper: F) -> Map<'q, DB, impl TryMapRow<DB, Output = O>>
    where
        O: Unpin,
        F: MapRow<DB, Output = O>,
    {
        self.try_map(MapRowAdapter(mapper))
    }

    /// Map each row in the result to another type.
    ///
    /// See also: [query_as][crate::query_as::query_as].
    pub fn try_map<F>(self, mapper: F) -> Map<'q, DB, F>
    where
        F: TryMapRow<DB>,
    {
        Map {
            query: self,
            mapper,
        }
    }
}

impl<'q, DB> Query<'q, DB>
where
    DB: Database,
    Self: Execute<'q, DB>,
{
    pub async fn execute<E>(self, mut executor: E) -> crate::Result<u64>
    where
        E: Executor<Database = DB>,
    {
        executor.execute(self).await
    }

    pub fn fetch<'e, E>(self, executor: E) -> <DB as HasCursor<'e, 'q>>::Cursor
    where
        E: RefExecutor<'e, Database = DB>,
    {
        executor.fetch_by_ref(self)
    }
}

impl<'q, DB, F> Map<'q, DB, F>
where
    DB: Database,
    Query<'q, DB>: Execute<'q, DB>,
    F: TryMapRow<DB>,
{
    /// Execute the query and get a [Stream] of the results, returning our mapped type.
    pub fn fetch<'e: 'q, E>(
        mut self,
        executor: E,
    ) -> impl Stream<Item = crate::Result<F::Output>> + Unpin + 'e
    where
        'q: 'e,
        E: RefExecutor<'e, Database = DB> + 'e,
        F: 'e,
        F::Output: 'e,
    {
        Box::pin(try_stream! {
            let mut cursor = executor.fetch_by_ref(self.query);
            while let Some(next) = cursor.next().await? {
                let mapped = self.mapper.try_map_row(next)?;
                yield mapped;
            }
        })
    }

    /// Get the first row in the result
    pub async fn fetch_optional<'e, E>(self, executor: E) -> crate::Result<Option<F::Output>>
    where
        E: RefExecutor<'e, Database = DB>,
        'q: 'e,
    {
        // could be implemented in terms of `fetch()` but this avoids overhead from `try_stream!`
        let mut cursor = executor.fetch_by_ref(self.query);
        let mut mapper = self.mapper;
        let val = cursor.next().await?;
        val.map(|row| mapper.try_map_row(row)).transpose()
    }

    pub async fn fetch_one<'e, E>(self, executor: E) -> crate::Result<F::Output>
    where
        E: RefExecutor<'e, Database = DB>,
        'q: 'e,
    {
        self.fetch_optional(executor)
            .and_then(|row| match row {
                Some(row) => ready(Ok(row)),
                None => ready(Err(crate::Error::RowNotFound)),
            })
            .await
    }

    pub async fn fetch_all<'e, E>(mut self, executor: E) -> crate::Result<Vec<F::Output>>
    where
        E: RefExecutor<'e, Database = DB>,
        'q: 'e,
    {
        let mut cursor = executor.fetch_by_ref(self.query);
        let mut out = vec![];

        while let Some(row) = cursor.next().await? {
            out.push(self.mapper.try_map_row(row)?);
        }

        Ok(out)
    }
}

// A (hopefully) temporary workaround for an internal compiler error (ICE) involving higher-ranked
// trait bounds (HRTBs), associated types and closures.
//
// See https://github.com/rust-lang/rust/issues/62529

pub trait TryMapRow<DB: Database> {
    type Output: Unpin;

    fn try_map_row(&mut self, row: <DB as HasRow>::Row) -> crate::Result<Self::Output>;
}

pub trait MapRow<DB: Database> {
    type Output: Unpin;

    fn map_row(&mut self, row: <DB as HasRow>::Row) -> Self::Output;
}

// An adapter that implements [MapRow] in terms of [TryMapRow]
// Just ends up Ok wrapping it

struct MapRowAdapter<F>(F);

impl<DB: Database, O, F> TryMapRow<DB> for MapRowAdapter<F>
where
    O: Unpin,
    F: MapRow<DB, Output = O>,
{
    type Output = O;

    fn try_map_row(&mut self, row: <DB as HasRow>::Row) -> crate::Result<Self::Output> {
        Ok(self.0.map_row(row))
    }
}

/// Construct a raw SQL query that can be chained to bind parameters and executed.
pub fn query<DB>(sql: &str) -> Query<DB>
where
    DB: Database,
{
    Query {
        database: PhantomData,
        arguments: Default::default(),
        query: sql,
    }
}