Skip to main content

sqlx_postgres/connection/
mod.rs

1use std::collections::BTreeMap;
2use std::fmt::{self, Debug, Formatter};
3use std::future::Future;
4use std::sync::Arc;
5
6use crate::HashMap;
7
8use crate::common::StatementCache;
9use crate::error::Error;
10use crate::ext::ustr::UStr;
11use crate::io::StatementId;
12use crate::message::{
13    BackendMessageFormat, Close, Query, ReadyForQuery, ReceivedMessage, Terminate,
14    TransactionStatus,
15};
16use crate::statement::PgStatementMetadata;
17use crate::transaction::Transaction;
18use crate::types::Oid;
19use crate::{PgConnectOptions, PgTypeInfo, Postgres};
20
21pub(crate) use sqlx_core::connection::*;
22use sqlx_core::sql_str::SqlSafeStr;
23
24pub use self::stream::PgStream;
25
26#[cfg(feature = "offline")]
27mod describe;
28mod establish;
29mod executor;
30mod resolve;
31mod sasl;
32mod stream;
33mod tls;
34
35/// A connection to a PostgreSQL database.
36///
37/// See [`PgConnectOptions`] for connection URL reference.
38pub struct PgConnection {
39    pub(crate) inner: Box<PgConnectionInner>,
40}
41
42pub struct PgConnectionInner {
43    // underlying TCP or UDS stream,
44    // wrapped in a potentially TLS stream,
45    // wrapped in a buffered stream
46    pub(crate) stream: PgStream,
47
48    // process id of this backend
49    // used to send cancel requests
50    #[allow(dead_code)]
51    process_id: u32,
52
53    // secret key of this backend
54    // used to send cancel requests
55    #[allow(dead_code)]
56    secret_key: u32,
57
58    // sequence of statement IDs for use in preparing statements
59    // in PostgreSQL, the statement is prepared to a user-supplied identifier
60    next_statement_id: StatementId,
61
62    // cache statement by query string to the id and columns
63    cache_statement: StatementCache<(StatementId, Arc<PgStatementMetadata>)>,
64
65    // cache user-defined types by id <-> info
66    cache_type_info: HashMap<Oid, PgTypeInfo>,
67    cache_type_oid: HashMap<UStr, Oid>,
68    cache_elem_type_to_array: HashMap<Oid, Oid>,
69    cache_table_to_column_names: HashMap<Oid, TableColumns>,
70
71    // number of ReadyForQuery messages that we are currently expecting
72    pub(crate) pending_ready_for_query_count: usize,
73
74    // current transaction status
75    transaction_status: TransactionStatus,
76    pub(crate) transaction_depth: usize,
77
78    log_settings: LogSettings,
79}
80
81pub(crate) struct TableColumns {
82    table_name: Arc<str>,
83    /// Attribute number -> name.
84    columns: BTreeMap<i16, Arc<str>>,
85}
86
87impl PgConnection {
88    /// the version number of the server in `libpq` format
89    pub fn server_version_num(&self) -> Option<u32> {
90        self.inner.stream.server_version_num
91    }
92
93    // will return when the connection is ready for another query
94    pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
95        if !self.inner.stream.write_buffer_mut().is_empty() {
96            self.inner.stream.flush().await?;
97        }
98
99        while self.inner.pending_ready_for_query_count > 0 {
100            let message = self.inner.stream.recv().await?;
101
102            if let BackendMessageFormat::ReadyForQuery = message.format {
103                self.handle_ready_for_query(message)?;
104            }
105        }
106
107        Ok(())
108    }
109
110    async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
111        let r: ReadyForQuery = self.inner.stream.recv_expect().await?;
112
113        self.inner.pending_ready_for_query_count -= 1;
114        self.inner.transaction_status = r.transaction_status;
115
116        Ok(())
117    }
118
119    #[inline(always)]
120    fn handle_ready_for_query(&mut self, message: ReceivedMessage) -> Result<(), Error> {
121        self.inner.pending_ready_for_query_count = self
122            .inner
123            .pending_ready_for_query_count
124            .checked_sub(1)
125            .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?;
126
127        self.inner.transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
128
129        Ok(())
130    }
131
132    /// Queue a simple query (not prepared) to execute the next time this connection is used.
133    ///
134    /// Used for rolling back transactions and releasing advisory locks.
135    #[inline(always)]
136    pub(crate) fn queue_simple_query(&mut self, query: &str) -> Result<(), Error> {
137        self.inner.stream.write_msg(Query(query))?;
138        self.inner.pending_ready_for_query_count += 1;
139
140        Ok(())
141    }
142
143    pub(crate) fn in_transaction(&self) -> bool {
144        match self.inner.transaction_status {
145            TransactionStatus::Transaction => true,
146            TransactionStatus::Error | TransactionStatus::Idle => false,
147        }
148    }
149}
150
151impl Debug for PgConnection {
152    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
153        f.debug_struct("PgConnection").finish()
154    }
155}
156
157impl Connection for PgConnection {
158    type Database = Postgres;
159
160    type Options = PgConnectOptions;
161
162    async fn close(mut self) -> Result<(), Error> {
163        // The normal, graceful termination procedure is that the frontend sends a Terminate
164        // message and immediately closes the connection.
165
166        // On receipt of this message, the backend closes the
167        // connection and terminates.
168        self.inner.stream.send(Terminate).await?;
169        self.inner.stream.shutdown().await?;
170
171        Ok(())
172    }
173
174    async fn close_hard(mut self) -> Result<(), Error> {
175        self.inner.stream.shutdown().await?;
176
177        Ok(())
178    }
179
180    async fn ping(&mut self) -> Result<(), Error> {
181        // Users were complaining about this showing up in query statistics on the server.
182        // By sending a comment we avoid an error if the connection was in the middle of a rowset
183        // self.execute("/* SQLx ping */").map_ok(|_| ()).boxed()
184
185        // The simplest call-and-response that's possible.
186        self.write_sync();
187        self.wait_until_ready().await
188    }
189
190    fn begin(
191        &mut self,
192    ) -> impl Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_ {
193        Transaction::begin(self, None)
194    }
195
196    fn begin_with(
197        &mut self,
198        statement: impl SqlSafeStr,
199    ) -> impl Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_
200    where
201        Self: Sized,
202    {
203        Transaction::begin(self, Some(statement.into_sql_str()))
204    }
205
206    fn cached_statements_size(&self) -> usize {
207        self.inner.cache_statement.len()
208    }
209
210    async fn clear_cached_statements(&mut self) -> Result<(), Error> {
211        self.inner.cache_type_oid.clear();
212
213        let mut cleared = 0_usize;
214
215        self.wait_until_ready().await?;
216
217        while let Some((id, _)) = self.inner.cache_statement.remove_lru() {
218            self.inner.stream.write_msg(Close::Statement(id))?;
219            cleared += 1;
220        }
221
222        if cleared > 0 {
223            self.write_sync();
224            self.inner.stream.flush().await?;
225
226            self.wait_for_close_complete(cleared).await?;
227            self.recv_ready_for_query().await?;
228        }
229
230        Ok(())
231    }
232
233    fn shrink_buffers(&mut self) {
234        self.inner.stream.shrink_buffers();
235    }
236
237    #[doc(hidden)]
238    fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + Send + '_ {
239        self.wait_until_ready()
240    }
241
242    #[doc(hidden)]
243    fn should_flush(&self) -> bool {
244        !self.inner.stream.write_buffer().is_empty()
245    }
246}
247
248// Implement `AsMut<Self>` so that `PgConnection` can be wrapped in
249// a `PgAdvisoryLockGuard`.
250//
251// See: https://github.com/launchbadge/sqlx/issues/2520
252impl AsMut<PgConnection> for PgConnection {
253    fn as_mut(&mut self) -> &mut PgConnection {
254        self
255    }
256}