1use bytes::Bytes;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8#[derive(Debug, Clone)]
13pub struct ColumnInfo {
14 pub name_to_index: HashMap<String, usize>,
16 pub oids: Vec<u32>,
18 pub formats: Vec<i16>,
20}
21
22impl ColumnInfo {
23 pub fn from_fields(fields: &[crate::protocol::FieldDescription]) -> Self {
26 let mut name_to_index = HashMap::with_capacity(fields.len());
27 let mut oids = Vec::with_capacity(fields.len());
28 let mut formats = Vec::with_capacity(fields.len());
29
30 for (i, field) in fields.iter().enumerate() {
31 name_to_index.insert(field.name.clone(), i);
32 oids.push(field.type_oid);
33 formats.push(field.format);
34 }
35
36 Self {
37 name_to_index,
38 oids,
39 formats,
40 }
41 }
42}
43
44pub struct PgRow {
46 pub columns: Vec<Option<Vec<u8>>>,
48 pub column_info: Option<Arc<ColumnInfo>>,
50}
51
52#[derive(Debug, Clone, Default)]
57pub struct PgBytesRow {
58 pub(crate) payload: Bytes,
59 pub(crate) spans: Vec<Option<(usize, usize)>>,
60 pub column_info: Option<Arc<ColumnInfo>>,
62}
63
64#[derive(Debug)]
66pub enum PgError {
67 Connection(String),
69 Protocol(String),
71 Auth(String),
73 Query(String),
75 QueryServer(PgServerError),
77 NoRows,
79 Io(std::io::Error),
81 Encode(String),
83 Timeout(String),
85 PoolExhausted {
87 max: usize,
89 },
90 PoolClosed,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct PgServerError {
97 pub severity: String,
99 pub code: String,
101 pub message: String,
103 pub detail: Option<String>,
105 pub hint: Option<String>,
107}
108
109impl From<crate::protocol::ErrorFields> for PgServerError {
110 fn from(value: crate::protocol::ErrorFields) -> Self {
111 Self {
112 severity: value.severity,
113 code: value.code,
114 message: value.message,
115 detail: value.detail,
116 hint: value.hint,
117 }
118 }
119}
120
121impl std::fmt::Display for PgError {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 match self {
124 PgError::Connection(e) => write!(f, "Connection error: {}", e),
125 PgError::Protocol(e) => write!(f, "Protocol error: {}", e),
126 PgError::Auth(e) => write!(f, "Auth error: {}", e),
127 PgError::Query(e) => write!(f, "Query error: {}", e),
128 PgError::QueryServer(e) => write!(f, "Query error [{}]: {}", e.code, e.message),
129 PgError::NoRows => write!(f, "No rows returned"),
130 PgError::Io(e) => write!(f, "I/O error: {}", e),
131 PgError::Encode(e) => write!(f, "Encode error: {}", e),
132 PgError::Timeout(ctx) => write!(f, "Timeout: {}", ctx),
133 PgError::PoolExhausted { max } => write!(f, "Pool exhausted ({} max connections)", max),
134 PgError::PoolClosed => write!(f, "Connection pool is closed"),
135 }
136 }
137}
138
139impl std::error::Error for PgError {
140 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
141 match self {
142 PgError::Io(e) => Some(e),
143 _ => None,
144 }
145 }
146}
147
148impl From<std::io::Error> for PgError {
149 fn from(e: std::io::Error) -> Self {
150 PgError::Io(e)
151 }
152}
153
154impl From<crate::protocol::EncodeError> for PgError {
155 fn from(e: crate::protocol::EncodeError) -> Self {
156 PgError::Encode(e.to_string())
157 }
158}
159
160impl PgError {
161 pub fn server_error(&self) -> Option<&PgServerError> {
163 match self {
164 PgError::QueryServer(err) => Some(err),
165 _ => None,
166 }
167 }
168
169 pub fn sqlstate(&self) -> Option<&str> {
171 self.server_error().map(|e| e.code.as_str())
172 }
173
174 pub fn is_prepared_statement_retryable(&self) -> bool {
177 let Some(err) = self.server_error() else {
178 return false;
179 };
180
181 let code = err.code.as_str();
182 let message = err.message.to_ascii_lowercase();
183
184 if code.eq_ignore_ascii_case("26000")
186 && message.contains("prepared statement")
187 && message.contains("does not exist")
188 {
189 return true;
190 }
191
192 if code.eq_ignore_ascii_case("0A000") && message.contains("cached plan must be replanned") {
194 return true;
195 }
196
197 message.contains("cached plan must be replanned")
199 }
200
201 pub fn is_prepared_statement_already_exists(&self) -> bool {
207 let Some(err) = self.server_error() else {
208 return false;
209 };
210 if !err.code.eq_ignore_ascii_case("42P05") {
211 return false;
212 }
213 let message = err.message.to_ascii_lowercase();
214 message.contains("prepared statement") && message.contains("already exists")
215 }
216
217 pub fn is_transient_server_error(&self) -> bool {
223 match self {
225 PgError::Timeout(_) => return true,
226 PgError::Io(io) => {
227 return matches!(
228 io.kind(),
229 std::io::ErrorKind::TimedOut
230 | std::io::ErrorKind::ConnectionRefused
231 | std::io::ErrorKind::ConnectionReset
232 | std::io::ErrorKind::BrokenPipe
233 | std::io::ErrorKind::Interrupted
234 );
235 }
236 PgError::Connection(_) => return true,
237 _ => {}
238 }
239
240 if self.is_prepared_statement_retryable() {
242 return true;
243 }
244
245 let Some(code) = self.sqlstate() else {
246 return false;
247 };
248
249 matches!(
250 code,
251 "40001"
253 | "40P01"
255 | "57P03"
257 | "57P01"
259 | "57P02"
260 ) || code.starts_with("08") }
262}
263
264pub type PgResult<T> = Result<T, PgError>;
266
267#[inline]
268pub(crate) fn is_ignorable_session_message(msg: &crate::protocol::BackendMessage) -> bool {
269 matches!(
270 msg,
271 crate::protocol::BackendMessage::NoticeResponse(_)
272 | crate::protocol::BackendMessage::ParameterStatus { .. }
273 )
274}
275
276#[inline]
277pub(crate) fn unexpected_backend_message(
278 phase: &str,
279 msg: &crate::protocol::BackendMessage,
280) -> PgError {
281 PgError::Protocol(format!(
282 "Unexpected backend message during {} phase: {:?}",
283 phase, msg
284 ))
285}
286
287#[inline]
288pub(crate) fn is_ignorable_session_msg_type(msg_type: u8) -> bool {
289 matches!(msg_type, b'N' | b'S')
290}
291
292#[inline]
293pub(crate) fn unexpected_backend_msg_type(phase: &str, msg_type: u8) -> PgError {
294 let printable = if msg_type.is_ascii_graphic() {
295 msg_type as char
296 } else {
297 '?'
298 };
299 PgError::Protocol(format!(
300 "Unexpected backend message type during {} phase: byte={} char={}",
301 phase, msg_type, printable
302 ))
303}
304
305#[derive(Debug, Clone)]
307pub struct QueryResult {
308 pub columns: Vec<String>,
310 pub rows: Vec<Vec<Option<String>>>,
312}
313
314#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
319pub enum ResultFormat {
320 #[default]
322 Text,
323 Binary,
325}
326
327impl ResultFormat {
328 #[inline]
329 pub(crate) fn as_wire_code(self) -> i16 {
330 match self {
331 ResultFormat::Text => crate::protocol::PgEncoder::FORMAT_TEXT,
332 ResultFormat::Binary => crate::protocol::PgEncoder::FORMAT_BINARY,
333 }
334 }
335}