sqlx_core_oldapi/mysql/connection/
mod.rs

1use crate::common::StatementCache;
2use crate::connection::{Connection, LogSettings};
3use crate::error::Error;
4use crate::mysql::protocol::statement::StmtClose;
5use crate::mysql::protocol::text::{Ping, Quit};
6use crate::mysql::statement::MySqlStatementMetadata;
7use crate::mysql::{MySql, MySqlConnectOptions};
8use crate::transaction::Transaction;
9use futures_core::future::BoxFuture;
10use futures_util::FutureExt;
11use std::fmt::{self, Debug, Formatter};
12
13mod auth;
14mod establish;
15mod executor;
16mod stream;
17mod tls;
18
19pub(crate) use stream::{MySqlStream, Waiting};
20
21const MAX_PACKET_SIZE: u32 = 1024;
22
23/// A connection to a MySQL database.
24pub struct MySqlConnection {
25    // underlying TCP stream,
26    // wrapped in a potentially TLS stream,
27    // wrapped in a buffered stream
28    pub(crate) stream: MySqlStream,
29
30    // transaction status
31    pub(crate) transaction_depth: usize,
32
33    // cache by query string to the statement id and metadata
34    cache_statement: StatementCache<(u32, MySqlStatementMetadata)>,
35
36    log_settings: LogSettings,
37}
38
39impl Debug for MySqlConnection {
40    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
41        f.debug_struct("MySqlConnection").finish()
42    }
43}
44
45impl Connection for MySqlConnection {
46    type Database = MySql;
47
48    type Options = MySqlConnectOptions;
49
50    fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
51        Box::pin(async move {
52            self.stream.send_packet(Quit).await?;
53            self.stream.shutdown().await?;
54
55            Ok(())
56        })
57    }
58
59    fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
60        Box::pin(async move {
61            self.stream.shutdown().await?;
62            Ok(())
63        })
64    }
65
66    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
67        Box::pin(async move {
68            self.stream.wait_until_ready().await?;
69            self.stream.send_packet(Ping).await?;
70            self.stream.recv_ok().await?;
71
72            Ok(())
73        })
74    }
75
76    #[doc(hidden)]
77    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
78        self.stream.wait_until_ready().boxed()
79    }
80
81    fn cached_statements_size(&self) -> usize {
82        self.cache_statement.len()
83    }
84
85    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
86        Box::pin(async move {
87            while let Some((statement_id, _)) = self.cache_statement.remove_lru() {
88                self.stream
89                    .send_packet(StmtClose {
90                        statement: statement_id,
91                    })
92                    .await?;
93            }
94
95            Ok(())
96        })
97    }
98
99    #[doc(hidden)]
100    fn should_flush(&self) -> bool {
101        !self.stream.wbuf.is_empty()
102    }
103
104    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
105    where
106        Self: Sized,
107    {
108        Transaction::begin(self)
109    }
110}