sqlx_postgres/connection/
mod.rs1use std::collections::BTreeMap;
2use std::fmt::{self, Debug, Formatter};
3use std::future::Future;
4use std::sync::Arc;
5
6use crate::HashMap;
7
8use crate::common::StatementCache;
9use crate::error::Error;
10use crate::ext::ustr::UStr;
11use crate::io::StatementId;
12use crate::message::{
13 BackendMessageFormat, Close, Query, ReadyForQuery, ReceivedMessage, Terminate,
14 TransactionStatus,
15};
16use crate::statement::PgStatementMetadata;
17use crate::transaction::Transaction;
18use crate::types::Oid;
19use crate::{PgConnectOptions, PgTypeInfo, Postgres};
20
21pub(crate) use sqlx_core::connection::*;
22use sqlx_core::sql_str::SqlSafeStr;
23
24pub use self::stream::PgStream;
25
26pub(crate) mod describe;
27mod establish;
28mod executor;
29mod sasl;
30mod stream;
31mod tls;
32
33pub struct PgConnection {
37 pub(crate) inner: Box<PgConnectionInner>,
38}
39
40pub struct PgConnectionInner {
41 pub(crate) stream: PgStream,
45
46 #[allow(dead_code)]
49 process_id: u32,
50
51 #[allow(dead_code)]
54 secret_key: u32,
55
56 next_statement_id: StatementId,
59
60 cache_statement: StatementCache<(StatementId, Arc<PgStatementMetadata>)>,
62
63 cache_type_info: HashMap<Oid, PgTypeInfo>,
65 cache_type_oid: HashMap<UStr, Oid>,
66 cache_elem_type_to_array: HashMap<Oid, Oid>,
67 cache_table_to_column_names: HashMap<Oid, TableColumns>,
68
69 pub(crate) pending_ready_for_query_count: usize,
71
72 transaction_status: TransactionStatus,
74 pub(crate) transaction_depth: usize,
75
76 log_settings: LogSettings,
77}
78
79pub(crate) struct TableColumns {
80 table_name: Arc<str>,
81 columns: BTreeMap<i16, Arc<str>>,
83}
84
85impl PgConnection {
86 pub fn server_version_num(&self) -> Option<u32> {
88 self.inner.stream.server_version_num
89 }
90
91 pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
93 if !self.inner.stream.write_buffer_mut().is_empty() {
94 self.inner.stream.flush().await?;
95 }
96
97 while self.inner.pending_ready_for_query_count > 0 {
98 let message = self.inner.stream.recv().await?;
99
100 if let BackendMessageFormat::ReadyForQuery = message.format {
101 self.handle_ready_for_query(message)?;
102 }
103 }
104
105 Ok(())
106 }
107
108 async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
109 let r: ReadyForQuery = self.inner.stream.recv_expect().await?;
110
111 self.inner.pending_ready_for_query_count -= 1;
112 self.inner.transaction_status = r.transaction_status;
113
114 Ok(())
115 }
116
117 #[inline(always)]
118 fn handle_ready_for_query(&mut self, message: ReceivedMessage) -> Result<(), Error> {
119 self.inner.pending_ready_for_query_count = self
120 .inner
121 .pending_ready_for_query_count
122 .checked_sub(1)
123 .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?;
124
125 self.inner.transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
126
127 Ok(())
128 }
129
130 #[inline(always)]
134 pub(crate) fn queue_simple_query(&mut self, query: &str) -> Result<(), Error> {
135 self.inner.stream.write_msg(Query(query))?;
136 self.inner.pending_ready_for_query_count += 1;
137
138 Ok(())
139 }
140
141 pub(crate) fn in_transaction(&self) -> bool {
142 match self.inner.transaction_status {
143 TransactionStatus::Transaction => true,
144 TransactionStatus::Error | TransactionStatus::Idle => false,
145 }
146 }
147}
148
149impl Debug for PgConnection {
150 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
151 f.debug_struct("PgConnection").finish()
152 }
153}
154
155impl Connection for PgConnection {
156 type Database = Postgres;
157
158 type Options = PgConnectOptions;
159
160 async fn close(mut self) -> Result<(), Error> {
161 self.inner.stream.send(Terminate).await?;
167 self.inner.stream.shutdown().await?;
168
169 Ok(())
170 }
171
172 async fn close_hard(mut self) -> Result<(), Error> {
173 self.inner.stream.shutdown().await?;
174
175 Ok(())
176 }
177
178 async fn ping(&mut self) -> Result<(), Error> {
179 self.write_sync();
185 self.wait_until_ready().await
186 }
187
188 fn begin(
189 &mut self,
190 ) -> impl Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_ {
191 Transaction::begin(self, None)
192 }
193
194 fn begin_with(
195 &mut self,
196 statement: impl SqlSafeStr,
197 ) -> impl Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_
198 where
199 Self: Sized,
200 {
201 Transaction::begin(self, Some(statement.into_sql_str()))
202 }
203
204 fn cached_statements_size(&self) -> usize {
205 self.inner.cache_statement.len()
206 }
207
208 async fn clear_cached_statements(&mut self) -> Result<(), Error> {
209 self.inner.cache_type_oid.clear();
210
211 let mut cleared = 0_usize;
212
213 self.wait_until_ready().await?;
214
215 while let Some((id, _)) = self.inner.cache_statement.remove_lru() {
216 self.inner.stream.write_msg(Close::Statement(id))?;
217 cleared += 1;
218 }
219
220 if cleared > 0 {
221 self.write_sync();
222 self.inner.stream.flush().await?;
223
224 self.wait_for_close_complete(cleared).await?;
225 self.recv_ready_for_query().await?;
226 }
227
228 Ok(())
229 }
230
231 fn shrink_buffers(&mut self) {
232 self.inner.stream.shrink_buffers();
233 }
234
235 #[doc(hidden)]
236 fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + Send + '_ {
237 self.wait_until_ready()
238 }
239
240 #[doc(hidden)]
241 fn should_flush(&self) -> bool {
242 !self.inner.stream.write_buffer().is_empty()
243 }
244}
245
246impl AsMut<PgConnection> for PgConnection {
251 fn as_mut(&mut self) -> &mut PgConnection {
252 self
253 }
254}