hyperdb_api_core/protocol/message/backend.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Backend (server-to-client) messages.
5//!
6//! This module parses messages sent from the Hyper server to the client.
7//! Each message variant is discriminated by a single-byte tag; see the
8//! `*_TAG` constants and the [`Message`] enum for the full catalog.
9//!
10//! # Authentication Flow
11//!
12//! The server initiates authentication with an `Authentication*` message.
13//! Supported methods:
14//!
15//! | Method | Tag subtype | Flow |
16//! |---|---|---|
17//! | Trust | `R(0)` | Server sends `AuthenticationOk` immediately |
18//! | Cleartext | `R(3)` | Server requests password; client sends `PasswordMessage` |
19//! | MD5 | `R(5)` + 4-byte salt | Client hashes `md5(md5(password + user) + salt)` |
20//! | SCRAM-SHA-256 | `R(10)` | Multi-step challenge-response via `SASLInitialResponse` / `SASLResponse` |
21//!
22//! After successful authentication the server sends `AuthenticationOk`,
23//! followed by `ParameterStatus` messages, `BackendKeyData`, and finally
24//! `ReadyForQuery`.
25//!
26//! # Attribution
27//!
28//! Portions of this module were adapted from
29//! [`postgres-protocol`](https://github.com/sfackler/rust-postgres)'s
30//! `message/backend.rs` (Copyright (c) 2016 Steven Fackler, MIT or
31//! Apache-2.0). Adapted material includes the message-tag constants
32//! (`PARSE_COMPLETE_TAG = b'1'`, `BIND_COMPLETE_TAG = b'2'`, ...,
33//! `AUTHENTICATION_TAG = b'R'`, etc.), the `Header` struct shape, and
34//! message-framing logic. Hyper-specific changes added on top include the
35//! HyperBinary COPY format and Hyper-specific message variants. See the
36//! `NOTICE` file at the repo root for the full upstream copyright and
37//! reproduced license text.
38
39use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
40use bytes::{Bytes, BytesMut};
41use memchr::memchr;
42use std::io::{self, Read};
43use std::ops::Range;
44use std::str;
45
46use crate::types::Oid;
47
48/// Message tag for `ParseComplete` ('1').
49pub const PARSE_COMPLETE_TAG: u8 = b'1';
50/// Message tag for `BindComplete` ('2').
51pub const BIND_COMPLETE_TAG: u8 = b'2';
52/// Message tag for `CloseComplete` ('3').
53pub const CLOSE_COMPLETE_TAG: u8 = b'3';
54/// Message tag for `NotificationResponse` ('A').
55pub const NOTIFICATION_RESPONSE_TAG: u8 = b'A';
56/// Message tag for `CopyDone` ('c').
57pub const COPY_DONE_TAG: u8 = b'c';
58/// Message tag for `CommandComplete` ('C').
59pub const COMMAND_COMPLETE_TAG: u8 = b'C';
60/// Message tag for `CopyData` ('d').
61pub const COPY_DATA_TAG: u8 = b'd';
62/// Message tag for `DataRow` ('D').
63pub const DATA_ROW_TAG: u8 = b'D';
64/// Message tag for `ErrorResponse` ('E').
65pub const ERROR_RESPONSE_TAG: u8 = b'E';
66/// Message tag for `CopyInResponse` ('G').
67pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
68/// Message tag for `CopyOutResponse` ('H').
69pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
70/// Message tag for `EmptyQueryResponse` ('I').
71pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
72/// Message tag for `BackendKeyData` ('K').
73pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
74/// Message tag for `NoData` ('n').
75pub const NO_DATA_TAG: u8 = b'n';
76/// Message tag for `NoticeResponse` ('N').
77pub const NOTICE_RESPONSE_TAG: u8 = b'N';
78/// Message tag for Authentication ('R').
79pub const AUTHENTICATION_TAG: u8 = b'R';
80/// Message tag for `PortalSuspended` ('s').
81pub const PORTAL_SUSPENDED_TAG: u8 = b's';
82/// Message tag for `ParameterStatus` ('S').
83pub const PARAMETER_STATUS_TAG: u8 = b'S';
84/// Message tag for `ParameterDescription` ('t').
85pub const PARAMETER_DESCRIPTION_TAG: u8 = b't';
86/// Message tag for `RowDescription` ('T').
87pub const ROW_DESCRIPTION_TAG: u8 = b'T';
88/// Message tag for `ReadyForQuery` ('Z').
89pub const READY_FOR_QUERY_TAG: u8 = b'Z';
90/// Message header information.
91///
92/// `PostgreSQL` wire protocol messages start with a 5-byte header:
93/// - 1 byte: message type tag
94/// - 4 bytes: message length (`BigEndian`, including the length field itself)
95#[derive(Debug, Copy, Clone)]
96pub struct Header {
97 /// Message type tag (e.g., 'R' for Authentication, 'T' for `RowDescription`).
98 tag: u8,
99 /// Total message length in bytes, including the 4-byte length field itself.
100 len: i32,
101}
102
103impl Header {
104 /// Parses a message header from a buffer.
105 ///
106 /// Returns `Ok(None)` if the buffer is too short (< 5 bytes).
107 /// Returns `Err` if the parsed length is invalid (< 4).
108 ///
109 /// # Arguments
110 ///
111 /// * `buf` - Buffer containing at least the first 5 bytes of a message
112 ///
113 /// # Errors
114 ///
115 /// Returns [`io::ErrorKind::InvalidData`] if the parsed length field
116 /// is less than 4 (which is the minimum valid length — the 4-byte
117 /// length field itself).
118 #[inline]
119 pub fn parse(buf: &[u8]) -> io::Result<Option<Header>> {
120 if buf.len() < 5 {
121 return Ok(None);
122 }
123
124 let tag = buf[0];
125 let len = BigEndian::read_i32(&buf[1..]);
126
127 if len < 4 {
128 return Err(io::Error::new(
129 io::ErrorKind::InvalidData,
130 "invalid message length: header length < 4",
131 ));
132 }
133
134 Ok(Some(Header { tag, len }))
135 }
136
137 /// Returns the message tag.
138 #[inline]
139 #[must_use]
140 pub fn tag(self) -> u8 {
141 self.tag
142 }
143
144 /// Returns the message length (including itself).
145 #[inline]
146 #[must_use]
147 pub fn len(self) -> i32 {
148 self.len
149 }
150
151 /// Returns true if the message length is 0.
152 ///
153 /// Note: A valid message always has a length of at least 4 (for the length field itself),
154 /// so this should always return false for valid messages.
155 #[inline]
156 #[must_use]
157 pub fn is_empty(self) -> bool {
158 self.len == 0
159 }
160}
161
162/// An enum representing backend messages from Hyper server.
163#[non_exhaustive]
164#[derive(Debug)]
165pub enum Message {
166 /// Authentication request completed successfully.
167 AuthenticationOk,
168 /// Authentication via cleartext password.
169 AuthenticationCleartextPassword,
170 /// Authentication via MD5 password.
171 AuthenticationMd5Password(AuthenticationMd5PasswordBody),
172 /// Authentication via SASL.
173 AuthenticationSasl(AuthenticationSaslBody),
174 /// SASL authentication continue.
175 AuthenticationSaslContinue(AuthenticationSaslContinueBody),
176 /// SASL authentication final.
177 AuthenticationSaslFinal(AuthenticationSaslFinalBody),
178 /// Backend key data for cancel requests.
179 BackendKeyData(BackendKeyDataBody),
180 /// Bind operation complete.
181 BindComplete,
182 /// Close operation complete.
183 CloseComplete,
184 /// Command completed.
185 CommandComplete(CommandCompleteBody),
186 /// COPY data.
187 CopyData(CopyDataBody),
188 /// COPY operation done.
189 CopyDone,
190 /// COPY IN response (client should send data).
191 CopyInResponse(CopyInResponseBody),
192 /// COPY OUT response (server will send data).
193 CopyOutResponse(CopyOutResponseBody),
194 /// A row of data.
195 DataRow(DataRowBody),
196 /// Empty query response.
197 EmptyQueryResponse,
198 /// Error response.
199 ErrorResponse(ErrorResponseBody),
200 /// No data (for empty result sets).
201 NoData,
202 /// Notice response (warning).
203 NoticeResponse(NoticeResponseBody),
204 /// Notification from LISTEN/NOTIFY.
205 NotificationResponse(NotificationResponseBody),
206 /// Parameter description for prepared statement.
207 ParameterDescription(ParameterDescriptionBody),
208 /// Parameter status update.
209 ParameterStatus(ParameterStatusBody),
210 /// Parse operation complete.
211 ParseComplete,
212 /// Portal suspended (for partial results).
213 PortalSuspended,
214 /// Ready for query.
215 ReadyForQuery(ReadyForQueryBody),
216 /// Row description (column metadata).
217 RowDescription(RowDescriptionBody),
218}
219
220impl Message {
221 /// Parses a backend message from a buffer.
222 ///
223 /// Reads the message header (5 bytes: tag + length) and parses the complete
224 /// message body. Returns `Ok(None)` if the buffer doesn't contain a complete
225 /// message yet. The buffer is advanced when a complete message is parsed.
226 ///
227 /// # Arguments
228 ///
229 /// * `buf` - Buffer containing message data (may be partial)
230 ///
231 /// # Errors
232 ///
233 /// - Returns [`io::ErrorKind::InvalidInput`] if the declared length
234 /// field is less than 4 (below the minimum valid message size).
235 /// - Returns an I/O error from the inner `Buffer` reads when a body
236 /// field is truncated, a C-string is not NUL-terminated, or a
237 /// string field contains invalid UTF-8.
238 /// - Returns [`io::ErrorKind::InvalidInput`] for unexpected
239 /// authentication sub-tags or unknown top-level message tags.
240 ///
241 /// # Panics
242 ///
243 /// Does not panic in practice. The `read_u32` call on `&buf[1..5]`
244 /// is proven infallible by the preceding `buf.len() < 5` short-circuit.
245 #[inline]
246 pub fn parse(buf: &mut BytesMut) -> io::Result<Option<Message>> {
247 if buf.len() < 5 {
248 let to_read = 5 - buf.len();
249 buf.reserve(to_read);
250 return Ok(None);
251 }
252
253 let tag = buf[0];
254 let len = (&buf[1..5]).read_u32::<BigEndian>().unwrap();
255
256 if len < 4 {
257 return Err(io::Error::new(
258 io::ErrorKind::InvalidInput,
259 "invalid message length: parsing u32",
260 ));
261 }
262
263 // Defensively use checked arithmetic. `len` is u32 from the wire — a
264 // hostile or buggy server could send u32::MAX, which on 32-bit platforms
265 // wraps usize and on 64-bit produces an oversized allocation.
266 let Some(total_len) = (len as usize).checked_add(1) else {
267 return Err(io::Error::new(
268 io::ErrorKind::InvalidInput,
269 format!("invalid message length: {len} + 1 overflows usize"),
270 ));
271 };
272 if buf.len() < total_len {
273 let to_read = total_len - buf.len();
274 buf.reserve(to_read);
275 return Ok(None);
276 }
277
278 let mut buf = Buffer {
279 bytes: buf.split_to(total_len).freeze(),
280 idx: 5,
281 };
282
283 let message = match tag {
284 PARSE_COMPLETE_TAG => Message::ParseComplete,
285 BIND_COMPLETE_TAG => Message::BindComplete,
286 CLOSE_COMPLETE_TAG => Message::CloseComplete,
287 NOTIFICATION_RESPONSE_TAG => {
288 let process_id = buf.read_i32::<BigEndian>()?;
289 let channel = buf.read_cstr()?;
290 let message = buf.read_cstr()?;
291 Message::NotificationResponse(NotificationResponseBody {
292 process_id,
293 channel,
294 message,
295 })
296 }
297 COPY_DONE_TAG => Message::CopyDone,
298 COMMAND_COMPLETE_TAG => {
299 let tag = buf.read_cstr()?;
300 Message::CommandComplete(CommandCompleteBody { tag })
301 }
302 COPY_DATA_TAG => {
303 let storage = buf.read_all();
304 Message::CopyData(CopyDataBody { storage })
305 }
306 DATA_ROW_TAG => {
307 let len = buf.read_u16::<BigEndian>()?;
308 let storage = buf.read_all();
309 Message::DataRow(DataRowBody { storage, len })
310 }
311 ERROR_RESPONSE_TAG => {
312 let storage = buf.read_all();
313 Message::ErrorResponse(ErrorResponseBody { storage })
314 }
315 COPY_IN_RESPONSE_TAG => {
316 let format = buf.read_u8()?;
317 let len = buf.read_u16::<BigEndian>()?;
318 let storage = buf.read_all();
319 Message::CopyInResponse(CopyInResponseBody {
320 format,
321 len,
322 storage,
323 })
324 }
325 COPY_OUT_RESPONSE_TAG => {
326 let format = buf.read_u8()?;
327 let len = buf.read_u16::<BigEndian>()?;
328 let storage = buf.read_all();
329 Message::CopyOutResponse(CopyOutResponseBody {
330 format,
331 len,
332 storage,
333 })
334 }
335 EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
336 BACKEND_KEY_DATA_TAG => {
337 let process_id = buf.read_i32::<BigEndian>()?;
338 let secret_key = buf.read_i32::<BigEndian>()?;
339 Message::BackendKeyData(BackendKeyDataBody {
340 process_id,
341 secret_key,
342 })
343 }
344 NO_DATA_TAG => Message::NoData,
345 NOTICE_RESPONSE_TAG => {
346 let storage = buf.read_all();
347 Message::NoticeResponse(NoticeResponseBody { storage })
348 }
349 AUTHENTICATION_TAG => match buf.read_i32::<BigEndian>()? {
350 0 => Message::AuthenticationOk,
351 3 => Message::AuthenticationCleartextPassword,
352 5 => {
353 let mut salt = [0; 4];
354 buf.read_exact(&mut salt)?;
355 Message::AuthenticationMd5Password(AuthenticationMd5PasswordBody { salt })
356 }
357 10 => {
358 let storage = buf.read_all();
359 Message::AuthenticationSasl(AuthenticationSaslBody(storage))
360 }
361 11 => {
362 let storage = buf.read_all();
363 Message::AuthenticationSaslContinue(AuthenticationSaslContinueBody(storage))
364 }
365 12 => {
366 let storage = buf.read_all();
367 Message::AuthenticationSaslFinal(AuthenticationSaslFinalBody(storage))
368 }
369 tag => {
370 return Err(io::Error::new(
371 io::ErrorKind::InvalidInput,
372 format!("unknown authentication tag `{tag}`"),
373 ));
374 }
375 },
376 PORTAL_SUSPENDED_TAG => Message::PortalSuspended,
377 PARAMETER_STATUS_TAG => {
378 let name = buf.read_cstr()?;
379 let value = buf.read_cstr()?;
380 Message::ParameterStatus(ParameterStatusBody { name, value })
381 }
382 PARAMETER_DESCRIPTION_TAG => {
383 let len = buf.read_u16::<BigEndian>()?;
384 let storage = buf.read_all();
385 Message::ParameterDescription(ParameterDescriptionBody { storage, len })
386 }
387 ROW_DESCRIPTION_TAG => {
388 let len = buf.read_u16::<BigEndian>()?;
389 let storage = buf.read_all();
390 Message::RowDescription(RowDescriptionBody { storage, len })
391 }
392 READY_FOR_QUERY_TAG => {
393 let status = buf.read_u8()?;
394 Message::ReadyForQuery(ReadyForQueryBody { status })
395 }
396 tag => {
397 return Err(io::Error::new(
398 io::ErrorKind::InvalidInput,
399 format!("unknown message tag `{tag}`"),
400 ));
401 }
402 };
403
404 if !buf.is_empty() {
405 return Err(io::Error::new(
406 io::ErrorKind::InvalidInput,
407 "invalid message length: expected buffer to be empty",
408 ));
409 }
410
411 Ok(Some(message))
412 }
413}
414
415/// Internal buffer for parsing message bodies.
416///
417/// Maintains a position index to track parsing progress through the message bytes.
418struct Buffer {
419 /// The complete message bytes.
420 bytes: Bytes,
421 /// Current parsing position within the bytes.
422 idx: usize,
423}
424
425impl Buffer {
426 /// Returns a slice of the remaining unparsed bytes.
427 #[inline]
428 fn slice(&self) -> &[u8] {
429 &self.bytes[self.idx..]
430 }
431
432 /// Returns true if all bytes have been consumed.
433 #[inline]
434 fn is_empty(&self) -> bool {
435 self.slice().is_empty()
436 }
437
438 /// Reads a null-terminated C string from the buffer.
439 ///
440 /// Advances the position past the null terminator.
441 /// Returns the bytes without the null terminator.
442 #[inline]
443 fn read_cstr(&mut self) -> io::Result<Bytes> {
444 match memchr(0, self.slice()) {
445 Some(pos) => {
446 let start = self.idx;
447 let end = start + pos;
448 let cstr = self.bytes.slice(start..end);
449 self.idx = end + 1;
450 Ok(cstr)
451 }
452 None => Err(io::Error::new(
453 io::ErrorKind::UnexpectedEof,
454 "unexpected EOF",
455 )),
456 }
457 }
458
459 /// Reads all remaining bytes from the buffer.
460 ///
461 /// Advances the position to the end of the buffer.
462 #[inline]
463 fn read_all(&mut self) -> Bytes {
464 let buf = self.bytes.slice(self.idx..);
465 self.idx = self.bytes.len();
466 buf
467 }
468}
469
470impl Read for Buffer {
471 #[inline]
472 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
473 let len = {
474 let slice = self.slice();
475 let len = std::cmp::min(slice.len(), buf.len());
476 buf[..len].copy_from_slice(&slice[..len]);
477 len
478 };
479 self.idx += len;
480 Ok(len)
481 }
482}
483
484// Message body types
485
486/// MD5 password authentication body.
487///
488/// Contains the 4-byte salt used for MD5 password hashing.
489/// The client must hash the password with this salt and send it back.
490#[derive(Debug)]
491pub struct AuthenticationMd5PasswordBody {
492 /// 4-byte random salt for MD5 password hashing.
493 salt: [u8; 4],
494}
495
496impl AuthenticationMd5PasswordBody {
497 /// Returns the salt.
498 #[inline]
499 #[must_use]
500 pub fn salt(&self) -> [u8; 4] {
501 self.salt
502 }
503}
504
505/// SASL authentication body.
506///
507/// Contains a list of supported SASL authentication mechanisms,
508/// each as a null-terminated string. The list ends with a double null.
509#[derive(Debug)]
510pub struct AuthenticationSaslBody(Bytes);
511
512impl AuthenticationSaslBody {
513 /// Returns the raw data.
514 #[inline]
515 pub fn data(&self) -> &[u8] {
516 &self.0
517 }
518
519 /// Returns an iterator over the available SASL mechanisms.
520 #[inline]
521 pub fn mechanisms(&self) -> SaslMechanisms<'_> {
522 SaslMechanisms { buf: &self.0 }
523 }
524}
525
526/// Iterator over SASL mechanism names in an authentication message.
527///
528/// Parses null-terminated strings from the SASL authentication body.
529#[derive(Debug)]
530pub struct SaslMechanisms<'a> {
531 /// Buffer containing null-terminated mechanism names.
532 buf: &'a [u8],
533}
534
535impl<'a> Iterator for SaslMechanisms<'a> {
536 type Item = &'a str;
537
538 fn next(&mut self) -> Option<Self::Item> {
539 if self.buf.is_empty() || self.buf[0] == 0 {
540 return None;
541 }
542
543 match memchr(0, self.buf) {
544 Some(pos) => {
545 let mechanism = str::from_utf8(&self.buf[..pos]).ok()?;
546 self.buf = &self.buf[pos + 1..];
547 Some(mechanism)
548 }
549 None => None,
550 }
551 }
552}
553
554/// SASL continue authentication body.
555///
556/// Contains server challenge data for the next step in SASL authentication.
557/// The format depends on the specific SASL mechanism being used.
558#[derive(Debug)]
559pub struct AuthenticationSaslContinueBody(Bytes);
560
561impl AuthenticationSaslContinueBody {
562 /// Returns the raw data.
563 #[inline]
564 pub fn data(&self) -> &[u8] {
565 &self.0
566 }
567}
568
569/// SASL final authentication body.
570///
571/// Contains the final server response in SASL authentication.
572/// Typically includes a server signature for verification.
573#[derive(Debug)]
574pub struct AuthenticationSaslFinalBody(Bytes);
575
576impl AuthenticationSaslFinalBody {
577 /// Returns the raw data.
578 #[inline]
579 pub fn data(&self) -> &[u8] {
580 &self.0
581 }
582}
583
584/// Backend key data body.
585///
586/// Contains information needed to cancel a running query.
587/// The client can use these values to send a cancel request.
588#[derive(Debug)]
589pub struct BackendKeyDataBody {
590 /// Process ID of the backend server process.
591 process_id: i32,
592 /// Secret key for authenticating cancel requests.
593 secret_key: i32,
594}
595
596impl BackendKeyDataBody {
597 /// Returns the process ID.
598 #[inline]
599 #[must_use]
600 pub fn process_id(&self) -> i32 {
601 self.process_id
602 }
603
604 /// Returns the secret key.
605 #[inline]
606 #[must_use]
607 pub fn secret_key(&self) -> i32 {
608 self.secret_key
609 }
610}
611
612/// Command complete body.
613///
614/// Sent after a command (SELECT, INSERT, UPDATE, DELETE, etc.) completes.
615/// The tag indicates what command was executed and may include row counts.
616#[derive(Debug)]
617pub struct CommandCompleteBody {
618 /// Command tag (e.g., "SELECT 42", "INSERT 0 1", "UPDATE 5").
619 tag: Bytes,
620}
621
622impl CommandCompleteBody {
623 /// Returns the command tag.
624 ///
625 /// # Errors
626 ///
627 /// Returns [`io::ErrorKind::InvalidInput`] if the tag bytes are not
628 /// valid UTF-8. The server sends ASCII command tags like `SELECT 42`,
629 /// so this path is only reachable on protocol corruption.
630 #[inline]
631 pub fn tag(&self) -> io::Result<&str> {
632 str::from_utf8(&self.tag).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
633 }
634}
635
636/// COPY data body.
637///
638/// Contains a chunk of COPY data during COPY IN or COPY OUT operations.
639/// The format depends on the COPY format (text or binary).
640#[derive(Debug)]
641pub struct CopyDataBody {
642 /// Raw COPY data bytes.
643 storage: Bytes,
644}
645
646impl CopyDataBody {
647 /// Returns the data.
648 #[inline]
649 pub fn data(&self) -> &[u8] {
650 &self.storage
651 }
652
653 /// Consumes self and returns the bytes.
654 #[inline]
655 pub fn into_bytes(self) -> Bytes {
656 self.storage
657 }
658}
659
660/// COPY IN response body.
661///
662/// Sent by the server to indicate it's ready to receive COPY data.
663/// The client should send `CopyData` messages until `CopyDone`.
664#[derive(Debug)]
665pub struct CopyInResponseBody {
666 /// Overall format: 0 = text, 1 = binary (`HyperBinary`).
667 format: u8,
668 /// Number of columns in the COPY operation.
669 len: u16,
670 /// Kept for memory ownership of parsed data.
671 #[expect(
672 dead_code,
673 reason = "owns the backing buffer referenced by `format`/`len`; must live as long as the message"
674 )]
675 storage: Bytes,
676}
677
678impl CopyInResponseBody {
679 /// Returns the overall format (0 = text, 1 = binary).
680 #[inline]
681 pub fn format(&self) -> u8 {
682 self.format
683 }
684
685 /// Returns the number of columns.
686 #[inline]
687 pub fn column_count(&self) -> u16 {
688 self.len
689 }
690}
691
692/// COPY OUT response body.
693///
694/// Sent by the server to indicate it will send COPY data.
695/// The client should expect `CopyData` messages until `CopyDone`.
696#[derive(Debug)]
697pub struct CopyOutResponseBody {
698 /// Overall format: 0 = text, 1 = binary (`HyperBinary`).
699 format: u8,
700 /// Number of columns in the COPY operation.
701 len: u16,
702 /// Kept for memory ownership of parsed data.
703 #[expect(
704 dead_code,
705 reason = "owns the backing buffer referenced by `format`/`len`; must live as long as the message"
706 )]
707 storage: Bytes,
708}
709
710impl CopyOutResponseBody {
711 /// Returns the overall format (0 = text, 1 = binary).
712 #[inline]
713 pub fn format(&self) -> u8 {
714 self.format
715 }
716
717 /// Returns the number of columns.
718 #[inline]
719 pub fn column_count(&self) -> u16 {
720 self.len
721 }
722}
723
724/// Data row body.
725///
726/// Contains a single row of data from a query result.
727/// Each column is prefixed with a 4-byte length (BigEndian):
728/// - Positive length: non-NULL value of that length
729/// - -1: NULL value
730#[derive(Debug, Clone)]
731pub struct DataRowBody {
732 /// Raw row data bytes.
733 storage: Bytes,
734 /// Number of columns in this row.
735 len: u16,
736}
737
738impl DataRowBody {
739 /// Returns an iterator over the column ranges.
740 #[inline]
741 pub fn ranges(&self) -> DataRowRanges<'_> {
742 DataRowRanges {
743 buf: &self.storage,
744 len: self.storage.len(),
745 remaining: self.len,
746 }
747 }
748
749 /// Returns the raw buffer.
750 #[inline]
751 pub fn buffer(&self) -> &[u8] {
752 &self.storage
753 }
754
755 /// Returns the number of columns.
756 #[inline]
757 pub fn column_count(&self) -> u16 {
758 self.len
759 }
760
761 /// Pre-computes offsets for **all** columns regardless of count.
762 ///
763 /// Computes offsets for all columns in this row.
764 ///
765 /// This provides O(1) access for any column index using dynamic allocation.
766 /// Each entry is:
767 /// - `Some((start, end))` for non-NULL values (byte range within the buffer)
768 /// - `None` for NULL values or when parsing fails due to truncated data
769 ///
770 /// # Performance Tradeoffs
771 ///
772 /// This method allocates a `Vec` for all columns upfront. For tables with many
773 /// columns where only a few are accessed, this may use more memory than needed.
774 ///
775 /// **When to use this method:**
776 /// - Accessing multiple columns from the same row
777 /// - Random access patterns where column indices vary
778 /// - Bulk processing where setup cost is amortized
779 ///
780 /// **Alternative for sparse access:**
781 /// If you only need one or two columns from a wide table, consider using
782 /// `get_raw(index)` which computes offset on-demand (O(n) per call, but no
783 /// allocation).
784 ///
785 /// # Memory Usage
786 ///
787 /// Allocates `column_count * size_of::<Option<(usize, usize)>>()` bytes.
788 /// For a table with 100 columns, this is approximately 2.4 KB per row.
789 ///
790 /// # Panics
791 ///
792 /// Does not panic in practice. The `usize::try_from(len)` call is
793 /// guarded by a `len >= 0` check, so the conversion from `i32` to
794 /// `usize` is always infallible.
795 #[inline]
796 pub fn compute_all_offsets(&self) -> Vec<Option<(usize, usize)>> {
797 let mut offsets = Vec::with_capacity(self.len as usize);
798
799 let mut pos = 0usize;
800 let buf = &self.storage[..];
801
802 for _ in 0..self.len as usize {
803 if pos + 4 > buf.len() {
804 break;
805 }
806 let len = i32::from_be_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]]);
807 pos += 4;
808
809 if len >= 0 {
810 let len = usize::try_from(len).expect("len >= 0 checked above");
811 let start = pos;
812 let end = start.saturating_add(len);
813 if end > buf.len() {
814 break;
815 }
816 offsets.push(Some((start, end)));
817 pos = end;
818 } else {
819 offsets.push(None);
820 }
821 }
822
823 offsets
824 }
825
826 /// Gets raw bytes for a column by index, computing offset on demand.
827 /// Returns None if NULL or out of bounds.
828 /// This avoids allocating a Vec of ranges.
829 ///
830 /// # Panics
831 ///
832 /// Does not panic in practice. The `usize::try_from(len)` calls are
833 /// guarded by a `len >= 0` check in each branch, making the
834 /// `i32 -> usize` conversion infallible.
835 #[inline]
836 pub fn get_column_bytes(&self, idx: usize) -> Option<&[u8]> {
837 if idx >= self.len as usize {
838 return None;
839 }
840
841 let mut buf = &self.storage[..];
842
843 // Skip to the requested column
844 for _ in 0..idx {
845 if buf.len() < 4 {
846 return None;
847 }
848 let len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
849 buf = &buf[4..];
850 if len >= 0 {
851 let len = usize::try_from(len).expect("len >= 0 checked above");
852 if buf.len() < len {
853 return None;
854 }
855 buf = &buf[len..];
856 }
857 }
858
859 // Read the target column
860 if buf.len() < 4 {
861 return None;
862 }
863 let len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
864 if len < 0 {
865 None // NULL
866 } else {
867 let len = usize::try_from(len).expect("len >= 0 checked above");
868 let data_start = 4;
869 if buf.len() < data_start + len {
870 return None;
871 }
872 Some(&buf[data_start..data_start + len])
873 }
874 }
875
876 /// Checks if a column is NULL.
877 ///
878 /// # Panics
879 ///
880 /// Does not panic in practice. Each `usize::try_from(len)` is guarded
881 /// by a `len >= 0` check, so the conversion from `i32` cannot fail.
882 #[inline]
883 pub fn is_column_null(&self, idx: usize) -> bool {
884 if idx >= self.len as usize {
885 return true;
886 }
887
888 let mut buf = &self.storage[..];
889
890 // Skip to the requested column
891 for _ in 0..idx {
892 if buf.len() < 4 {
893 return true;
894 }
895 let len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
896 buf = &buf[4..];
897 if len >= 0 {
898 let len = usize::try_from(len).expect("len >= 0 checked above");
899 if buf.len() < len {
900 return true;
901 }
902 buf = &buf[len..];
903 }
904 }
905
906 // Check the target column
907 if buf.len() < 4 {
908 return true;
909 }
910 let len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
911 len < 0
912 }
913}
914
915/// Iterator over data row column ranges.
916///
917/// Yields byte ranges for each column in a `DataRow`.
918/// Returns `None` for NULL columns, `Some(range)` for non-NULL columns.
919#[derive(Debug)]
920pub struct DataRowRanges<'a> {
921 /// Buffer containing the row data.
922 buf: &'a [u8],
923 /// Original buffer length (for computing absolute offsets).
924 len: usize,
925 /// Number of columns remaining to parse.
926 remaining: u16,
927}
928
929impl Iterator for DataRowRanges<'_> {
930 type Item = io::Result<Option<Range<usize>>>;
931
932 fn next(&mut self) -> Option<Self::Item> {
933 if self.remaining == 0 {
934 return if self.buf.is_empty() {
935 None
936 } else {
937 Some(Err(io::Error::new(
938 io::ErrorKind::InvalidInput,
939 "invalid message length: extra data in data row",
940 )))
941 };
942 }
943
944 self.remaining -= 1;
945
946 if self.buf.len() < 4 {
947 return Some(Err(io::Error::new(
948 io::ErrorKind::UnexpectedEof,
949 "unexpected EOF reading column length",
950 )));
951 }
952
953 let len = i32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]);
954 self.buf = &self.buf[4..];
955
956 if len < 0 {
957 Some(Ok(None)) // NULL value
958 } else {
959 let len = usize::try_from(len).expect("len >= 0 in else branch");
960 if self.buf.len() < len {
961 return Some(Err(io::Error::new(
962 io::ErrorKind::UnexpectedEof,
963 "unexpected EOF",
964 )));
965 }
966 let base = self.len - self.buf.len();
967 self.buf = &self.buf[len..];
968 Some(Ok(Some(base..base + len)))
969 }
970 }
971}
972
973/// Error response body.
974///
975/// Contains error information in the form of key-value pairs.
976/// Each field has a type code (single byte) followed by a null-terminated value.
977/// The message ends with a null byte (type code 0).
978#[derive(Debug)]
979pub struct ErrorResponseBody {
980 /// Raw error message bytes.
981 storage: Bytes,
982}
983
984impl ErrorResponseBody {
985 /// Returns an iterator over error fields.
986 #[inline]
987 pub fn fields(&self) -> ErrorFields<'_> {
988 ErrorFields { buf: &self.storage }
989 }
990}
991
992/// Iterator over error fields in an `ErrorResponse` or `NoticeResponse`.
993///
994/// Each field consists of a type code byte followed by a null-terminated value string.
995#[derive(Debug)]
996pub struct ErrorFields<'a> {
997 /// Buffer containing the error/notice fields.
998 buf: &'a [u8],
999}
1000
1001impl<'a> Iterator for ErrorFields<'a> {
1002 type Item = io::Result<ErrorField<'a>>;
1003
1004 fn next(&mut self) -> Option<Self::Item> {
1005 if self.buf.is_empty() {
1006 return None;
1007 }
1008
1009 let type_ = self.buf[0];
1010 self.buf = &self.buf[1..];
1011
1012 if type_ == 0 {
1013 return None;
1014 }
1015
1016 match memchr(0, self.buf) {
1017 Some(pos) => {
1018 let value = &self.buf[..pos];
1019 self.buf = &self.buf[pos + 1..];
1020 Some(Ok(ErrorField { type_, value }))
1021 }
1022 None => Some(Err(io::Error::new(
1023 io::ErrorKind::UnexpectedEof,
1024 "unexpected EOF in error field",
1025 ))),
1026 }
1027 }
1028}
1029
1030/// A single error or notice field.
1031///
1032/// Common field type codes:
1033/// - 'S': Severity
1034/// - 'C': Code (SQLSTATE)
1035/// - 'M': Message
1036/// - 'D': Detail
1037/// - 'H': Hint
1038/// - 'P': Position
1039/// - 'p': Internal position
1040/// - 'q': Internal query
1041/// - 'W': Where
1042/// - 's': Schema name
1043/// - 't': Table name
1044/// - 'c': Column name
1045/// - 'd': Data type name
1046/// - 'n': Constraint name
1047#[derive(Debug)]
1048pub struct ErrorField<'a> {
1049 /// Field type code (single byte).
1050 type_: u8,
1051 /// Field value (without null terminator).
1052 value: &'a [u8],
1053}
1054
1055impl ErrorField<'_> {
1056 /// Returns the field type code.
1057 #[inline]
1058 #[must_use]
1059 pub fn type_(&self) -> u8 {
1060 self.type_
1061 }
1062
1063 /// Returns the field value as bytes.
1064 #[inline]
1065 #[must_use]
1066 pub fn value_bytes(&self) -> &[u8] {
1067 self.value
1068 }
1069
1070 /// Returns the field value as a string.
1071 ///
1072 /// # Errors
1073 ///
1074 /// Returns [`io::ErrorKind::InvalidInput`] if the field value is not
1075 /// valid UTF-8. Server-sent fields are expected to be UTF-8; this path
1076 /// is only reached on protocol corruption.
1077 #[inline]
1078 pub fn value(&self) -> io::Result<&str> {
1079 str::from_utf8(self.value).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
1080 }
1081}
1082
1083/// Notice response body (same format as error).
1084///
1085/// Contains warning or informational messages.
1086/// Uses the same field format as `ErrorResponse` but indicates a warning, not an error.
1087#[derive(Debug)]
1088pub struct NoticeResponseBody {
1089 /// Raw notice message bytes.
1090 storage: Bytes,
1091}
1092
1093impl NoticeResponseBody {
1094 /// Returns an iterator over notice fields.
1095 #[inline]
1096 pub fn fields(&self) -> ErrorFields<'_> {
1097 ErrorFields { buf: &self.storage }
1098 }
1099}
1100
1101/// Notification response body.
1102///
1103/// Sent when a NOTIFY event occurs that the client is listening to.
1104/// Used for asynchronous notifications between database sessions.
1105#[derive(Debug)]
1106pub struct NotificationResponseBody {
1107 /// Process ID of the backend that sent the notification.
1108 process_id: i32,
1109 /// Channel name (null-terminated).
1110 channel: Bytes,
1111 /// Notification payload (null-terminated).
1112 message: Bytes,
1113}
1114
1115impl NotificationResponseBody {
1116 /// Returns the process ID.
1117 #[inline]
1118 pub fn process_id(&self) -> i32 {
1119 self.process_id
1120 }
1121
1122 /// Returns the channel name.
1123 ///
1124 /// # Errors
1125 ///
1126 /// Returns [`io::ErrorKind::InvalidInput`] if the channel bytes are
1127 /// not valid UTF-8 (protocol corruption — server always sends UTF-8).
1128 #[inline]
1129 pub fn channel(&self) -> io::Result<&str> {
1130 str::from_utf8(&self.channel).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
1131 }
1132
1133 /// Returns the message.
1134 ///
1135 /// # Errors
1136 ///
1137 /// Returns [`io::ErrorKind::InvalidInput`] if the payload bytes are
1138 /// not valid UTF-8 (protocol corruption — server always sends UTF-8).
1139 #[inline]
1140 pub fn message(&self) -> io::Result<&str> {
1141 str::from_utf8(&self.message).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
1142 }
1143}
1144
1145/// Parameter description body.
1146///
1147/// Sent in response to a Parse message, describing the parameter types
1148/// expected by the prepared statement.
1149#[derive(Debug)]
1150pub struct ParameterDescriptionBody {
1151 /// Raw parameter OID bytes.
1152 storage: Bytes,
1153 /// Number of parameters.
1154 len: u16,
1155}
1156
1157impl ParameterDescriptionBody {
1158 /// Returns an iterator over parameter OIDs.
1159 #[inline]
1160 pub fn parameters(&self) -> Parameters<'_> {
1161 Parameters {
1162 buf: &self.storage,
1163 remaining: self.len,
1164 }
1165 }
1166}
1167
1168/// Iterator over parameter OIDs in a `ParameterDescription`.
1169///
1170/// Each parameter is represented by a 4-byte OID (`BigEndian`).
1171#[derive(Debug)]
1172pub struct Parameters<'a> {
1173 /// Buffer containing parameter OIDs.
1174 buf: &'a [u8],
1175 /// Number of parameters remaining to parse.
1176 remaining: u16,
1177}
1178
1179impl Iterator for Parameters<'_> {
1180 type Item = io::Result<Oid>;
1181
1182 fn next(&mut self) -> Option<Self::Item> {
1183 if self.remaining == 0 {
1184 return None;
1185 }
1186
1187 self.remaining -= 1;
1188
1189 if self.buf.len() < 4 {
1190 return Some(Err(io::Error::new(
1191 io::ErrorKind::UnexpectedEof,
1192 "unexpected EOF",
1193 )));
1194 }
1195
1196 let oid = u32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]);
1197 self.buf = &self.buf[4..];
1198 Some(Ok(Oid::new(oid)))
1199 }
1200}
1201
1202/// Parameter status body.
1203///
1204/// Sent by the server to inform the client about configuration parameter changes.
1205/// Common parameters include "`application_name`", "`server_version`", etc.
1206#[derive(Debug)]
1207pub struct ParameterStatusBody {
1208 /// Parameter name (null-terminated).
1209 name: Bytes,
1210 /// Parameter value (null-terminated).
1211 value: Bytes,
1212}
1213
1214impl ParameterStatusBody {
1215 /// Returns the parameter name.
1216 ///
1217 /// # Errors
1218 ///
1219 /// Returns [`io::ErrorKind::InvalidInput`] if the parameter name bytes
1220 /// are not valid UTF-8 (protocol corruption).
1221 #[inline]
1222 pub fn name(&self) -> io::Result<&str> {
1223 str::from_utf8(&self.name).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
1224 }
1225
1226 /// Returns the parameter value.
1227 ///
1228 /// # Errors
1229 ///
1230 /// Returns [`io::ErrorKind::InvalidInput`] if the parameter value bytes
1231 /// are not valid UTF-8 (protocol corruption).
1232 #[inline]
1233 pub fn value(&self) -> io::Result<&str> {
1234 str::from_utf8(&self.value).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
1235 }
1236}
1237
1238/// Ready for query body.
1239///
1240/// Sent by the server to indicate it's ready to accept a new query.
1241/// The status indicates the current transaction state.
1242#[derive(Debug)]
1243pub struct ReadyForQueryBody {
1244 /// Transaction status: 'I' = idle, 'T' = in transaction, 'E' = in failed transaction.
1245 status: u8,
1246}
1247
1248impl ReadyForQueryBody {
1249 /// Returns the transaction status.
1250 /// 'I' = idle, 'T' = in transaction, 'E' = in failed transaction
1251 #[inline]
1252 #[must_use]
1253 pub fn status(&self) -> u8 {
1254 self.status
1255 }
1256}
1257
1258/// Row description body.
1259///
1260/// Sent in response to a Describe or Execute message, describing the columns
1261/// that will be returned by a query. Each field description includes name,
1262/// type OID, format, and other metadata.
1263#[derive(Debug)]
1264pub struct RowDescriptionBody {
1265 /// Raw field description bytes.
1266 storage: Bytes,
1267 /// Number of fields (columns).
1268 len: u16,
1269}
1270
1271impl RowDescriptionBody {
1272 /// Returns an iterator over field descriptions.
1273 #[inline]
1274 pub fn fields(&self) -> Fields<'_> {
1275 Fields {
1276 buf: &self.storage,
1277 remaining: self.len,
1278 }
1279 }
1280
1281 /// Returns the number of fields.
1282 #[inline]
1283 pub fn field_count(&self) -> u16 {
1284 self.len
1285 }
1286}
1287
1288/// Iterator over field descriptions in a `RowDescription`.
1289///
1290/// Each field contains name, table OID, column ID, type OID, type size,
1291/// type modifier, and format code.
1292#[derive(Debug)]
1293pub struct Fields<'a> {
1294 /// Buffer containing field descriptions.
1295 buf: &'a [u8],
1296 /// Number of fields remaining to parse.
1297 remaining: u16,
1298}
1299
1300impl<'a> Iterator for Fields<'a> {
1301 type Item = io::Result<Field<'a>>;
1302
1303 fn next(&mut self) -> Option<Self::Item> {
1304 if self.remaining == 0 {
1305 return None;
1306 }
1307
1308 self.remaining -= 1;
1309
1310 // Parse field name (null-terminated string)
1311 let Some(name_end) = memchr(0, self.buf) else {
1312 return Some(Err(io::Error::new(
1313 io::ErrorKind::UnexpectedEof,
1314 "unexpected EOF in field name",
1315 )));
1316 };
1317
1318 let name = match str::from_utf8(&self.buf[..name_end]) {
1319 Ok(s) => s,
1320 Err(e) => return Some(Err(io::Error::new(io::ErrorKind::InvalidInput, e))),
1321 };
1322 self.buf = &self.buf[name_end + 1..];
1323
1324 // Parse fixed fields (18 bytes total)
1325 if self.buf.len() < 18 {
1326 return Some(Err(io::Error::new(
1327 io::ErrorKind::UnexpectedEof,
1328 "unexpected EOF in field description",
1329 )));
1330 }
1331
1332 let table_oid = u32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]);
1333 let column_id = i16::from_be_bytes([self.buf[4], self.buf[5]]);
1334 let type_oid = u32::from_be_bytes([self.buf[6], self.buf[7], self.buf[8], self.buf[9]]);
1335 let type_size = i16::from_be_bytes([self.buf[10], self.buf[11]]);
1336 let type_modifier =
1337 i32::from_be_bytes([self.buf[12], self.buf[13], self.buf[14], self.buf[15]]);
1338 let format = i16::from_be_bytes([self.buf[16], self.buf[17]]);
1339 self.buf = &self.buf[18..];
1340
1341 Some(Ok(Field {
1342 name,
1343 table_oid: Oid::new(table_oid),
1344 column_id,
1345 type_oid: Oid::new(type_oid),
1346 type_size,
1347 type_modifier,
1348 format,
1349 }))
1350 }
1351}
1352
1353/// A field description in a `RowDescription`.
1354///
1355/// Describes a single column that will be returned by a query.
1356#[derive(Debug)]
1357pub struct Field<'a> {
1358 /// Column name.
1359 name: &'a str,
1360 /// OID of the table this column belongs to (0 if not from a table).
1361 table_oid: Oid,
1362 /// Column number within the table (attribute number).
1363 column_id: i16,
1364 /// OID of the column's data type.
1365 type_oid: Oid,
1366 /// Size of the type in bytes (-1 for variable-length types).
1367 type_size: i16,
1368 /// Type modifier (e.g., precision/scale for numeric types).
1369 type_modifier: i32,
1370 /// Format code: 0 = text, 1 = binary.
1371 format: i16,
1372}
1373
1374impl<'a> Field<'a> {
1375 /// Returns the field name.
1376 #[inline]
1377 #[must_use]
1378 pub fn name(&self) -> &'a str {
1379 self.name
1380 }
1381
1382 /// Returns the table OID.
1383 #[inline]
1384 #[must_use]
1385 pub fn table_oid(&self) -> Oid {
1386 self.table_oid
1387 }
1388
1389 /// Returns the column ID within the table.
1390 #[inline]
1391 #[must_use]
1392 pub fn column_id(&self) -> i16 {
1393 self.column_id
1394 }
1395
1396 /// Returns the type OID.
1397 #[inline]
1398 #[must_use]
1399 pub fn type_oid(&self) -> Oid {
1400 self.type_oid
1401 }
1402
1403 /// Returns the type size.
1404 #[inline]
1405 #[must_use]
1406 pub fn type_size(&self) -> i16 {
1407 self.type_size
1408 }
1409
1410 /// Returns the type modifier.
1411 #[inline]
1412 #[must_use]
1413 pub fn type_modifier(&self) -> i32 {
1414 self.type_modifier
1415 }
1416
1417 /// Returns the format code (0 = text, 1 = binary).
1418 #[inline]
1419 #[must_use]
1420 pub fn format(&self) -> i16 {
1421 self.format
1422 }
1423}
1424
1425#[cfg(test)]
1426mod tests {
1427 use super::*;
1428
1429 #[test]
1430 fn compute_all_offsets_handles_many_columns() {
1431 // Build a row with 40 columns to verify dynamic allocation works correctly.
1432 let mut buf = Vec::new();
1433 for i in 0u8..40 {
1434 buf.extend_from_slice(&1i32.to_be_bytes());
1435 buf.push(i);
1436 }
1437 let row = DataRowBody {
1438 storage: Bytes::from(buf),
1439 len: 40,
1440 };
1441
1442 let offsets = row.compute_all_offsets();
1443 assert_eq!(offsets.len(), 40);
1444
1445 let expect_range = |idx: usize, val: u8| {
1446 let (start, end) = offsets[idx].expect("expected non-null");
1447 let slice = &row.buffer()[start..end];
1448 assert_eq!(slice, &[val]);
1449 };
1450
1451 expect_range(0, 0);
1452 expect_range(10, 10);
1453 expect_range(39, 39);
1454 }
1455
1456 #[test]
1457 fn compute_all_offsets_tracks_nulls() {
1458 // Build: col0=1 byte, col1=NULL, col2=1 byte
1459 let mut buf = Vec::new();
1460 buf.extend_from_slice(&1i32.to_be_bytes());
1461 buf.push(0xAA);
1462 buf.extend_from_slice(&(-1i32).to_be_bytes()); // NULL
1463 buf.extend_from_slice(&1i32.to_be_bytes());
1464 buf.push(0xBB);
1465
1466 let row = DataRowBody {
1467 storage: Bytes::from(buf),
1468 len: 3,
1469 };
1470
1471 let offsets = row.compute_all_offsets();
1472 assert_eq!(offsets.len(), 3);
1473 assert!(offsets[0].is_some());
1474 assert!(offsets[1].is_none());
1475 assert!(offsets[2].is_some());
1476 }
1477}