sqlx_xugu/connection/
mod.rs1use self::stream::XuguStream;
2use crate::io::AsyncStreamExt;
3use crate::protocol::message::*;
4use crate::protocol::statement::StmtClose;
5use crate::protocol::text::{OkPacket, Ping};
6use crate::protocol::ServerContext;
7use crate::statement::XuguStatementMetadata;
8use crate::{Xugu, XuguConnectOptions, XuguDatabaseError};
9use futures_core::future::BoxFuture;
10use futures_util::FutureExt;
11use log::Level;
12use sqlx_core::common::StatementCache;
13use sqlx_core::connection::{Connection, LogSettings};
14use sqlx_core::transaction::Transaction;
15use sqlx_core::{err_protocol, Error};
16use std::borrow::Cow;
17use std::fmt::{Debug, Formatter};
18
19mod establish;
20mod executor;
21mod ssl;
22mod stream;
23
24pub struct XuguConnection {
25 pub(crate) inner: Box<XuguConnectionInner>,
26}
27
28pub(crate) struct XuguConnectionInner {
29 pub(crate) stream: XuguStream,
30
31 pub(crate) transaction_depth: usize,
33 cache_statement: StatementCache<(u32, XuguStatementMetadata)>,
37
38 pub(crate) pending_ready_for_query_count: usize,
40 pub(crate) last_num_columns: usize,
41
42 log_settings: LogSettings,
43
44 st_id_gen: u32,
45 con_obj_name: String,
46}
47
48impl XuguConnectionInner {
49 pub(crate) fn gen_st_id(&mut self) -> u32 {
50 self.st_id_gen = self.st_id_gen.wrapping_add(1);
51 self.st_id_gen
52 }
53
54 pub(super) fn addr_code(&mut self) -> usize {
55 let addr = std::ptr::addr_of!(*self) as usize;
56 addr
57 }
58}
59
60impl XuguConnection {
61 pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
63 if !self.inner.stream.write_buffer_mut().is_empty() {
64 self.inner.stream.flush().await?;
65 }
66
67 let mut num_columns = self.inner.last_num_columns;
68 while self.inner.pending_ready_for_query_count > 0 {
69 let message: ReceivedMessage = self.inner.stream.recv().await?;
70 let cnt = ServerContext::new(self.inner.stream.server_version);
71 match message.format {
72 BackendMessageFormat::ErrorResponse => {
73 let err: ErrorResponse = message.decode(&mut self.inner.stream, cnt).await?;
74 return Err(Error::Database(Box::new(XuguDatabaseError::from_str(
75 &err.error,
76 ))));
77 }
78 BackendMessageFormat::MessageResponse => {
79 let notice: MessageResponse =
80 message.decode(&mut self.inner.stream, cnt).await?;
81 let (log_level, tracing_level) = (Level::Info, tracing::Level::INFO);
82 let log_is_enabled = log::log_enabled!(
83 target: "sqlx::xugu::notice",
84 log_level
85 ) || sqlx_core::private_tracing_dynamic_enabled!(
86 target: "sqlx::xugu::notice",
87 tracing_level
88 );
89 if log_is_enabled {
90 sqlx_core::private_tracing_dynamic_event!(
91 target: "sqlx::xugu::notice",
92 tracing_level,
93 message = notice.msg
94 );
95 }
96 }
97 BackendMessageFormat::RowDescription => {
98 let columns: RowDescription =
100 message.decode(&mut self.inner.stream, cnt).await?;
101 num_columns = columns.fields.len();
102 self.inner.last_num_columns = num_columns;
103 }
104 BackendMessageFormat::DataRow => {
105 let _: DataRow = message.decode(&mut self.inner.stream, cnt).await?;
107 for _ in 0..num_columns {
108 let len = self.inner.stream.read_i32().await?;
109 let _buf = self.inner.stream.read_bytes(len as usize).await?;
110 }
111 }
112 BackendMessageFormat::ReadyForQuery => {
113 let _: ReadyForQuery = message.decode(&mut self.inner.stream, cnt).await?;
114 self.handle_ready_for_query().await?;
115 }
116 BackendMessageFormat::InsertResponse => {
117 let _: InsertResponse = message.decode(&mut self.inner.stream, cnt).await?;
118 }
119 BackendMessageFormat::DeleteResponse => {
120 let _: DeleteResponse = message.decode(&mut self.inner.stream, cnt).await?;
121 }
122 BackendMessageFormat::UpdateResponse => {
123 let _: UpdateResponse = message.decode(&mut self.inner.stream, cnt).await?;
124 }
125 BackendMessageFormat::ParameterDescription => {
126 let _: ParameterDescription =
127 message.decode(&mut self.inner.stream, cnt).await?;
128 }
129 }
130 }
131
132 Ok(())
133 }
134
135 #[inline(always)]
136 async fn handle_ready_for_query(&mut self) -> Result<(), Error> {
137 self.inner.pending_ready_for_query_count = self
138 .inner
139 .pending_ready_for_query_count
140 .checked_sub(1)
141 .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?;
142
143 Ok(())
144 }
145
146 pub(crate) fn in_transaction(&self) -> bool {
147 true
152 }
153
154 pub(crate) async fn send_halt(&mut self) -> Result<(), Error> {
156 let buf = b".".as_slice();
157 self.inner.stream.send_packet(buf).await?;
158
159 Ok(())
160 }
161}
162
163impl Debug for XuguConnection {
164 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
165 f.debug_struct("XuguConnection").finish()
166 }
167}
168
169impl Connection for XuguConnection {
170 type Database = Xugu;
171 type Options = XuguConnectOptions;
172
173 fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
174 Box::pin(async move {
175 self.send_halt().await?;
178 self.inner.stream.shutdown().await?;
179 Ok(())
180 })
181 }
182
183 fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
184 Box::pin(async move {
185 self.inner.stream.shutdown().await?;
186 Ok(())
187 })
188 }
189
190 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
191 Box::pin(async move {
192 self.wait_until_ready().await?;
193 self.inner.stream.send_packet(Ping).await?;
194 let _ok: OkPacket = self.inner.stream.recv().await?;
195
196 Ok(())
197 })
198 }
199
200 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
201 where
202 Self: Sized,
203 {
204 Transaction::begin(self, None)
205 }
206
207 fn begin_with(
208 &mut self,
209 statement: impl Into<Cow<'static, str>>,
210 ) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
211 where
212 Self: Sized,
213 {
214 Transaction::begin(self, Some(statement.into()))
215 }
216
217 fn cached_statements_size(&self) -> usize {
218 self.inner.cache_statement.len()
219 }
220
221 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
222 Box::pin(async move {
223 self.wait_until_ready().await?;
224
225 while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() {
226 self.inner
227 .stream
228 .send_packet(StmtClose {
229 st_id: statement_id,
230 con_obj_name: &self.inner.con_obj_name,
231 })
232 .await?;
233
234 let _ok: OkPacket = self.inner.stream.recv().await?;
235 }
236
237 Ok(())
238 })
239 }
240
241 fn shrink_buffers(&mut self) {
242 self.inner.stream.shrink_buffers();
243 }
244
245 #[doc(hidden)]
246 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
247 self.wait_until_ready().boxed()
248 }
249
250 #[doc(hidden)]
251 fn should_flush(&self) -> bool {
252 !self.inner.stream.write_buffer().is_empty()
253 }
254}