Skip to main content

rbdc_sqlite/options/
connect.rs

1use crate::query::SqliteQuery;
2use crate::type_info::Type;
3use crate::{SqliteArguments, SqliteConnectOptions, SqliteConnection, SqliteQueryResult};
4use either::Either;
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_util::FutureExt;
8use futures_util::{StreamExt, TryStreamExt};
9use rbdc::db::{Connection, ExecResult, Row};
10use rbdc::error::Error;
11use rbdc::try_stream;
12use rbs::Value;
13use std::fmt::Write;
14
15impl SqliteConnectOptions {
16    pub fn connect(&self) -> BoxFuture<'_, Result<SqliteConnection, Error>> {
17        Box::pin(async move {
18            let mut conn = SqliteConnection::establish(self).await?;
19
20            // send an initial sql statement comprised of options
21            let mut init = String::new();
22
23            // This is a special case for sqlcipher. When the `key` pragma
24            // is set, we have to make sure it's executed first in order.
25            if let Some(pragma_key_password) = self.pragmas.get("key") {
26                write!(init, "PRAGMA key = {}; ", pragma_key_password).ok();
27            }
28
29            for (key, value) in &self.pragmas {
30                // Since we've already written the possible `key` pragma
31                // above, we shall skip it now.
32                if key == "key" {
33                    continue;
34                }
35                write!(init, "PRAGMA {} = {}; ", key, value).ok();
36            }
37
38            conn.exec(&*init, vec![]).await?;
39
40            if !self.collations.is_empty() {
41                let mut locked = conn.lock_handle().await?;
42
43                for collation in &self.collations {
44                    collation.create(&mut locked.guard.handle)?;
45                }
46            }
47
48            Ok(conn)
49        })
50    }
51}
52
53impl Connection for SqliteConnection {
54    fn exec_rows(
55        &mut self,
56        sql: &str,
57        params: Vec<Value>,
58    ) -> BoxFuture<'_, Result<BoxStream<Result<Box<dyn Row>, Error>>, Error>> {
59        let sql = sql.to_owned();
60        let row_channel_size = self.row_channel_size;
61        let has_args = !params.is_empty();
62
63        Box::pin(async move {
64            let rx = if has_args {
65                let arguments = SqliteArguments::from_args(params)?;
66                self.worker
67                    .execute(sql, Some(arguments.into_static()), row_channel_size, true)
68                    .await
69                    .map_err(|_| Error::from("WorkerCrashed"))?
70            } else {
71                self.worker
72                    .execute(sql, None, row_channel_size, false)
73                    .await
74                    .map_err(|_| Error::from("WorkerCrashed"))?
75            };
76
77            let stream = try_stream! {
78                use futures_util::StreamExt;
79                let mut s = rx.into_stream();
80                while let Some(item) = s.next().await {
81                    match item? {
82                        Either::Left(_) => {}
83                        Either::Right(row) => {
84                            r#yield!(Box::new(row) as Box<dyn Row>);
85                        }
86                    }
87                }
88                Ok(())
89            }
90            .boxed();
91
92            Ok(stream as BoxStream<Result<Box<dyn Row>, Error>>)
93        })
94    }
95
96    fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
97        let sql = sql.to_owned();
98        Box::pin(async move {
99            let many = {
100                if params.len() == 0 {
101                    self.fetch_many(SqliteQuery {
102                        statement: Either::Left(sql),
103                        arguments: params,
104                        persistent: false,
105                    })
106                } else {
107                    let mut type_info = Vec::with_capacity(params.len());
108                    for x in &params {
109                        type_info.push(x.type_info());
110                    }
111                    let stmt = self.prepare_with(&sql, &type_info).await?;
112                    self.fetch_many(SqliteQuery {
113                        statement: Either::Right(stmt),
114                        arguments: params,
115                        persistent: true,
116                    })
117                }
118            };
119            let v: BoxStream<Result<SqliteQueryResult, Error>> = many
120                .try_filter_map(|step| async move {
121                    Ok(match step {
122                        Either::Left(rows) => Some(rows),
123                        Either::Right(_) => None,
124                    })
125                })
126                .boxed();
127            let v: SqliteQueryResult = v.try_collect().boxed().await?;
128            return Ok(ExecResult {
129                rows_affected: v.rows_affected(),
130                last_insert_id: Value::U64(v.last_insert_rowid as u64),
131            });
132        })
133    }
134
135    fn close(&mut self) -> BoxFuture<'_, Result<(), Error>> {
136        Box::pin(async { self.do_close().await })
137    }
138
139    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
140        Box::pin(async move {
141            self.worker
142                .oneshot_cmd(|tx| crate::connection::Command::Ping { tx })
143                .await?;
144            Ok(())
145        })
146    }
147}