cdbc_pg/connection/
mod.rs1use std::fmt::{self, Debug, Formatter};
2use std::sync::Arc;
3
4use cdbc::HashMap;
5use cdbc::utils::statement_cache::StatementCache;
6use cdbc::connection::{Connection};
7use cdbc::error::Error;
8use cdbc::executor::Executor;
9use cdbc::utils::ustr::UStr;
10use cdbc::io::Decode;
11use crate::message::{
12 Close, Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus,
13};
14use crate::statement::PgStatementMetadata;
15use crate::{PgConnectOptions, PgTypeInfo, Postgres};
16use cdbc::transaction::Transaction;
17
18pub use self::stream::PgStream;
19
20pub(crate) mod describe;
21mod establish;
22mod executor;
23mod sasl;
24mod stream;
25mod tls;
26
27pub struct PgConnection {
29 pub(crate) stream: PgStream,
33
34 #[allow(dead_code)]
37 process_id: u32,
38
39 #[allow(dead_code)]
42 secret_key: u32,
43
44 next_statement_id: u32,
47
48 cache_statement: StatementCache<(u32, Arc<PgStatementMetadata>)>,
50
51 cache_type_info: HashMap<u32, PgTypeInfo>,
53 cache_type_oid: HashMap<UStr, u32>,
54
55 pub(crate) pending_ready_for_query_count: usize,
57
58 transaction_status: TransactionStatus,
60 pub(crate) transaction_depth: usize,
61}
62
63impl PgConnection {
64 pub fn wait_until_ready(&mut self) -> Result<(), Error> {
66 if !self.stream.wbuf.is_empty() {
67 self.stream.flush()?;
68 }
69
70 while self.pending_ready_for_query_count > 0 {
71 let message = self.stream.recv()?;
72
73 if let MessageFormat::ReadyForQuery = message.format {
74 self.handle_ready_for_query(message)?;
75 }
76 }
77
78 Ok(())
79 }
80
81 fn recv_ready_for_query(&mut self) -> Result<(), Error> {
82 let r: ReadyForQuery = self
83 .stream
84 .recv_expect(MessageFormat::ReadyForQuery)
85 ?;
86
87 self.pending_ready_for_query_count -= 1;
88 self.transaction_status = r.transaction_status;
89
90 Ok(())
91 }
92
93 fn handle_ready_for_query(&mut self, message: Message) -> Result<(), Error> {
94 self.pending_ready_for_query_count -= 1;
95 self.transaction_status = ReadyForQuery::decode(message.contents)?.transaction_status;
96
97 Ok(())
98 }
99}
100
101impl Debug for PgConnection {
102 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
103 f.debug_struct("PgConnection").finish()
104 }
105}
106
107impl Connection for PgConnection {
108
109 type Options = PgConnectOptions;
110
111 fn close(mut self) -> Result<(), Error> {
112 self.stream.send(Terminate)?;
119 self.stream.shutdown()?;
120
121 Ok(())
122 }
123
124 fn ping(&mut self) -> Result<(), Error> {
125 self.execute("/* SQLx ping */")?;
127 Ok(())
128 }
129
130 fn begin(&mut self) -> Result<Transaction<'_, Self::Database>, Error>
131 where
132 Self: Sized,
133 {
134 Transaction::begin(self)
135 }
136
137 fn cached_statements_size(&self) -> usize {
138 self.cache_statement.len()
139 }
140
141 fn clear_cached_statements(&mut self) -> Result<(), Error> {
142 let mut cleared = 0_usize;
143
144 self.wait_until_ready()?;
145
146 while let Some((id, _)) = self.cache_statement.remove_lru() {
147 self.stream.write(Close::Statement(id));
148 cleared += 1;
149 }
150
151 if cleared > 0 {
152 self.write_sync();
153 self.stream.flush()?;
154
155 self.wait_for_close_complete(cleared)?;
156 self.recv_ready_for_query()?;
157 }
158
159 Ok(())
160 }
161
162 #[doc(hidden)]
163 fn flush(&mut self) -> Result<(), Error> {
164 self.wait_until_ready()
165 }
166
167 #[doc(hidden)]
168 fn should_flush(&self) -> bool {
169 !self.stream.wbuf.is_empty()
170 }
171}
172
173pub trait PgConnectionInfo {
174 fn server_version_num(&self) -> Option<u32>;
176}
177
178impl PgConnectionInfo for PgConnection {
179 fn server_version_num(&self) -> Option<u32> {
180 self.stream.server_version_num
181 }
182}
183
184impl PgConnectionInfo for cdbc::pool::PoolConnection<Postgres> {
185 fn server_version_num(&self) -> Option<u32> {
186 self.stream.server_version_num
187 }
188}