rbdc_mysql/connection/
mod.rs1use crate::protocol::statement::StmtClose;
2use crate::protocol::text::{Ping, Quit};
3use crate::stmt::MySqlStatementMetadata;
4use either::Either;
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_util::{FutureExt, StreamExt, TryStreamExt};
8use rbdc::common::StatementCache;
9use rbdc::db::{Connection, ExecResult, Row};
10use rbdc::Error;
11use rbs::Value;
12use std::fmt::{self, Debug, Formatter};
13use std::ops::{Deref, DerefMut};
14use std::sync::Arc;
15
16mod auth;
17mod establish;
18mod executor;
19mod stream;
20mod tls;
21
22use crate::options::MySqlConnectOptions;
23use crate::query::MysqlQuery;
24use crate::query_result::MySqlQueryResult;
25use crate::row::MySqlRow;
26pub(crate) use stream::MySqlStream;
27
28const MAX_PACKET_SIZE: u32 = 1024;
29
30pub struct MySqlConnection {
32 pub stream: DropBox<MySqlStream>,
36 pub cache_statement: StatementCache<(u32, MySqlStatementMetadata)>,
38 pub option: Arc<MySqlConnectOptions>,
40}
41
42impl Debug for MySqlConnection {
43 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
44 f.debug_struct("MySqlConnection").finish()
45 }
46}
47
48pub struct DropBox<T> {
49 pub inner: Option<T>,
50}
51
52impl<T> Deref for DropBox<T> {
53 type Target = T;
54
55 fn deref(&self) -> &Self::Target {
56 self.inner.as_ref().expect("conn closed")
57 }
58}
59
60impl<T> DerefMut for DropBox<T> {
61 fn deref_mut(&mut self) -> &mut Self::Target {
62 self.inner.as_mut().expect("conn closed")
63 }
64}
65
66impl MySqlConnection {
67 #[inline]
68 async fn do_close(&mut self) -> Result<(), Error> {
69 self.stream.send_packet(Quit).await?;
70 self.stream.shutdown().await?;
71 Ok(())
72 }
73
74 fn do_ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
75 Box::pin(async move {
76 self.stream.wait_until_ready().await?;
77 self.stream.send_packet(Ping).await?;
78 self.stream.recv_ok().await?;
79
80 Ok(())
81 })
82 }
83
84 #[doc(hidden)]
85 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
86 self.stream.wait_until_ready().boxed()
87 }
88
89 fn cached_statements_size(&self) -> usize {
90 self.cache_statement.len()
91 }
92
93 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
94 Box::pin(async move {
95 while let Some((statement_id, _)) = self.cache_statement.remove_lru() {
96 self.stream
97 .send_packet(StmtClose {
98 statement: statement_id,
99 })
100 .await?;
101 }
102
103 Ok(())
104 })
105 }
106
107 #[doc(hidden)]
108 fn should_flush(&self) -> bool {
109 !self.stream.wbuf.is_empty()
110 }
111}
112
113impl Connection for MySqlConnection {
114 fn get_rows(
115 &mut self,
116 sql: &str,
117 params: Vec<Value>,
118 ) -> BoxFuture<'_, Result<Vec<Box<dyn Row>>, Error>> {
119 let sql = sql.to_owned();
120 Box::pin(async move {
121 let many = {
122 if params.len() == 0 {
123 self.fetch_many(MysqlQuery {
124 statement: Either::Left(sql),
125 arguments: params,
126 persistent: false,
127 })
128 } else {
129 let stmt = self.prepare_with(&sql, &[]).await?;
130 self.fetch_many(MysqlQuery {
131 statement: Either::Right(stmt),
132 arguments: params,
133 persistent: true,
134 })
135 }
136 };
137 let f: BoxStream<Result<MySqlRow, Error>> = many
138 .try_filter_map(|step| async move {
139 Ok(match step {
140 Either::Left(_) => None,
141 Either::Right(row) => Some(row),
142 })
143 })
144 .boxed();
145 let c: BoxFuture<'_, Result<Vec<MySqlRow>, Error>> = f.try_collect().boxed();
146 let v = c.await?;
147 let mut data: Vec<Box<dyn Row>> = Vec::with_capacity(v.len());
148 for x in v {
149 data.push(Box::new(x));
150 }
151 Ok(data)
152 })
153 }
154
155 fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<'_, Result<ExecResult, Error>> {
156 let sql = sql.to_owned();
157 Box::pin(async move {
158 let many = {
159 if params.len() == 0 {
160 self.fetch_many(MysqlQuery {
161 statement: Either::Left(sql),
162 arguments: params,
163 persistent: false,
164 })
165 } else {
166 let stmt = self.prepare_with(&sql, &[]).await?;
167 self.fetch_many(MysqlQuery {
168 statement: Either::Right(stmt),
169 arguments: params,
170 persistent: true,
171 })
172 }
173 };
174 let v: BoxStream<Result<MySqlQueryResult, Error>> = many
175 .try_filter_map(|step| async move {
176 Ok(match step {
177 Either::Left(rows) => Some(rows),
178 Either::Right(_) => None,
179 })
180 })
181 .boxed();
182 let v: MySqlQueryResult = v.try_collect().boxed().await?;
183 return Ok(ExecResult {
184 rows_affected: v.rows_affected,
185 last_insert_id: v.last_insert_id.into(),
186 });
187 })
188 }
189
190 fn close(&mut self) -> BoxFuture<'_, Result<(), Error>> {
191 let c = self.do_close();
192 Box::pin(async { c.await })
193 }
194
195 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
196 let c = self.do_ping();
197 Box::pin(async move { c.await })
198 }
199}