qail_pg/driver/connection/
types.rs1use super::super::notification::Notification;
16use super::super::stream::PgStream;
17use super::super::{AuthSettings, EnterpriseAuthMechanism};
18use bytes::BytesMut;
19use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroUsize;
21use std::sync::Arc;
22use std::sync::atomic::AtomicU64;
23use tokio::net::TcpStream;
24
25pub(super) const STMT_CACHE_CAPACITY: NonZeroUsize = NonZeroUsize::new(100).unwrap();
27
28#[derive(Debug)]
33pub(crate) struct StatementCache {
34 capacity: NonZeroUsize,
35 entries: HashMap<u64, String>,
36 order: VecDeque<u64>, }
38
39impl StatementCache {
40 pub(crate) fn new(capacity: NonZeroUsize) -> Self {
41 Self {
42 capacity,
43 entries: HashMap::with_capacity(capacity.get()),
44 order: VecDeque::with_capacity(capacity.get()),
45 }
46 }
47
48 pub(crate) fn len(&self) -> usize {
49 self.entries.len()
50 }
51
52 pub(crate) fn cap(&self) -> NonZeroUsize {
53 self.capacity
54 }
55
56 pub(crate) fn contains(&self, key: &u64) -> bool {
57 self.entries.contains_key(key)
58 }
59
60 pub(crate) fn get(&mut self, key: &u64) -> Option<String> {
61 let value = self.entries.get(key).cloned()?;
62 self.touch(*key);
63 Some(value)
64 }
65
66 pub(crate) fn put(&mut self, key: u64, value: String) {
67 if let std::collections::hash_map::Entry::Occupied(mut e) = self.entries.entry(key) {
68 e.insert(value);
69 self.touch(key);
70 return;
71 }
72
73 if self.entries.len() >= self.capacity.get() {
74 let _ = self.pop_lru();
75 }
76
77 self.entries.insert(key, value);
78 self.order.push_back(key);
79 }
80
81 pub(crate) fn pop_lru(&mut self) -> Option<(u64, String)> {
82 while let Some(key) = self.order.pop_front() {
83 if let Some(value) = self.entries.remove(&key) {
84 return Some((key, value));
85 }
86 }
87 None
88 }
89
90 pub(crate) fn remove(&mut self, key: &u64) -> Option<String> {
91 let removed = self.entries.remove(key);
92 if removed.is_some() {
93 self.order.retain(|k| k != key);
94 }
95 removed
96 }
97
98 pub(crate) fn clear(&mut self) {
99 self.entries.clear();
100 self.order.clear();
101 }
102
103 fn touch(&mut self, key: u64) {
104 self.order.retain(|k| *k != key);
105 self.order.push_back(key);
106 }
107}
108
109pub(crate) const BUFFER_CAPACITY: usize = 65536;
111
112pub(super) const SSL_REQUEST: [u8; 8] = [0, 0, 0, 8, 4, 210, 22, 47];
114
115pub(super) const GSSENC_REQUEST: [u8; 8] = [0, 0, 0, 8, 4, 210, 22, 48];
118
119#[derive(Debug)]
121pub(super) enum GssEncNegotiationResult {
122 Accepted(TcpStream),
126 Rejected,
128 ServerError,
131}
132
133pub(crate) const CANCEL_REQUEST_CODE: i32 = 80877102;
135
136pub(super) static GSS_SESSION_COUNTER: AtomicU64 = AtomicU64::new(1);
138
139pub(crate) const DEFAULT_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
142pub(super) const CONNECT_TRANSPORT_PLAIN: &str = "plain";
143pub(super) const CONNECT_TRANSPORT_TLS: &str = "tls";
144pub(super) const CONNECT_TRANSPORT_MTLS: &str = "mtls";
145pub(super) const CONNECT_TRANSPORT_GSSENC: &str = "gssenc";
146pub(super) const CONNECT_BACKEND_TOKIO: &str = "tokio";
147#[cfg(all(target_os = "linux", feature = "io_uring"))]
148pub(super) const CONNECT_BACKEND_IO_URING: &str = "io_uring";
149
150#[derive(Debug, Clone)]
152pub struct TlsConfig {
153 pub client_cert_pem: Vec<u8>,
155 pub client_key_pem: Vec<u8>,
157 pub ca_cert_pem: Option<Vec<u8>>,
159}
160
161impl TlsConfig {
162 pub fn from_files(
164 cert_path: impl AsRef<std::path::Path>,
165 key_path: impl AsRef<std::path::Path>,
166 ca_path: Option<impl AsRef<std::path::Path>>,
167 ) -> std::io::Result<Self> {
168 Ok(Self {
169 client_cert_pem: std::fs::read(cert_path)?,
170 client_key_pem: std::fs::read(key_path)?,
171 ca_cert_pem: ca_path.map(|p| std::fs::read(p)).transpose()?,
172 })
173 }
174}
175
176pub(super) struct ConnectParams<'a> {
181 pub(super) host: &'a str,
182 pub(super) port: u16,
183 pub(super) user: &'a str,
184 pub(super) database: &'a str,
185 pub(super) password: Option<&'a str>,
186 pub(super) auth_settings: AuthSettings,
187 pub(super) gss_token_provider: Option<super::super::GssTokenProvider>,
188 pub(super) gss_token_provider_ex: Option<super::super::GssTokenProviderEx>,
189 pub(super) startup_params: Vec<(String, String)>,
190}
191
192#[inline]
193pub(super) fn has_logical_replication_startup_mode(startup_params: &[(String, String)]) -> bool {
194 startup_params
195 .iter()
196 .any(|(k, v)| k.eq_ignore_ascii_case("replication") && v.eq_ignore_ascii_case("database"))
197}
198
199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub(super) enum StartupAuthFlow {
201 CleartextPassword,
202 Md5Password,
203 Scram { server_final_seen: bool },
204 EnterpriseGss { mechanism: EnterpriseAuthMechanism },
205}
206
207impl StartupAuthFlow {
208 pub(super) fn label(self) -> &'static str {
209 match self {
210 Self::CleartextPassword => "cleartext-password",
211 Self::Md5Password => "md5-password",
212 Self::Scram { .. } => "scram",
213 Self::EnterpriseGss { mechanism } => match mechanism {
214 EnterpriseAuthMechanism::KerberosV5 => "kerberos-v5",
215 EnterpriseAuthMechanism::GssApi => "gssapi",
216 EnterpriseAuthMechanism::Sspi => "sspi",
217 },
218 }
219 }
220}
221
222pub struct PgConnection {
224 pub(crate) stream: PgStream,
225 pub(crate) buffer: BytesMut,
226 pub(crate) write_buf: BytesMut,
227 pub(crate) sql_buf: BytesMut,
228 pub(crate) params_buf: Vec<Option<Vec<u8>>>,
229 pub(crate) prepared_statements: HashMap<String, String>,
230 pub(crate) stmt_cache: StatementCache,
231 pub(crate) column_info_cache: HashMap<u64, Arc<super::super::ColumnInfo>>,
235 pub(crate) process_id: i32,
236 pub(crate) secret_key: i32,
237 pub(crate) notifications: VecDeque<Notification>,
240 pub(crate) replication_stream_active: bool,
242 pub(crate) replication_mode_enabled: bool,
244 pub(crate) last_replication_wal_end: Option<u64>,
246 pub(crate) io_desynced: bool,
249 pub(crate) pending_statement_closes: Vec<String>,
252 pub(crate) draining_statement_closes: bool,
254}