mysql_connector/connection/
mod.rs

1mod auth;
2pub(super) mod bitflags;
3mod command;
4mod data;
5mod init;
6mod io;
7mod options;
8pub(super) mod packets;
9mod parse_buf;
10mod prepared_statement;
11mod query;
12mod result_set;
13mod serialization;
14pub mod timeout;
15pub mod types;
16
17const MAX_PAYLOAD_LEN: usize = 16_777_215;
18const DEFAULT_MAX_ALLOWED_PACKET: usize = 4 * 1024 * 1024;
19
20const UTF8_GENERAL_CI: u16 = 33;
21const UTF8MB4_GENERAL_CI: u16 = 45;
22
23lazy_static::lazy_static! {
24    static ref BUFFER_POOL: SyncPool<Vec<u8>, 64> = SyncPool::new(VecPoolCtx {
25        size_cap: DEFAULT_MAX_ALLOWED_PACKET,
26        init_size: 1024,
27    });
28}
29
30use {
31    crate::pool::{AsyncPoolContent, AsyncPoolContentError, SyncPool, VecPoolCtx},
32    std::{fmt, sync::Arc},
33    tokio::io::{AsyncRead, AsyncWrite},
34};
35
36pub(crate) use {
37    command::Command,
38    parse_buf::ParseBuf,
39    serialization::{Deserialize, Serialize},
40};
41
42pub use {
43    data::ConnectionData,
44    options::{ConnectionOptions, ConnectionOptionsTrait},
45    prepared_statement::PreparedStatement,
46    result_set::ResultSet,
47    timeout::{Timeout, TimeoutFuture},
48};
49
50pub struct Connection {
51    stream: Box<dyn StreamRequirements>,
52    seq_id: u8,
53    data: ConnectionData,
54    options: Arc<dyn ConnectionOptionsTrait>,
55    pending_result: bool,
56}
57
58impl Connection {
59    pub fn data(&self) -> &ConnectionData {
60        &self.data
61    }
62
63    pub fn options(&self) -> Arc<dyn ConnectionOptionsTrait> {
64        self.options.clone()
65    }
66}
67
68impl fmt::Debug for Connection {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        f.debug_struct("Connection")
71            .field("seq_id", &self.seq_id)
72            .field("data", &self.data)
73            .field("options", &self.options)
74            .finish()
75    }
76}
77
78pub trait StreamRequirements:
79    AsyncRead + AsyncWrite + Unpin + fmt::Debug + Send + Sync + 'static
80{
81}
82impl<T: AsyncRead + AsyncWrite + Unpin + fmt::Debug + Send + Sync + 'static> StreamRequirements
83    for T
84{
85}
86
87#[allow(async_fn_in_trait)]
88pub trait Stream: Sized + StreamRequirements {
89    /// Set this to `true` if the connection is a socket or a shared-memory connection.
90    const SECURE: bool;
91    type Options: Default + fmt::Debug + Send + Sync;
92
93    async fn connect(data: &Self::Options) -> Result<Self, std::io::Error>;
94}
95
96impl AsyncPoolContentError for Connection {
97    type Error = crate::Error;
98}
99
100impl<T: Stream> AsyncPoolContent<T> for Connection {
101    type Ctx = Arc<ConnectionOptions<T>>;
102
103    fn new<'a>(
104        ctx: &'a Self::Ctx,
105    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Self, Self::Error>> + 'a>> {
106        Box::pin(Self::connect(Arc::clone(ctx)))
107    }
108}
109
110#[cfg(feature = "tcpstream")]
111#[cfg_attr(doc, doc(cfg(feature = "tcpstream")))]
112#[derive(Debug)]
113pub struct TcpStreamOptions {
114    pub host: String,
115    pub port: u16,
116    pub nodelay: bool,
117}
118
119#[cfg(feature = "tcpstream")]
120#[cfg_attr(doc, doc(cfg(feature = "tcpstream")))]
121impl Default for TcpStreamOptions {
122    fn default() -> Self {
123        Self {
124            host: String::from("localhost"),
125            port: 3306,
126            nodelay: true,
127        }
128    }
129}
130
131#[cfg(feature = "tcpstream")]
132#[cfg_attr(doc, doc(cfg(feature = "tcpstream")))]
133impl Stream for tokio::net::TcpStream {
134    const SECURE: bool = false;
135    type Options = TcpStreamOptions;
136
137    async fn connect(data: &Self::Options) -> Result<Self, std::io::Error> {
138        let this = Self::connect((data.host.as_str(), data.port)).await?;
139        this.set_nodelay(data.nodelay)?;
140        Ok(this)
141    }
142}