sqlx_sqlite/connection/
executor.rs1use crate::{
2 Sqlite, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo,
3};
4use futures_core::future::BoxFuture;
5use futures_core::stream::BoxStream;
6use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
7use sqlx_core::describe::Describe;
8use sqlx_core::error::Error;
9use sqlx_core::executor::{Execute, Executor};
10use sqlx_core::Either;
11use std::{future, pin::pin};
12
13impl<'c> Executor<'c> for &'c mut SqliteConnection {
14 type Database = Sqlite;
15
16 fn fetch_many<'e, 'q, E>(
17 self,
18 mut query: E,
19 ) -> BoxStream<'e, Result<Either<SqliteQueryResult, SqliteRow>, Error>>
20 where
21 'c: 'e,
22 E: Execute<'q, Self::Database>,
23 'q: 'e,
24 E: 'q,
25 {
26 let sql = query.sql();
27 let arguments = match query.take_arguments().map_err(Error::Encode) {
28 Ok(arguments) => arguments,
29 Err(error) => return stream::once(future::ready(Err(error))).boxed(),
30 };
31 let persistent = query.persistent() && arguments.is_some();
32
33 Box::pin(
34 self.worker
35 .execute(sql, arguments, self.row_channel_size, persistent, None)
36 .map_ok(flume::Receiver::into_stream)
37 .try_flatten_stream(),
38 )
39 }
40
41 fn fetch_optional<'e, 'q, E>(
42 self,
43 mut query: E,
44 ) -> BoxFuture<'e, Result<Option<SqliteRow>, Error>>
45 where
46 'c: 'e,
47 E: Execute<'q, Self::Database>,
48 'q: 'e,
49 E: 'q,
50 {
51 let sql = query.sql();
52 let arguments = match query.take_arguments().map_err(Error::Encode) {
53 Ok(arguments) => arguments,
54 Err(error) => return future::ready(Err(error)).boxed(),
55 };
56 let persistent = query.persistent() && arguments.is_some();
57
58 Box::pin(async move {
59 let mut stream = pin!(self
60 .worker
61 .execute(sql, arguments, self.row_channel_size, persistent, Some(1))
62 .map_ok(flume::Receiver::into_stream)
63 .try_flatten_stream());
64
65 while let Some(res) = stream.try_next().await? {
66 if let Either::Right(row) = res {
67 return Ok(Some(row));
68 }
69 }
70
71 Ok(None)
72 })
73 }
74
75 fn prepare_with<'e, 'q: 'e>(
76 self,
77 sql: &'q str,
78 _parameters: &[SqliteTypeInfo],
79 ) -> BoxFuture<'e, Result<SqliteStatement<'q>, Error>>
80 where
81 'c: 'e,
82 {
83 Box::pin(async move {
84 let statement = self.worker.prepare(sql).await?;
85
86 Ok(SqliteStatement {
87 sql: sql.into(),
88 ..statement
89 })
90 })
91 }
92
93 #[doc(hidden)]
94 fn describe<'e, 'q: 'e>(self, sql: &'q str) -> BoxFuture<'e, Result<Describe<Sqlite>, Error>>
95 where
96 'c: 'e,
97 {
98 Box::pin(self.worker.describe(sql))
99 }
100}