1use std::collections::HashMap;
5use std::sync::Arc;
6
7#[derive(Debug, Clone)]
12pub struct ColumnInfo {
13 pub name_to_index: HashMap<String, usize>,
15 pub oids: Vec<u32>,
17 pub formats: Vec<i16>,
19}
20
21impl ColumnInfo {
22 pub fn from_fields(fields: &[crate::protocol::FieldDescription]) -> Self {
25 let mut name_to_index = HashMap::with_capacity(fields.len());
26 let mut oids = Vec::with_capacity(fields.len());
27 let mut formats = Vec::with_capacity(fields.len());
28
29 for (i, field) in fields.iter().enumerate() {
30 name_to_index.insert(field.name.clone(), i);
31 oids.push(field.type_oid);
32 formats.push(field.format);
33 }
34
35 Self {
36 name_to_index,
37 oids,
38 formats,
39 }
40 }
41}
42
43pub struct PgRow {
45 pub columns: Vec<Option<Vec<u8>>>,
47 pub column_info: Option<Arc<ColumnInfo>>,
49}
50
51#[derive(Debug)]
53pub enum PgError {
54 Connection(String),
56 Protocol(String),
58 Auth(String),
60 Query(String),
62 QueryServer(PgServerError),
64 NoRows,
66 Io(std::io::Error),
68 Encode(String),
70 Timeout(String),
72 PoolExhausted {
74 max: usize,
76 },
77 PoolClosed,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq)]
83pub struct PgServerError {
84 pub severity: String,
86 pub code: String,
88 pub message: String,
90 pub detail: Option<String>,
92 pub hint: Option<String>,
94}
95
96impl From<crate::protocol::ErrorFields> for PgServerError {
97 fn from(value: crate::protocol::ErrorFields) -> Self {
98 Self {
99 severity: value.severity,
100 code: value.code,
101 message: value.message,
102 detail: value.detail,
103 hint: value.hint,
104 }
105 }
106}
107
108impl std::fmt::Display for PgError {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 match self {
111 PgError::Connection(e) => write!(f, "Connection error: {}", e),
112 PgError::Protocol(e) => write!(f, "Protocol error: {}", e),
113 PgError::Auth(e) => write!(f, "Auth error: {}", e),
114 PgError::Query(e) => write!(f, "Query error: {}", e),
115 PgError::QueryServer(e) => write!(f, "Query error [{}]: {}", e.code, e.message),
116 PgError::NoRows => write!(f, "No rows returned"),
117 PgError::Io(e) => write!(f, "I/O error: {}", e),
118 PgError::Encode(e) => write!(f, "Encode error: {}", e),
119 PgError::Timeout(ctx) => write!(f, "Timeout: {}", ctx),
120 PgError::PoolExhausted { max } => write!(f, "Pool exhausted ({} max connections)", max),
121 PgError::PoolClosed => write!(f, "Connection pool is closed"),
122 }
123 }
124}
125
126impl std::error::Error for PgError {
127 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
128 match self {
129 PgError::Io(e) => Some(e),
130 _ => None,
131 }
132 }
133}
134
135impl From<std::io::Error> for PgError {
136 fn from(e: std::io::Error) -> Self {
137 PgError::Io(e)
138 }
139}
140
141impl From<crate::protocol::EncodeError> for PgError {
142 fn from(e: crate::protocol::EncodeError) -> Self {
143 PgError::Encode(e.to_string())
144 }
145}
146
147impl PgError {
148 pub fn server_error(&self) -> Option<&PgServerError> {
150 match self {
151 PgError::QueryServer(err) => Some(err),
152 _ => None,
153 }
154 }
155
156 pub fn sqlstate(&self) -> Option<&str> {
158 self.server_error().map(|e| e.code.as_str())
159 }
160
161 pub fn is_prepared_statement_retryable(&self) -> bool {
164 let Some(err) = self.server_error() else {
165 return false;
166 };
167
168 let code = err.code.as_str();
169 let message = err.message.to_ascii_lowercase();
170
171 if code.eq_ignore_ascii_case("26000")
173 && message.contains("prepared statement")
174 && message.contains("does not exist")
175 {
176 return true;
177 }
178
179 if code.eq_ignore_ascii_case("0A000") && message.contains("cached plan must be replanned") {
181 return true;
182 }
183
184 message.contains("cached plan must be replanned")
186 }
187
188 pub fn is_prepared_statement_already_exists(&self) -> bool {
194 let Some(err) = self.server_error() else {
195 return false;
196 };
197 if !err.code.eq_ignore_ascii_case("42P05") {
198 return false;
199 }
200 let message = err.message.to_ascii_lowercase();
201 message.contains("prepared statement") && message.contains("already exists")
202 }
203
204 pub fn is_transient_server_error(&self) -> bool {
210 match self {
212 PgError::Timeout(_) => return true,
213 PgError::Io(io) => {
214 return matches!(
215 io.kind(),
216 std::io::ErrorKind::TimedOut
217 | std::io::ErrorKind::ConnectionRefused
218 | std::io::ErrorKind::ConnectionReset
219 | std::io::ErrorKind::BrokenPipe
220 | std::io::ErrorKind::Interrupted
221 );
222 }
223 PgError::Connection(_) => return true,
224 _ => {}
225 }
226
227 if self.is_prepared_statement_retryable() {
229 return true;
230 }
231
232 let Some(code) = self.sqlstate() else {
233 return false;
234 };
235
236 matches!(
237 code,
238 "40001"
240 | "40P01"
242 | "57P03"
244 | "57P01"
246 | "57P02"
247 ) || code.starts_with("08") }
249}
250
251pub type PgResult<T> = Result<T, PgError>;
253
254#[inline]
255pub(crate) fn is_ignorable_session_message(msg: &crate::protocol::BackendMessage) -> bool {
256 matches!(
257 msg,
258 crate::protocol::BackendMessage::NoticeResponse(_)
259 | crate::protocol::BackendMessage::ParameterStatus { .. }
260 )
261}
262
263#[inline]
264pub(crate) fn unexpected_backend_message(
265 phase: &str,
266 msg: &crate::protocol::BackendMessage,
267) -> PgError {
268 PgError::Protocol(format!(
269 "Unexpected backend message during {} phase: {:?}",
270 phase, msg
271 ))
272}
273
274#[inline]
275pub(crate) fn is_ignorable_session_msg_type(msg_type: u8) -> bool {
276 matches!(msg_type, b'N' | b'S')
277}
278
279#[inline]
280pub(crate) fn unexpected_backend_msg_type(phase: &str, msg_type: u8) -> PgError {
281 let printable = if msg_type.is_ascii_graphic() {
282 msg_type as char
283 } else {
284 '?'
285 };
286 PgError::Protocol(format!(
287 "Unexpected backend message type during {} phase: byte={} char={}",
288 phase, msg_type, printable
289 ))
290}
291
292#[derive(Debug, Clone)]
294pub struct QueryResult {
295 pub columns: Vec<String>,
297 pub rows: Vec<Vec<Option<String>>>,
299}
300
301#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
306pub enum ResultFormat {
307 #[default]
309 Text,
310 Binary,
312}
313
314impl ResultFormat {
315 #[inline]
316 pub(crate) fn as_wire_code(self) -> i16 {
317 match self {
318 ResultFormat::Text => crate::protocol::PgEncoder::FORMAT_TEXT,
319 ResultFormat::Binary => crate::protocol::PgEncoder::FORMAT_BINARY,
320 }
321 }
322}