sqlx_core_guts/postgres/connection/
mod.rs1use std::fmt::{self, Debug, Formatter};
2use std::sync::Arc;
3
4use crate::HashMap;
5use futures_core::future::BoxFuture;
6use futures_util::FutureExt;
7
8use crate::common::StatementCache;
9use crate::connection::{Connection, LogSettings};
10use crate::error::Error;
11use crate::ext::ustr::UStr;
12use crate::io::Decode;
13use crate::postgres::message::{
14 Close, Message, MessageFormat, Query, ReadyForQuery, Terminate, TransactionStatus,
15};
16use crate::postgres::statement::PgStatementMetadata;
17use crate::postgres::types::Oid;
18use crate::postgres::{PgConnectOptions, PgTypeInfo, Postgres};
19use crate::transaction::Transaction;
20
21pub use self::stream::PgStream;
22
23pub(crate) mod describe;
24mod establish;
25mod executor;
26mod sasl;
27mod stream;
28mod tls;
29
30pub struct PgConnection {
32 pub(crate) stream: PgStream,
36
37 #[allow(dead_code)]
40 process_id: u32,
41
42 #[allow(dead_code)]
45 secret_key: u32,
46
47 next_statement_id: Oid,
50
51 cache_statement: StatementCache<(Oid, Arc<PgStatementMetadata>)>,
53
54 cache_type_info: HashMap<Oid, PgTypeInfo>,
56 cache_type_oid: HashMap<UStr, Oid>,
57
58 pub(crate) pending_ready_for_query_count: usize,
60
61 transaction_status: TransactionStatus,
63 pub(crate) transaction_depth: usize,
64
65 log_settings: LogSettings,
66}
67
68impl PgConnection {
69 pub fn server_version_num(&self) -> Option<u32> {
71 self.stream.server_version_num
72 }
73
74 pub(in crate::postgres) async fn wait_until_ready(&mut self) -> Result<(), Error> {
76 if !self.stream.wbuf.is_empty() {
77 self.stream.flush().await?;
78 }
79
80 while self.pending_ready_for_query_count > 0 {
81 let message = self.stream.recv().await?;
82
83 if let MessageFormat::ReadyForQuery = message.format {
84 self.handle_ready_for_query(message)?;
85 }
86 }
87
88 Ok(())
89 }
90
91 async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
92 let r: ReadyForQuery = self
93 .stream
94 .recv_expect(MessageFormat::ReadyForQuery)
95 .await?;
96
97 self.pending_ready_for_query_count -= 1;
98 self.transaction_status = r.transaction_status;
99
100 Ok(())
101 }
102
103 fn handle_ready_for_query(&mut self, message: Message) -> Result<(), Error> {
104 self.pending_ready_for_query_count -= 1;
105 self.transaction_status = ReadyForQuery::decode(message.contents)?.transaction_status;
106
107 Ok(())
108 }
109
110 pub(crate) fn queue_simple_query(&mut self, query: &str) {
114 self.pending_ready_for_query_count += 1;
115 self.stream.write(Query(query));
116 }
117}
118
119impl Debug for PgConnection {
120 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
121 f.debug_struct("PgConnection").finish()
122 }
123}
124
125impl Connection for PgConnection {
126 type Database = Postgres;
127
128 type Options = PgConnectOptions;
129
130 fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
131 Box::pin(async move {
138 self.stream.send(Terminate).await?;
139 self.stream.shutdown().await?;
140
141 Ok(())
142 })
143 }
144
145 fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
146 Box::pin(async move {
147 self.stream.shutdown().await?;
148
149 Ok(())
150 })
151 }
152
153 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
154 Box::pin(async move {
159 self.write_sync();
161 self.wait_until_ready().await
162 })
163 }
164
165 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
166 where
167 Self: Sized,
168 {
169 Transaction::begin(self)
170 }
171
172 fn cached_statements_size(&self) -> usize {
173 self.cache_statement.len()
174 }
175
176 fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
177 Box::pin(async move {
178 let mut cleared = 0_usize;
179
180 self.wait_until_ready().await?;
181
182 while let Some((id, _)) = self.cache_statement.remove_lru() {
183 self.stream.write(Close::Statement(id));
184 cleared += 1;
185 }
186
187 if cleared > 0 {
188 self.write_sync();
189 self.stream.flush().await?;
190
191 self.wait_for_close_complete(cleared).await?;
192 self.recv_ready_for_query().await?;
193 }
194
195 Ok(())
196 })
197 }
198
199 #[doc(hidden)]
200 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
201 self.wait_until_ready().boxed()
202 }
203
204 #[doc(hidden)]
205 fn should_flush(&self) -> bool {
206 !self.stream.wbuf.is_empty()
207 }
208}