sqlx_sqlite/connection/
executor.rs

1use crate::{
2    Sqlite, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo,
3};
4use futures_core::future::BoxFuture;
5use futures_core::stream::BoxStream;
6use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
7use sqlx_core::describe::Describe;
8use sqlx_core::error::Error;
9use sqlx_core::executor::{Execute, Executor};
10use sqlx_core::Either;
11use std::{future, pin::pin};
12
13impl<'c> Executor<'c> for &'c mut SqliteConnection {
14    type Database = Sqlite;
15
16    fn fetch_many<'e, 'q, E>(
17        self,
18        mut query: E,
19    ) -> BoxStream<'e, Result<Either<SqliteQueryResult, SqliteRow>, Error>>
20    where
21        'c: 'e,
22        E: Execute<'q, Self::Database>,
23        'q: 'e,
24        E: 'q,
25    {
26        let sql = query.sql();
27        let arguments = match query.take_arguments().map_err(Error::Encode) {
28            Ok(arguments) => arguments,
29            Err(error) => return stream::once(future::ready(Err(error))).boxed(),
30        };
31        let persistent = query.persistent() && arguments.is_some();
32
33        Box::pin(
34            self.worker
35                .execute(sql, arguments, self.row_channel_size, persistent, None)
36                .map_ok(flume::Receiver::into_stream)
37                .try_flatten_stream(),
38        )
39    }
40
41    fn fetch_optional<'e, 'q, E>(
42        self,
43        mut query: E,
44    ) -> BoxFuture<'e, Result<Option<SqliteRow>, Error>>
45    where
46        'c: 'e,
47        E: Execute<'q, Self::Database>,
48        'q: 'e,
49        E: 'q,
50    {
51        let sql = query.sql();
52        let arguments = match query.take_arguments().map_err(Error::Encode) {
53            Ok(arguments) => arguments,
54            Err(error) => return future::ready(Err(error)).boxed(),
55        };
56        let persistent = query.persistent() && arguments.is_some();
57
58        Box::pin(async move {
59            let mut stream = pin!(self
60                .worker
61                .execute(sql, arguments, self.row_channel_size, persistent, Some(1))
62                .map_ok(flume::Receiver::into_stream)
63                .try_flatten_stream());
64
65            while let Some(res) = stream.try_next().await? {
66                if let Either::Right(row) = res {
67                    return Ok(Some(row));
68                }
69            }
70
71            Ok(None)
72        })
73    }
74
75    fn prepare_with<'e, 'q: 'e>(
76        self,
77        sql: &'q str,
78        _parameters: &[SqliteTypeInfo],
79    ) -> BoxFuture<'e, Result<SqliteStatement<'q>, Error>>
80    where
81        'c: 'e,
82    {
83        Box::pin(async move {
84            let statement = self.worker.prepare(sql).await?;
85
86            Ok(SqliteStatement {
87                sql: sql.into(),
88                ..statement
89            })
90        })
91    }
92
93    #[doc(hidden)]
94    fn describe<'e, 'q: 'e>(self, sql: &'q str) -> BoxFuture<'e, Result<Describe<Sqlite>, Error>>
95    where
96        'c: 'e,
97    {
98        Box::pin(self.worker.describe(sql))
99    }
100}