rbdc_sqlite/options/
connect.rs1use crate::query::SqliteQuery;
2use crate::type_info::Type;
3use crate::{SqliteConnectOptions, SqliteConnection, SqliteQueryResult, SqliteRow};
4use either::Either;
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_util::FutureExt;
8use futures_util::{StreamExt, TryStreamExt};
9use rbdc::db::{Connection, ExecResult, Row};
10use rbdc::error::Error;
11use rbs::Value;
12use std::fmt::Write;
13
14impl SqliteConnectOptions {
15 pub fn connect(&self) -> BoxFuture<'_, Result<SqliteConnection, Error>> {
16 Box::pin(async move {
17 let mut conn = SqliteConnection::establish(self).await?;
18
19 let mut init = String::new();
21
22 if let Some(pragma_key_password) = self.pragmas.get("key") {
25 write!(init, "PRAGMA key = {}; ", pragma_key_password).ok();
26 }
27
28 for (key, value) in &self.pragmas {
29 if key == "key" {
32 continue;
33 }
34 write!(init, "PRAGMA {} = {}; ", key, value).ok();
35 }
36
37 conn.exec(&*init, vec![]).await?;
38
39 if !self.collations.is_empty() {
40 let mut locked = conn.lock_handle().await?;
41
42 for collation in &self.collations {
43 collation.create(&mut locked.guard.handle)?;
44 }
45 }
46
47 Ok(conn)
48 })
49 }
50}
51
52impl Connection for SqliteConnection {
53 fn get_rows(
54 &mut self,
55 sql: &str,
56 params: Vec<Value>,
57 ) -> BoxFuture<'_, Result<Vec<Box<dyn Row>>, Error>> {
58 let sql = sql.to_owned();
59 Box::pin(async move {
60 let many = {
61 if params.len() == 0 {
62 self.fetch_many(SqliteQuery {
63 statement: Either::Left(sql),
64 arguments: params,
65 persistent: false,
66 })
67 } else {
68 let stmt = self.prepare_with(&sql, &[]).await?;
69 self.fetch_many(SqliteQuery {
70 statement: Either::Right(stmt),
71 arguments: params,
72 persistent: true,
73 })
74 }
75 };
76 let f: BoxStream<Result<SqliteRow, Error>> = many
77 .try_filter_map(|step| async move {
78 Ok(match step {
79 Either::Left(_) => None,
80 Either::Right(row) => Some(row),
81 })
82 })
83 .boxed();
84 let c: BoxFuture<'_, Result<Vec<SqliteRow>, Error>> = f.try_collect().boxed();
85 let v = c.await?;
86 let mut data: Vec<Box<dyn Row>> = Vec::with_capacity(v.len());
87 for x in v {
88 data.push(Box::new(x));
89 }
90 Ok(data)
91 })
92 }
93
94 fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
95 let sql = sql.to_owned();
96 Box::pin(async move {
97 let many = {
98 if params.len() == 0 {
99 self.fetch_many(SqliteQuery {
100 statement: Either::Left(sql),
101 arguments: params,
102 persistent: false,
103 })
104 } else {
105 let mut type_info = Vec::with_capacity(params.len());
106 for x in ¶ms {
107 type_info.push(x.type_info());
108 }
109 let stmt = self.prepare_with(&sql, &type_info).await?;
110 self.fetch_many(SqliteQuery {
111 statement: Either::Right(stmt),
112 arguments: params,
113 persistent: true,
114 })
115 }
116 };
117 let v: BoxStream<Result<SqliteQueryResult, Error>> = many
118 .try_filter_map(|step| async move {
119 Ok(match step {
120 Either::Left(rows) => Some(rows),
121 Either::Right(_) => None,
122 })
123 })
124 .boxed();
125 let v: SqliteQueryResult = v.try_collect().boxed().await?;
126 return Ok(ExecResult {
127 rows_affected: v.rows_affected(),
128 last_insert_id: Value::U64(v.last_insert_rowid as u64),
129 });
130 })
131 }
132
133 fn close(&mut self) -> BoxFuture<'_, Result<(), Error>> {
134 Box::pin(async { self.do_close().await })
135 }
136
137 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
138 Box::pin(async move {
139 self.worker
140 .oneshot_cmd(|tx| crate::connection::Command::Ping { tx })
141 .await?;
142 Ok(())
143 })
144 }
145}