rbdc_mysql/connection/
mod.rs1use crate::protocol::statement::StmtClose;
2use crate::protocol::text::{Ping, Quit};
3use crate::stmt::MySqlStatementMetadata;
4use either::Either;
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_util::{FutureExt, StreamExt, TryStreamExt};
8use rbdc::common::StatementCache;
9use rbdc::db::{Connection, ExecResult, Row};
10use rbdc::Error;
11use rbs::Value;
12use std::fmt::{self, Debug, Formatter};
13use std::ops::{Deref, DerefMut};
14use std::sync::Arc;
15
16
17mod auth;
18mod establish;
19mod executor;
20mod stream;
21mod tls;
22
23use crate::query::MysqlQuery;
24use crate::query_result::MySqlQueryResult;
25use crate::row::MySqlRow;
26pub(crate) use stream::MySqlStream;
27use crate::options::MySqlConnectOptions;
28
29const MAX_PACKET_SIZE: u32 = 1024;
30
31pub struct MySqlConnection {
33 pub stream: DropBox<MySqlStream>,
37 pub cache_statement: StatementCache<(u32, MySqlStatementMetadata)>,
39 pub option: Arc<MySqlConnectOptions>,
41}
42
43impl Debug for MySqlConnection {
44 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
45 f.debug_struct("MySqlConnection").finish()
46 }
47}
48
49pub struct DropBox<T> {
50 pub inner: Option<T>,
51}
52
53impl<T> Deref for DropBox<T> {
54 type Target = T;
55
56 fn deref(&self) -> &Self::Target {
57 self.inner.as_ref().expect("conn closed")
58 }
59}
60
61impl<T> DerefMut for DropBox<T> {
62 fn deref_mut(&mut self) -> &mut Self::Target {
63 self.inner.as_mut().expect("conn closed")
64 }
65}
66
67impl MySqlConnection {
68 #[inline]
69 async fn do_close(&mut self) -> Result<(), Error> {
70 self.stream.send_packet(Quit).await?;
71 self.stream.shutdown().await?;
72 Ok(())
73 }
74
75 fn do_ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
76 Box::pin(async move {
77 self.stream.wait_until_ready().await?;
78 self.stream.send_packet(Ping).await?;
79 self.stream.recv_ok().await?;
80
81 Ok(())
82 })
83 }
84
85 #[doc(hidden)]
86 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
87 self.stream.wait_until_ready().boxed()
88 }
89
90 fn cached_statements_size(&self) -> usize {
91 self.cache_statement.len()
92 }
93
94 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
95 Box::pin(async move {
96 while let Some((statement_id, _)) = self.cache_statement.remove_lru() {
97 self.stream
98 .send_packet(StmtClose {
99 statement: statement_id,
100 })
101 .await?;
102 }
103
104 Ok(())
105 })
106 }
107
108 #[doc(hidden)]
109 fn should_flush(&self) -> bool {
110 !self.stream.wbuf.is_empty()
111 }
112}
113
114impl Connection for MySqlConnection {
115 fn get_rows(
116 &mut self,
117 sql: &str,
118 params: Vec<Value>,
119 ) -> BoxFuture<Result<Vec<Box<dyn Row>>, Error>> {
120 let sql = sql.to_owned();
121 Box::pin(async move {
122 let many = {
123 if params.len() == 0 {
124 self.fetch_many(MysqlQuery {
125 statement: Either::Left(sql),
126 arguments: params,
127 persistent: false,
128 })
129 } else {
130 let stmt = self.prepare_with(&sql, &[]).await?;
131 self.fetch_many(MysqlQuery {
132 statement: Either::Right(stmt),
133 arguments: params,
134 persistent: true,
135 })
136 }
137 };
138 let f: BoxStream<Result<MySqlRow, Error>> = many
139 .try_filter_map(|step| async move {
140 Ok(match step {
141 Either::Left(_) => None,
142 Either::Right(row) => Some(row),
143 })
144 })
145 .boxed();
146 let c: BoxFuture<Result<Vec<MySqlRow>, Error>> = f.try_collect().boxed();
147 let v = c.await?;
148 let mut data: Vec<Box<dyn Row>> = Vec::with_capacity(v.len());
149 for x in v {
150 data.push(Box::new(x));
151 }
152 Ok(data)
153 })
154 }
155
156 fn exec(&mut self, sql: &str, params: Vec<Value>) -> BoxFuture<Result<ExecResult, Error>> {
157 let sql = sql.to_owned();
158 Box::pin(async move {
159 let many = {
160 if params.len() == 0 {
161 self.fetch_many(MysqlQuery {
162 statement: Either::Left(sql),
163 arguments: params,
164 persistent: false,
165 })
166 } else {
167 let stmt = self.prepare_with(&sql, &[]).await?;
168 self.fetch_many(MysqlQuery {
169 statement: Either::Right(stmt),
170 arguments: params,
171 persistent: true,
172 })
173 }
174 };
175 let v: BoxStream<Result<MySqlQueryResult, Error>> = many
176 .try_filter_map(|step| async move {
177 Ok(match step {
178 Either::Left(rows) => Some(rows),
179 Either::Right(_) => None,
180 })
181 })
182 .boxed();
183 let v: MySqlQueryResult = v.try_collect().boxed().await?;
184 return Ok(ExecResult {
185 rows_affected: v.rows_affected,
186 last_insert_id: v.last_insert_id.into(),
187 });
188 })
189 }
190
191 fn close(&mut self) -> BoxFuture<Result<(), Error>> {
192 let c = self.do_close();
193 Box::pin(async { c.await })
194 }
195
196 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
197 let c = self.do_ping();
198 Box::pin(async move { c.await })
199 }
200}