sqlx_build_trust_postgres/connection/
mod.rs1use std::fmt::{self, Debug, Formatter};
2use std::sync::Arc;
3
4use crate::HashMap;
5use futures_core::future::BoxFuture;
6use futures_util::FutureExt;
7
8use crate::common::StatementCache;
9use crate::error::Error;
10use crate::ext::ustr::UStr;
11use crate::io::Decode;
12use crate::message::{
13 Close, Message, MessageFormat, Query, ReadyForQuery, Terminate, TransactionStatus,
14};
15use crate::statement::PgStatementMetadata;
16use crate::transaction::Transaction;
17use crate::types::Oid;
18use crate::{PgConnectOptions, PgTypeInfo, Postgres};
19
20pub(crate) use sqlx_core::connection::*;
21
22pub use self::stream::PgStream;
23
24pub(crate) mod describe;
25mod establish;
26mod executor;
27mod sasl;
28mod stream;
29mod tls;
30
31pub struct PgConnection {
33 pub(crate) stream: PgStream,
37
38 #[allow(dead_code)]
41 process_id: u32,
42
43 #[allow(dead_code)]
46 secret_key: u32,
47
48 next_statement_id: Oid,
51
52 cache_statement: StatementCache<(Oid, Arc<PgStatementMetadata>)>,
54
55 cache_type_info: HashMap<Oid, PgTypeInfo>,
57 cache_type_oid: HashMap<UStr, Oid>,
58
59 pub(crate) pending_ready_for_query_count: usize,
61
62 transaction_status: TransactionStatus,
64 pub(crate) transaction_depth: usize,
65
66 log_settings: LogSettings,
67}
68
69impl PgConnection {
70 pub fn server_version_num(&self) -> Option<u32> {
72 self.stream.server_version_num
73 }
74
75 pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
77 if !self.stream.write_buffer_mut().is_empty() {
78 self.stream.flush().await?;
79 }
80
81 while self.pending_ready_for_query_count > 0 {
82 let message = self.stream.recv().await?;
83
84 if let MessageFormat::ReadyForQuery = message.format {
85 self.handle_ready_for_query(message)?;
86 }
87 }
88
89 Ok(())
90 }
91
92 async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
93 let r: ReadyForQuery = self
94 .stream
95 .recv_expect(MessageFormat::ReadyForQuery)
96 .await?;
97
98 self.pending_ready_for_query_count -= 1;
99 self.transaction_status = r.transaction_status;
100
101 Ok(())
102 }
103
104 fn handle_ready_for_query(&mut self, message: Message) -> Result<(), Error> {
105 self.pending_ready_for_query_count -= 1;
106 self.transaction_status = ReadyForQuery::decode(message.contents)?.transaction_status;
107
108 Ok(())
109 }
110
111 pub(crate) fn queue_simple_query(&mut self, query: &str) {
115 self.pending_ready_for_query_count += 1;
116 self.stream.write(Query(query));
117 }
118}
119
120impl Debug for PgConnection {
121 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
122 f.debug_struct("PgConnection").finish()
123 }
124}
125
126impl Connection for PgConnection {
127 type Database = Postgres;
128
129 type Options = PgConnectOptions;
130
131 fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
132 Box::pin(async move {
139 self.stream.send(Terminate).await?;
140 self.stream.shutdown().await?;
141
142 Ok(())
143 })
144 }
145
146 fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
147 Box::pin(async move {
148 self.stream.shutdown().await?;
149
150 Ok(())
151 })
152 }
153
154 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
155 Box::pin(async move {
160 self.write_sync();
162 self.wait_until_ready().await
163 })
164 }
165
166 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
167 where
168 Self: Sized,
169 {
170 Transaction::begin(self)
171 }
172
173 fn cached_statements_size(&self) -> usize {
174 self.cache_statement.len()
175 }
176
177 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
178 Box::pin(async move {
179 self.cache_type_oid.clear();
180
181 let mut cleared = 0_usize;
182
183 self.wait_until_ready().await?;
184
185 while let Some((id, _)) = self.cache_statement.remove_lru() {
186 self.stream.write(Close::Statement(id));
187 cleared += 1;
188 }
189
190 if cleared > 0 {
191 self.write_sync();
192 self.stream.flush().await?;
193
194 self.wait_for_close_complete(cleared).await?;
195 self.recv_ready_for_query().await?;
196 }
197
198 Ok(())
199 })
200 }
201
202 fn shrink_buffers(&mut self) {
203 self.stream.shrink_buffers();
204 }
205
206 #[doc(hidden)]
207 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
208 self.wait_until_ready().boxed()
209 }
210
211 #[doc(hidden)]
212 fn should_flush(&self) -> bool {
213 !self.stream.write_buffer().is_empty()
214 }
215}
216
217impl AsMut<PgConnection> for PgConnection {
222 fn as_mut(&mut self) -> &mut PgConnection {
223 self
224 }
225}