Skip to main content

sqlx_xugu/connection/
mod.rs

1use self::stream::XuguStream;
2pub(crate) use crate::connection::id::StatementId;
3use crate::io::AsyncStreamExt;
4use crate::protocol::message::*;
5use crate::protocol::statement::StmtClose;
6use crate::protocol::text::{OkPacket, Ping};
7use crate::protocol::ServerContext;
8use crate::statement::XuguStatementMetadata;
9use crate::{Xugu, XuguConnectOptions, XuguDatabaseError};
10use futures_core::future::BoxFuture;
11use futures_util::FutureExt;
12use log::Level;
13use sqlx_core::common::StatementCache;
14use sqlx_core::connection::{Connection, LogSettings};
15use sqlx_core::transaction::Transaction;
16use sqlx_core::{err_protocol, Error};
17use std::borrow::Cow;
18use std::fmt::{Debug, Formatter};
19
20mod establish;
21mod executor;
22mod id;
23mod ssl;
24mod stream;
25
26pub struct XuguConnection {
27    pub(crate) inner: Box<XuguConnectionInner>,
28}
29
30pub(crate) struct XuguConnectionInner {
31    pub(crate) stream: XuguStream,
32
33    // transaction status
34    pub(crate) transaction_depth: usize,
35    // status_flags: Status,
36
37    // sequence of statement IDs for use in preparing statements
38    next_statement_id: StatementId,
39
40    // cache by query string to the statement id and metadata
41    cache_statement: StatementCache<(StatementId, XuguStatementMetadata)>,
42
43    // number of ReadyForQuery messages that we are currently expecting
44    pub(crate) pending_ready_for_query_count: usize,
45    pub(crate) last_num_columns: usize,
46
47    log_settings: LogSettings,
48}
49
50impl XuguConnectionInner {
51    pub(crate) fn gen_st_id(&mut self) -> StatementId {
52        let id = self.next_statement_id;
53        self.next_statement_id = id.next();
54        id
55    }
56}
57
58impl XuguConnection {
59    // will return when the connection is ready for another query
60    pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
61        if !self.inner.stream.write_buffer_mut().is_empty() {
62            self.inner.stream.flush().await?;
63        }
64
65        let mut num_columns = self.inner.last_num_columns;
66        while self.inner.pending_ready_for_query_count > 0 {
67            let message: ReceivedMessage = self.inner.stream.recv().await?;
68            let cnt = ServerContext::new(self.inner.stream.server_version);
69            match message.format {
70                BackendMessageFormat::ErrorResponse => {
71                    let err: ErrorResponse = message.decode(&mut self.inner.stream, cnt).await?;
72                    return Err(Error::Database(Box::new(XuguDatabaseError::from_str(
73                        &err.error,
74                    ))));
75                }
76                BackendMessageFormat::MessageResponse => {
77                    let notice: MessageResponse =
78                        message.decode(&mut self.inner.stream, cnt).await?;
79                    let (log_level, tracing_level) = (Level::Info, tracing::Level::INFO);
80                    let log_is_enabled = log::log_enabled!(
81                        target: "sqlx::xugu::notice",
82                        log_level
83                    ) || sqlx_core::private_tracing_dynamic_enabled!(
84                        target: "sqlx::xugu::notice",
85                        tracing_level
86                    );
87                    if log_is_enabled {
88                        sqlx_core::private_tracing_dynamic_event!(
89                            target: "sqlx::xugu::notice",
90                            tracing_level,
91                            message = notice.msg
92                        );
93                    }
94                }
95                BackendMessageFormat::RowDescription => {
96                    // 接收列数据
97                    let columns: RowDescription =
98                        message.decode(&mut self.inner.stream, cnt).await?;
99                    num_columns = columns.fields.len();
100                    self.inner.last_num_columns = num_columns;
101                }
102                BackendMessageFormat::DataRow => {
103                    // 接收行数据
104                    let _: DataRow = message.decode(&mut self.inner.stream, cnt).await?;
105                    for _ in 0..num_columns {
106                        let len = self.inner.stream.read_i32().await?;
107                        let _buf = self.inner.stream.read_bytes(len as usize).await?;
108                    }
109                }
110                BackendMessageFormat::ReadyForQuery => {
111                    let _: ReadyForQuery = message.decode(&mut self.inner.stream, cnt).await?;
112                    self.handle_ready_for_query().await?;
113                }
114                BackendMessageFormat::InsertResponse => {
115                    let _: InsertResponse = message.decode(&mut self.inner.stream, cnt).await?;
116                }
117                BackendMessageFormat::DeleteResponse => {
118                    let _: DeleteResponse = message.decode(&mut self.inner.stream, cnt).await?;
119                }
120                BackendMessageFormat::UpdateResponse => {
121                    let _: UpdateResponse = message.decode(&mut self.inner.stream, cnt).await?;
122                }
123                BackendMessageFormat::ParameterDescription => {
124                    let _: ParameterDescription =
125                        message.decode(&mut self.inner.stream, cnt).await?;
126                }
127            }
128        }
129
130        Ok(())
131    }
132
133    #[inline(always)]
134    async fn handle_ready_for_query(&mut self) -> Result<(), Error> {
135        self.inner.pending_ready_for_query_count = self
136            .inner
137            .pending_ready_for_query_count
138            .checked_sub(1)
139            .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?;
140
141        Ok(())
142    }
143
144    pub(crate) fn in_transaction(&self) -> bool {
145        // TODO in_transaction
146        // self.inner
147        //     .status_flags
148        //     .intersects(Status::SERVER_STATUS_IN_TRANS)
149        true
150    }
151
152    /// 发送中断信号,停止接受服务器返回数据
153    pub(crate) async fn send_halt(&mut self) -> Result<(), Error> {
154        let buf = b".".as_slice();
155        self.inner.stream.send_packet(buf).await?;
156
157        Ok(())
158    }
159}
160
161impl Debug for XuguConnection {
162    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
163        f.debug_struct("XuguConnection").finish()
164    }
165}
166
167impl Connection for XuguConnection {
168    type Database = Xugu;
169    type Options = XuguConnectOptions;
170
171    fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
172        Box::pin(async move {
173            // self.inner.stream.send_packet(Quit).await?;
174            // TODO
175            self.send_halt().await?;
176            self.inner.stream.shutdown().await?;
177            Ok(())
178        })
179    }
180
181    fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
182        Box::pin(async move {
183            self.inner.stream.shutdown().await?;
184            Ok(())
185        })
186    }
187
188    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
189        Box::pin(async move {
190            self.wait_until_ready().await?;
191            self.inner.stream.send_packet(Ping).await?;
192            let _ok: OkPacket = self.inner.stream.recv().await?;
193
194            Ok(())
195        })
196    }
197
198    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
199    where
200        Self: Sized,
201    {
202        Transaction::begin(self, None)
203    }
204
205    fn begin_with(
206        &mut self,
207        statement: impl Into<Cow<'static, str>>,
208    ) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
209    where
210        Self: Sized,
211    {
212        Transaction::begin(self, Some(statement.into()))
213    }
214
215    fn cached_statements_size(&self) -> usize {
216        self.inner.cache_statement.len()
217    }
218
219    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
220        Box::pin(async move {
221            self.wait_until_ready().await?;
222
223            while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() {
224                self.inner
225                    .stream
226                    .send_packet(StmtClose(statement_id))
227                    .await?;
228
229                let _ok: OkPacket = self.inner.stream.recv().await?;
230            }
231
232            Ok(())
233        })
234    }
235
236    fn shrink_buffers(&mut self) {
237        self.inner.stream.shrink_buffers();
238    }
239
240    #[doc(hidden)]
241    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
242        self.wait_until_ready().boxed()
243    }
244
245    #[doc(hidden)]
246    fn should_flush(&self) -> bool {
247        !self.inner.stream.write_buffer().is_empty()
248    }
249}