sqlx_postgres/connection/
mod.rs

1use std::collections::BTreeMap;
2use std::fmt::{self, Debug, Formatter};
3use std::future::Future;
4use std::sync::Arc;
5
6use crate::HashMap;
7
8use crate::common::StatementCache;
9use crate::error::Error;
10use crate::ext::ustr::UStr;
11use crate::io::StatementId;
12use crate::message::{
13    BackendMessageFormat, Close, Query, ReadyForQuery, ReceivedMessage, Terminate,
14    TransactionStatus,
15};
16use crate::statement::PgStatementMetadata;
17use crate::transaction::Transaction;
18use crate::types::Oid;
19use crate::{PgConnectOptions, PgTypeInfo, Postgres};
20
21pub(crate) use sqlx_core::connection::*;
22use sqlx_core::sql_str::SqlSafeStr;
23
24pub use self::stream::PgStream;
25
26pub(crate) mod describe;
27mod establish;
28mod executor;
29mod sasl;
30mod stream;
31mod tls;
32
33/// A connection to a PostgreSQL database.
34///
35/// See [`PgConnectOptions`] for connection URL reference.
36pub struct PgConnection {
37    pub(crate) inner: Box<PgConnectionInner>,
38}
39
40pub struct PgConnectionInner {
41    // underlying TCP or UDS stream,
42    // wrapped in a potentially TLS stream,
43    // wrapped in a buffered stream
44    pub(crate) stream: PgStream,
45
46    // process id of this backend
47    // used to send cancel requests
48    #[allow(dead_code)]
49    process_id: u32,
50
51    // secret key of this backend
52    // used to send cancel requests
53    #[allow(dead_code)]
54    secret_key: u32,
55
56    // sequence of statement IDs for use in preparing statements
57    // in PostgreSQL, the statement is prepared to a user-supplied identifier
58    next_statement_id: StatementId,
59
60    // cache statement by query string to the id and columns
61    cache_statement: StatementCache<(StatementId, Arc<PgStatementMetadata>)>,
62
63    // cache user-defined types by id <-> info
64    cache_type_info: HashMap<Oid, PgTypeInfo>,
65    cache_type_oid: HashMap<UStr, Oid>,
66    cache_elem_type_to_array: HashMap<Oid, Oid>,
67    cache_table_to_column_names: HashMap<Oid, TableColumns>,
68
69    // number of ReadyForQuery messages that we are currently expecting
70    pub(crate) pending_ready_for_query_count: usize,
71
72    // current transaction status
73    transaction_status: TransactionStatus,
74    pub(crate) transaction_depth: usize,
75
76    log_settings: LogSettings,
77}
78
79pub(crate) struct TableColumns {
80    table_name: Arc<str>,
81    /// Attribute number -> name.
82    columns: BTreeMap<i16, Arc<str>>,
83}
84
85impl PgConnection {
86    /// the version number of the server in `libpq` format
87    pub fn server_version_num(&self) -> Option<u32> {
88        self.inner.stream.server_version_num
89    }
90
91    // will return when the connection is ready for another query
92    pub(crate) async fn wait_until_ready(&mut self) -> Result<(), Error> {
93        if !self.inner.stream.write_buffer_mut().is_empty() {
94            self.inner.stream.flush().await?;
95        }
96
97        while self.inner.pending_ready_for_query_count > 0 {
98            let message = self.inner.stream.recv().await?;
99
100            if let BackendMessageFormat::ReadyForQuery = message.format {
101                self.handle_ready_for_query(message)?;
102            }
103        }
104
105        Ok(())
106    }
107
108    async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
109        let r: ReadyForQuery = self.inner.stream.recv_expect().await?;
110
111        self.inner.pending_ready_for_query_count -= 1;
112        self.inner.transaction_status = r.transaction_status;
113
114        Ok(())
115    }
116
117    #[inline(always)]
118    fn handle_ready_for_query(&mut self, message: ReceivedMessage) -> Result<(), Error> {
119        self.inner.pending_ready_for_query_count = self
120            .inner
121            .pending_ready_for_query_count
122            .checked_sub(1)
123            .ok_or_else(|| err_protocol!("received more ReadyForQuery messages than expected"))?;
124
125        self.inner.transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
126
127        Ok(())
128    }
129
130    /// Queue a simple query (not prepared) to execute the next time this connection is used.
131    ///
132    /// Used for rolling back transactions and releasing advisory locks.
133    #[inline(always)]
134    pub(crate) fn queue_simple_query(&mut self, query: &str) -> Result<(), Error> {
135        self.inner.stream.write_msg(Query(query))?;
136        self.inner.pending_ready_for_query_count += 1;
137
138        Ok(())
139    }
140
141    pub(crate) fn in_transaction(&self) -> bool {
142        match self.inner.transaction_status {
143            TransactionStatus::Transaction => true,
144            TransactionStatus::Error | TransactionStatus::Idle => false,
145        }
146    }
147}
148
149impl Debug for PgConnection {
150    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
151        f.debug_struct("PgConnection").finish()
152    }
153}
154
155impl Connection for PgConnection {
156    type Database = Postgres;
157
158    type Options = PgConnectOptions;
159
160    async fn close(mut self) -> Result<(), Error> {
161        // The normal, graceful termination procedure is that the frontend sends a Terminate
162        // message and immediately closes the connection.
163
164        // On receipt of this message, the backend closes the
165        // connection and terminates.
166        self.inner.stream.send(Terminate).await?;
167        self.inner.stream.shutdown().await?;
168
169        Ok(())
170    }
171
172    async fn close_hard(mut self) -> Result<(), Error> {
173        self.inner.stream.shutdown().await?;
174
175        Ok(())
176    }
177
178    async fn ping(&mut self) -> Result<(), Error> {
179        // Users were complaining about this showing up in query statistics on the server.
180        // By sending a comment we avoid an error if the connection was in the middle of a rowset
181        // self.execute("/* SQLx ping */").map_ok(|_| ()).boxed()
182
183        // The simplest call-and-response that's possible.
184        self.write_sync();
185        self.wait_until_ready().await
186    }
187
188    fn begin(
189        &mut self,
190    ) -> impl Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_ {
191        Transaction::begin(self, None)
192    }
193
194    fn begin_with(
195        &mut self,
196        statement: impl SqlSafeStr,
197    ) -> impl Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_
198    where
199        Self: Sized,
200    {
201        Transaction::begin(self, Some(statement.into_sql_str()))
202    }
203
204    fn cached_statements_size(&self) -> usize {
205        self.inner.cache_statement.len()
206    }
207
208    async fn clear_cached_statements(&mut self) -> Result<(), Error> {
209        self.inner.cache_type_oid.clear();
210
211        let mut cleared = 0_usize;
212
213        self.wait_until_ready().await?;
214
215        while let Some((id, _)) = self.inner.cache_statement.remove_lru() {
216            self.inner.stream.write_msg(Close::Statement(id))?;
217            cleared += 1;
218        }
219
220        if cleared > 0 {
221            self.write_sync();
222            self.inner.stream.flush().await?;
223
224            self.wait_for_close_complete(cleared).await?;
225            self.recv_ready_for_query().await?;
226        }
227
228        Ok(())
229    }
230
231    fn shrink_buffers(&mut self) {
232        self.inner.stream.shrink_buffers();
233    }
234
235    #[doc(hidden)]
236    fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + Send + '_ {
237        self.wait_until_ready()
238    }
239
240    #[doc(hidden)]
241    fn should_flush(&self) -> bool {
242        !self.inner.stream.write_buffer().is_empty()
243    }
244}
245
246// Implement `AsMut<Self>` so that `PgConnection` can be wrapped in
247// a `PgAdvisoryLockGuard`.
248//
249// See: https://github.com/launchbadge/sqlx/issues/2520
250impl AsMut<PgConnection> for PgConnection {
251    fn as_mut(&mut self) -> &mut PgConnection {
252        self
253    }
254}