sqlx_core_oldapi/mysql/connection/
mod.rs1use crate::common::StatementCache;
2use crate::connection::{Connection, LogSettings};
3use crate::error::Error;
4use crate::mysql::protocol::statement::StmtClose;
5use crate::mysql::protocol::text::{Ping, Quit};
6use crate::mysql::statement::MySqlStatementMetadata;
7use crate::mysql::{MySql, MySqlConnectOptions};
8use crate::transaction::Transaction;
9use futures_core::future::BoxFuture;
10use futures_util::FutureExt;
11use std::fmt::{self, Debug, Formatter};
12
13mod auth;
14mod establish;
15mod executor;
16mod stream;
17mod tls;
18
19pub(crate) use stream::{MySqlStream, Waiting};
20
21const MAX_PACKET_SIZE: u32 = 1024;
22
23pub struct MySqlConnection {
25 pub(crate) stream: MySqlStream,
29
30 pub(crate) transaction_depth: usize,
32
33 cache_statement: StatementCache<(u32, MySqlStatementMetadata)>,
35
36 log_settings: LogSettings,
37}
38
39impl Debug for MySqlConnection {
40 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
41 f.debug_struct("MySqlConnection").finish()
42 }
43}
44
45impl Connection for MySqlConnection {
46 type Database = MySql;
47
48 type Options = MySqlConnectOptions;
49
50 fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
51 Box::pin(async move {
52 self.stream.send_packet(Quit).await?;
53 self.stream.shutdown().await?;
54
55 Ok(())
56 })
57 }
58
59 fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
60 Box::pin(async move {
61 self.stream.shutdown().await?;
62 Ok(())
63 })
64 }
65
66 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
67 Box::pin(async move {
68 self.stream.wait_until_ready().await?;
69 self.stream.send_packet(Ping).await?;
70 self.stream.recv_ok().await?;
71
72 Ok(())
73 })
74 }
75
76 #[doc(hidden)]
77 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
78 self.stream.wait_until_ready().boxed()
79 }
80
81 fn cached_statements_size(&self) -> usize {
82 self.cache_statement.len()
83 }
84
85 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
86 Box::pin(async move {
87 while let Some((statement_id, _)) = self.cache_statement.remove_lru() {
88 self.stream
89 .send_packet(StmtClose {
90 statement: statement_id,
91 })
92 .await?;
93 }
94
95 Ok(())
96 })
97 }
98
99 #[doc(hidden)]
100 fn should_flush(&self) -> bool {
101 !self.stream.wbuf.is_empty()
102 }
103
104 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
105 where
106 Self: Sized,
107 {
108 Transaction::begin(self)
109 }
110}