sqlx_postgres/connection/
mod.rs1use std::borrow::Cow;
2use std::fmt::{self, Debug, Formatter};
3use std::sync::Arc;
4
5use crate::HashMap;
6use futures_core::future::BoxFuture;
7use futures_util::FutureExt;
8
9use crate::common::StatementCache;
10use crate::error::Error;
11use crate::ext::ustr::UStr;
12use crate::io::StatementId;
13use crate::message::{
14 BackendMessageFormat, Close, Query, ReadyForQuery, ReceivedMessage, Terminate,
15 TransactionStatus,
16};
17use crate::statement::PgStatementMetadata;
18use crate::transaction::Transaction;
19use crate::types::Oid;
20use crate::{PgConnectOptions, PgTypeInfo, Postgres};
21
22pub(crate) use sqlx_core::connection::*;
23
24pub use self::stream::PgStream;
25
26pub(crate) mod describe;
27mod establish;
28mod executor;
29mod sasl;
30mod stream;
31mod tls;
32
33pub struct PgConnection {
37 pub(crate) inner: Box<PgConnectionInner>,
38}
39
40pub struct PgConnectionInner {
41 pub(crate) stream: PgStream,
45
46 #[allow(dead_code)]
49 process_id: u32,
50
51 #[allow(dead_code)]
54 secret_key: u32,
55
56 next_statement_id: StatementId,
59
60 cache_statement: StatementCache<(StatementId, Arc<PgStatementMetadata>)>,
62
63 cache_type_info: HashMap<Oid, PgTypeInfo>,
65 cache_type_oid: HashMap<UStr, Oid>,
66 cache_elem_type_to_array: HashMap<Oid, Oid>,
67
68 pub(crate) pending_ready_for_query_count: usize,
70
71 transaction_status: TransactionStatus,
73 pub(crate) transaction_depth: usize,
74
75 log_settings: LogSettings,
76}
77
78impl PgConnection {
79 pub fn server_version_num(&self) -> Option<u32> {
81 self.inner.stream.server_version_num
82 }
83
84 pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
86 if !self.inner.stream.write_buffer_mut().is_empty() {
87 self.inner.stream.flush().await?;
88 }
89
90 while self.inner.pending_ready_for_query_count > 0 {
91 let message = self.inner.stream.recv().await?;
92
93 if let BackendMessageFormat::ReadyForQuery = message.format {
94 self.handle_ready_for_query(message)?;
95 }
96 }
97
98 Ok(())
99 }
100
101 async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
102 let r: ReadyForQuery = self.inner.stream.recv_expect().await?;
103
104 self.inner.pending_ready_for_query_count -= 1;
105 self.inner.transaction_status = r.transaction_status;
106
107 Ok(())
108 }
109
110 #[inline(always)]
111 fn handle_ready_for_query(&mut self, message: ReceivedMessage) -> Result<(), Error> {
112 self.inner.pending_ready_for_query_count = self
113 .inner
114 .pending_ready_for_query_count
115 .checked_sub(1)
116 .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?;
117
118 self.inner.transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
119
120 Ok(())
121 }
122
123 #[inline(always)]
127 pub(crate) fn queue_simple_query(&mut self, query: &str) -> Result<(), Error> {
128 self.inner.stream.write_msg(Query(query))?;
129 self.inner.pending_ready_for_query_count += 1;
130
131 Ok(())
132 }
133
134 pub(crate) fn in_transaction(&self) -> bool {
135 match self.inner.transaction_status {
136 TransactionStatus::Transaction => true,
137 TransactionStatus::Error | TransactionStatus::Idle => false,
138 }
139 }
140}
141
142impl Debug for PgConnection {
143 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
144 f.debug_struct("PgConnection").finish()
145 }
146}
147
148impl Connection for PgConnection {
149 type Database = Postgres;
150
151 type Options = PgConnectOptions;
152
153 fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
154 Box::pin(async move {
161 self.inner.stream.send(Terminate).await?;
162 self.inner.stream.shutdown().await?;
163
164 Ok(())
165 })
166 }
167
168 fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
169 Box::pin(async move {
170 self.inner.stream.shutdown().await?;
171
172 Ok(())
173 })
174 }
175
176 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
177 Box::pin(async move {
182 self.write_sync();
184 self.wait_until_ready().await
185 })
186 }
187
188 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
189 where
190 Self: Sized,
191 {
192 Transaction::begin(self, None)
193 }
194
195 fn begin_with(
196 &mut self,
197 statement: impl Into<Cow<'static, str>>,
198 ) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
199 where
200 Self: Sized,
201 {
202 Transaction::begin(self, Some(statement.into()))
203 }
204
205 fn cached_statements_size(&self) -> usize {
206 self.inner.cache_statement.len()
207 }
208
209 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
210 Box::pin(async move {
211 self.inner.cache_type_oid.clear();
212
213 let mut cleared = 0_usize;
214
215 self.wait_until_ready().await?;
216
217 while let Some((id, _)) = self.inner.cache_statement.remove_lru() {
218 self.inner.stream.write_msg(Close::Statement(id))?;
219 cleared += 1;
220 }
221
222 if cleared > 0 {
223 self.write_sync();
224 self.inner.stream.flush().await?;
225
226 self.wait_for_close_complete(cleared).await?;
227 self.recv_ready_for_query().await?;
228 }
229
230 Ok(())
231 })
232 }
233
234 fn shrink_buffers(&mut self) {
235 self.inner.stream.shrink_buffers();
236 }
237
238 #[doc(hidden)]
239 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
240 self.wait_until_ready().boxed()
241 }
242
243 #[doc(hidden)]
244 fn should_flush(&self) -> bool {
245 !self.inner.stream.write_buffer().is_empty()
246 }
247}
248
249impl AsMut<PgConnection> for PgConnection {
254 fn as_mut(&mut self) -> &mut PgConnection {
255 self
256 }
257}