sqlx_core_oldapi/sqlite/connection/
executor.rs

1use crate::describe::Describe;
2use crate::error::Error;
3use crate::executor::{Execute, Executor};
4use crate::sqlite::{
5    Sqlite, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo,
6};
7use either::Either;
8use futures_core::future::BoxFuture;
9use futures_core::stream::BoxStream;
10use futures_util::{TryFutureExt, TryStreamExt};
11
12impl<'c> Executor<'c> for &'c mut SqliteConnection {
13    type Database = Sqlite;
14
15    fn fetch_many<'e, 'q: 'e, E: 'q>(
16        self,
17        mut query: E,
18    ) -> BoxStream<'e, Result<Either<SqliteQueryResult, SqliteRow>, Error>>
19    where
20        'c: 'e,
21        E: Execute<'q, Self::Database>,
22    {
23        let sql = query.sql();
24        let arguments = query.take_arguments();
25        let persistent = query.persistent() && arguments.is_some();
26
27        Box::pin(
28            self.worker
29                .execute(sql, arguments, self.row_channel_size, persistent)
30                .map_ok(flume::Receiver::into_stream)
31                .try_flatten_stream(),
32        )
33    }
34
35    fn fetch_optional<'e, 'q: 'e, E: 'q>(
36        self,
37        mut query: E,
38    ) -> BoxFuture<'e, Result<Option<SqliteRow>, Error>>
39    where
40        'c: 'e,
41        E: Execute<'q, Self::Database>,
42    {
43        let sql = query.sql();
44        let arguments = query.take_arguments();
45        let persistent = query.persistent() && arguments.is_some();
46
47        Box::pin(async move {
48            let stream = self
49                .worker
50                .execute(sql, arguments, self.row_channel_size, persistent)
51                .map_ok(flume::Receiver::into_stream)
52                .try_flatten_stream();
53
54            futures_util::pin_mut!(stream);
55
56            while let Some(res) = stream.try_next().await? {
57                if let Either::Right(row) = res {
58                    return Ok(Some(row));
59                }
60            }
61
62            Ok(None)
63        })
64    }
65
66    fn prepare_with<'e, 'q: 'e>(
67        self,
68        sql: &'q str,
69        _parameters: &[SqliteTypeInfo],
70    ) -> BoxFuture<'e, Result<SqliteStatement<'q>, Error>>
71    where
72        'c: 'e,
73    {
74        Box::pin(async move {
75            let statement = self.worker.prepare(sql).await?;
76
77            Ok(SqliteStatement {
78                sql: sql.into(),
79                ..statement
80            })
81        })
82    }
83
84    #[doc(hidden)]
85    fn describe<'e, 'q: 'e>(self, sql: &'q str) -> BoxFuture<'e, Result<Describe<Sqlite>, Error>>
86    where
87        'c: 'e,
88    {
89        Box::pin(self.worker.describe(sql))
90    }
91}