sqlx_core_oldapi/sqlite/connection/
executor.rs1use crate::describe::Describe;
2use crate::error::Error;
3use crate::executor::{Execute, Executor};
4use crate::sqlite::{
5 Sqlite, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo,
6};
7use either::Either;
8use futures_core::future::BoxFuture;
9use futures_core::stream::BoxStream;
10use futures_util::{TryFutureExt, TryStreamExt};
11
12impl<'c> Executor<'c> for &'c mut SqliteConnection {
13 type Database = Sqlite;
14
15 fn fetch_many<'e, 'q: 'e, E: 'q>(
16 self,
17 mut query: E,
18 ) -> BoxStream<'e, Result<Either<SqliteQueryResult, SqliteRow>, Error>>
19 where
20 'c: 'e,
21 E: Execute<'q, Self::Database>,
22 {
23 let sql = query.sql();
24 let arguments = query.take_arguments();
25 let persistent = query.persistent() && arguments.is_some();
26
27 Box::pin(
28 self.worker
29 .execute(sql, arguments, self.row_channel_size, persistent)
30 .map_ok(flume::Receiver::into_stream)
31 .try_flatten_stream(),
32 )
33 }
34
35 fn fetch_optional<'e, 'q: 'e, E: 'q>(
36 self,
37 mut query: E,
38 ) -> BoxFuture<'e, Result<Option<SqliteRow>, Error>>
39 where
40 'c: 'e,
41 E: Execute<'q, Self::Database>,
42 {
43 let sql = query.sql();
44 let arguments = query.take_arguments();
45 let persistent = query.persistent() && arguments.is_some();
46
47 Box::pin(async move {
48 let stream = self
49 .worker
50 .execute(sql, arguments, self.row_channel_size, persistent)
51 .map_ok(flume::Receiver::into_stream)
52 .try_flatten_stream();
53
54 futures_util::pin_mut!(stream);
55
56 while let Some(res) = stream.try_next().await? {
57 if let Either::Right(row) = res {
58 return Ok(Some(row));
59 }
60 }
61
62 Ok(None)
63 })
64 }
65
66 fn prepare_with<'e, 'q: 'e>(
67 self,
68 sql: &'q str,
69 _parameters: &[SqliteTypeInfo],
70 ) -> BoxFuture<'e, Result<SqliteStatement<'q>, Error>>
71 where
72 'c: 'e,
73 {
74 Box::pin(async move {
75 let statement = self.worker.prepare(sql).await?;
76
77 Ok(SqliteStatement {
78 sql: sql.into(),
79 ..statement
80 })
81 })
82 }
83
84 #[doc(hidden)]
85 fn describe<'e, 'q: 'e>(self, sql: &'q str) -> BoxFuture<'e, Result<Describe<Sqlite>, Error>>
86 where
87 'c: 'e,
88 {
89 Box::pin(self.worker.describe(sql))
90 }
91}