sqlx_postgres/connection/
mod.rs1use std::collections::BTreeMap;
2use std::fmt::{self, Debug, Formatter};
3use std::future::Future;
4use std::sync::Arc;
5
6use crate::HashMap;
7
8use crate::common::StatementCache;
9use crate::error::Error;
10use crate::ext::ustr::UStr;
11use crate::io::StatementId;
12use crate::message::{
13 BackendMessageFormat, Close, Query, ReadyForQuery, ReceivedMessage, Terminate,
14 TransactionStatus,
15};
16use crate::statement::PgStatementMetadata;
17use crate::transaction::Transaction;
18use crate::types::Oid;
19use crate::{PgConnectOptions, PgTypeInfo, Postgres};
20
21pub(crate) use sqlx_core::connection::*;
22use sqlx_core::sql_str::SqlSafeStr;
23
24pub use self::stream::PgStream;
25
26#[cfg(feature = "offline")]
27mod describe;
28mod establish;
29mod executor;
30mod resolve;
31mod sasl;
32mod stream;
33mod tls;
34
35pub struct PgConnection {
39 pub(crate) inner: Box<PgConnectionInner>,
40}
41
42pub struct PgConnectionInner {
43 pub(crate) stream: PgStream,
47
48 #[allow(dead_code)]
51 process_id: u32,
52
53 #[allow(dead_code)]
56 secret_key: u32,
57
58 next_statement_id: StatementId,
61
62 cache_statement: StatementCache<(StatementId, Arc<PgStatementMetadata>)>,
64
65 cache_type_info: HashMap<Oid, PgTypeInfo>,
67 cache_type_oid: HashMap<UStr, Oid>,
68 cache_elem_type_to_array: HashMap<Oid, Oid>,
69 cache_table_to_column_names: HashMap<Oid, TableColumns>,
70
71 pub(crate) pending_ready_for_query_count: usize,
73
74 transaction_status: TransactionStatus,
76 pub(crate) transaction_depth: usize,
77
78 log_settings: LogSettings,
79}
80
81pub(crate) struct TableColumns {
82 table_name: Arc<str>,
83 columns: BTreeMap<i16, Arc<str>>,
85}
86
87impl PgConnection {
88 pub fn server_version_num(&self) -> Option<u32> {
90 self.inner.stream.server_version_num
91 }
92
93 pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
95 if !self.inner.stream.write_buffer_mut().is_empty() {
96 self.inner.stream.flush().await?;
97 }
98
99 while self.inner.pending_ready_for_query_count > 0 {
100 let message = self.inner.stream.recv().await?;
101
102 if let BackendMessageFormat::ReadyForQuery = message.format {
103 self.handle_ready_for_query(message)?;
104 }
105 }
106
107 Ok(())
108 }
109
110 async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
111 let r: ReadyForQuery = self.inner.stream.recv_expect().await?;
112
113 self.inner.pending_ready_for_query_count -= 1;
114 self.inner.transaction_status = r.transaction_status;
115
116 Ok(())
117 }
118
119 #[inline(always)]
120 fn handle_ready_for_query(&mut self, message: ReceivedMessage) -> Result<(), Error> {
121 self.inner.pending_ready_for_query_count = self
122 .inner
123 .pending_ready_for_query_count
124 .checked_sub(1)
125 .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?;
126
127 self.inner.transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
128
129 Ok(())
130 }
131
132 #[inline(always)]
136 pub(crate) fn queue_simple_query(&mut self, query: &str) -> Result<(), Error> {
137 self.inner.stream.write_msg(Query(query))?;
138 self.inner.pending_ready_for_query_count += 1;
139
140 Ok(())
141 }
142
143 pub(crate) fn in_transaction(&self) -> bool {
144 match self.inner.transaction_status {
145 TransactionStatus::Transaction => true,
146 TransactionStatus::Error | TransactionStatus::Idle => false,
147 }
148 }
149}
150
151impl Debug for PgConnection {
152 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
153 f.debug_struct("PgConnection").finish()
154 }
155}
156
157impl Connection for PgConnection {
158 type Database = Postgres;
159
160 type Options = PgConnectOptions;
161
162 async fn close(mut self) -> Result<(), Error> {
163 self.inner.stream.send(Terminate).await?;
169 self.inner.stream.shutdown().await?;
170
171 Ok(())
172 }
173
174 async fn close_hard(mut self) -> Result<(), Error> {
175 self.inner.stream.shutdown().await?;
176
177 Ok(())
178 }
179
180 async fn ping(&mut self) -> Result<(), Error> {
181 self.write_sync();
187 self.wait_until_ready().await
188 }
189
190 fn begin(
191 &mut self,
192 ) -> impl Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_ {
193 Transaction::begin(self, None)
194 }
195
196 fn begin_with(
197 &mut self,
198 statement: impl SqlSafeStr,
199 ) -> impl Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_
200 where
201 Self: Sized,
202 {
203 Transaction::begin(self, Some(statement.into_sql_str()))
204 }
205
206 fn cached_statements_size(&self) -> usize {
207 self.inner.cache_statement.len()
208 }
209
210 async fn clear_cached_statements(&mut self) -> Result<(), Error> {
211 self.inner.cache_type_oid.clear();
212
213 let mut cleared = 0_usize;
214
215 self.wait_until_ready().await?;
216
217 while let Some((id, _)) = self.inner.cache_statement.remove_lru() {
218 self.inner.stream.write_msg(Close::Statement(id))?;
219 cleared += 1;
220 }
221
222 if cleared > 0 {
223 self.write_sync();
224 self.inner.stream.flush().await?;
225
226 self.wait_for_close_complete(cleared).await?;
227 self.recv_ready_for_query().await?;
228 }
229
230 Ok(())
231 }
232
233 fn shrink_buffers(&mut self) {
234 self.inner.stream.shrink_buffers();
235 }
236
237 #[doc(hidden)]
238 fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + Send + '_ {
239 self.wait_until_ready()
240 }
241
242 #[doc(hidden)]
243 fn should_flush(&self) -> bool {
244 !self.inner.stream.write_buffer().is_empty()
245 }
246}
247
248impl AsMut<PgConnection> for PgConnection {
253 fn as_mut(&mut self) -> &mut PgConnection {
254 self
255 }
256}