cdbc_pg/connection/
mod.rs

1use std::fmt::{self, Debug, Formatter};
2use std::sync::Arc;
3
4use cdbc::HashMap;
5use cdbc::utils::statement_cache::StatementCache;
6use cdbc::connection::{Connection};
7use cdbc::error::Error;
8use cdbc::executor::Executor;
9use cdbc::utils::ustr::UStr;
10use cdbc::io::Decode;
11use crate::message::{
12    Close, Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus,
13};
14use crate::statement::PgStatementMetadata;
15use crate::{PgConnectOptions, PgTypeInfo, Postgres};
16use cdbc::transaction::Transaction;
17
18pub use self::stream::PgStream;
19
20pub(crate) mod describe;
21mod establish;
22mod executor;
23mod sasl;
24mod stream;
25mod tls;
26
27/// A connection to a PostgreSQL database.
28pub struct PgConnection {
29    // underlying TCP or UDS stream,
30    // wrapped in a potentially TLS stream,
31    // wrapped in a buffered stream
32    pub(crate) stream: PgStream,
33
34    // process id of this backend
35    // used to send cancel requests
36    #[allow(dead_code)]
37    process_id: u32,
38
39    // secret key of this backend
40    // used to send cancel requests
41    #[allow(dead_code)]
42    secret_key: u32,
43
44    // sequence of statement IDs for use in preparing statements
45    // in PostgreSQL, the statement is prepared to a user-supplied identifier
46    next_statement_id: u32,
47
48    // cache statement by query string to the id and columns
49    cache_statement: StatementCache<(u32, Arc<PgStatementMetadata>)>,
50
51    // cache user-defined types by id <-> info
52    cache_type_info: HashMap<u32, PgTypeInfo>,
53    cache_type_oid: HashMap<UStr, u32>,
54
55    // number of ReadyForQuery messages that we are currently expecting
56    pub(crate) pending_ready_for_query_count: usize,
57
58    // current transaction status
59    transaction_status: TransactionStatus,
60    pub(crate) transaction_depth: usize,
61}
62
63impl PgConnection {
64    // will return when the connection is ready for another query
65    pub fn wait_until_ready(&mut self) -> Result<(), Error> {
66        if !self.stream.wbuf.is_empty() {
67            self.stream.flush()?;
68        }
69
70        while self.pending_ready_for_query_count > 0 {
71            let message = self.stream.recv()?;
72
73            if let MessageFormat::ReadyForQuery = message.format {
74                self.handle_ready_for_query(message)?;
75            }
76        }
77
78        Ok(())
79    }
80
81    fn recv_ready_for_query(&mut self) -> Result<(), Error> {
82        let r: ReadyForQuery = self
83            .stream
84            .recv_expect(MessageFormat::ReadyForQuery)
85            ?;
86
87        self.pending_ready_for_query_count -= 1;
88        self.transaction_status = r.transaction_status;
89
90        Ok(())
91    }
92
93    fn handle_ready_for_query(&mut self, message: Message) -> Result<(), Error> {
94        self.pending_ready_for_query_count -= 1;
95        self.transaction_status = ReadyForQuery::decode(message.contents)?.transaction_status;
96
97        Ok(())
98    }
99}
100
101impl Debug for PgConnection {
102    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
103        f.debug_struct("PgConnection").finish()
104    }
105}
106
107impl Connection for PgConnection {
108
109    type Options = PgConnectOptions;
110
111    fn close(mut self) -> Result<(), Error> {
112        // The normal, graceful termination procedure is that the frontend sends a Terminate
113        // message and immediately closes the connection.
114
115        // On receipt of this message, the backend closes the
116        // connection and terminates.
117
118            self.stream.send(Terminate)?;
119            self.stream.shutdown()?;
120
121            Ok(())
122    }
123
124    fn ping(&mut self) ->  Result<(), Error> {
125        // By sending a comment we avoid an error if the connection was in the middle of a rowset
126        self.execute("/* SQLx ping */")?;
127        Ok(())
128    }
129
130    fn begin(&mut self) ->  Result<Transaction<'_, Self::Database>, Error>
131    where
132        Self: Sized,
133    {
134        Transaction::begin(self)
135    }
136
137    fn cached_statements_size(&self) -> usize {
138        self.cache_statement.len()
139    }
140
141    fn clear_cached_statements(&mut self) ->  Result<(), Error> {
142            let mut cleared = 0_usize;
143
144            self.wait_until_ready()?;
145
146            while let Some((id, _)) = self.cache_statement.remove_lru() {
147                self.stream.write(Close::Statement(id));
148                cleared += 1;
149            }
150
151            if cleared > 0 {
152                self.write_sync();
153                self.stream.flush()?;
154
155                self.wait_for_close_complete(cleared)?;
156                self.recv_ready_for_query()?;
157            }
158
159            Ok(())
160    }
161
162    #[doc(hidden)]
163    fn flush(&mut self) ->  Result<(), Error> {
164        self.wait_until_ready()
165    }
166
167    #[doc(hidden)]
168    fn should_flush(&self) -> bool {
169        !self.stream.wbuf.is_empty()
170    }
171}
172
173pub trait PgConnectionInfo {
174    /// the version number of the server in `libpq` format
175    fn server_version_num(&self) -> Option<u32>;
176}
177
178impl PgConnectionInfo for PgConnection {
179    fn server_version_num(&self) -> Option<u32> {
180        self.stream.server_version_num
181    }
182}
183
184impl PgConnectionInfo for cdbc::pool::PoolConnection<Postgres> {
185    fn server_version_num(&self) -> Option<u32> {
186        self.stream.server_version_num
187    }
188}