Skip to main content

hyperdb_api_core/protocol/message/
frontend.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Frontend (client-to-server) messages.
5//!
6//! Each function in this module writes a single `PostgreSQL` wire protocol
7//! message into a [`BytesMut`] buffer. All structural fields (tags, lengths,
8//! counts) are written in **`BigEndian`** per the `PostgreSQL` specification.
9//!
10//! The functions are intentionally stateless -- they append bytes to a buffer
11//! and do not manage connection state. The connection layer in `hyper-client`
12//! is responsible for sequencing messages correctly (e.g. Parse/Bind/Execute
13//! followed by Sync).
14//!
15//! # Attribution
16//!
17//! Portions of this module were adapted from
18//! [`postgres-protocol`](https://github.com/sfackler/rust-postgres)'s
19//! `message/frontend.rs` (Copyright (c) 2016 Steven Fackler, MIT or
20//! Apache-2.0). Adapted material includes the PostgreSQL v3.0
21//! protocol-version literal (`196608`, `(3 << 16) | 0`), the startup-message
22//! framing layout, and the `msg_len` helper. Hyper-specific changes added
23//! on top include performance work and Hyper-specific message variants.
24//! See the `NOTICE` file at the repo root for the full upstream copyright
25//! and reproduced license text.
26
27use bytes::{BufMut, BytesMut};
28use std::io;
29
30use crate::types::Oid;
31
32/// Narrows a `usize` length to the `PostgreSQL` wire-protocol `i32` length prefix.
33///
34/// Panics on overflow. The `PostgreSQL` protocol caps individual messages at
35/// `i32::MAX` bytes (~2 GiB); any single string, query, or byte slice exceeding
36/// that is a programming error, not a runtime-recoverable condition. Per the
37/// M-PANIC-ON-BUG guideline, we panic rather than threading a `Result` through
38/// every wire-format helper.
39#[inline]
40fn msg_len(n: usize) -> i32 {
41    i32::try_from(n).expect("PostgreSQL wire message field exceeds i32::MAX bytes")
42}
43
44#[expect(
45    clippy::similar_names,
46    reason = "paired bindings (request/response, reader/writer, etc.) are more readable with symmetric names than artificially distinct ones"
47)]
48/// Narrows a `usize` count to the `PostgreSQL` wire-protocol 16-bit parameter /
49/// format count.
50///
51/// The wire field is `Int16` (signed on the wire, but values `0..=65_535` are
52/// legal — the high bit is a permitted value, not a sign). We go through `u16`
53/// first to trap genuine overflow, then bit-reinterpret to `i16` so
54/// [`bytes::BufMut::put_i16`] writes the expected `BigEndian` representation.
55///
56/// Panics if the count exceeds `65_535`. In practice Hyper/PostgreSQL cap this
57/// far lower (1600 parameters per query), so overflow is a programming error.
58#[inline]
59fn msg_count(n: usize) -> i16 {
60    let as_u16 = u16::try_from(n)
61        .expect("PostgreSQL wire message count exceeds u16::MAX (max 65_535 parameters)");
62    // Bit-pattern reinterpret: `put_i16` writes the same 16 bits regardless of
63    // signed/unsigned interpretation.
64    #[expect(
65        clippy::cast_possible_wrap,
66        reason = "intentional bit-pattern reinterpret; wire field is semantically 0..=65_535"
67    )]
68    let as_i16 = as_u16 as i16;
69    as_i16
70}
71
72/// Writes a startup message to the buffer.
73///
74/// The startup message establishes the connection and sets initial parameters.
75/// This is the first message sent by the client after establishing a TCP connection.
76///
77/// # Arguments
78///
79/// * `parameters` - Key-value pairs of connection parameters (e.g., "user", "database")
80/// * `buf` - Buffer to write the message to
81///
82/// # Errors
83///
84/// Currently infallible — always returns `Ok(())`. The `io::Result` return
85/// type is preserved for forward compatibility so that future validation
86/// (e.g. parameter-length checks) can surface errors without a breaking
87/// signature change.
88///
89/// # Panics
90///
91/// Panics via `msg_len` if the total encoded message length exceeds
92/// `i32::MAX` bytes (~2 GiB). PostgreSQL caps messages at that size, so
93/// exceeding it is a programming error.
94pub fn startup_message(parameters: &[(&str, &str)], buf: &mut BytesMut) -> io::Result<()> {
95    // Reserve space for length
96    let len_idx = buf.len();
97    buf.put_i32(0);
98
99    // Protocol version 3.0
100    buf.put_i32(196608); // (3 << 16) | 0
101
102    // Parameters
103    for (name, value) in parameters {
104        buf.put_slice(name.as_bytes());
105        buf.put_u8(0);
106        buf.put_slice(value.as_bytes());
107        buf.put_u8(0);
108    }
109
110    // Terminator
111    buf.put_u8(0);
112
113    // Update length
114    let len = msg_len(buf.len() - len_idx);
115    buf[len_idx..len_idx + 4].copy_from_slice(&len.to_be_bytes());
116
117    Ok(())
118}
119
120/// Writes a password message (for authentication).
121///
122/// Sent in response to `AuthenticationCleartextPassword` or after MD5 hashing.
123///
124/// # Arguments
125///
126/// * `password` - The password to send (plaintext or MD5 hash)
127/// * `buf` - Buffer to write the message to
128///
129/// # Errors
130///
131/// Currently infallible — always returns `Ok(())`. The `io::Result` return
132/// type is preserved for forward compatibility.
133///
134/// # Panics
135///
136/// Panics via `msg_len` if `password.len()` exceeds `i32::MAX` bytes.
137pub fn password_message(password: &str, buf: &mut BytesMut) -> io::Result<()> {
138    buf.put_u8(b'p');
139    buf.put_i32(4 + msg_len(password.len()) + 1);
140    buf.put_slice(password.as_bytes());
141    buf.put_u8(0);
142    Ok(())
143}
144
145/// Writes a SASL initial response message.
146///
147/// Used to initiate SASL authentication (e.g., SCRAM-SHA-256).
148/// Sent in response to `AuthenticationSasl` message.
149///
150/// # Arguments
151///
152/// * `mechanism` - The SASL mechanism name to use
153/// * `data` - Initial client response data (may be empty)
154/// * `buf` - Buffer to write the message to
155///
156/// # Errors
157///
158/// Currently infallible — always returns `Ok(())`. The `io::Result` return
159/// type is preserved for forward compatibility.
160///
161/// # Panics
162///
163/// Panics via `msg_len` if `mechanism.len()` or `data.len()` exceeds
164/// `i32::MAX` bytes.
165pub fn sasl_initial_response(mechanism: &str, data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
166    buf.put_u8(b'p');
167    let len = 4 + msg_len(mechanism.len()) + 1 + 4 + msg_len(data.len());
168    buf.put_i32(len);
169    buf.put_slice(mechanism.as_bytes());
170    buf.put_u8(0);
171    buf.put_i32(msg_len(data.len()));
172    buf.put_slice(data);
173    Ok(())
174}
175
176/// Writes a SASL response message.
177///
178/// Used to continue SASL authentication exchange.
179/// Sent in response to `AuthenticationSaslContinue` messages.
180///
181/// # Arguments
182///
183/// * `data` - Client response data for this authentication step
184/// * `buf` - Buffer to write the message to
185///
186/// # Errors
187///
188/// Currently infallible — always returns `Ok(())`. The `io::Result` return
189/// type is preserved for forward compatibility.
190///
191/// # Panics
192///
193/// Panics via `msg_len` if `data.len()` exceeds `i32::MAX` bytes.
194pub fn sasl_response(data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
195    buf.put_u8(b'p');
196    buf.put_i32(4 + msg_len(data.len()));
197    buf.put_slice(data);
198    Ok(())
199}
200
201/// Writes a simple query message.
202///
203/// Executes a SQL query directly without using prepared statements.
204/// The server will respond with `RowDescription`, `DataRow`, `CommandComplete`, etc.
205///
206/// # Arguments
207///
208/// * `query` - SQL query string (null-terminated)
209/// * `buf` - Buffer to write the message to
210///
211/// # Errors
212///
213/// Currently infallible — always returns `Ok(())`. The `io::Result` return
214/// type is preserved for forward compatibility.
215///
216/// # Panics
217///
218/// Panics via `msg_len` if `query.len()` exceeds `i32::MAX` bytes
219/// (PostgreSQL's per-message cap).
220pub fn query(query: &str, buf: &mut BytesMut) -> io::Result<()> {
221    buf.put_u8(b'Q');
222    buf.put_i32(4 + msg_len(query.len()) + 1);
223    buf.put_slice(query.as_bytes());
224    buf.put_u8(0);
225    Ok(())
226}
227
228/// Writes a parse message (prepare statement).
229///
230/// Prepares a SQL statement for later execution. The server responds with
231/// `ParseComplete` and may send `ParameterDescription`.
232///
233/// # Arguments
234///
235/// * `name` - Statement name (null-terminated, empty string for unnamed)
236/// * `query` - SQL query string with parameter placeholders ($1, $2, etc.)
237/// * `param_types` - OIDs of parameter types (empty if types should be inferred)
238/// * `buf` - Buffer to write the message to
239///
240/// # Errors
241///
242/// Currently infallible — always returns `Ok(())`. The `io::Result` return
243/// type is preserved for forward compatibility.
244///
245/// # Panics
246///
247/// - Panics via `msg_len` if `name.len()`, `query.len()`, or the encoded
248///   message length exceeds `i32::MAX` bytes.
249/// - Panics via `msg_count` if `param_types.len()` exceeds `u16::MAX`
250///   (65 535). PostgreSQL caps parameters per query at 1 600, so overflow
251///   is a programming error.
252pub fn parse(name: &str, query: &str, param_types: &[Oid], buf: &mut BytesMut) -> io::Result<()> {
253    buf.put_u8(b'P');
254
255    let len = 4  // length itself
256        + msg_len(name.len()) + 1  // statement name
257        + msg_len(query.len()) + 1  // query string
258        + 2  // parameter count
259        + (msg_len(param_types.len()) * 4); // parameter OIDs
260
261    buf.put_i32(len);
262    buf.put_slice(name.as_bytes());
263    buf.put_u8(0);
264    buf.put_slice(query.as_bytes());
265    buf.put_u8(0);
266    buf.put_i16(msg_count(param_types.len()));
267    for oid in param_types {
268        buf.put_u32(oid.value());
269    }
270
271    Ok(())
272}
273
274/// Writes a bind message.
275///
276/// Binds parameter values to a prepared statement, creating a portal.
277/// The server responds with `BindComplete`.
278///
279/// # Arguments
280///
281/// * `portal` - Portal name (null-terminated, empty string for unnamed)
282/// * `statement` - Statement name (from Parse message)
283/// * `param_formats` - Format codes for each parameter (0 = text, 1 = binary)
284/// * `params` - Parameter values (None for NULL)
285/// * `result_formats` - Format codes for result columns (0 = text, 1 = binary)
286/// * `buf` - Buffer to write the message to
287///
288/// # Errors
289///
290/// Currently infallible — always returns `Ok(())`. The `io::Result` return
291/// type is preserved for forward compatibility.
292///
293/// # Panics
294///
295/// - Panics via `msg_len` if any string length, parameter length, or the
296///   total encoded message length exceeds `i32::MAX` bytes.
297/// - Panics via `msg_count` if `param_formats.len()`, `params.len()`, or
298///   `result_formats.len()` exceeds `u16::MAX` (65 535).
299pub fn bind(
300    portal: &str,
301    statement: &str,
302    param_formats: &[i16],
303    params: &[Option<&[u8]>],
304    result_formats: &[i16],
305    buf: &mut BytesMut,
306) -> io::Result<()> {
307    buf.put_u8(b'B');
308
309    // Calculate length
310    let mut len = 4  // length itself
311        + msg_len(portal.len()) + 1  // portal name
312        + msg_len(statement.len()) + 1  // statement name
313        + 2  // parameter format count
314        + (msg_len(param_formats.len()) * 2)  // format codes
315        + 2; // parameter value count
316
317    for param in params {
318        len += 4; // length field
319        if let Some(data) = param {
320            len += msg_len(data.len());
321        }
322    }
323
324    len += 2; // result format count
325    len += (msg_len(result_formats.len())) * 2; // result format codes
326
327    buf.put_i32(len);
328    buf.put_slice(portal.as_bytes());
329    buf.put_u8(0);
330    buf.put_slice(statement.as_bytes());
331    buf.put_u8(0);
332
333    // Parameter formats
334    buf.put_i16(msg_count(param_formats.len()));
335    for &format in param_formats {
336        buf.put_i16(format);
337    }
338
339    // Parameter values
340    buf.put_i16(msg_count(params.len()));
341    for param in params {
342        match param {
343            Some(data) => {
344                buf.put_i32(msg_len(data.len()));
345                buf.put_slice(data);
346            }
347            None => {
348                buf.put_i32(-1); // NULL
349            }
350        }
351    }
352
353    // Result formats
354    buf.put_i16(msg_count(result_formats.len()));
355    for &format in result_formats {
356        buf.put_i16(format);
357    }
358
359    Ok(())
360}
361
362/// Writes a describe message.
363///
364/// Requests metadata about a prepared statement or portal.
365/// The server responds with `RowDescription` or `ParameterDescription`.
366///
367/// # Arguments
368///
369/// * `kind` - 'S' for statement, 'P' for portal
370/// * `name` - Statement or portal name (null-terminated, empty string for unnamed)
371/// * `buf` - Buffer to write the message to
372///
373/// # Errors
374///
375/// Currently infallible — always returns `Ok(())`. The `io::Result` return
376/// type is preserved for forward compatibility.
377///
378/// # Panics
379///
380/// Panics via `msg_len` if `name.len()` exceeds `i32::MAX` bytes.
381pub fn describe(kind: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
382    buf.put_u8(b'D');
383    buf.put_i32(4 + 1 + msg_len(name.len()) + 1);
384    buf.put_u8(kind); // 'S' for statement, 'P' for portal
385    buf.put_slice(name.as_bytes());
386    buf.put_u8(0);
387    Ok(())
388}
389
390/// Writes an execute message.
391///
392/// Executes a portal (bound statement). The server responds with `DataRow`
393/// messages and `CommandComplete`. Use 0 for `max_rows` to fetch all rows.
394///
395/// # Arguments
396///
397/// * `portal` - Portal name (null-terminated, empty string for unnamed)
398/// * `max_rows` - Maximum number of rows to return (0 = unlimited)
399/// * `buf` - Buffer to write the message to
400///
401/// # Errors
402///
403/// Currently infallible — always returns `Ok(())`. The `io::Result` return
404/// type is preserved for forward compatibility.
405///
406/// # Panics
407///
408/// Panics via `msg_len` if `portal.len()` exceeds `i32::MAX` bytes.
409pub fn execute(portal: &str, max_rows: i32, buf: &mut BytesMut) -> io::Result<()> {
410    buf.put_u8(b'E');
411    buf.put_i32(4 + msg_len(portal.len()) + 1 + 4);
412    buf.put_slice(portal.as_bytes());
413    buf.put_u8(0);
414    buf.put_i32(max_rows);
415    Ok(())
416}
417
418/// Writes a sync message.
419///
420/// Forces the server to process all pending messages and respond with `ReadyForQuery`.
421/// Should be sent after completing a query sequence (Parse/Bind/Execute).
422pub fn sync(buf: &mut BytesMut) {
423    buf.put_u8(b'S');
424    buf.put_i32(4);
425}
426
427/// Writes a flush message.
428///
429/// Requests that the server flush its output buffer.
430/// The server will send any pending messages but does not send a response.
431pub fn flush(buf: &mut BytesMut) {
432    buf.put_u8(b'H');
433    buf.put_i32(4);
434}
435
436/// Writes a close message.
437///
438/// Closes a prepared statement or portal. The server responds with `CloseComplete`.
439///
440/// # Arguments
441///
442/// * `kind` - 'S' for statement, 'P' for portal
443/// * `name` - Statement or portal name (null-terminated, empty string for unnamed)
444/// * `buf` - Buffer to write the message to
445///
446/// # Errors
447///
448/// Currently infallible — always returns `Ok(())`. The `io::Result` return
449/// type is preserved for forward compatibility.
450///
451/// # Panics
452///
453/// Panics via `msg_len` if `name.len()` exceeds `i32::MAX` bytes.
454pub fn close(kind: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
455    buf.put_u8(b'C');
456    buf.put_i32(4 + 1 + msg_len(name.len()) + 1);
457    buf.put_u8(kind); // 'S' for statement, 'P' for portal
458    buf.put_slice(name.as_bytes());
459    buf.put_u8(0);
460    Ok(())
461}
462
463/// Writes a terminate message.
464///
465/// Closes the connection gracefully. The server closes the connection
466/// after receiving this message. No response is sent.
467pub fn terminate(buf: &mut BytesMut) {
468    buf.put_u8(b'X');
469    buf.put_i32(4);
470}
471
472/// Writes a cancel request.
473///
474/// Cancels a running query. This is sent on a separate connection from
475/// the main connection. The server does not send a response.
476///
477/// # Arguments
478///
479/// * `process_id` - Backend process ID (from `BackendKeyData` message)
480/// * `secret_key` - Secret key (from `BackendKeyData` message)
481/// * `buf` - Buffer to write the message to
482pub fn cancel_request(process_id: i32, secret_key: i32, buf: &mut BytesMut) {
483    buf.put_i32(16); // Length
484    buf.put_i32(80877102); // Cancel request code
485    buf.put_i32(process_id);
486    buf.put_i32(secret_key);
487}
488
489/// Writes a copy data message.
490///
491/// Sends a chunk of COPY data to the server during COPY IN operation.
492/// The data format depends on the COPY format (text or binary).
493///
494/// # Arguments
495///
496/// * `data` - COPY data bytes
497/// * `buf` - Buffer to write the message to
498pub fn copy_data(data: &[u8], buf: &mut BytesMut) {
499    buf.put_u8(b'd');
500    buf.put_i32(4 + msg_len(data.len()));
501    buf.put_slice(data);
502}
503
504/// Writes a copy done message.
505///
506/// Indicates that all COPY data has been sent (end of COPY IN operation).
507/// The server responds with `CommandComplete`.
508pub fn copy_done(buf: &mut BytesMut) {
509    buf.put_u8(b'c');
510    buf.put_i32(4);
511}
512
513/// Writes a copy fail message.
514///
515/// Aborts a COPY IN operation with an error message.
516/// The server responds with `ErrorResponse`.
517///
518/// # Arguments
519///
520/// * `message` - Error message explaining why COPY failed
521/// * `buf` - Buffer to write the message to
522pub fn copy_fail(message: &str, buf: &mut BytesMut) {
523    buf.put_u8(b'f');
524    buf.put_i32(4 + msg_len(message.len()) + 1);
525    buf.put_slice(message.as_bytes());
526    buf.put_u8(0);
527}