Skip to main content

qail_pg/driver/
types.rs

1//! Core types: ColumnInfo, PgRow, PgError, PgResult, QueryResult, ResultFormat,
2//! and wire-protocol message utilities.
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7/// Metadata about the columns returned by a query.
8///
9/// Maps column names to positional indices and stores OID / format
10/// information so that [`PgRow`] values can be decoded correctly.
11#[derive(Debug, Clone)]
12pub struct ColumnInfo {
13    /// Lookup table from column name to zero-based index.
14    pub name_to_index: HashMap<String, usize>,
15    /// PostgreSQL type OIDs, one per column.
16    pub oids: Vec<u32>,
17    /// Wire format codes (0 = text, 1 = binary), one per column.
18    pub formats: Vec<i16>,
19}
20
21impl ColumnInfo {
22    /// Build column metadata from the `RowDescription` field list
23    /// returned by the backend after a query.
24    pub fn from_fields(fields: &[crate::protocol::FieldDescription]) -> Self {
25        let mut name_to_index = HashMap::with_capacity(fields.len());
26        let mut oids = Vec::with_capacity(fields.len());
27        let mut formats = Vec::with_capacity(fields.len());
28
29        for (i, field) in fields.iter().enumerate() {
30            name_to_index.insert(field.name.clone(), i);
31            oids.push(field.type_oid);
32            formats.push(field.format);
33        }
34
35        Self {
36            name_to_index,
37            oids,
38            formats,
39        }
40    }
41}
42
43/// PostgreSQL row with column data and metadata.
44pub struct PgRow {
45    /// Raw column values — `None` represents SQL `NULL`.
46    pub columns: Vec<Option<Vec<u8>>>,
47    /// Shared column metadata for decoding values by name or type.
48    pub column_info: Option<Arc<ColumnInfo>>,
49}
50
51/// Error type for PostgreSQL driver operations.
52#[derive(Debug)]
53pub enum PgError {
54    /// TCP / TLS connection failure with the PostgreSQL server.
55    Connection(String),
56    /// Wire-protocol framing or decoding error.
57    Protocol(String),
58    /// Authentication failure (bad password, unsupported mechanism, etc.).
59    Auth(String),
60    /// Query execution error returned by the backend (e.g. constraint violation).
61    Query(String),
62    /// Structured server error with SQLSTATE and optional detail/hint fields.
63    QueryServer(PgServerError),
64    /// The query returned zero rows when at least one was expected.
65    NoRows,
66    /// I/O error (preserves inner error for chaining)
67    Io(std::io::Error),
68    /// Encoding error (parameter limit, etc.)
69    Encode(String),
70    /// Operation timed out (connection, acquire, query)
71    Timeout(String),
72    /// Pool exhausted — all connections are in use
73    PoolExhausted {
74        /// Maximum pool size that was reached.
75        max: usize,
76    },
77    /// Pool is closed and no longer accepting requests
78    PoolClosed,
79}
80
81/// Structured PostgreSQL server error fields.
82#[derive(Debug, Clone, PartialEq, Eq)]
83pub struct PgServerError {
84    /// Severity level (e.g. `ERROR`, `FATAL`, `WARNING`).
85    pub severity: String,
86    /// SQLSTATE error code (e.g. `23505`).
87    pub code: String,
88    /// Human-readable message.
89    pub message: String,
90    /// Optional detailed description.
91    pub detail: Option<String>,
92    /// Optional hint from server.
93    pub hint: Option<String>,
94}
95
96impl From<crate::protocol::ErrorFields> for PgServerError {
97    fn from(value: crate::protocol::ErrorFields) -> Self {
98        Self {
99            severity: value.severity,
100            code: value.code,
101            message: value.message,
102            detail: value.detail,
103            hint: value.hint,
104        }
105    }
106}
107
108impl std::fmt::Display for PgError {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        match self {
111            PgError::Connection(e) => write!(f, "Connection error: {}", e),
112            PgError::Protocol(e) => write!(f, "Protocol error: {}", e),
113            PgError::Auth(e) => write!(f, "Auth error: {}", e),
114            PgError::Query(e) => write!(f, "Query error: {}", e),
115            PgError::QueryServer(e) => write!(f, "Query error [{}]: {}", e.code, e.message),
116            PgError::NoRows => write!(f, "No rows returned"),
117            PgError::Io(e) => write!(f, "I/O error: {}", e),
118            PgError::Encode(e) => write!(f, "Encode error: {}", e),
119            PgError::Timeout(ctx) => write!(f, "Timeout: {}", ctx),
120            PgError::PoolExhausted { max } => write!(f, "Pool exhausted ({} max connections)", max),
121            PgError::PoolClosed => write!(f, "Connection pool is closed"),
122        }
123    }
124}
125
126impl std::error::Error for PgError {
127    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
128        match self {
129            PgError::Io(e) => Some(e),
130            _ => None,
131        }
132    }
133}
134
135impl From<std::io::Error> for PgError {
136    fn from(e: std::io::Error) -> Self {
137        PgError::Io(e)
138    }
139}
140
141impl From<crate::protocol::EncodeError> for PgError {
142    fn from(e: crate::protocol::EncodeError) -> Self {
143        PgError::Encode(e.to_string())
144    }
145}
146
147impl PgError {
148    /// Return structured server error fields when available.
149    pub fn server_error(&self) -> Option<&PgServerError> {
150        match self {
151            PgError::QueryServer(err) => Some(err),
152            _ => None,
153        }
154    }
155
156    /// Return SQLSTATE code when available.
157    pub fn sqlstate(&self) -> Option<&str> {
158        self.server_error().map(|e| e.code.as_str())
159    }
160
161    /// True when a cached prepared statement can be self-healed by clearing
162    /// local statement state and retrying once.
163    pub fn is_prepared_statement_retryable(&self) -> bool {
164        let Some(err) = self.server_error() else {
165            return false;
166        };
167
168        let code = err.code.as_str();
169        let message = err.message.to_ascii_lowercase();
170
171        // invalid_sql_statement_name
172        if code.eq_ignore_ascii_case("26000")
173            && message.contains("prepared statement")
174            && message.contains("does not exist")
175        {
176            return true;
177        }
178
179        // feature_not_supported + message heuristic used by PostgreSQL replans.
180        if code.eq_ignore_ascii_case("0A000") && message.contains("cached plan must be replanned") {
181            return true;
182        }
183
184        // Defensive message-only fallback for proxy/failover rewrites.
185        message.contains("cached plan must be replanned")
186    }
187
188    /// True when server reports the prepared statement name already exists.
189    ///
190    /// This typically means local cache eviction drifted from server state
191    /// (e.g. local entry dropped while backend statement still exists).
192    /// Callers can retry once without Parse after preserving local mapping.
193    pub fn is_prepared_statement_already_exists(&self) -> bool {
194        let Some(err) = self.server_error() else {
195            return false;
196        };
197        if !err.code.eq_ignore_ascii_case("42P05") {
198            return false;
199        }
200        let message = err.message.to_ascii_lowercase();
201        message.contains("prepared statement") && message.contains("already exists")
202    }
203
204    /// True when the error is a transient server condition that may succeed
205    /// on retry. Covers serialization failures, deadlocks, standby
206    /// unavailability, connection exceptions, and prepared-statement staleness.
207    ///
208    /// Callers should pair this with a bounded retry loop and backoff.
209    pub fn is_transient_server_error(&self) -> bool {
210        // Non-server errors that are inherently transient.
211        match self {
212            PgError::Timeout(_) => return true,
213            PgError::Io(io) => {
214                return matches!(
215                    io.kind(),
216                    std::io::ErrorKind::TimedOut
217                        | std::io::ErrorKind::ConnectionRefused
218                        | std::io::ErrorKind::ConnectionReset
219                        | std::io::ErrorKind::BrokenPipe
220                        | std::io::ErrorKind::Interrupted
221                );
222            }
223            PgError::Connection(_) => return true,
224            _ => {}
225        }
226
227        // Prepared-statement staleness is a subset of transient errors.
228        if self.is_prepared_statement_retryable() {
229            return true;
230        }
231
232        let Some(code) = self.sqlstate() else {
233            return false;
234        };
235
236        matches!(
237            code,
238            // serialization_failure — MVCC conflict, safe to retry
239            "40001"
240            // deadlock_detected — PG auto-aborts one participant
241            | "40P01"
242            // cannot_connect_now — hot-standby recovery in progress
243            | "57P03"
244            // admin_shutdown / crash_shutdown — server restarting
245            | "57P01"
246            | "57P02"
247        ) || code.starts_with("08") // connection_exception class
248    }
249}
250
251/// Result type for PostgreSQL operations.
252pub type PgResult<T> = Result<T, PgError>;
253
254#[inline]
255pub(crate) fn is_ignorable_session_message(msg: &crate::protocol::BackendMessage) -> bool {
256    matches!(
257        msg,
258        crate::protocol::BackendMessage::NoticeResponse(_)
259            | crate::protocol::BackendMessage::ParameterStatus { .. }
260    )
261}
262
263#[inline]
264pub(crate) fn unexpected_backend_message(
265    phase: &str,
266    msg: &crate::protocol::BackendMessage,
267) -> PgError {
268    PgError::Protocol(format!(
269        "Unexpected backend message during {} phase: {:?}",
270        phase, msg
271    ))
272}
273
274#[inline]
275pub(crate) fn is_ignorable_session_msg_type(msg_type: u8) -> bool {
276    matches!(msg_type, b'N' | b'S')
277}
278
279#[inline]
280pub(crate) fn unexpected_backend_msg_type(phase: &str, msg_type: u8) -> PgError {
281    let printable = if msg_type.is_ascii_graphic() {
282        msg_type as char
283    } else {
284        '?'
285    };
286    PgError::Protocol(format!(
287        "Unexpected backend message type during {} phase: byte={} char={}",
288        phase, msg_type, printable
289    ))
290}
291
292/// Result of a query that returns rows (SELECT/GET).
293#[derive(Debug, Clone)]
294pub struct QueryResult {
295    /// Column names from RowDescription.
296    pub columns: Vec<String>,
297    /// Rows of text-decoded values (None = NULL).
298    pub rows: Vec<Vec<Option<String>>>,
299}
300
301/// PostgreSQL result-column wire format.
302///
303/// - `Text` (0): server sends textual column values.
304/// - `Binary` (1): server sends binary column values.
305#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
306pub enum ResultFormat {
307    /// Text format (`0`)
308    #[default]
309    Text,
310    /// Binary format (`1`)
311    Binary,
312}
313
314impl ResultFormat {
315    #[inline]
316    pub(crate) fn as_wire_code(self) -> i16 {
317        match self {
318            ResultFormat::Text => crate::protocol::PgEncoder::FORMAT_TEXT,
319            ResultFormat::Binary => crate::protocol::PgEncoder::FORMAT_BINARY,
320        }
321    }
322}