sqlx_xugu/connection/
mod.rs1use self::stream::XuguStream;
2pub(crate) use crate::connection::id::StatementId;
3use crate::io::AsyncStreamExt;
4use crate::protocol::message::*;
5use crate::protocol::statement::StmtClose;
6use crate::protocol::text::{OkPacket, Ping};
7use crate::protocol::ServerContext;
8use crate::statement::XuguStatementMetadata;
9use crate::{Xugu, XuguConnectOptions, XuguDatabaseError};
10use futures_core::future::BoxFuture;
11use futures_util::FutureExt;
12use log::Level;
13use sqlx_core::common::StatementCache;
14use sqlx_core::connection::{Connection, LogSettings};
15use sqlx_core::transaction::Transaction;
16use sqlx_core::{err_protocol, Error};
17use std::borrow::Cow;
18use std::fmt::{Debug, Formatter};
19
20mod establish;
21mod executor;
22mod id;
23mod ssl;
24mod stream;
25
26pub struct XuguConnection {
27 pub(crate) inner: Box<XuguConnectionInner>,
28}
29
30pub(crate) struct XuguConnectionInner {
31 pub(crate) stream: XuguStream,
32
33 pub(crate) transaction_depth: usize,
35 next_statement_id: StatementId,
39
40 cache_statement: StatementCache<(StatementId, XuguStatementMetadata)>,
42
43 pub(crate) pending_ready_for_query_count: usize,
45 pub(crate) last_num_columns: usize,
46
47 log_settings: LogSettings,
48}
49
50impl XuguConnectionInner {
51 pub(crate) fn gen_st_id(&mut self) -> StatementId {
52 let id = self.next_statement_id;
53 self.next_statement_id = id.next();
54 id
55 }
56}
57
58impl XuguConnection {
59 pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
61 if !self.inner.stream.write_buffer_mut().is_empty() {
62 self.inner.stream.flush().await?;
63 }
64
65 let mut num_columns = self.inner.last_num_columns;
66 while self.inner.pending_ready_for_query_count > 0 {
67 let message: ReceivedMessage = self.inner.stream.recv().await?;
68 let cnt = ServerContext::new(self.inner.stream.server_version);
69 match message.format {
70 BackendMessageFormat::ErrorResponse => {
71 let err: ErrorResponse = message.decode(&mut self.inner.stream, cnt).await?;
72 return Err(Error::Database(Box::new(XuguDatabaseError::from_str(
73 &err.error,
74 ))));
75 }
76 BackendMessageFormat::MessageResponse => {
77 let notice: MessageResponse =
78 message.decode(&mut self.inner.stream, cnt).await?;
79 let (log_level, tracing_level) = (Level::Info, tracing::Level::INFO);
80 let log_is_enabled = log::log_enabled!(
81 target: "sqlx::xugu::notice",
82 log_level
83 ) || sqlx_core::private_tracing_dynamic_enabled!(
84 target: "sqlx::xugu::notice",
85 tracing_level
86 );
87 if log_is_enabled {
88 sqlx_core::private_tracing_dynamic_event!(
89 target: "sqlx::xugu::notice",
90 tracing_level,
91 message = notice.msg
92 );
93 }
94 }
95 BackendMessageFormat::RowDescription => {
96 let columns: RowDescription =
98 message.decode(&mut self.inner.stream, cnt).await?;
99 num_columns = columns.fields.len();
100 self.inner.last_num_columns = num_columns;
101 }
102 BackendMessageFormat::DataRow => {
103 let _: DataRow = message.decode(&mut self.inner.stream, cnt).await?;
105 for _ in 0..num_columns {
106 let len = self.inner.stream.read_i32().await?;
107 let _buf = self.inner.stream.read_bytes(len as usize).await?;
108 }
109 }
110 BackendMessageFormat::ReadyForQuery => {
111 let _: ReadyForQuery = message.decode(&mut self.inner.stream, cnt).await?;
112 self.handle_ready_for_query().await?;
113 }
114 BackendMessageFormat::InsertResponse => {
115 let _: InsertResponse = message.decode(&mut self.inner.stream, cnt).await?;
116 }
117 BackendMessageFormat::DeleteResponse => {
118 let _: DeleteResponse = message.decode(&mut self.inner.stream, cnt).await?;
119 }
120 BackendMessageFormat::UpdateResponse => {
121 let _: UpdateResponse = message.decode(&mut self.inner.stream, cnt).await?;
122 }
123 BackendMessageFormat::ParameterDescription => {
124 let _: ParameterDescription =
125 message.decode(&mut self.inner.stream, cnt).await?;
126 }
127 }
128 }
129
130 Ok(())
131 }
132
133 #[inline(always)]
134 async fn handle_ready_for_query(&mut self) -> Result<(), Error> {
135 self.inner.pending_ready_for_query_count = self
136 .inner
137 .pending_ready_for_query_count
138 .checked_sub(1)
139 .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?;
140
141 Ok(())
142 }
143
144 pub(crate) fn in_transaction(&self) -> bool {
145 true
150 }
151
152 pub(crate) async fn send_halt(&mut self) -> Result<(), Error> {
154 let buf = b".".as_slice();
155 self.inner.stream.send_packet(buf).await?;
156
157 Ok(())
158 }
159}
160
161impl Debug for XuguConnection {
162 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
163 f.debug_struct("XuguConnection").finish()
164 }
165}
166
167impl Connection for XuguConnection {
168 type Database = Xugu;
169 type Options = XuguConnectOptions;
170
171 fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
172 Box::pin(async move {
173 self.send_halt().await?;
176 self.inner.stream.shutdown().await?;
177 Ok(())
178 })
179 }
180
181 fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
182 Box::pin(async move {
183 self.inner.stream.shutdown().await?;
184 Ok(())
185 })
186 }
187
188 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
189 Box::pin(async move {
190 self.wait_until_ready().await?;
191 self.inner.stream.send_packet(Ping).await?;
192 let _ok: OkPacket = self.inner.stream.recv().await?;
193
194 Ok(())
195 })
196 }
197
198 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
199 where
200 Self: Sized,
201 {
202 Transaction::begin(self, None)
203 }
204
205 fn begin_with(
206 &mut self,
207 statement: impl Into<Cow<'static, str>>,
208 ) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
209 where
210 Self: Sized,
211 {
212 Transaction::begin(self, Some(statement.into()))
213 }
214
215 fn cached_statements_size(&self) -> usize {
216 self.inner.cache_statement.len()
217 }
218
219 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
220 Box::pin(async move {
221 self.wait_until_ready().await?;
222
223 while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() {
224 self.inner
225 .stream
226 .send_packet(StmtClose(statement_id))
227 .await?;
228
229 let _ok: OkPacket = self.inner.stream.recv().await?;
230 }
231
232 Ok(())
233 })
234 }
235
236 fn shrink_buffers(&mut self) {
237 self.inner.stream.shrink_buffers();
238 }
239
240 #[doc(hidden)]
241 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
242 self.wait_until_ready().boxed()
243 }
244
245 #[doc(hidden)]
246 fn should_flush(&self) -> bool {
247 !self.inner.stream.write_buffer().is_empty()
248 }
249}