rbdc_sqlite/options/
connect.rs1use crate::query::SqliteQuery;
2use crate::type_info::Type;
3use crate::{SqliteArguments, SqliteConnectOptions, SqliteConnection, SqliteQueryResult};
4use either::Either;
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_util::FutureExt;
8use futures_util::{StreamExt, TryStreamExt};
9use rbdc::db::{Connection, ExecResult, Row};
10use rbdc::error::Error;
11use rbdc::try_stream;
12use rbs::Value;
13use std::fmt::Write;
14
15impl SqliteConnectOptions {
16 pub fn connect(&self) -> BoxFuture<'_, Result<SqliteConnection, Error>> {
17 Box::pin(async move {
18 let mut conn = SqliteConnection::establish(self).await?;
19
20 let mut init = String::new();
22
23 if let Some(pragma_key_password) = self.pragmas.get("key") {
26 write!(init, "PRAGMA key = {}; ", pragma_key_password).ok();
27 }
28
29 for (key, value) in &self.pragmas {
30 if key == "key" {
33 continue;
34 }
35 write!(init, "PRAGMA {} = {}; ", key, value).ok();
36 }
37
38 conn.exec(&*init, vec![]).await?;
39
40 if !self.collations.is_empty() {
41 let mut locked = conn.lock_handle().await?;
42
43 for collation in &self.collations {
44 collation.create(&mut locked.guard.handle)?;
45 }
46 }
47
48 Ok(conn)
49 })
50 }
51}
52
53impl Connection for SqliteConnection {
54 fn exec_rows(
55 &mut self,
56 sql: &str,
57 params: Vec<Value>,
58 ) -> BoxFuture<'_, Result<BoxStream<Result<Box<dyn Row>, Error>>, Error>> {
59 let sql = sql.to_owned();
60 let row_channel_size = self.row_channel_size;
61 let has_args = !params.is_empty();
62
63 Box::pin(async move {
64 let rx = if has_args {
65 let arguments = SqliteArguments::from_args(params)?;
66 self.worker
67 .execute(sql, Some(arguments.into_static()), row_channel_size, true)
68 .await
69 .map_err(|_| Error::from("WorkerCrashed"))?
70 } else {
71 self.worker
72 .execute(sql, None, row_channel_size, false)
73 .await
74 .map_err(|_| Error::from("WorkerCrashed"))?
75 };
76
77 let stream = try_stream! {
78 use futures_util::StreamExt;
79 let mut s = rx.into_stream();
80 while let Some(item) = s.next().await {
81 match item? {
82 Either::Left(_) => {}
83 Either::Right(row) => {
84 r#yield!(Box::new(row) as Box<dyn Row>);
85 }
86 }
87 }
88 Ok(())
89 }
90 .boxed();
91
92 Ok(stream as BoxStream<Result<Box<dyn Row>, Error>>)
93 })
94 }
95
96 fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
97 let sql = sql.to_owned();
98 Box::pin(async move {
99 let many = {
100 if params.len() == 0 {
101 self.fetch_many(SqliteQuery {
102 statement: Either::Left(sql),
103 arguments: params,
104 persistent: false,
105 })
106 } else {
107 let mut type_info = Vec::with_capacity(params.len());
108 for x in ¶ms {
109 type_info.push(x.type_info());
110 }
111 let stmt = self.prepare_with(&sql, &type_info).await?;
112 self.fetch_many(SqliteQuery {
113 statement: Either::Right(stmt),
114 arguments: params,
115 persistent: true,
116 })
117 }
118 };
119 let v: BoxStream<Result<SqliteQueryResult, Error>> = many
120 .try_filter_map(|step| async move {
121 Ok(match step {
122 Either::Left(rows) => Some(rows),
123 Either::Right(_) => None,
124 })
125 })
126 .boxed();
127 let v: SqliteQueryResult = v.try_collect().boxed().await?;
128 return Ok(ExecResult {
129 rows_affected: v.rows_affected(),
130 last_insert_id: Value::U64(v.last_insert_rowid as u64),
131 });
132 })
133 }
134
135 fn close(&mut self) -> BoxFuture<'_, Result<(), Error>> {
136 Box::pin(async { self.do_close().await })
137 }
138
139 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
140 Box::pin(async move {
141 self.worker
142 .oneshot_cmd(|tx| crate::connection::Command::Ping { tx })
143 .await?;
144 Ok(())
145 })
146 }
147}