Skip to main content

qail_pg/driver/
types.rs

1//! Core types: ColumnInfo, PgRow, PgError, PgResult, QueryResult, ResultFormat,
2//! and wire-protocol message utilities.
3
4use bytes::Bytes;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8/// Metadata about the columns returned by a query.
9///
10/// Maps column names to positional indices and stores OID / format
11/// information so that [`PgRow`] values can be decoded correctly.
12#[derive(Debug, Clone)]
13pub struct ColumnInfo {
14    /// Lookup table from column name to zero-based index.
15    pub name_to_index: HashMap<String, usize>,
16    /// PostgreSQL type OIDs, one per column.
17    pub oids: Vec<u32>,
18    /// Wire format codes (0 = text, 1 = binary), one per column.
19    pub formats: Vec<i16>,
20}
21
22impl ColumnInfo {
23    /// Build column metadata from the `RowDescription` field list
24    /// returned by the backend after a query.
25    pub fn from_fields(fields: &[crate::protocol::FieldDescription]) -> Self {
26        let mut name_to_index = HashMap::with_capacity(fields.len());
27        let mut oids = Vec::with_capacity(fields.len());
28        let mut formats = Vec::with_capacity(fields.len());
29
30        for (i, field) in fields.iter().enumerate() {
31            name_to_index.insert(field.name.clone(), i);
32            oids.push(field.type_oid);
33            formats.push(field.format);
34        }
35
36        Self {
37            name_to_index,
38            oids,
39            formats,
40        }
41    }
42}
43
44/// PostgreSQL row with column data and metadata.
45pub struct PgRow {
46    /// Raw column values — `None` represents SQL `NULL`.
47    pub columns: Vec<Option<Vec<u8>>>,
48    /// Shared column metadata for decoding values by name or type.
49    pub column_info: Option<Arc<ColumnInfo>>,
50}
51
52/// PostgreSQL row backed by a single shared payload buffer.
53///
54/// This avoids per-cell byte copies by storing one `Bytes` payload plus
55/// column offsets into that payload.
56#[derive(Debug, Clone, Default)]
57pub struct PgBytesRow {
58    pub(crate) payload: Bytes,
59    pub(crate) spans: Vec<Option<(usize, usize)>>,
60    /// Shared column metadata for decoding values by name or type.
61    pub column_info: Option<Arc<ColumnInfo>>,
62}
63
64/// Error type for PostgreSQL driver operations.
65#[derive(Debug)]
66pub enum PgError {
67    /// TCP / TLS connection failure with the PostgreSQL server.
68    Connection(String),
69    /// Wire-protocol framing or decoding error.
70    Protocol(String),
71    /// Authentication failure (bad password, unsupported mechanism, etc.).
72    Auth(String),
73    /// Query execution error returned by the backend (e.g. constraint violation).
74    Query(String),
75    /// Structured server error with SQLSTATE and optional detail/hint fields.
76    QueryServer(PgServerError),
77    /// The query returned zero rows when at least one was expected.
78    NoRows,
79    /// I/O error (preserves inner error for chaining)
80    Io(std::io::Error),
81    /// Encoding error (parameter limit, etc.)
82    Encode(String),
83    /// Operation timed out (connection, acquire, query)
84    Timeout(String),
85    /// Pool exhausted — all connections are in use
86    PoolExhausted {
87        /// Maximum pool size that was reached.
88        max: usize,
89    },
90    /// Pool is closed and no longer accepting requests
91    PoolClosed,
92}
93
94/// Structured PostgreSQL server error fields.
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct PgServerError {
97    /// Severity level (e.g. `ERROR`, `FATAL`, `WARNING`).
98    pub severity: String,
99    /// SQLSTATE error code (e.g. `23505`).
100    pub code: String,
101    /// Human-readable message.
102    pub message: String,
103    /// Optional detailed description.
104    pub detail: Option<String>,
105    /// Optional hint from server.
106    pub hint: Option<String>,
107}
108
109impl From<crate::protocol::ErrorFields> for PgServerError {
110    fn from(value: crate::protocol::ErrorFields) -> Self {
111        Self {
112            severity: value.severity,
113            code: value.code,
114            message: value.message,
115            detail: value.detail,
116            hint: value.hint,
117        }
118    }
119}
120
121impl std::fmt::Display for PgError {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        match self {
124            PgError::Connection(e) => write!(f, "Connection error: {}", e),
125            PgError::Protocol(e) => write!(f, "Protocol error: {}", e),
126            PgError::Auth(e) => write!(f, "Auth error: {}", e),
127            PgError::Query(e) => write!(f, "Query error: {}", e),
128            PgError::QueryServer(e) => write!(f, "Query error [{}]: {}", e.code, e.message),
129            PgError::NoRows => write!(f, "No rows returned"),
130            PgError::Io(e) => write!(f, "I/O error: {}", e),
131            PgError::Encode(e) => write!(f, "Encode error: {}", e),
132            PgError::Timeout(ctx) => write!(f, "Timeout: {}", ctx),
133            PgError::PoolExhausted { max } => write!(f, "Pool exhausted ({} max connections)", max),
134            PgError::PoolClosed => write!(f, "Connection pool is closed"),
135        }
136    }
137}
138
139impl std::error::Error for PgError {
140    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
141        match self {
142            PgError::Io(e) => Some(e),
143            _ => None,
144        }
145    }
146}
147
148impl From<std::io::Error> for PgError {
149    fn from(e: std::io::Error) -> Self {
150        PgError::Io(e)
151    }
152}
153
154impl From<crate::protocol::EncodeError> for PgError {
155    fn from(e: crate::protocol::EncodeError) -> Self {
156        PgError::Encode(e.to_string())
157    }
158}
159
160impl PgError {
161    /// Return structured server error fields when available.
162    pub fn server_error(&self) -> Option<&PgServerError> {
163        match self {
164            PgError::QueryServer(err) => Some(err),
165            _ => None,
166        }
167    }
168
169    /// Return SQLSTATE code when available.
170    pub fn sqlstate(&self) -> Option<&str> {
171        self.server_error().map(|e| e.code.as_str())
172    }
173
174    /// True when a cached prepared statement can be self-healed by clearing
175    /// local statement state and retrying once.
176    pub fn is_prepared_statement_retryable(&self) -> bool {
177        let Some(err) = self.server_error() else {
178            return false;
179        };
180
181        let code = err.code.as_str();
182        let message = err.message.to_ascii_lowercase();
183
184        // invalid_sql_statement_name
185        if code.eq_ignore_ascii_case("26000")
186            && message.contains("prepared statement")
187            && message.contains("does not exist")
188        {
189            return true;
190        }
191
192        // feature_not_supported + message heuristic used by PostgreSQL replans.
193        if code.eq_ignore_ascii_case("0A000") && message.contains("cached plan must be replanned") {
194            return true;
195        }
196
197        // Defensive message-only fallback for proxy/failover rewrites.
198        message.contains("cached plan must be replanned")
199    }
200
201    /// True when server reports the prepared statement name already exists.
202    ///
203    /// This typically means local cache eviction drifted from server state
204    /// (e.g. local entry dropped while backend statement still exists).
205    /// Callers can retry once without Parse after preserving local mapping.
206    pub fn is_prepared_statement_already_exists(&self) -> bool {
207        let Some(err) = self.server_error() else {
208            return false;
209        };
210        if !err.code.eq_ignore_ascii_case("42P05") {
211            return false;
212        }
213        let message = err.message.to_ascii_lowercase();
214        message.contains("prepared statement") && message.contains("already exists")
215    }
216
217    /// True when the error is a transient server condition that may succeed
218    /// on retry. Covers serialization failures, deadlocks, standby
219    /// unavailability, connection exceptions, and prepared-statement staleness.
220    ///
221    /// Callers should pair this with a bounded retry loop and backoff.
222    pub fn is_transient_server_error(&self) -> bool {
223        // Non-server errors that are inherently transient.
224        match self {
225            PgError::Timeout(_) => return true,
226            PgError::Io(io) => {
227                return matches!(
228                    io.kind(),
229                    std::io::ErrorKind::TimedOut
230                        | std::io::ErrorKind::ConnectionRefused
231                        | std::io::ErrorKind::ConnectionReset
232                        | std::io::ErrorKind::BrokenPipe
233                        | std::io::ErrorKind::Interrupted
234                );
235            }
236            PgError::Connection(_) => return true,
237            _ => {}
238        }
239
240        // Prepared-statement staleness is a subset of transient errors.
241        if self.is_prepared_statement_retryable() {
242            return true;
243        }
244
245        let Some(code) = self.sqlstate() else {
246            return false;
247        };
248
249        matches!(
250            code,
251            // serialization_failure — MVCC conflict, safe to retry
252            "40001"
253            // deadlock_detected — PG auto-aborts one participant
254            | "40P01"
255            // cannot_connect_now — hot-standby recovery in progress
256            | "57P03"
257            // admin_shutdown / crash_shutdown — server restarting
258            | "57P01"
259            | "57P02"
260        ) || code.starts_with("08") // connection_exception class
261    }
262}
263
264/// Result type for PostgreSQL operations.
265pub type PgResult<T> = Result<T, PgError>;
266
267#[inline]
268pub(crate) fn is_ignorable_session_message(msg: &crate::protocol::BackendMessage) -> bool {
269    matches!(
270        msg,
271        crate::protocol::BackendMessage::NoticeResponse(_)
272            | crate::protocol::BackendMessage::ParameterStatus { .. }
273    )
274}
275
276#[inline]
277pub(crate) fn unexpected_backend_message(
278    phase: &str,
279    msg: &crate::protocol::BackendMessage,
280) -> PgError {
281    PgError::Protocol(format!(
282        "Unexpected backend message during {} phase: {:?}",
283        phase, msg
284    ))
285}
286
287#[inline]
288pub(crate) fn is_ignorable_session_msg_type(msg_type: u8) -> bool {
289    matches!(msg_type, b'N' | b'S')
290}
291
292#[inline]
293pub(crate) fn unexpected_backend_msg_type(phase: &str, msg_type: u8) -> PgError {
294    let printable = if msg_type.is_ascii_graphic() {
295        msg_type as char
296    } else {
297        '?'
298    };
299    PgError::Protocol(format!(
300        "Unexpected backend message type during {} phase: byte={} char={}",
301        phase, msg_type, printable
302    ))
303}
304
305/// Result of a query that returns rows (SELECT/GET).
306#[derive(Debug, Clone)]
307pub struct QueryResult {
308    /// Column names from RowDescription.
309    pub columns: Vec<String>,
310    /// Rows of text-decoded values (None = NULL).
311    pub rows: Vec<Vec<Option<String>>>,
312}
313
314/// PostgreSQL result-column wire format.
315///
316/// - `Text` (0): server sends textual column values.
317/// - `Binary` (1): server sends binary column values.
318#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
319pub enum ResultFormat {
320    /// Text format (`0`)
321    #[default]
322    Text,
323    /// Binary format (`1`)
324    Binary,
325}
326
327impl ResultFormat {
328    #[inline]
329    pub(crate) fn as_wire_code(self) -> i16 {
330        match self {
331            ResultFormat::Text => crate::protocol::PgEncoder::FORMAT_TEXT,
332            ResultFormat::Binary => crate::protocol::PgEncoder::FORMAT_BINARY,
333        }
334    }
335}