rbdc_sqlite/options/
connect.rs

1use crate::query::SqliteQuery;
2use crate::type_info::Type;
3use crate::{SqliteConnectOptions, SqliteConnection, SqliteQueryResult, SqliteRow};
4use either::Either;
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_util::FutureExt;
8use futures_util::{StreamExt, TryStreamExt};
9use rbdc::db::{Connection, ExecResult, Row};
10use rbdc::error::Error;
11use rbs::Value;
12use std::fmt::Write;
13
14impl SqliteConnectOptions {
15    pub fn connect(&self) -> BoxFuture<'_, Result<SqliteConnection, Error>> {
16        Box::pin(async move {
17            let mut conn = SqliteConnection::establish(self).await?;
18
19            // send an initial sql statement comprised of options
20            let mut init = String::new();
21
22            // This is a special case for sqlcipher. When the `key` pragma
23            // is set, we have to make sure it's executed first in order.
24            if let Some(pragma_key_password) = self.pragmas.get("key") {
25                write!(init, "PRAGMA key = {}; ", pragma_key_password).ok();
26            }
27
28            for (key, value) in &self.pragmas {
29                // Since we've already written the possible `key` pragma
30                // above, we shall skip it now.
31                if key == "key" {
32                    continue;
33                }
34                write!(init, "PRAGMA {} = {}; ", key, value).ok();
35            }
36
37            conn.exec(&*init, vec![]).await?;
38
39            if !self.collations.is_empty() {
40                let mut locked = conn.lock_handle().await?;
41
42                for collation in &self.collations {
43                    collation.create(&mut locked.guard.handle)?;
44                }
45            }
46
47            Ok(conn)
48        })
49    }
50}
51
52impl Connection for SqliteConnection {
53    fn get_rows(
54        &mut self,
55        sql: &str,
56        params: Vec<Value>,
57    ) -> BoxFuture<'_, Result<Vec<Box<dyn Row>>, Error>> {
58        let sql = sql.to_owned();
59        Box::pin(async move {
60            let many = {
61                if params.len() == 0 {
62                    self.fetch_many(SqliteQuery {
63                        statement: Either::Left(sql),
64                        arguments: params,
65                        persistent: false,
66                    })
67                } else {
68                    let stmt = self.prepare_with(&sql, &[]).await?;
69                    self.fetch_many(SqliteQuery {
70                        statement: Either::Right(stmt),
71                        arguments: params,
72                        persistent: true,
73                    })
74                }
75            };
76            let f: BoxStream<Result<SqliteRow, Error>> = many
77                .try_filter_map(|step| async move {
78                    Ok(match step {
79                        Either::Left(_) => None,
80                        Either::Right(row) => Some(row),
81                    })
82                })
83                .boxed();
84            let c: BoxFuture<'_, Result<Vec<SqliteRow>, Error>> = f.try_collect().boxed();
85            let v = c.await?;
86            let mut data: Vec<Box<dyn Row>> = Vec::with_capacity(v.len());
87            for x in v {
88                data.push(Box::new(x));
89            }
90            Ok(data)
91        })
92    }
93
94    fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
95        let sql = sql.to_owned();
96        Box::pin(async move {
97            let many = {
98                if params.len() == 0 {
99                    self.fetch_many(SqliteQuery {
100                        statement: Either::Left(sql),
101                        arguments: params,
102                        persistent: false,
103                    })
104                } else {
105                    let mut type_info = Vec::with_capacity(params.len());
106                    for x in &params {
107                        type_info.push(x.type_info());
108                    }
109                    let stmt = self.prepare_with(&sql, &type_info).await?;
110                    self.fetch_many(SqliteQuery {
111                        statement: Either::Right(stmt),
112                        arguments: params,
113                        persistent: true,
114                    })
115                }
116            };
117            let v: BoxStream<Result<SqliteQueryResult, Error>> = many
118                .try_filter_map(|step| async move {
119                    Ok(match step {
120                        Either::Left(rows) => Some(rows),
121                        Either::Right(_) => None,
122                    })
123                })
124                .boxed();
125            let v: SqliteQueryResult = v.try_collect().boxed().await?;
126            return Ok(ExecResult {
127                rows_affected: v.rows_affected(),
128                last_insert_id: Value::U64(v.last_insert_rowid as u64),
129            });
130        })
131    }
132
133    fn close(&mut self) -> BoxFuture<'_, Result<(), Error>> {
134        Box::pin(async { self.do_close().await })
135    }
136
137    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
138        Box::pin(async move {
139            self.worker
140                .oneshot_cmd(|tx| crate::connection::Command::Ping { tx })
141                .await?;
142            Ok(())
143        })
144    }
145}