sqlx_postgres/connection/
mod.rs

1use std::borrow::Cow;
2use std::fmt::{self, Debug, Formatter};
3use std::sync::Arc;
4
5use crate::HashMap;
6use futures_core::future::BoxFuture;
7use futures_util::FutureExt;
8
9use crate::common::StatementCache;
10use crate::error::Error;
11use crate::ext::ustr::UStr;
12use crate::io::StatementId;
13use crate::message::{
14    BackendMessageFormat, Close, Query, ReadyForQuery, ReceivedMessage, Terminate,
15    TransactionStatus,
16};
17use crate::statement::PgStatementMetadata;
18use crate::transaction::Transaction;
19use crate::types::Oid;
20use crate::{PgConnectOptions, PgTypeInfo, Postgres};
21
22pub(crate) use sqlx_core::connection::*;
23
24pub use self::stream::PgStream;
25
26pub(crate) mod describe;
27mod establish;
28mod executor;
29mod sasl;
30mod stream;
31mod tls;
32
33/// A connection to a PostgreSQL database.
34///
35/// See [`PgConnectOptions`] for connection URL reference.
36pub struct PgConnection {
37    pub(crate) inner: Box<PgConnectionInner>,
38}
39
40pub struct PgConnectionInner {
41    // underlying TCP or UDS stream,
42    // wrapped in a potentially TLS stream,
43    // wrapped in a buffered stream
44    pub(crate) stream: PgStream,
45
46    // process id of this backend
47    // used to send cancel requests
48    #[allow(dead_code)]
49    process_id: u32,
50
51    // secret key of this backend
52    // used to send cancel requests
53    #[allow(dead_code)]
54    secret_key: u32,
55
56    // sequence of statement IDs for use in preparing statements
57    // in PostgreSQL, the statement is prepared to a user-supplied identifier
58    next_statement_id: StatementId,
59
60    // cache statement by query string to the id and columns
61    cache_statement: StatementCache<(StatementId, Arc<PgStatementMetadata>)>,
62
63    // cache user-defined types by id <-> info
64    cache_type_info: HashMap<Oid, PgTypeInfo>,
65    cache_type_oid: HashMap<UStr, Oid>,
66    cache_elem_type_to_array: HashMap<Oid, Oid>,
67
68    // number of ReadyForQuery messages that we are currently expecting
69    pub(crate) pending_ready_for_query_count: usize,
70
71    // current transaction status
72    transaction_status: TransactionStatus,
73    pub(crate) transaction_depth: usize,
74
75    log_settings: LogSettings,
76}
77
78impl PgConnection {
79    /// the version number of the server in `libpq` format
80    pub fn server_version_num(&self) -> Option<u32> {
81        self.inner.stream.server_version_num
82    }
83
84    // will return when the connection is ready for another query
85    pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
86        if !self.inner.stream.write_buffer_mut().is_empty() {
87            self.inner.stream.flush().await?;
88        }
89
90        while self.inner.pending_ready_for_query_count > 0 {
91            let message = self.inner.stream.recv().await?;
92
93            if let BackendMessageFormat::ReadyForQuery = message.format {
94                self.handle_ready_for_query(message)?;
95            }
96        }
97
98        Ok(())
99    }
100
101    async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
102        let r: ReadyForQuery = self.inner.stream.recv_expect().await?;
103
104        self.inner.pending_ready_for_query_count -= 1;
105        self.inner.transaction_status = r.transaction_status;
106
107        Ok(())
108    }
109
110    #[inline(always)]
111    fn handle_ready_for_query(&mut self, message: ReceivedMessage) -> Result<(), Error> {
112        self.inner.pending_ready_for_query_count = self
113            .inner
114            .pending_ready_for_query_count
115            .checked_sub(1)
116            .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?;
117
118        self.inner.transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
119
120        Ok(())
121    }
122
123    /// Queue a simple query (not prepared) to execute the next time this connection is used.
124    ///
125    /// Used for rolling back transactions and releasing advisory locks.
126    #[inline(always)]
127    pub(crate) fn queue_simple_query(&mut self, query: &str) -> Result<(), Error> {
128        self.inner.stream.write_msg(Query(query))?;
129        self.inner.pending_ready_for_query_count += 1;
130
131        Ok(())
132    }
133
134    pub(crate) fn in_transaction(&self) -> bool {
135        match self.inner.transaction_status {
136            TransactionStatus::Transaction => true,
137            TransactionStatus::Error | TransactionStatus::Idle => false,
138        }
139    }
140}
141
142impl Debug for PgConnection {
143    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
144        f.debug_struct("PgConnection").finish()
145    }
146}
147
148impl Connection for PgConnection {
149    type Database = Postgres;
150
151    type Options = PgConnectOptions;
152
153    fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
154        // The normal, graceful termination procedure is that the frontend sends a Terminate
155        // message and immediately closes the connection.
156
157        // On receipt of this message, the backend closes the
158        // connection and terminates.
159
160        Box::pin(async move {
161            self.inner.stream.send(Terminate).await?;
162            self.inner.stream.shutdown().await?;
163
164            Ok(())
165        })
166    }
167
168    fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
169        Box::pin(async move {
170            self.inner.stream.shutdown().await?;
171
172            Ok(())
173        })
174    }
175
176    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
177        // Users were complaining about this showing up in query statistics on the server.
178        // By sending a comment we avoid an error if the connection was in the middle of a rowset
179        // self.execute("/* SQLx ping */").map_ok(|_| ()).boxed()
180
181        Box::pin(async move {
182            // The simplest call-and-response that's possible.
183            self.write_sync();
184            self.wait_until_ready().await
185        })
186    }
187
188    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
189    where
190        Self: Sized,
191    {
192        Transaction::begin(self, None)
193    }
194
195    fn begin_with(
196        &mut self,
197        statement: impl Into<Cow<'static, str>>,
198    ) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
199    where
200        Self: Sized,
201    {
202        Transaction::begin(self, Some(statement.into()))
203    }
204
205    fn cached_statements_size(&self) -> usize {
206        self.inner.cache_statement.len()
207    }
208
209    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
210        Box::pin(async move {
211            self.inner.cache_type_oid.clear();
212
213            let mut cleared = 0_usize;
214
215            self.wait_until_ready().await?;
216
217            while let Some((id, _)) = self.inner.cache_statement.remove_lru() {
218                self.inner.stream.write_msg(Close::Statement(id))?;
219                cleared += 1;
220            }
221
222            if cleared > 0 {
223                self.write_sync();
224                self.inner.stream.flush().await?;
225
226                self.wait_for_close_complete(cleared).await?;
227                self.recv_ready_for_query().await?;
228            }
229
230            Ok(())
231        })
232    }
233
234    fn shrink_buffers(&mut self) {
235        self.inner.stream.shrink_buffers();
236    }
237
238    #[doc(hidden)]
239    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
240        self.wait_until_ready().boxed()
241    }
242
243    #[doc(hidden)]
244    fn should_flush(&self) -> bool {
245        !self.inner.stream.write_buffer().is_empty()
246    }
247}
248
249// Implement `AsMut<Self>` so that `PgConnection` can be wrapped in
250// a `PgAdvisoryLockGuard`.
251//
252// See: https://github.com/launchbadge/sqlx/issues/2520
253impl AsMut<PgConnection> for PgConnection {
254    fn as_mut(&mut self) -> &mut PgConnection {
255        self
256    }
257}