Skip to main content

hyperdb_api_core/protocol/message/
backend.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Backend (server-to-client) messages.
5//!
6//! This module parses messages sent from the Hyper server to the client.
7//! Each message variant is discriminated by a single-byte tag; see the
8//! `*_TAG` constants and the [`Message`] enum for the full catalog.
9//!
10//! # Authentication Flow
11//!
12//! The server initiates authentication with an `Authentication*` message.
13//! Supported methods:
14//!
15//! | Method | Tag subtype | Flow |
16//! |---|---|---|
17//! | Trust | `R(0)` | Server sends `AuthenticationOk` immediately |
18//! | Cleartext | `R(3)` | Server requests password; client sends `PasswordMessage` |
19//! | MD5 | `R(5)` + 4-byte salt | Client hashes `md5(md5(password + user) + salt)` |
20//! | SCRAM-SHA-256 | `R(10)` | Multi-step challenge-response via `SASLInitialResponse` / `SASLResponse` |
21//!
22//! After successful authentication the server sends `AuthenticationOk`,
23//! followed by `ParameterStatus` messages, `BackendKeyData`, and finally
24//! `ReadyForQuery`.
25//!
26//! # Attribution
27//!
28//! Portions of this module were adapted from
29//! [`postgres-protocol`](https://github.com/sfackler/rust-postgres)'s
30//! `message/backend.rs` (Copyright (c) 2016 Steven Fackler, MIT or
31//! Apache-2.0). Adapted material includes the message-tag constants
32//! (`PARSE_COMPLETE_TAG = b'1'`, `BIND_COMPLETE_TAG = b'2'`, ...,
33//! `AUTHENTICATION_TAG = b'R'`, etc.), the `Header` struct shape, and
34//! message-framing logic. Hyper-specific changes added on top include the
35//! HyperBinary COPY format and Hyper-specific message variants. See the
36//! `NOTICE` file at the repo root for the full upstream copyright and
37//! reproduced license text.
38
39use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
40use bytes::{Bytes, BytesMut};
41use memchr::memchr;
42use std::io::{self, Read};
43use std::ops::Range;
44use std::str;
45
46use crate::types::Oid;
47
48/// Message tag for `ParseComplete` ('1').
49pub const PARSE_COMPLETE_TAG: u8 = b'1';
50/// Message tag for `BindComplete` ('2').
51pub const BIND_COMPLETE_TAG: u8 = b'2';
52/// Message tag for `CloseComplete` ('3').
53pub const CLOSE_COMPLETE_TAG: u8 = b'3';
54/// Message tag for `NotificationResponse` ('A').
55pub const NOTIFICATION_RESPONSE_TAG: u8 = b'A';
56/// Message tag for `CopyDone` ('c').
57pub const COPY_DONE_TAG: u8 = b'c';
58/// Message tag for `CommandComplete` ('C').
59pub const COMMAND_COMPLETE_TAG: u8 = b'C';
60/// Message tag for `CopyData` ('d').
61pub const COPY_DATA_TAG: u8 = b'd';
62/// Message tag for `DataRow` ('D').
63pub const DATA_ROW_TAG: u8 = b'D';
64/// Message tag for `ErrorResponse` ('E').
65pub const ERROR_RESPONSE_TAG: u8 = b'E';
66/// Message tag for `CopyInResponse` ('G').
67pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
68/// Message tag for `CopyOutResponse` ('H').
69pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
70/// Message tag for `EmptyQueryResponse` ('I').
71pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
72/// Message tag for `BackendKeyData` ('K').
73pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
74/// Message tag for `NoData` ('n').
75pub const NO_DATA_TAG: u8 = b'n';
76/// Message tag for `NoticeResponse` ('N').
77pub const NOTICE_RESPONSE_TAG: u8 = b'N';
78/// Message tag for Authentication ('R').
79pub const AUTHENTICATION_TAG: u8 = b'R';
80/// Message tag for `PortalSuspended` ('s').
81pub const PORTAL_SUSPENDED_TAG: u8 = b's';
82/// Message tag for `ParameterStatus` ('S').
83pub const PARAMETER_STATUS_TAG: u8 = b'S';
84/// Message tag for `ParameterDescription` ('t').
85pub const PARAMETER_DESCRIPTION_TAG: u8 = b't';
86/// Message tag for `RowDescription` ('T').
87pub const ROW_DESCRIPTION_TAG: u8 = b'T';
88/// Message tag for `ReadyForQuery` ('Z').
89pub const READY_FOR_QUERY_TAG: u8 = b'Z';
90/// Message header information.
91///
92/// `PostgreSQL` wire protocol messages start with a 5-byte header:
93/// - 1 byte: message type tag
94/// - 4 bytes: message length (`BigEndian`, including the length field itself)
95#[derive(Debug, Copy, Clone)]
96pub struct Header {
97    /// Message type tag (e.g., 'R' for Authentication, 'T' for `RowDescription`).
98    tag: u8,
99    /// Total message length in bytes, including the 4-byte length field itself.
100    len: i32,
101}
102
103impl Header {
104    /// Parses a message header from a buffer.
105    ///
106    /// Returns `Ok(None)` if the buffer is too short (< 5 bytes).
107    /// Returns `Err` if the parsed length is invalid (< 4).
108    ///
109    /// # Arguments
110    ///
111    /// * `buf` - Buffer containing at least the first 5 bytes of a message
112    ///
113    /// # Errors
114    ///
115    /// Returns [`io::ErrorKind::InvalidData`] if the parsed length field
116    /// is less than 4 (which is the minimum valid length — the 4-byte
117    /// length field itself).
118    #[inline]
119    pub fn parse(buf: &[u8]) -> io::Result<Option<Header>> {
120        if buf.len() < 5 {
121            return Ok(None);
122        }
123
124        let tag = buf[0];
125        let len = BigEndian::read_i32(&buf[1..]);
126
127        if len < 4 {
128            return Err(io::Error::new(
129                io::ErrorKind::InvalidData,
130                "invalid message length: header length < 4",
131            ));
132        }
133
134        Ok(Some(Header { tag, len }))
135    }
136
137    /// Returns the message tag.
138    #[inline]
139    #[must_use]
140    pub fn tag(self) -> u8 {
141        self.tag
142    }
143
144    /// Returns the message length (including itself).
145    #[inline]
146    #[must_use]
147    pub fn len(self) -> i32 {
148        self.len
149    }
150
151    /// Returns true if the message length is 0.
152    ///
153    /// Note: A valid message always has a length of at least 4 (for the length field itself),
154    /// so this should always return false for valid messages.
155    #[inline]
156    #[must_use]
157    pub fn is_empty(self) -> bool {
158        self.len == 0
159    }
160}
161
162/// An enum representing backend messages from Hyper server.
163#[non_exhaustive]
164#[derive(Debug)]
165pub enum Message {
166    /// Authentication request completed successfully.
167    AuthenticationOk,
168    /// Authentication via cleartext password.
169    AuthenticationCleartextPassword,
170    /// Authentication via MD5 password.
171    AuthenticationMd5Password(AuthenticationMd5PasswordBody),
172    /// Authentication via SASL.
173    AuthenticationSasl(AuthenticationSaslBody),
174    /// SASL authentication continue.
175    AuthenticationSaslContinue(AuthenticationSaslContinueBody),
176    /// SASL authentication final.
177    AuthenticationSaslFinal(AuthenticationSaslFinalBody),
178    /// Backend key data for cancel requests.
179    BackendKeyData(BackendKeyDataBody),
180    /// Bind operation complete.
181    BindComplete,
182    /// Close operation complete.
183    CloseComplete,
184    /// Command completed.
185    CommandComplete(CommandCompleteBody),
186    /// COPY data.
187    CopyData(CopyDataBody),
188    /// COPY operation done.
189    CopyDone,
190    /// COPY IN response (client should send data).
191    CopyInResponse(CopyInResponseBody),
192    /// COPY OUT response (server will send data).
193    CopyOutResponse(CopyOutResponseBody),
194    /// A row of data.
195    DataRow(DataRowBody),
196    /// Empty query response.
197    EmptyQueryResponse,
198    /// Error response.
199    ErrorResponse(ErrorResponseBody),
200    /// No data (for empty result sets).
201    NoData,
202    /// Notice response (warning).
203    NoticeResponse(NoticeResponseBody),
204    /// Notification from LISTEN/NOTIFY.
205    NotificationResponse(NotificationResponseBody),
206    /// Parameter description for prepared statement.
207    ParameterDescription(ParameterDescriptionBody),
208    /// Parameter status update.
209    ParameterStatus(ParameterStatusBody),
210    /// Parse operation complete.
211    ParseComplete,
212    /// Portal suspended (for partial results).
213    PortalSuspended,
214    /// Ready for query.
215    ReadyForQuery(ReadyForQueryBody),
216    /// Row description (column metadata).
217    RowDescription(RowDescriptionBody),
218}
219
220impl Message {
221    /// Parses a backend message from a buffer.
222    ///
223    /// Reads the message header (5 bytes: tag + length) and parses the complete
224    /// message body. Returns `Ok(None)` if the buffer doesn't contain a complete
225    /// message yet. The buffer is advanced when a complete message is parsed.
226    ///
227    /// # Arguments
228    ///
229    /// * `buf` - Buffer containing message data (may be partial)
230    ///
231    /// # Errors
232    ///
233    /// - Returns [`io::ErrorKind::InvalidInput`] if the declared length
234    ///   field is less than 4 (below the minimum valid message size).
235    /// - Returns an I/O error from the inner `Buffer` reads when a body
236    ///   field is truncated, a C-string is not NUL-terminated, or a
237    ///   string field contains invalid UTF-8.
238    /// - Returns [`io::ErrorKind::InvalidInput`] for unexpected
239    ///   authentication sub-tags or unknown top-level message tags.
240    ///
241    /// # Panics
242    ///
243    /// Does not panic in practice. The `read_u32` call on `&buf[1..5]`
244    /// is proven infallible by the preceding `buf.len() < 5` short-circuit.
245    #[inline]
246    pub fn parse(buf: &mut BytesMut) -> io::Result<Option<Message>> {
247        if buf.len() < 5 {
248            let to_read = 5 - buf.len();
249            buf.reserve(to_read);
250            return Ok(None);
251        }
252
253        let tag = buf[0];
254        let len = (&buf[1..5]).read_u32::<BigEndian>().unwrap();
255
256        if len < 4 {
257            return Err(io::Error::new(
258                io::ErrorKind::InvalidInput,
259                "invalid message length: parsing u32",
260            ));
261        }
262
263        // Defensively use checked arithmetic. `len` is u32 from the wire — a
264        // hostile or buggy server could send u32::MAX, which on 32-bit platforms
265        // wraps usize and on 64-bit produces an oversized allocation.
266        let Some(total_len) = (len as usize).checked_add(1) else {
267            return Err(io::Error::new(
268                io::ErrorKind::InvalidInput,
269                format!("invalid message length: {len} + 1 overflows usize"),
270            ));
271        };
272        if buf.len() < total_len {
273            let to_read = total_len - buf.len();
274            buf.reserve(to_read);
275            return Ok(None);
276        }
277
278        let mut buf = Buffer {
279            bytes: buf.split_to(total_len).freeze(),
280            idx: 5,
281        };
282
283        let message = match tag {
284            PARSE_COMPLETE_TAG => Message::ParseComplete,
285            BIND_COMPLETE_TAG => Message::BindComplete,
286            CLOSE_COMPLETE_TAG => Message::CloseComplete,
287            NOTIFICATION_RESPONSE_TAG => {
288                let process_id = buf.read_i32::<BigEndian>()?;
289                let channel = buf.read_cstr()?;
290                let message = buf.read_cstr()?;
291                Message::NotificationResponse(NotificationResponseBody {
292                    process_id,
293                    channel,
294                    message,
295                })
296            }
297            COPY_DONE_TAG => Message::CopyDone,
298            COMMAND_COMPLETE_TAG => {
299                let tag = buf.read_cstr()?;
300                Message::CommandComplete(CommandCompleteBody { tag })
301            }
302            COPY_DATA_TAG => {
303                let storage = buf.read_all();
304                Message::CopyData(CopyDataBody { storage })
305            }
306            DATA_ROW_TAG => {
307                let len = buf.read_u16::<BigEndian>()?;
308                let storage = buf.read_all();
309                Message::DataRow(DataRowBody { storage, len })
310            }
311            ERROR_RESPONSE_TAG => {
312                let storage = buf.read_all();
313                Message::ErrorResponse(ErrorResponseBody { storage })
314            }
315            COPY_IN_RESPONSE_TAG => {
316                let format = buf.read_u8()?;
317                let len = buf.read_u16::<BigEndian>()?;
318                let storage = buf.read_all();
319                Message::CopyInResponse(CopyInResponseBody {
320                    format,
321                    len,
322                    storage,
323                })
324            }
325            COPY_OUT_RESPONSE_TAG => {
326                let format = buf.read_u8()?;
327                let len = buf.read_u16::<BigEndian>()?;
328                let storage = buf.read_all();
329                Message::CopyOutResponse(CopyOutResponseBody {
330                    format,
331                    len,
332                    storage,
333                })
334            }
335            EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
336            BACKEND_KEY_DATA_TAG => {
337                let process_id = buf.read_i32::<BigEndian>()?;
338                let secret_key = buf.read_i32::<BigEndian>()?;
339                Message::BackendKeyData(BackendKeyDataBody {
340                    process_id,
341                    secret_key,
342                })
343            }
344            NO_DATA_TAG => Message::NoData,
345            NOTICE_RESPONSE_TAG => {
346                let storage = buf.read_all();
347                Message::NoticeResponse(NoticeResponseBody { storage })
348            }
349            AUTHENTICATION_TAG => match buf.read_i32::<BigEndian>()? {
350                0 => Message::AuthenticationOk,
351                3 => Message::AuthenticationCleartextPassword,
352                5 => {
353                    let mut salt = [0; 4];
354                    buf.read_exact(&mut salt)?;
355                    Message::AuthenticationMd5Password(AuthenticationMd5PasswordBody { salt })
356                }
357                10 => {
358                    let storage = buf.read_all();
359                    Message::AuthenticationSasl(AuthenticationSaslBody(storage))
360                }
361                11 => {
362                    let storage = buf.read_all();
363                    Message::AuthenticationSaslContinue(AuthenticationSaslContinueBody(storage))
364                }
365                12 => {
366                    let storage = buf.read_all();
367                    Message::AuthenticationSaslFinal(AuthenticationSaslFinalBody(storage))
368                }
369                tag => {
370                    return Err(io::Error::new(
371                        io::ErrorKind::InvalidInput,
372                        format!("unknown authentication tag `{tag}`"),
373                    ));
374                }
375            },
376            PORTAL_SUSPENDED_TAG => Message::PortalSuspended,
377            PARAMETER_STATUS_TAG => {
378                let name = buf.read_cstr()?;
379                let value = buf.read_cstr()?;
380                Message::ParameterStatus(ParameterStatusBody { name, value })
381            }
382            PARAMETER_DESCRIPTION_TAG => {
383                let len = buf.read_u16::<BigEndian>()?;
384                let storage = buf.read_all();
385                Message::ParameterDescription(ParameterDescriptionBody { storage, len })
386            }
387            ROW_DESCRIPTION_TAG => {
388                let len = buf.read_u16::<BigEndian>()?;
389                let storage = buf.read_all();
390                Message::RowDescription(RowDescriptionBody { storage, len })
391            }
392            READY_FOR_QUERY_TAG => {
393                let status = buf.read_u8()?;
394                Message::ReadyForQuery(ReadyForQueryBody { status })
395            }
396            tag => {
397                return Err(io::Error::new(
398                    io::ErrorKind::InvalidInput,
399                    format!("unknown message tag `{tag}`"),
400                ));
401            }
402        };
403
404        if !buf.is_empty() {
405            return Err(io::Error::new(
406                io::ErrorKind::InvalidInput,
407                "invalid message length: expected buffer to be empty",
408            ));
409        }
410
411        Ok(Some(message))
412    }
413}
414
415/// Internal buffer for parsing message bodies.
416///
417/// Maintains a position index to track parsing progress through the message bytes.
418struct Buffer {
419    /// The complete message bytes.
420    bytes: Bytes,
421    /// Current parsing position within the bytes.
422    idx: usize,
423}
424
425impl Buffer {
426    /// Returns a slice of the remaining unparsed bytes.
427    #[inline]
428    fn slice(&self) -> &[u8] {
429        &self.bytes[self.idx..]
430    }
431
432    /// Returns true if all bytes have been consumed.
433    #[inline]
434    fn is_empty(&self) -> bool {
435        self.slice().is_empty()
436    }
437
438    /// Reads a null-terminated C string from the buffer.
439    ///
440    /// Advances the position past the null terminator.
441    /// Returns the bytes without the null terminator.
442    #[inline]
443    fn read_cstr(&mut self) -> io::Result<Bytes> {
444        match memchr(0, self.slice()) {
445            Some(pos) => {
446                let start = self.idx;
447                let end = start + pos;
448                let cstr = self.bytes.slice(start..end);
449                self.idx = end + 1;
450                Ok(cstr)
451            }
452            None => Err(io::Error::new(
453                io::ErrorKind::UnexpectedEof,
454                "unexpected EOF",
455            )),
456        }
457    }
458
459    /// Reads all remaining bytes from the buffer.
460    ///
461    /// Advances the position to the end of the buffer.
462    #[inline]
463    fn read_all(&mut self) -> Bytes {
464        let buf = self.bytes.slice(self.idx..);
465        self.idx = self.bytes.len();
466        buf
467    }
468}
469
470impl Read for Buffer {
471    #[inline]
472    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
473        let len = {
474            let slice = self.slice();
475            let len = std::cmp::min(slice.len(), buf.len());
476            buf[..len].copy_from_slice(&slice[..len]);
477            len
478        };
479        self.idx += len;
480        Ok(len)
481    }
482}
483
484// Message body types
485
486/// MD5 password authentication body.
487///
488/// Contains the 4-byte salt used for MD5 password hashing.
489/// The client must hash the password with this salt and send it back.
490#[derive(Debug)]
491pub struct AuthenticationMd5PasswordBody {
492    /// 4-byte random salt for MD5 password hashing.
493    salt: [u8; 4],
494}
495
496impl AuthenticationMd5PasswordBody {
497    /// Returns the salt.
498    #[inline]
499    #[must_use]
500    pub fn salt(&self) -> [u8; 4] {
501        self.salt
502    }
503}
504
505/// SASL authentication body.
506///
507/// Contains a list of supported SASL authentication mechanisms,
508/// each as a null-terminated string. The list ends with a double null.
509#[derive(Debug)]
510pub struct AuthenticationSaslBody(Bytes);
511
512impl AuthenticationSaslBody {
513    /// Returns the raw data.
514    #[inline]
515    pub fn data(&self) -> &[u8] {
516        &self.0
517    }
518
519    /// Returns an iterator over the available SASL mechanisms.
520    #[inline]
521    pub fn mechanisms(&self) -> SaslMechanisms<'_> {
522        SaslMechanisms { buf: &self.0 }
523    }
524}
525
526/// Iterator over SASL mechanism names in an authentication message.
527///
528/// Parses null-terminated strings from the SASL authentication body.
529#[derive(Debug)]
530pub struct SaslMechanisms<'a> {
531    /// Buffer containing null-terminated mechanism names.
532    buf: &'a [u8],
533}
534
535impl<'a> Iterator for SaslMechanisms<'a> {
536    type Item = &'a str;
537
538    fn next(&mut self) -> Option<Self::Item> {
539        if self.buf.is_empty() || self.buf[0] == 0 {
540            return None;
541        }
542
543        match memchr(0, self.buf) {
544            Some(pos) => {
545                let mechanism = str::from_utf8(&self.buf[..pos]).ok()?;
546                self.buf = &self.buf[pos + 1..];
547                Some(mechanism)
548            }
549            None => None,
550        }
551    }
552}
553
554/// SASL continue authentication body.
555///
556/// Contains server challenge data for the next step in SASL authentication.
557/// The format depends on the specific SASL mechanism being used.
558#[derive(Debug)]
559pub struct AuthenticationSaslContinueBody(Bytes);
560
561impl AuthenticationSaslContinueBody {
562    /// Returns the raw data.
563    #[inline]
564    pub fn data(&self) -> &[u8] {
565        &self.0
566    }
567}
568
569/// SASL final authentication body.
570///
571/// Contains the final server response in SASL authentication.
572/// Typically includes a server signature for verification.
573#[derive(Debug)]
574pub struct AuthenticationSaslFinalBody(Bytes);
575
576impl AuthenticationSaslFinalBody {
577    /// Returns the raw data.
578    #[inline]
579    pub fn data(&self) -> &[u8] {
580        &self.0
581    }
582}
583
584/// Backend key data body.
585///
586/// Contains information needed to cancel a running query.
587/// The client can use these values to send a cancel request.
588#[derive(Debug)]
589pub struct BackendKeyDataBody {
590    /// Process ID of the backend server process.
591    process_id: i32,
592    /// Secret key for authenticating cancel requests.
593    secret_key: i32,
594}
595
596impl BackendKeyDataBody {
597    /// Returns the process ID.
598    #[inline]
599    #[must_use]
600    pub fn process_id(&self) -> i32 {
601        self.process_id
602    }
603
604    /// Returns the secret key.
605    #[inline]
606    #[must_use]
607    pub fn secret_key(&self) -> i32 {
608        self.secret_key
609    }
610}
611
612/// Command complete body.
613///
614/// Sent after a command (SELECT, INSERT, UPDATE, DELETE, etc.) completes.
615/// The tag indicates what command was executed and may include row counts.
616#[derive(Debug)]
617pub struct CommandCompleteBody {
618    /// Command tag (e.g., "SELECT 42", "INSERT 0 1", "UPDATE 5").
619    tag: Bytes,
620}
621
622impl CommandCompleteBody {
623    /// Returns the command tag.
624    ///
625    /// # Errors
626    ///
627    /// Returns [`io::ErrorKind::InvalidInput`] if the tag bytes are not
628    /// valid UTF-8. The server sends ASCII command tags like `SELECT 42`,
629    /// so this path is only reachable on protocol corruption.
630    #[inline]
631    pub fn tag(&self) -> io::Result<&str> {
632        str::from_utf8(&self.tag).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
633    }
634}
635
636/// COPY data body.
637///
638/// Contains a chunk of COPY data during COPY IN or COPY OUT operations.
639/// The format depends on the COPY format (text or binary).
640#[derive(Debug)]
641pub struct CopyDataBody {
642    /// Raw COPY data bytes.
643    storage: Bytes,
644}
645
646impl CopyDataBody {
647    /// Returns the data.
648    #[inline]
649    pub fn data(&self) -> &[u8] {
650        &self.storage
651    }
652
653    /// Consumes self and returns the bytes.
654    #[inline]
655    pub fn into_bytes(self) -> Bytes {
656        self.storage
657    }
658}
659
660/// COPY IN response body.
661///
662/// Sent by the server to indicate it's ready to receive COPY data.
663/// The client should send `CopyData` messages until `CopyDone`.
664#[derive(Debug)]
665pub struct CopyInResponseBody {
666    /// Overall format: 0 = text, 1 = binary (`HyperBinary`).
667    format: u8,
668    /// Number of columns in the COPY operation.
669    len: u16,
670    /// Kept for memory ownership of parsed data.
671    #[expect(
672        dead_code,
673        reason = "owns the backing buffer referenced by `format`/`len`; must live as long as the message"
674    )]
675    storage: Bytes,
676}
677
678impl CopyInResponseBody {
679    /// Returns the overall format (0 = text, 1 = binary).
680    #[inline]
681    pub fn format(&self) -> u8 {
682        self.format
683    }
684
685    /// Returns the number of columns.
686    #[inline]
687    pub fn column_count(&self) -> u16 {
688        self.len
689    }
690}
691
692/// COPY OUT response body.
693///
694/// Sent by the server to indicate it will send COPY data.
695/// The client should expect `CopyData` messages until `CopyDone`.
696#[derive(Debug)]
697pub struct CopyOutResponseBody {
698    /// Overall format: 0 = text, 1 = binary (`HyperBinary`).
699    format: u8,
700    /// Number of columns in the COPY operation.
701    len: u16,
702    /// Kept for memory ownership of parsed data.
703    #[expect(
704        dead_code,
705        reason = "owns the backing buffer referenced by `format`/`len`; must live as long as the message"
706    )]
707    storage: Bytes,
708}
709
710impl CopyOutResponseBody {
711    /// Returns the overall format (0 = text, 1 = binary).
712    #[inline]
713    pub fn format(&self) -> u8 {
714        self.format
715    }
716
717    /// Returns the number of columns.
718    #[inline]
719    pub fn column_count(&self) -> u16 {
720        self.len
721    }
722}
723
724/// Data row body.
725///
726/// Contains a single row of data from a query result.
727/// Each column is prefixed with a 4-byte length (BigEndian):
728/// - Positive length: non-NULL value of that length
729/// - -1: NULL value
730#[derive(Debug, Clone)]
731pub struct DataRowBody {
732    /// Raw row data bytes.
733    storage: Bytes,
734    /// Number of columns in this row.
735    len: u16,
736}
737
738impl DataRowBody {
739    /// Returns an iterator over the column ranges.
740    #[inline]
741    pub fn ranges(&self) -> DataRowRanges<'_> {
742        DataRowRanges {
743            buf: &self.storage,
744            len: self.storage.len(),
745            remaining: self.len,
746        }
747    }
748
749    /// Returns the raw buffer.
750    #[inline]
751    pub fn buffer(&self) -> &[u8] {
752        &self.storage
753    }
754
755    /// Returns the number of columns.
756    #[inline]
757    pub fn column_count(&self) -> u16 {
758        self.len
759    }
760
761    /// Pre-computes offsets for **all** columns regardless of count.
762    ///
763    /// Computes offsets for all columns in this row.
764    ///
765    /// This provides O(1) access for any column index using dynamic allocation.
766    /// Each entry is:
767    /// - `Some((start, end))` for non-NULL values (byte range within the buffer)
768    /// - `None` for NULL values or when parsing fails due to truncated data
769    ///
770    /// # Performance Tradeoffs
771    ///
772    /// This method allocates a `Vec` for all columns upfront. For tables with many
773    /// columns where only a few are accessed, this may use more memory than needed.
774    ///
775    /// **When to use this method:**
776    /// - Accessing multiple columns from the same row
777    /// - Random access patterns where column indices vary
778    /// - Bulk processing where setup cost is amortized
779    ///
780    /// **Alternative for sparse access:**
781    /// If you only need one or two columns from a wide table, consider using
782    /// `get_raw(index)` which computes offset on-demand (O(n) per call, but no
783    /// allocation).
784    ///
785    /// # Memory Usage
786    ///
787    /// Allocates `column_count * size_of::<Option<(usize, usize)>>()` bytes.
788    /// For a table with 100 columns, this is approximately 2.4 KB per row.
789    ///
790    /// # Panics
791    ///
792    /// Does not panic in practice. The `usize::try_from(len)` call is
793    /// guarded by a `len >= 0` check, so the conversion from `i32` to
794    /// `usize` is always infallible.
795    #[inline]
796    pub fn compute_all_offsets(&self) -> Vec<Option<(usize, usize)>> {
797        let mut offsets = Vec::with_capacity(self.len as usize);
798
799        let mut pos = 0usize;
800        let buf = &self.storage[..];
801
802        for _ in 0..self.len as usize {
803            if pos + 4 > buf.len() {
804                break;
805            }
806            let len = i32::from_be_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]]);
807            pos += 4;
808
809            if len >= 0 {
810                let len = usize::try_from(len).expect("len >= 0 checked above");
811                let start = pos;
812                let end = start.saturating_add(len);
813                if end > buf.len() {
814                    break;
815                }
816                offsets.push(Some((start, end)));
817                pos = end;
818            } else {
819                offsets.push(None);
820            }
821        }
822
823        offsets
824    }
825
826    /// Gets raw bytes for a column by index, computing offset on demand.
827    /// Returns None if NULL or out of bounds.
828    /// This avoids allocating a Vec of ranges.
829    ///
830    /// # Panics
831    ///
832    /// Does not panic in practice. The `usize::try_from(len)` calls are
833    /// guarded by a `len >= 0` check in each branch, making the
834    /// `i32 -> usize` conversion infallible.
835    #[inline]
836    pub fn get_column_bytes(&self, idx: usize) -> Option<&[u8]> {
837        if idx >= self.len as usize {
838            return None;
839        }
840
841        let mut buf = &self.storage[..];
842
843        // Skip to the requested column
844        for _ in 0..idx {
845            if buf.len() < 4 {
846                return None;
847            }
848            let len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
849            buf = &buf[4..];
850            if len >= 0 {
851                let len = usize::try_from(len).expect("len >= 0 checked above");
852                if buf.len() < len {
853                    return None;
854                }
855                buf = &buf[len..];
856            }
857        }
858
859        // Read the target column
860        if buf.len() < 4 {
861            return None;
862        }
863        let len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
864        if len < 0 {
865            None // NULL
866        } else {
867            let len = usize::try_from(len).expect("len >= 0 checked above");
868            let data_start = 4;
869            if buf.len() < data_start + len {
870                return None;
871            }
872            Some(&buf[data_start..data_start + len])
873        }
874    }
875
876    /// Checks if a column is NULL.
877    ///
878    /// # Panics
879    ///
880    /// Does not panic in practice. Each `usize::try_from(len)` is guarded
881    /// by a `len >= 0` check, so the conversion from `i32` cannot fail.
882    #[inline]
883    pub fn is_column_null(&self, idx: usize) -> bool {
884        if idx >= self.len as usize {
885            return true;
886        }
887
888        let mut buf = &self.storage[..];
889
890        // Skip to the requested column
891        for _ in 0..idx {
892            if buf.len() < 4 {
893                return true;
894            }
895            let len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
896            buf = &buf[4..];
897            if len >= 0 {
898                let len = usize::try_from(len).expect("len >= 0 checked above");
899                if buf.len() < len {
900                    return true;
901                }
902                buf = &buf[len..];
903            }
904        }
905
906        // Check the target column
907        if buf.len() < 4 {
908            return true;
909        }
910        let len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
911        len < 0
912    }
913}
914
915/// Iterator over data row column ranges.
916///
917/// Yields byte ranges for each column in a `DataRow`.
918/// Returns `None` for NULL columns, `Some(range)` for non-NULL columns.
919#[derive(Debug)]
920pub struct DataRowRanges<'a> {
921    /// Buffer containing the row data.
922    buf: &'a [u8],
923    /// Original buffer length (for computing absolute offsets).
924    len: usize,
925    /// Number of columns remaining to parse.
926    remaining: u16,
927}
928
929impl Iterator for DataRowRanges<'_> {
930    type Item = io::Result<Option<Range<usize>>>;
931
932    fn next(&mut self) -> Option<Self::Item> {
933        if self.remaining == 0 {
934            return if self.buf.is_empty() {
935                None
936            } else {
937                Some(Err(io::Error::new(
938                    io::ErrorKind::InvalidInput,
939                    "invalid message length: extra data in data row",
940                )))
941            };
942        }
943
944        self.remaining -= 1;
945
946        if self.buf.len() < 4 {
947            return Some(Err(io::Error::new(
948                io::ErrorKind::UnexpectedEof,
949                "unexpected EOF reading column length",
950            )));
951        }
952
953        let len = i32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]);
954        self.buf = &self.buf[4..];
955
956        if len < 0 {
957            Some(Ok(None)) // NULL value
958        } else {
959            let len = usize::try_from(len).expect("len >= 0 in else branch");
960            if self.buf.len() < len {
961                return Some(Err(io::Error::new(
962                    io::ErrorKind::UnexpectedEof,
963                    "unexpected EOF",
964                )));
965            }
966            let base = self.len - self.buf.len();
967            self.buf = &self.buf[len..];
968            Some(Ok(Some(base..base + len)))
969        }
970    }
971}
972
973/// Error response body.
974///
975/// Contains error information in the form of key-value pairs.
976/// Each field has a type code (single byte) followed by a null-terminated value.
977/// The message ends with a null byte (type code 0).
978#[derive(Debug)]
979pub struct ErrorResponseBody {
980    /// Raw error message bytes.
981    storage: Bytes,
982}
983
984impl ErrorResponseBody {
985    /// Returns an iterator over error fields.
986    #[inline]
987    pub fn fields(&self) -> ErrorFields<'_> {
988        ErrorFields { buf: &self.storage }
989    }
990}
991
992/// Iterator over error fields in an `ErrorResponse` or `NoticeResponse`.
993///
994/// Each field consists of a type code byte followed by a null-terminated value string.
995#[derive(Debug)]
996pub struct ErrorFields<'a> {
997    /// Buffer containing the error/notice fields.
998    buf: &'a [u8],
999}
1000
1001impl<'a> Iterator for ErrorFields<'a> {
1002    type Item = io::Result<ErrorField<'a>>;
1003
1004    fn next(&mut self) -> Option<Self::Item> {
1005        if self.buf.is_empty() {
1006            return None;
1007        }
1008
1009        let type_ = self.buf[0];
1010        self.buf = &self.buf[1..];
1011
1012        if type_ == 0 {
1013            return None;
1014        }
1015
1016        match memchr(0, self.buf) {
1017            Some(pos) => {
1018                let value = &self.buf[..pos];
1019                self.buf = &self.buf[pos + 1..];
1020                Some(Ok(ErrorField { type_, value }))
1021            }
1022            None => Some(Err(io::Error::new(
1023                io::ErrorKind::UnexpectedEof,
1024                "unexpected EOF in error field",
1025            ))),
1026        }
1027    }
1028}
1029
1030/// A single error or notice field.
1031///
1032/// Common field type codes:
1033/// - 'S': Severity
1034/// - 'C': Code (SQLSTATE)
1035/// - 'M': Message
1036/// - 'D': Detail
1037/// - 'H': Hint
1038/// - 'P': Position
1039/// - 'p': Internal position
1040/// - 'q': Internal query
1041/// - 'W': Where
1042/// - 's': Schema name
1043/// - 't': Table name
1044/// - 'c': Column name
1045/// - 'd': Data type name
1046/// - 'n': Constraint name
1047#[derive(Debug)]
1048pub struct ErrorField<'a> {
1049    /// Field type code (single byte).
1050    type_: u8,
1051    /// Field value (without null terminator).
1052    value: &'a [u8],
1053}
1054
1055impl ErrorField<'_> {
1056    /// Returns the field type code.
1057    #[inline]
1058    #[must_use]
1059    pub fn type_(&self) -> u8 {
1060        self.type_
1061    }
1062
1063    /// Returns the field value as bytes.
1064    #[inline]
1065    #[must_use]
1066    pub fn value_bytes(&self) -> &[u8] {
1067        self.value
1068    }
1069
1070    /// Returns the field value as a string.
1071    ///
1072    /// # Errors
1073    ///
1074    /// Returns [`io::ErrorKind::InvalidInput`] if the field value is not
1075    /// valid UTF-8. Server-sent fields are expected to be UTF-8; this path
1076    /// is only reached on protocol corruption.
1077    #[inline]
1078    pub fn value(&self) -> io::Result<&str> {
1079        str::from_utf8(self.value).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
1080    }
1081}
1082
1083/// Notice response body (same format as error).
1084///
1085/// Contains warning or informational messages.
1086/// Uses the same field format as `ErrorResponse` but indicates a warning, not an error.
1087#[derive(Debug)]
1088pub struct NoticeResponseBody {
1089    /// Raw notice message bytes.
1090    storage: Bytes,
1091}
1092
1093impl NoticeResponseBody {
1094    /// Returns an iterator over notice fields.
1095    #[inline]
1096    pub fn fields(&self) -> ErrorFields<'_> {
1097        ErrorFields { buf: &self.storage }
1098    }
1099}
1100
1101/// Notification response body.
1102///
1103/// Sent when a NOTIFY event occurs that the client is listening to.
1104/// Used for asynchronous notifications between database sessions.
1105#[derive(Debug)]
1106pub struct NotificationResponseBody {
1107    /// Process ID of the backend that sent the notification.
1108    process_id: i32,
1109    /// Channel name (null-terminated).
1110    channel: Bytes,
1111    /// Notification payload (null-terminated).
1112    message: Bytes,
1113}
1114
1115impl NotificationResponseBody {
1116    /// Returns the process ID.
1117    #[inline]
1118    pub fn process_id(&self) -> i32 {
1119        self.process_id
1120    }
1121
1122    /// Returns the channel name.
1123    ///
1124    /// # Errors
1125    ///
1126    /// Returns [`io::ErrorKind::InvalidInput`] if the channel bytes are
1127    /// not valid UTF-8 (protocol corruption — server always sends UTF-8).
1128    #[inline]
1129    pub fn channel(&self) -> io::Result<&str> {
1130        str::from_utf8(&self.channel).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
1131    }
1132
1133    /// Returns the message.
1134    ///
1135    /// # Errors
1136    ///
1137    /// Returns [`io::ErrorKind::InvalidInput`] if the payload bytes are
1138    /// not valid UTF-8 (protocol corruption — server always sends UTF-8).
1139    #[inline]
1140    pub fn message(&self) -> io::Result<&str> {
1141        str::from_utf8(&self.message).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
1142    }
1143}
1144
1145/// Parameter description body.
1146///
1147/// Sent in response to a Parse message, describing the parameter types
1148/// expected by the prepared statement.
1149#[derive(Debug)]
1150pub struct ParameterDescriptionBody {
1151    /// Raw parameter OID bytes.
1152    storage: Bytes,
1153    /// Number of parameters.
1154    len: u16,
1155}
1156
1157impl ParameterDescriptionBody {
1158    /// Returns an iterator over parameter OIDs.
1159    #[inline]
1160    pub fn parameters(&self) -> Parameters<'_> {
1161        Parameters {
1162            buf: &self.storage,
1163            remaining: self.len,
1164        }
1165    }
1166}
1167
1168/// Iterator over parameter OIDs in a `ParameterDescription`.
1169///
1170/// Each parameter is represented by a 4-byte OID (`BigEndian`).
1171#[derive(Debug)]
1172pub struct Parameters<'a> {
1173    /// Buffer containing parameter OIDs.
1174    buf: &'a [u8],
1175    /// Number of parameters remaining to parse.
1176    remaining: u16,
1177}
1178
1179impl Iterator for Parameters<'_> {
1180    type Item = io::Result<Oid>;
1181
1182    fn next(&mut self) -> Option<Self::Item> {
1183        if self.remaining == 0 {
1184            return None;
1185        }
1186
1187        self.remaining -= 1;
1188
1189        if self.buf.len() < 4 {
1190            return Some(Err(io::Error::new(
1191                io::ErrorKind::UnexpectedEof,
1192                "unexpected EOF",
1193            )));
1194        }
1195
1196        let oid = u32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]);
1197        self.buf = &self.buf[4..];
1198        Some(Ok(Oid::new(oid)))
1199    }
1200}
1201
1202/// Parameter status body.
1203///
1204/// Sent by the server to inform the client about configuration parameter changes.
1205/// Common parameters include "`application_name`", "`server_version`", etc.
1206#[derive(Debug)]
1207pub struct ParameterStatusBody {
1208    /// Parameter name (null-terminated).
1209    name: Bytes,
1210    /// Parameter value (null-terminated).
1211    value: Bytes,
1212}
1213
1214impl ParameterStatusBody {
1215    /// Returns the parameter name.
1216    ///
1217    /// # Errors
1218    ///
1219    /// Returns [`io::ErrorKind::InvalidInput`] if the parameter name bytes
1220    /// are not valid UTF-8 (protocol corruption).
1221    #[inline]
1222    pub fn name(&self) -> io::Result<&str> {
1223        str::from_utf8(&self.name).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
1224    }
1225
1226    /// Returns the parameter value.
1227    ///
1228    /// # Errors
1229    ///
1230    /// Returns [`io::ErrorKind::InvalidInput`] if the parameter value bytes
1231    /// are not valid UTF-8 (protocol corruption).
1232    #[inline]
1233    pub fn value(&self) -> io::Result<&str> {
1234        str::from_utf8(&self.value).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
1235    }
1236}
1237
1238/// Ready for query body.
1239///
1240/// Sent by the server to indicate it's ready to accept a new query.
1241/// The status indicates the current transaction state.
1242#[derive(Debug)]
1243pub struct ReadyForQueryBody {
1244    /// Transaction status: 'I' = idle, 'T' = in transaction, 'E' = in failed transaction.
1245    status: u8,
1246}
1247
1248impl ReadyForQueryBody {
1249    /// Returns the transaction status.
1250    /// 'I' = idle, 'T' = in transaction, 'E' = in failed transaction
1251    #[inline]
1252    #[must_use]
1253    pub fn status(&self) -> u8 {
1254        self.status
1255    }
1256}
1257
1258/// Row description body.
1259///
1260/// Sent in response to a Describe or Execute message, describing the columns
1261/// that will be returned by a query. Each field description includes name,
1262/// type OID, format, and other metadata.
1263#[derive(Debug)]
1264pub struct RowDescriptionBody {
1265    /// Raw field description bytes.
1266    storage: Bytes,
1267    /// Number of fields (columns).
1268    len: u16,
1269}
1270
1271impl RowDescriptionBody {
1272    /// Returns an iterator over field descriptions.
1273    #[inline]
1274    pub fn fields(&self) -> Fields<'_> {
1275        Fields {
1276            buf: &self.storage,
1277            remaining: self.len,
1278        }
1279    }
1280
1281    /// Returns the number of fields.
1282    #[inline]
1283    pub fn field_count(&self) -> u16 {
1284        self.len
1285    }
1286}
1287
1288/// Iterator over field descriptions in a `RowDescription`.
1289///
1290/// Each field contains name, table OID, column ID, type OID, type size,
1291/// type modifier, and format code.
1292#[derive(Debug)]
1293pub struct Fields<'a> {
1294    /// Buffer containing field descriptions.
1295    buf: &'a [u8],
1296    /// Number of fields remaining to parse.
1297    remaining: u16,
1298}
1299
1300impl<'a> Iterator for Fields<'a> {
1301    type Item = io::Result<Field<'a>>;
1302
1303    fn next(&mut self) -> Option<Self::Item> {
1304        if self.remaining == 0 {
1305            return None;
1306        }
1307
1308        self.remaining -= 1;
1309
1310        // Parse field name (null-terminated string)
1311        let Some(name_end) = memchr(0, self.buf) else {
1312            return Some(Err(io::Error::new(
1313                io::ErrorKind::UnexpectedEof,
1314                "unexpected EOF in field name",
1315            )));
1316        };
1317
1318        let name = match str::from_utf8(&self.buf[..name_end]) {
1319            Ok(s) => s,
1320            Err(e) => return Some(Err(io::Error::new(io::ErrorKind::InvalidInput, e))),
1321        };
1322        self.buf = &self.buf[name_end + 1..];
1323
1324        // Parse fixed fields (18 bytes total)
1325        if self.buf.len() < 18 {
1326            return Some(Err(io::Error::new(
1327                io::ErrorKind::UnexpectedEof,
1328                "unexpected EOF in field description",
1329            )));
1330        }
1331
1332        let table_oid = u32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]);
1333        let column_id = i16::from_be_bytes([self.buf[4], self.buf[5]]);
1334        let type_oid = u32::from_be_bytes([self.buf[6], self.buf[7], self.buf[8], self.buf[9]]);
1335        let type_size = i16::from_be_bytes([self.buf[10], self.buf[11]]);
1336        let type_modifier =
1337            i32::from_be_bytes([self.buf[12], self.buf[13], self.buf[14], self.buf[15]]);
1338        let format = i16::from_be_bytes([self.buf[16], self.buf[17]]);
1339        self.buf = &self.buf[18..];
1340
1341        Some(Ok(Field {
1342            name,
1343            table_oid: Oid::new(table_oid),
1344            column_id,
1345            type_oid: Oid::new(type_oid),
1346            type_size,
1347            type_modifier,
1348            format,
1349        }))
1350    }
1351}
1352
1353/// A field description in a `RowDescription`.
1354///
1355/// Describes a single column that will be returned by a query.
1356#[derive(Debug)]
1357pub struct Field<'a> {
1358    /// Column name.
1359    name: &'a str,
1360    /// OID of the table this column belongs to (0 if not from a table).
1361    table_oid: Oid,
1362    /// Column number within the table (attribute number).
1363    column_id: i16,
1364    /// OID of the column's data type.
1365    type_oid: Oid,
1366    /// Size of the type in bytes (-1 for variable-length types).
1367    type_size: i16,
1368    /// Type modifier (e.g., precision/scale for numeric types).
1369    type_modifier: i32,
1370    /// Format code: 0 = text, 1 = binary.
1371    format: i16,
1372}
1373
1374impl<'a> Field<'a> {
1375    /// Returns the field name.
1376    #[inline]
1377    #[must_use]
1378    pub fn name(&self) -> &'a str {
1379        self.name
1380    }
1381
1382    /// Returns the table OID.
1383    #[inline]
1384    #[must_use]
1385    pub fn table_oid(&self) -> Oid {
1386        self.table_oid
1387    }
1388
1389    /// Returns the column ID within the table.
1390    #[inline]
1391    #[must_use]
1392    pub fn column_id(&self) -> i16 {
1393        self.column_id
1394    }
1395
1396    /// Returns the type OID.
1397    #[inline]
1398    #[must_use]
1399    pub fn type_oid(&self) -> Oid {
1400        self.type_oid
1401    }
1402
1403    /// Returns the type size.
1404    #[inline]
1405    #[must_use]
1406    pub fn type_size(&self) -> i16 {
1407        self.type_size
1408    }
1409
1410    /// Returns the type modifier.
1411    #[inline]
1412    #[must_use]
1413    pub fn type_modifier(&self) -> i32 {
1414        self.type_modifier
1415    }
1416
1417    /// Returns the format code (0 = text, 1 = binary).
1418    #[inline]
1419    #[must_use]
1420    pub fn format(&self) -> i16 {
1421        self.format
1422    }
1423}
1424
1425#[cfg(test)]
1426mod tests {
1427    use super::*;
1428
1429    #[test]
1430    fn compute_all_offsets_handles_many_columns() {
1431        // Build a row with 40 columns to verify dynamic allocation works correctly.
1432        let mut buf = Vec::new();
1433        for i in 0u8..40 {
1434            buf.extend_from_slice(&1i32.to_be_bytes());
1435            buf.push(i);
1436        }
1437        let row = DataRowBody {
1438            storage: Bytes::from(buf),
1439            len: 40,
1440        };
1441
1442        let offsets = row.compute_all_offsets();
1443        assert_eq!(offsets.len(), 40);
1444
1445        let expect_range = |idx: usize, val: u8| {
1446            let (start, end) = offsets[idx].expect("expected non-null");
1447            let slice = &row.buffer()[start..end];
1448            assert_eq!(slice, &[val]);
1449        };
1450
1451        expect_range(0, 0);
1452        expect_range(10, 10);
1453        expect_range(39, 39);
1454    }
1455
1456    #[test]
1457    fn compute_all_offsets_tracks_nulls() {
1458        // Build: col0=1 byte, col1=NULL, col2=1 byte
1459        let mut buf = Vec::new();
1460        buf.extend_from_slice(&1i32.to_be_bytes());
1461        buf.push(0xAA);
1462        buf.extend_from_slice(&(-1i32).to_be_bytes()); // NULL
1463        buf.extend_from_slice(&1i32.to_be_bytes());
1464        buf.push(0xBB);
1465
1466        let row = DataRowBody {
1467            storage: Bytes::from(buf),
1468            len: 3,
1469        };
1470
1471        let offsets = row.compute_all_offsets();
1472        assert_eq!(offsets.len(), 3);
1473        assert!(offsets[0].is_some());
1474        assert!(offsets[1].is_none());
1475        assert!(offsets[2].is_some());
1476    }
1477}