rbdc_sqlite/connection/
executor.rs1use crate::query::SqliteQuery;
2use crate::{SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo};
3use either::Either;
4use futures_core::future::BoxFuture;
5use futures_core::stream::BoxStream;
6use futures_util::pin_mut;
7use futures_util::{TryFutureExt, TryStreamExt};
8use rbdc::error::Error;
9use rbdc::try_stream;
10
11impl SqliteConnection {
12 pub fn fetch_many(
13 &mut self,
14 query: SqliteQuery,
15 ) -> BoxStream<'_, Result<Either<SqliteQueryResult, SqliteRow>, Error>> {
16 let sql = query.sql().to_string();
17 let persistent = query.persistent() && !query.arguments.is_empty();
18 Box::pin(try_stream! {
19 let arguments = query.take_arguments()?;
20 let s=self.worker
21 .execute(sql, arguments, self.row_channel_size, persistent)
22 .map_ok(flume::Receiver::into_stream)
23 .try_flatten_stream();
24 pin_mut!(s);
25 while let Some(v) = s.try_next().await? {
26 r#yield!(v);
27 }
28 Ok(())
29 })
30 }
31
32 pub fn fetch_optional(
33 &mut self,
34 query: SqliteQuery,
35 ) -> BoxFuture<'_, Result<Option<SqliteRow>, Error>> {
36 let sql = query.sql().to_owned();
37 let persistent = query.persistent() && !query.arguments.is_empty();
38 Box::pin(async move {
39 let arguments = query.take_arguments()?;
40 let stream = self
41 .worker
42 .execute(sql, arguments, self.row_channel_size, persistent)
43 .map_ok(flume::Receiver::into_stream)
44 .try_flatten_stream();
45 pin_mut!(stream);
46 while let Some(res) = stream.try_next().await? {
47 if let Either::Right(row) = res {
48 return Ok(Some(row));
49 }
50 }
51 Ok(None)
52 })
53 }
54
55 pub fn prepare_with<'a>(
56 &'a mut self,
57 sql: &'a str,
58 _parameters: &[SqliteTypeInfo],
59 ) -> BoxFuture<'a, Result<SqliteStatement, Error>> {
60 Box::pin(async move {
61 let statement = self.worker.prepare(sql).await?;
62
63 Ok(SqliteStatement {
64 sql: sql.into(),
65 ..statement
66 })
67 })
68 }
69}