sqlx_build_trust_postgres/connection/
mod.rs

1use std::fmt::{self, Debug, Formatter};
2use std::sync::Arc;
3
4use crate::HashMap;
5use futures_core::future::BoxFuture;
6use futures_util::FutureExt;
7
8use crate::common::StatementCache;
9use crate::error::Error;
10use crate::ext::ustr::UStr;
11use crate::io::Decode;
12use crate::message::{
13    Close, Message, MessageFormat, Query, ReadyForQuery, Terminate, TransactionStatus,
14};
15use crate::statement::PgStatementMetadata;
16use crate::transaction::Transaction;
17use crate::types::Oid;
18use crate::{PgConnectOptions, PgTypeInfo, Postgres};
19
20pub(crate) use sqlx_core::connection::*;
21
22pub use self::stream::PgStream;
23
24pub(crate) mod describe;
25mod establish;
26mod executor;
27mod sasl;
28mod stream;
29mod tls;
30
31/// A connection to a PostgreSQL database.
32pub struct PgConnection {
33    // underlying TCP or UDS stream,
34    // wrapped in a potentially TLS stream,
35    // wrapped in a buffered stream
36    pub(crate) stream: PgStream,
37
38    // process id of this backend
39    // used to send cancel requests
40    #[allow(dead_code)]
41    process_id: u32,
42
43    // secret key of this backend
44    // used to send cancel requests
45    #[allow(dead_code)]
46    secret_key: u32,
47
48    // sequence of statement IDs for use in preparing statements
49    // in PostgreSQL, the statement is prepared to a user-supplied identifier
50    next_statement_id: Oid,
51
52    // cache statement by query string to the id and columns
53    cache_statement: StatementCache<(Oid, Arc<PgStatementMetadata>)>,
54
55    // cache user-defined types by id <-> info
56    cache_type_info: HashMap<Oid, PgTypeInfo>,
57    cache_type_oid: HashMap<UStr, Oid>,
58
59    // number of ReadyForQuery messages that we are currently expecting
60    pub(crate) pending_ready_for_query_count: usize,
61
62    // current transaction status
63    transaction_status: TransactionStatus,
64    pub(crate) transaction_depth: usize,
65
66    log_settings: LogSettings,
67}
68
69impl PgConnection {
70    /// the version number of the server in `libpq` format
71    pub fn server_version_num(&self) -> Option<u32> {
72        self.stream.server_version_num
73    }
74
75    // will return when the connection is ready for another query
76    pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
77        if !self.stream.write_buffer_mut().is_empty() {
78            self.stream.flush().await?;
79        }
80
81        while self.pending_ready_for_query_count > 0 {
82            let message = self.stream.recv().await?;
83
84            if let MessageFormat::ReadyForQuery = message.format {
85                self.handle_ready_for_query(message)?;
86            }
87        }
88
89        Ok(())
90    }
91
92    async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
93        let r: ReadyForQuery = self
94            .stream
95            .recv_expect(MessageFormat::ReadyForQuery)
96            .await?;
97
98        self.pending_ready_for_query_count -= 1;
99        self.transaction_status = r.transaction_status;
100
101        Ok(())
102    }
103
104    fn handle_ready_for_query(&mut self, message: Message) -> Result<(), Error> {
105        self.pending_ready_for_query_count -= 1;
106        self.transaction_status = ReadyForQuery::decode(message.contents)?.transaction_status;
107
108        Ok(())
109    }
110
111    /// Queue a simple query (not prepared) to execute the next time this connection is used.
112    ///
113    /// Used for rolling back transactions and releasing advisory locks.
114    pub(crate) fn queue_simple_query(&mut self, query: &str) {
115        self.pending_ready_for_query_count += 1;
116        self.stream.write(Query(query));
117    }
118}
119
120impl Debug for PgConnection {
121    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
122        f.debug_struct("PgConnection").finish()
123    }
124}
125
126impl Connection for PgConnection {
127    type Database = Postgres;
128
129    type Options = PgConnectOptions;
130
131    fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
132        // The normal, graceful termination procedure is that the frontend sends a Terminate
133        // message and immediately closes the connection.
134
135        // On receipt of this message, the backend closes the
136        // connection and terminates.
137
138        Box::pin(async move {
139            self.stream.send(Terminate).await?;
140            self.stream.shutdown().await?;
141
142            Ok(())
143        })
144    }
145
146    fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
147        Box::pin(async move {
148            self.stream.shutdown().await?;
149
150            Ok(())
151        })
152    }
153
154    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
155        // Users were complaining about this showing up in query statistics on the server.
156        // By sending a comment we avoid an error if the connection was in the middle of a rowset
157        // self.execute("/* SQLx ping */").map_ok(|_| ()).boxed()
158
159        Box::pin(async move {
160            // The simplest call-and-response that's possible.
161            self.write_sync();
162            self.wait_until_ready().await
163        })
164    }
165
166    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
167    where
168        Self: Sized,
169    {
170        Transaction::begin(self)
171    }
172
173    fn cached_statements_size(&self) -> usize {
174        self.cache_statement.len()
175    }
176
177    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
178        Box::pin(async move {
179            self.cache_type_oid.clear();
180
181            let mut cleared = 0_usize;
182
183            self.wait_until_ready().await?;
184
185            while let Some((id, _)) = self.cache_statement.remove_lru() {
186                self.stream.write(Close::Statement(id));
187                cleared += 1;
188            }
189
190            if cleared > 0 {
191                self.write_sync();
192                self.stream.flush().await?;
193
194                self.wait_for_close_complete(cleared).await?;
195                self.recv_ready_for_query().await?;
196            }
197
198            Ok(())
199        })
200    }
201
202    fn shrink_buffers(&mut self) {
203        self.stream.shrink_buffers();
204    }
205
206    #[doc(hidden)]
207    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
208        self.wait_until_ready().boxed()
209    }
210
211    #[doc(hidden)]
212    fn should_flush(&self) -> bool {
213        !self.stream.write_buffer().is_empty()
214    }
215}
216
217// Implement `AsMut<Self>` so that `PgConnection` can be wrapped in
218// a `PgAdvisoryLockGuard`.
219//
220// See: https://github.com/launchbadge/sqlx/issues/2520
221impl AsMut<PgConnection> for PgConnection {
222    fn as_mut(&mut self) -> &mut PgConnection {
223        self
224    }
225}