sqlx_xugu/connection/
mod.rs

1use self::stream::XuguStream;
2use crate::io::AsyncStreamExt;
3use crate::protocol::message::*;
4use crate::protocol::statement::StmtClose;
5use crate::protocol::text::{OkPacket, Ping};
6use crate::protocol::ServerContext;
7use crate::statement::XuguStatementMetadata;
8use crate::{Xugu, XuguConnectOptions, XuguDatabaseError};
9use futures_core::future::BoxFuture;
10use futures_util::FutureExt;
11use log::Level;
12use sqlx_core::common::StatementCache;
13use sqlx_core::connection::{Connection, LogSettings};
14use sqlx_core::transaction::Transaction;
15use sqlx_core::{err_protocol, Error};
16use std::borrow::Cow;
17use std::fmt::{Debug, Formatter};
18
19mod establish;
20mod executor;
21mod ssl;
22mod stream;
23
24pub struct XuguConnection {
25    pub(crate) inner: Box<XuguConnectionInner>,
26}
27
28pub(crate) struct XuguConnectionInner {
29    pub(crate) stream: XuguStream,
30
31    // transaction status
32    pub(crate) transaction_depth: usize,
33    // status_flags: Status,
34
35    // cache by query string to the statement id and metadata
36    cache_statement: StatementCache<(u32, XuguStatementMetadata)>,
37
38    // number of ReadyForQuery messages that we are currently expecting
39    pub(crate) pending_ready_for_query_count: usize,
40    pub(crate) last_num_columns: usize,
41
42    log_settings: LogSettings,
43
44    st_id_gen: u32,
45    con_obj_name: String,
46}
47
48impl XuguConnectionInner {
49    pub(crate) fn gen_st_id(&mut self) -> u32 {
50        self.st_id_gen = self.st_id_gen.wrapping_add(1);
51        self.st_id_gen
52    }
53
54    pub(super) fn addr_code(&mut self) -> usize {
55        let addr = std::ptr::addr_of!(*self) as usize;
56        addr
57    }
58}
59
60impl XuguConnection {
61    // will return when the connection is ready for another query
62    pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
63        if !self.inner.stream.write_buffer_mut().is_empty() {
64            self.inner.stream.flush().await?;
65        }
66
67        let mut num_columns = self.inner.last_num_columns;
68        while self.inner.pending_ready_for_query_count > 0 {
69            let message: ReceivedMessage = self.inner.stream.recv().await?;
70            let cnt = ServerContext::new(self.inner.stream.server_version);
71            match message.format {
72                BackendMessageFormat::ErrorResponse => {
73                    let err: ErrorResponse = message.decode(&mut self.inner.stream, cnt).await?;
74                    return Err(Error::Database(Box::new(XuguDatabaseError::from_str(
75                        &err.error,
76                    ))));
77                }
78                BackendMessageFormat::MessageResponse => {
79                    let notice: MessageResponse =
80                        message.decode(&mut self.inner.stream, cnt).await?;
81                    let (log_level, tracing_level) = (Level::Info, tracing::Level::INFO);
82                    let log_is_enabled = log::log_enabled!(
83                        target: "sqlx::xugu::notice",
84                        log_level
85                    ) || sqlx_core::private_tracing_dynamic_enabled!(
86                        target: "sqlx::xugu::notice",
87                        tracing_level
88                    );
89                    if log_is_enabled {
90                        sqlx_core::private_tracing_dynamic_event!(
91                            target: "sqlx::xugu::notice",
92                            tracing_level,
93                            message = notice.msg
94                        );
95                    }
96                }
97                BackendMessageFormat::RowDescription => {
98                    // 接收列数据
99                    let columns: RowDescription =
100                        message.decode(&mut self.inner.stream, cnt).await?;
101                    num_columns = columns.fields.len();
102                    self.inner.last_num_columns = num_columns;
103                }
104                BackendMessageFormat::DataRow => {
105                    // 接收行数据
106                    let _: DataRow = message.decode(&mut self.inner.stream, cnt).await?;
107                    for _ in 0..num_columns {
108                        let len = self.inner.stream.read_i32().await?;
109                        let _buf = self.inner.stream.read_bytes(len as usize).await?;
110                    }
111                }
112                BackendMessageFormat::ReadyForQuery => {
113                    let _: ReadyForQuery = message.decode(&mut self.inner.stream, cnt).await?;
114                    self.handle_ready_for_query().await?;
115                }
116                BackendMessageFormat::InsertResponse => {
117                    let _: InsertResponse = message.decode(&mut self.inner.stream, cnt).await?;
118                }
119                BackendMessageFormat::DeleteResponse => {
120                    let _: DeleteResponse = message.decode(&mut self.inner.stream, cnt).await?;
121                }
122                BackendMessageFormat::UpdateResponse => {
123                    let _: UpdateResponse = message.decode(&mut self.inner.stream, cnt).await?;
124                }
125                BackendMessageFormat::ParameterDescription => {
126                    let _: ParameterDescription =
127                        message.decode(&mut self.inner.stream, cnt).await?;
128                }
129            }
130        }
131
132        Ok(())
133    }
134
135    #[inline(always)]
136    async fn handle_ready_for_query(&mut self) -> Result<(), Error> {
137        self.inner.pending_ready_for_query_count = self
138            .inner
139            .pending_ready_for_query_count
140            .checked_sub(1)
141            .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?;
142
143        Ok(())
144    }
145
146    pub(crate) fn in_transaction(&self) -> bool {
147        // TODO in_transaction
148        // self.inner
149        //     .status_flags
150        //     .intersects(Status::SERVER_STATUS_IN_TRANS)
151        true
152    }
153
154    /// 发送中断信号,停止接受服务器返回数据
155    pub(crate) async fn send_halt(&mut self) -> Result<(), Error> {
156        let buf = b".".as_slice();
157        self.inner.stream.send_packet(buf).await?;
158
159        Ok(())
160    }
161}
162
163impl Debug for XuguConnection {
164    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
165        f.debug_struct("XuguConnection").finish()
166    }
167}
168
169impl Connection for XuguConnection {
170    type Database = Xugu;
171    type Options = XuguConnectOptions;
172
173    fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
174        Box::pin(async move {
175            // self.inner.stream.send_packet(Quit).await?;
176            // TODO
177            self.send_halt().await?;
178            self.inner.stream.shutdown().await?;
179            Ok(())
180        })
181    }
182
183    fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
184        Box::pin(async move {
185            self.inner.stream.shutdown().await?;
186            Ok(())
187        })
188    }
189
190    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
191        Box::pin(async move {
192            self.wait_until_ready().await?;
193            self.inner.stream.send_packet(Ping).await?;
194            let _ok: OkPacket = self.inner.stream.recv().await?;
195
196            Ok(())
197        })
198    }
199
200    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
201    where
202        Self: Sized,
203    {
204        Transaction::begin(self, None)
205    }
206
207    fn begin_with(
208        &mut self,
209        statement: impl Into<Cow<'static, str>>,
210    ) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
211    where
212        Self: Sized,
213    {
214        Transaction::begin(self, Some(statement.into()))
215    }
216
217    fn cached_statements_size(&self) -> usize {
218        self.inner.cache_statement.len()
219    }
220
221    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
222        Box::pin(async move {
223            self.wait_until_ready().await?;
224
225            while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() {
226                self.inner
227                    .stream
228                    .send_packet(StmtClose {
229                        st_id: statement_id,
230                        con_obj_name: &self.inner.con_obj_name,
231                    })
232                    .await?;
233
234                let _ok: OkPacket = self.inner.stream.recv().await?;
235            }
236
237            Ok(())
238        })
239    }
240
241    fn shrink_buffers(&mut self) {
242        self.inner.stream.shrink_buffers();
243    }
244
245    #[doc(hidden)]
246    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
247        self.wait_until_ready().boxed()
248    }
249
250    #[doc(hidden)]
251    fn should_flush(&self) -> bool {
252        !self.inner.stream.write_buffer().is_empty()
253    }
254}