sqlx_core_guts/postgres/connection/
mod.rs

1use std::fmt::{self, Debug, Formatter};
2use std::sync::Arc;
3
4use crate::HashMap;
5use futures_core::future::BoxFuture;
6use futures_util::FutureExt;
7
8use crate::common::StatementCache;
9use crate::connection::{Connection, LogSettings};
10use crate::error::Error;
11use crate::ext::ustr::UStr;
12use crate::io::Decode;
13use crate::postgres::message::{
14    Close, Message, MessageFormat, Query, ReadyForQuery, Terminate, TransactionStatus,
15};
16use crate::postgres::statement::PgStatementMetadata;
17use crate::postgres::types::Oid;
18use crate::postgres::{PgConnectOptions, PgTypeInfo, Postgres};
19use crate::transaction::Transaction;
20
21pub use self::stream::PgStream;
22
23pub(crate) mod describe;
24mod establish;
25mod executor;
26mod sasl;
27mod stream;
28mod tls;
29
30/// A connection to a PostgreSQL database.
31pub struct PgConnection {
32    // underlying TCP or UDS stream,
33    // wrapped in a potentially TLS stream,
34    // wrapped in a buffered stream
35    pub(crate) stream: PgStream,
36
37    // process id of this backend
38    // used to send cancel requests
39    #[allow(dead_code)]
40    process_id: u32,
41
42    // secret key of this backend
43    // used to send cancel requests
44    #[allow(dead_code)]
45    secret_key: u32,
46
47    // sequence of statement IDs for use in preparing statements
48    // in PostgreSQL, the statement is prepared to a user-supplied identifier
49    next_statement_id: Oid,
50
51    // cache statement by query string to the id and columns
52    cache_statement: StatementCache<(Oid, Arc<PgStatementMetadata>)>,
53
54    // cache user-defined types by id <-> info
55    cache_type_info: HashMap<Oid, PgTypeInfo>,
56    cache_type_oid: HashMap<UStr, Oid>,
57
58    // number of ReadyForQuery messages that we are currently expecting
59    pub(crate) pending_ready_for_query_count: usize,
60
61    // current transaction status
62    transaction_status: TransactionStatus,
63    pub(crate) transaction_depth: usize,
64
65    log_settings: LogSettings,
66}
67
68impl PgConnection {
69    /// the version number of the server in `libpq` format
70    pub fn server_version_num(&self) -> Option<u32> {
71        self.stream.server_version_num
72    }
73
74    // will return when the connection is ready for another query
75    pub(in crate::postgres) async fn wait_until_ready(&mut self) -> Result<(), Error> {
76        if !self.stream.wbuf.is_empty() {
77            self.stream.flush().await?;
78        }
79
80        while self.pending_ready_for_query_count > 0 {
81            let message = self.stream.recv().await?;
82
83            if let MessageFormat::ReadyForQuery = message.format {
84                self.handle_ready_for_query(message)?;
85            }
86        }
87
88        Ok(())
89    }
90
91    async fn recv_ready_for_query(&mut self) -> Result<(), Error> {
92        let r: ReadyForQuery = self
93            .stream
94            .recv_expect(MessageFormat::ReadyForQuery)
95            .await?;
96
97        self.pending_ready_for_query_count -= 1;
98        self.transaction_status = r.transaction_status;
99
100        Ok(())
101    }
102
103    fn handle_ready_for_query(&mut self, message: Message) -> Result<(), Error> {
104        self.pending_ready_for_query_count -= 1;
105        self.transaction_status = ReadyForQuery::decode(message.contents)?.transaction_status;
106
107        Ok(())
108    }
109
110    /// Queue a simple query (not prepared) to execute the next time this connection is used.
111    ///
112    /// Used for rolling back transactions and releasing advisory locks.
113    pub(crate) fn queue_simple_query(&mut self, query: &str) {
114        self.pending_ready_for_query_count += 1;
115        self.stream.write(Query(query));
116    }
117}
118
119impl Debug for PgConnection {
120    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
121        f.debug_struct("PgConnection").finish()
122    }
123}
124
125impl Connection for PgConnection {
126    type Database = Postgres;
127
128    type Options = PgConnectOptions;
129
130    fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
131        // The normal, graceful termination procedure is that the frontend sends a Terminate
132        // message and immediately closes the connection.
133
134        // On receipt of this message, the backend closes the
135        // connection and terminates.
136
137        Box::pin(async move {
138            self.stream.send(Terminate).await?;
139            self.stream.shutdown().await?;
140
141            Ok(())
142        })
143    }
144
145    fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
146        Box::pin(async move {
147            self.stream.shutdown().await?;
148
149            Ok(())
150        })
151    }
152
153    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
154        // Users were complaining about this showing up in query statistics on the server.
155        // By sending a comment we avoid an error if the connection was in the middle of a rowset
156        // self.execute("/* SQLx ping */").map_ok(|_| ()).boxed()
157
158        Box::pin(async move {
159            // The simplest call-and-response that's possible.
160            self.write_sync();
161            self.wait_until_ready().await
162        })
163    }
164
165    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
166    where
167        Self: Sized,
168    {
169        Transaction::begin(self)
170    }
171
172    fn cached_statements_size(&self) -> usize {
173        self.cache_statement.len()
174    }
175
176    fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
177        Box::pin(async move {
178            let mut cleared = 0_usize;
179
180            self.wait_until_ready().await?;
181
182            while let Some((id, _)) = self.cache_statement.remove_lru() {
183                self.stream.write(Close::Statement(id));
184                cleared += 1;
185            }
186
187            if cleared > 0 {
188                self.write_sync();
189                self.stream.flush().await?;
190
191                self.wait_for_close_complete(cleared).await?;
192                self.recv_ready_for_query().await?;
193            }
194
195            Ok(())
196        })
197    }
198
199    #[doc(hidden)]
200    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
201        self.wait_until_ready().boxed()
202    }
203
204    #[doc(hidden)]
205    fn should_flush(&self) -> bool {
206        !self.stream.wbuf.is_empty()
207    }
208}