rbdc_sqlite/connection/
executor.rs

1use crate::query::SqliteQuery;
2use crate::{SqliteConnection, SqliteQueryResult, SqliteRow, SqliteStatement, SqliteTypeInfo};
3use either::Either;
4use futures_core::future::BoxFuture;
5use futures_core::stream::BoxStream;
6use futures_util::pin_mut;
7use futures_util::{TryFutureExt, TryStreamExt};
8use rbdc::error::Error;
9use rbdc::try_stream;
10
11impl SqliteConnection {
12    pub fn fetch_many(
13        &mut self,
14        query: SqliteQuery,
15    ) -> BoxStream<'_, Result<Either<SqliteQueryResult, SqliteRow>, Error>> {
16        let sql = query.sql().to_string();
17        let persistent = query.persistent() && !query.arguments.is_empty();
18        Box::pin(try_stream! {
19            let arguments = query.take_arguments()?;
20            let s=self.worker
21                .execute(sql, arguments, self.row_channel_size, persistent)
22                .map_ok(flume::Receiver::into_stream)
23                .try_flatten_stream();
24            pin_mut!(s);
25            while let Some(v) = s.try_next().await? {
26                r#yield!(v);
27            }
28            Ok(())
29        })
30    }
31
32    pub fn fetch_optional(
33        &mut self,
34        query: SqliteQuery,
35    ) -> BoxFuture<'_, Result<Option<SqliteRow>, Error>> {
36        let sql = query.sql().to_owned();
37        let persistent = query.persistent() && !query.arguments.is_empty();
38        Box::pin(async move {
39            let arguments = query.take_arguments()?;
40            let stream = self
41                .worker
42                .execute(sql, arguments, self.row_channel_size, persistent)
43                .map_ok(flume::Receiver::into_stream)
44                .try_flatten_stream();
45            pin_mut!(stream);
46            while let Some(res) = stream.try_next().await? {
47                if let Either::Right(row) = res {
48                    return Ok(Some(row));
49                }
50            }
51            Ok(None)
52        })
53    }
54
55    pub fn prepare_with<'a>(
56        &'a mut self,
57        sql: &'a str,
58        _parameters: &[SqliteTypeInfo],
59    ) -> BoxFuture<'a, Result<SqliteStatement, Error>> {
60        Box::pin(async move {
61            let statement = self.worker.prepare(sql).await?;
62
63            Ok(SqliteStatement {
64                sql: sql.into(),
65                ..statement
66            })
67        })
68    }
69}