rbdc_mysql/connection/
mod.rs

1use crate::protocol::statement::StmtClose;
2use crate::protocol::text::{Ping, Quit};
3use crate::stmt::MySqlStatementMetadata;
4use either::Either;
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_util::{FutureExt, StreamExt, TryStreamExt};
8use rbdc::common::StatementCache;
9use rbdc::db::{Connection, ExecResult, Row};
10use rbdc::Error;
11use rbs::Value;
12use std::fmt::{self, Debug, Formatter};
13use std::ops::{Deref, DerefMut};
14use std::sync::Arc;
15
16
17mod auth;
18mod establish;
19mod executor;
20mod stream;
21mod tls;
22
23use crate::query::MysqlQuery;
24use crate::query_result::MySqlQueryResult;
25use crate::row::MySqlRow;
26pub(crate) use stream::MySqlStream;
27use crate::options::MySqlConnectOptions;
28
29const MAX_PACKET_SIZE: u32 = 1024;
30
31/// A connection to a MySQL database.
32pub struct MySqlConnection {
33    // underlying TCP stream,
34    // wrapped in a potentially TLS stream,
35    // wrapped in a buffered stream
36    pub stream: DropBox<MySqlStream>,
37    // cache by query string to the statement id and metadata
38    pub cache_statement: StatementCache<(u32, MySqlStatementMetadata)>,
39    // mysql options
40    pub option: Arc<MySqlConnectOptions>,
41}
42
43impl Debug for MySqlConnection {
44    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
45        f.debug_struct("MySqlConnection").finish()
46    }
47}
48
49pub struct DropBox<T> {
50    pub inner: Option<T>,
51}
52
53impl<T> Deref for DropBox<T> {
54    type Target = T;
55
56    fn deref(&self) -> &Self::Target {
57        self.inner.as_ref().expect("conn closed")
58    }
59}
60
61impl<T> DerefMut for DropBox<T> {
62    fn deref_mut(&mut self) -> &mut Self::Target {
63        self.inner.as_mut().expect("conn closed")
64    }
65}
66
67impl MySqlConnection {
68    #[inline]
69    async fn do_close(&mut self) -> Result<(), Error> {
70        self.stream.send_packet(Quit).await?;
71        self.stream.shutdown().await?;
72        Ok(())
73    }
74
75    fn do_ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
76        Box::pin(async move {
77            self.stream.wait_until_ready().await?;
78            self.stream.send_packet(Ping).await?;
79            self.stream.recv_ok().await?;
80
81            Ok(())
82        })
83    }
84
85    #[doc(hidden)]
86    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
87        self.stream.wait_until_ready().boxed()
88    }
89
90    fn cached_statements_size(&self) -> usize {
91        self.cache_statement.len()
92    }
93
94    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
95        Box::pin(async move {
96            while let Some((statement_id, _)) = self.cache_statement.remove_lru() {
97                self.stream
98                    .send_packet(StmtClose {
99                        statement: statement_id,
100                    })
101                    .await?;
102            }
103
104            Ok(())
105        })
106    }
107
108    #[doc(hidden)]
109    fn should_flush(&self) -> bool {
110        !self.stream.wbuf.is_empty()
111    }
112}
113
114impl Connection for MySqlConnection {
115    fn get_rows(
116        &mut self,
117        sql: &str,
118        params: Vec<Value>,
119    ) -> BoxFuture<Result<Vec<Box<dyn Row>>, Error>> {
120        let sql = sql.to_owned();
121        Box::pin(async move {
122            let many = {
123                if params.len() == 0 {
124                    self.fetch_many(MysqlQuery {
125                        statement: Either::Left(sql),
126                        arguments: params,
127                        persistent: false,
128                    })
129                } else {
130                    let stmt = self.prepare_with(&sql, &[]).await?;
131                    self.fetch_many(MysqlQuery {
132                        statement: Either::Right(stmt),
133                        arguments: params,
134                        persistent: true,
135                    })
136                }
137            };
138            let f: BoxStream<Result<MySqlRow, Error>> = many
139                .try_filter_map(|step| async move {
140                    Ok(match step {
141                        Either::Left(_) => None,
142                        Either::Right(row) => Some(row),
143                    })
144                })
145                .boxed();
146            let c: BoxFuture<Result<Vec<MySqlRow>, Error>> = f.try_collect().boxed();
147            let v = c.await?;
148            let mut data: Vec<Box<dyn Row>> = Vec::with_capacity(v.len());
149            for x in v {
150                data.push(Box::new(x));
151            }
152            Ok(data)
153        })
154    }
155
156    fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<Result<ExecResult, Error>> {
157        let sql = sql.to_owned();
158        Box::pin(async move {
159            let many = {
160                if params.len() == 0 {
161                    self.fetch_many(MysqlQuery {
162                        statement: Either::Left(sql),
163                        arguments: params,
164                        persistent: false,
165                    })
166                } else {
167                    let stmt = self.prepare_with(&sql, &[]).await?;
168                    self.fetch_many(MysqlQuery {
169                        statement: Either::Right(stmt),
170                        arguments: params,
171                        persistent: true,
172                    })
173                }
174            };
175            let v: BoxStream<Result<MySqlQueryResult, Error>> = many
176                .try_filter_map(|step| async move {
177                    Ok(match step {
178                        Either::Left(rows) => Some(rows),
179                        Either::Right(_) => None,
180                    })
181                })
182                .boxed();
183            let v: MySqlQueryResult = v.try_collect().boxed().await?;
184            return Ok(ExecResult {
185                rows_affected: v.rows_affected,
186                last_insert_id: v.last_insert_id.into(),
187            });
188        })
189    }
190
191    fn close(&mut self) -> BoxFuture<Result<(), Error>> {
192        let c = self.do_close();
193        Box::pin(async { c.await })
194    }
195
196    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
197        let c = self.do_ping();
198        Box::pin(async move { c.await })
199    }
200}