sqlx_mysql/connection/
mod.rs1use std::fmt::{self, Debug, Formatter};
2use std::future::Future;
3
4pub(crate) use sqlx_core::connection::*;
5use sqlx_core::sql_str::SqlSafeStr;
6pub(crate) use stream::{MySqlStream, Waiting};
7
8use crate::collation::Collation;
9use crate::common::StatementCache;
10use crate::error::Error;
11use crate::protocol::response::Status;
12use crate::protocol::statement::StmtClose;
13use crate::protocol::text::{Ping, Quit};
14use crate::statement::MySqlStatementMetadata;
15use crate::transaction::Transaction;
16use crate::{MySql, MySqlConnectOptions};
17
18mod auth;
19mod establish;
20mod executor;
21mod stream;
22mod tls;
23
24const MAX_PACKET_SIZE: u32 = 1024;
25
26#[allow(clippy::cast_possible_truncation)]
31const INITIAL_CHARSET: u8 = Collation::UTF8MB4_GENERAL_CI.0 as u8;
32
33pub struct MySqlConnection {
35 pub(crate) inner: Box<MySqlConnectionInner>,
36}
37
38pub(crate) struct MySqlConnectionInner {
39 pub(crate) stream: MySqlStream,
43
44 pub(crate) transaction_depth: usize,
46 status_flags: Status,
47
48 cache_statement: StatementCache<(u32, MySqlStatementMetadata)>,
50
51 log_settings: LogSettings,
52}
53
54impl MySqlConnection {
55 pub(crate) fn in_transaction(&self) -> bool {
56 self.inner
57 .status_flags
58 .intersects(Status::SERVER_STATUS_IN_TRANS)
59 }
60}
61
62impl Debug for MySqlConnection {
63 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
64 f.debug_struct("MySqlConnection").finish()
65 }
66}
67
68impl Connection for MySqlConnection {
69 type Database = MySql;
70
71 type Options = MySqlConnectOptions;
72
73 async fn close(mut self) -> Result<(), Error> {
74 self.inner.stream.send_packet(Quit).await?;
75 self.inner.stream.shutdown().await?;
76
77 Ok(())
78 }
79
80 async fn close_hard(mut self) -> Result<(), Error> {
81 self.inner.stream.shutdown().await?;
82 Ok(())
83 }
84
85 async fn ping(&mut self) -> Result<(), Error> {
86 self.inner.stream.wait_until_ready().await?;
87 self.inner.stream.send_packet(Ping).await?;
88 self.inner.stream.recv_ok().await?;
89
90 Ok(())
91 }
92
93 #[doc(hidden)]
94 fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + Send + '_ {
95 self.inner.stream.wait_until_ready()
96 }
97
98 fn cached_statements_size(&self) -> usize {
99 self.inner.cache_statement.len()
100 }
101
102 async fn clear_cached_statements(&mut self) -> Result<(), Error> {
103 while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() {
104 self.inner
105 .stream
106 .send_packet(StmtClose {
107 statement: statement_id,
108 })
109 .await?;
110 }
111
112 Ok(())
113 }
114
115 #[doc(hidden)]
116 fn should_flush(&self) -> bool {
117 !self.inner.stream.write_buffer().is_empty()
118 }
119
120 fn begin(
121 &mut self,
122 ) -> impl Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_ {
123 Transaction::begin(self, None)
124 }
125
126 fn begin_with(
127 &mut self,
128 statement: impl SqlSafeStr,
129 ) -> impl Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_
130 where
131 Self: Sized,
132 {
133 Transaction::begin(self, Some(statement.into_sql_str()))
134 }
135
136 fn shrink_buffers(&mut self) {
137 self.inner.stream.shrink_buffers();
138 }
139}