rbdc_mysql/connection/
mod.rs

1use crate::protocol::statement::StmtClose;
2use crate::protocol::text::{Ping, Quit};
3use crate::stmt::MySqlStatementMetadata;
4use either::Either;
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_util::{FutureExt, StreamExt, TryStreamExt};
8use rbdc::common::StatementCache;
9use rbdc::db::{Connection, ExecResult, Row};
10use rbdc::Error;
11use rbs::Value;
12use std::fmt::{self, Debug, Formatter};
13use std::ops::{Deref, DerefMut};
14use std::sync::Arc;
15
16mod auth;
17mod establish;
18mod executor;
19mod stream;
20mod tls;
21
22use crate::options::MySqlConnectOptions;
23use crate::query::MysqlQuery;
24use crate::query_result::MySqlQueryResult;
25use crate::row::MySqlRow;
26pub(crate) use stream::MySqlStream;
27
28const MAX_PACKET_SIZE: u32 = 1024;
29
30/// A connection to a MySQL database.
31pub struct MySqlConnection {
32    // underlying TCP stream,
33    // wrapped in a potentially TLS stream,
34    // wrapped in a buffered stream
35    pub stream: DropBox<MySqlStream>,
36    // cache by query string to the statement id and metadata
37    pub cache_statement: StatementCache<(u32, MySqlStatementMetadata)>,
38    // mysql options
39    pub option: Arc<MySqlConnectOptions>,
40}
41
42impl Debug for MySqlConnection {
43    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
44        f.debug_struct("MySqlConnection").finish()
45    }
46}
47
48pub struct DropBox<T> {
49    pub inner: Option<T>,
50}
51
52impl<T> Deref for DropBox<T> {
53    type Target = T;
54
55    fn deref(&self) -> &Self::Target {
56        self.inner.as_ref().expect("conn closed")
57    }
58}
59
60impl<T> DerefMut for DropBox<T> {
61    fn deref_mut(&mut self) -> &mut Self::Target {
62        self.inner.as_mut().expect("conn closed")
63    }
64}
65
66impl MySqlConnection {
67    #[inline]
68    async fn do_close(&mut self) -> Result<(), Error> {
69        self.stream.send_packet(Quit).await?;
70        self.stream.shutdown().await?;
71        Ok(())
72    }
73
74    fn do_ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
75        Box::pin(async move {
76            self.stream.wait_until_ready().await?;
77            self.stream.send_packet(Ping).await?;
78            self.stream.recv_ok().await?;
79
80            Ok(())
81        })
82    }
83
84    #[doc(hidden)]
85    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
86        self.stream.wait_until_ready().boxed()
87    }
88
89    fn cached_statements_size(&self) -> usize {
90        self.cache_statement.len()
91    }
92
93    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
94        Box::pin(async move {
95            while let Some((statement_id, _)) = self.cache_statement.remove_lru() {
96                self.stream
97                    .send_packet(StmtClose {
98                        statement: statement_id,
99                    })
100                    .await?;
101            }
102
103            Ok(())
104        })
105    }
106
107    #[doc(hidden)]
108    fn should_flush(&self) -> bool {
109        !self.stream.wbuf.is_empty()
110    }
111}
112
113impl Connection for MySqlConnection {
114    fn get_rows(
115        &mut self,
116        sql: &str,
117        params: Vec<Value>,
118    ) -> BoxFuture<'_, Result<Vec<Box<dyn Row>>, Error>> {
119        let sql = sql.to_owned();
120        Box::pin(async move {
121            let many = {
122                if params.len() == 0 {
123                    self.fetch_many(MysqlQuery {
124                        statement: Either::Left(sql),
125                        arguments: params,
126                        persistent: false,
127                    })
128                } else {
129                    let stmt = self.prepare_with(&sql, &[]).await?;
130                    self.fetch_many(MysqlQuery {
131                        statement: Either::Right(stmt),
132                        arguments: params,
133                        persistent: true,
134                    })
135                }
136            };
137            let f: BoxStream<Result<MySqlRow, Error>> = many
138                .try_filter_map(|step| async move {
139                    Ok(match step {
140                        Either::Left(_) => None,
141                        Either::Right(row) => Some(row),
142                    })
143                })
144                .boxed();
145            let c: BoxFuture<'_, Result<Vec<MySqlRow>, Error>> = f.try_collect().boxed();
146            let v = c.await?;
147            let mut data: Vec<Box<dyn Row>> = Vec::with_capacity(v.len());
148            for x in v {
149                data.push(Box::new(x));
150            }
151            Ok(data)
152        })
153    }
154
155    fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
156        let sql = sql.to_owned();
157        Box::pin(async move {
158            let many = {
159                if params.len() == 0 {
160                    self.fetch_many(MysqlQuery {
161                        statement: Either::Left(sql),
162                        arguments: params,
163                        persistent: false,
164                    })
165                } else {
166                    let stmt = self.prepare_with(&sql, &[]).await?;
167                    self.fetch_many(MysqlQuery {
168                        statement: Either::Right(stmt),
169                        arguments: params,
170                        persistent: true,
171                    })
172                }
173            };
174            let v: BoxStream<Result<MySqlQueryResult, Error>> = many
175                .try_filter_map(|step| async move {
176                    Ok(match step {
177                        Either::Left(rows) => Some(rows),
178                        Either::Right(_) => None,
179                    })
180                })
181                .boxed();
182            let v: MySqlQueryResult = v.try_collect().boxed().await?;
183            return Ok(ExecResult {
184                rows_affected: v.rows_affected,
185                last_insert_id: v.last_insert_id.into(),
186            });
187        })
188    }
189
190    fn close(&mut self) -> BoxFuture<'_, Result<(), Error>> {
191        let c = self.do_close();
192        Box::pin(async { c.await })
193    }
194
195    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
196        let c = self.do_ping();
197        Box::pin(async move { c.await })
198    }
199}