hyperdb_api_core/protocol/message/frontend.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Frontend (client-to-server) messages.
5//!
6//! Each function in this module writes a single `PostgreSQL` wire protocol
7//! message into a [`BytesMut`] buffer. All structural fields (tags, lengths,
8//! counts) are written in **`BigEndian`** per the `PostgreSQL` specification.
9//!
10//! The functions are intentionally stateless -- they append bytes to a buffer
11//! and do not manage connection state. The connection layer in `hyper-client`
12//! is responsible for sequencing messages correctly (e.g. Parse/Bind/Execute
13//! followed by Sync).
14//!
15//! # Attribution
16//!
17//! Portions of this module were adapted from
18//! [`postgres-protocol`](https://github.com/sfackler/rust-postgres)'s
19//! `message/frontend.rs` (Copyright (c) 2016 Steven Fackler, MIT or
20//! Apache-2.0). Adapted material includes the PostgreSQL v3.0
21//! protocol-version literal (`196608`, `(3 << 16) | 0`), the startup-message
22//! framing layout, and the `msg_len` helper. Hyper-specific changes added
23//! on top include performance work and Hyper-specific message variants.
24//! See the `NOTICE` file at the repo root for the full upstream copyright
25//! and reproduced license text.
26
27use bytes::{BufMut, BytesMut};
28use std::io;
29
30use crate::types::Oid;
31
32/// Narrows a `usize` length to the `PostgreSQL` wire-protocol `i32` length prefix.
33///
34/// Panics on overflow. The `PostgreSQL` protocol caps individual messages at
35/// `i32::MAX` bytes (~2 GiB); any single string, query, or byte slice exceeding
36/// that is a programming error, not a runtime-recoverable condition. Per the
37/// M-PANIC-ON-BUG guideline, we panic rather than threading a `Result` through
38/// every wire-format helper.
39#[inline]
40fn msg_len(n: usize) -> i32 {
41 i32::try_from(n).expect("PostgreSQL wire message field exceeds i32::MAX bytes")
42}
43
44#[expect(
45 clippy::similar_names,
46 reason = "paired bindings (request/response, reader/writer, etc.) are more readable with symmetric names than artificially distinct ones"
47)]
48/// Narrows a `usize` count to the `PostgreSQL` wire-protocol 16-bit parameter /
49/// format count.
50///
51/// The wire field is `Int16` (signed on the wire, but values `0..=65_535` are
52/// legal — the high bit is a permitted value, not a sign). We go through `u16`
53/// first to trap genuine overflow, then bit-reinterpret to `i16` so
54/// [`bytes::BufMut::put_i16`] writes the expected `BigEndian` representation.
55///
56/// Panics if the count exceeds `65_535`. In practice Hyper/PostgreSQL cap this
57/// far lower (1600 parameters per query), so overflow is a programming error.
58#[inline]
59fn msg_count(n: usize) -> i16 {
60 let as_u16 = u16::try_from(n)
61 .expect("PostgreSQL wire message count exceeds u16::MAX (max 65_535 parameters)");
62 // Bit-pattern reinterpret: `put_i16` writes the same 16 bits regardless of
63 // signed/unsigned interpretation.
64 #[expect(
65 clippy::cast_possible_wrap,
66 reason = "intentional bit-pattern reinterpret; wire field is semantically 0..=65_535"
67 )]
68 let as_i16 = as_u16 as i16;
69 as_i16
70}
71
72/// Writes a startup message to the buffer.
73///
74/// The startup message establishes the connection and sets initial parameters.
75/// This is the first message sent by the client after establishing a TCP connection.
76///
77/// # Arguments
78///
79/// * `parameters` - Key-value pairs of connection parameters (e.g., "user", "database")
80/// * `buf` - Buffer to write the message to
81///
82/// # Errors
83///
84/// Currently infallible — always returns `Ok(())`. The `io::Result` return
85/// type is preserved for forward compatibility so that future validation
86/// (e.g. parameter-length checks) can surface errors without a breaking
87/// signature change.
88///
89/// # Panics
90///
91/// Panics via `msg_len` if the total encoded message length exceeds
92/// `i32::MAX` bytes (~2 GiB). PostgreSQL caps messages at that size, so
93/// exceeding it is a programming error.
94pub fn startup_message(parameters: &[(&str, &str)], buf: &mut BytesMut) -> io::Result<()> {
95 // Reserve space for length
96 let len_idx = buf.len();
97 buf.put_i32(0);
98
99 // Protocol version 3.0
100 buf.put_i32(196608); // (3 << 16) | 0
101
102 // Parameters
103 for (name, value) in parameters {
104 buf.put_slice(name.as_bytes());
105 buf.put_u8(0);
106 buf.put_slice(value.as_bytes());
107 buf.put_u8(0);
108 }
109
110 // Terminator
111 buf.put_u8(0);
112
113 // Update length
114 let len = msg_len(buf.len() - len_idx);
115 buf[len_idx..len_idx + 4].copy_from_slice(&len.to_be_bytes());
116
117 Ok(())
118}
119
120/// Writes a password message (for authentication).
121///
122/// Sent in response to `AuthenticationCleartextPassword` or after MD5 hashing.
123///
124/// # Arguments
125///
126/// * `password` - The password to send (plaintext or MD5 hash)
127/// * `buf` - Buffer to write the message to
128///
129/// # Errors
130///
131/// Currently infallible — always returns `Ok(())`. The `io::Result` return
132/// type is preserved for forward compatibility.
133///
134/// # Panics
135///
136/// Panics via `msg_len` if `password.len()` exceeds `i32::MAX` bytes.
137pub fn password_message(password: &str, buf: &mut BytesMut) -> io::Result<()> {
138 buf.put_u8(b'p');
139 buf.put_i32(4 + msg_len(password.len()) + 1);
140 buf.put_slice(password.as_bytes());
141 buf.put_u8(0);
142 Ok(())
143}
144
145/// Writes a SASL initial response message.
146///
147/// Used to initiate SASL authentication (e.g., SCRAM-SHA-256).
148/// Sent in response to `AuthenticationSasl` message.
149///
150/// # Arguments
151///
152/// * `mechanism` - The SASL mechanism name to use
153/// * `data` - Initial client response data (may be empty)
154/// * `buf` - Buffer to write the message to
155///
156/// # Errors
157///
158/// Currently infallible — always returns `Ok(())`. The `io::Result` return
159/// type is preserved for forward compatibility.
160///
161/// # Panics
162///
163/// Panics via `msg_len` if `mechanism.len()` or `data.len()` exceeds
164/// `i32::MAX` bytes.
165pub fn sasl_initial_response(mechanism: &str, data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
166 buf.put_u8(b'p');
167 let len = 4 + msg_len(mechanism.len()) + 1 + 4 + msg_len(data.len());
168 buf.put_i32(len);
169 buf.put_slice(mechanism.as_bytes());
170 buf.put_u8(0);
171 buf.put_i32(msg_len(data.len()));
172 buf.put_slice(data);
173 Ok(())
174}
175
176/// Writes a SASL response message.
177///
178/// Used to continue SASL authentication exchange.
179/// Sent in response to `AuthenticationSaslContinue` messages.
180///
181/// # Arguments
182///
183/// * `data` - Client response data for this authentication step
184/// * `buf` - Buffer to write the message to
185///
186/// # Errors
187///
188/// Currently infallible — always returns `Ok(())`. The `io::Result` return
189/// type is preserved for forward compatibility.
190///
191/// # Panics
192///
193/// Panics via `msg_len` if `data.len()` exceeds `i32::MAX` bytes.
194pub fn sasl_response(data: &[u8], buf: &mut BytesMut) -> io::Result<()> {
195 buf.put_u8(b'p');
196 buf.put_i32(4 + msg_len(data.len()));
197 buf.put_slice(data);
198 Ok(())
199}
200
201/// Writes a simple query message.
202///
203/// Executes a SQL query directly without using prepared statements.
204/// The server will respond with `RowDescription`, `DataRow`, `CommandComplete`, etc.
205///
206/// # Arguments
207///
208/// * `query` - SQL query string (null-terminated)
209/// * `buf` - Buffer to write the message to
210///
211/// # Errors
212///
213/// Currently infallible — always returns `Ok(())`. The `io::Result` return
214/// type is preserved for forward compatibility.
215///
216/// # Panics
217///
218/// Panics via `msg_len` if `query.len()` exceeds `i32::MAX` bytes
219/// (PostgreSQL's per-message cap).
220pub fn query(query: &str, buf: &mut BytesMut) -> io::Result<()> {
221 buf.put_u8(b'Q');
222 buf.put_i32(4 + msg_len(query.len()) + 1);
223 buf.put_slice(query.as_bytes());
224 buf.put_u8(0);
225 Ok(())
226}
227
228/// Writes a parse message (prepare statement).
229///
230/// Prepares a SQL statement for later execution. The server responds with
231/// `ParseComplete` and may send `ParameterDescription`.
232///
233/// # Arguments
234///
235/// * `name` - Statement name (null-terminated, empty string for unnamed)
236/// * `query` - SQL query string with parameter placeholders ($1, $2, etc.)
237/// * `param_types` - OIDs of parameter types (empty if types should be inferred)
238/// * `buf` - Buffer to write the message to
239///
240/// # Errors
241///
242/// Currently infallible — always returns `Ok(())`. The `io::Result` return
243/// type is preserved for forward compatibility.
244///
245/// # Panics
246///
247/// - Panics via `msg_len` if `name.len()`, `query.len()`, or the encoded
248/// message length exceeds `i32::MAX` bytes.
249/// - Panics via `msg_count` if `param_types.len()` exceeds `u16::MAX`
250/// (65 535). PostgreSQL caps parameters per query at 1 600, so overflow
251/// is a programming error.
252pub fn parse(name: &str, query: &str, param_types: &[Oid], buf: &mut BytesMut) -> io::Result<()> {
253 buf.put_u8(b'P');
254
255 let len = 4 // length itself
256 + msg_len(name.len()) + 1 // statement name
257 + msg_len(query.len()) + 1 // query string
258 + 2 // parameter count
259 + (msg_len(param_types.len()) * 4); // parameter OIDs
260
261 buf.put_i32(len);
262 buf.put_slice(name.as_bytes());
263 buf.put_u8(0);
264 buf.put_slice(query.as_bytes());
265 buf.put_u8(0);
266 buf.put_i16(msg_count(param_types.len()));
267 for oid in param_types {
268 buf.put_u32(oid.value());
269 }
270
271 Ok(())
272}
273
274/// Writes a bind message.
275///
276/// Binds parameter values to a prepared statement, creating a portal.
277/// The server responds with `BindComplete`.
278///
279/// # Arguments
280///
281/// * `portal` - Portal name (null-terminated, empty string for unnamed)
282/// * `statement` - Statement name (from Parse message)
283/// * `param_formats` - Format codes for each parameter (0 = text, 1 = binary)
284/// * `params` - Parameter values (None for NULL)
285/// * `result_formats` - Format codes for result columns (0 = text, 1 = binary)
286/// * `buf` - Buffer to write the message to
287///
288/// # Errors
289///
290/// Currently infallible — always returns `Ok(())`. The `io::Result` return
291/// type is preserved for forward compatibility.
292///
293/// # Panics
294///
295/// - Panics via `msg_len` if any string length, parameter length, or the
296/// total encoded message length exceeds `i32::MAX` bytes.
297/// - Panics via `msg_count` if `param_formats.len()`, `params.len()`, or
298/// `result_formats.len()` exceeds `u16::MAX` (65 535).
299pub fn bind(
300 portal: &str,
301 statement: &str,
302 param_formats: &[i16],
303 params: &[Option<&[u8]>],
304 result_formats: &[i16],
305 buf: &mut BytesMut,
306) -> io::Result<()> {
307 buf.put_u8(b'B');
308
309 // Calculate length
310 let mut len = 4 // length itself
311 + msg_len(portal.len()) + 1 // portal name
312 + msg_len(statement.len()) + 1 // statement name
313 + 2 // parameter format count
314 + (msg_len(param_formats.len()) * 2) // format codes
315 + 2; // parameter value count
316
317 for param in params {
318 len += 4; // length field
319 if let Some(data) = param {
320 len += msg_len(data.len());
321 }
322 }
323
324 len += 2; // result format count
325 len += (msg_len(result_formats.len())) * 2; // result format codes
326
327 buf.put_i32(len);
328 buf.put_slice(portal.as_bytes());
329 buf.put_u8(0);
330 buf.put_slice(statement.as_bytes());
331 buf.put_u8(0);
332
333 // Parameter formats
334 buf.put_i16(msg_count(param_formats.len()));
335 for &format in param_formats {
336 buf.put_i16(format);
337 }
338
339 // Parameter values
340 buf.put_i16(msg_count(params.len()));
341 for param in params {
342 match param {
343 Some(data) => {
344 buf.put_i32(msg_len(data.len()));
345 buf.put_slice(data);
346 }
347 None => {
348 buf.put_i32(-1); // NULL
349 }
350 }
351 }
352
353 // Result formats
354 buf.put_i16(msg_count(result_formats.len()));
355 for &format in result_formats {
356 buf.put_i16(format);
357 }
358
359 Ok(())
360}
361
362/// Writes a describe message.
363///
364/// Requests metadata about a prepared statement or portal.
365/// The server responds with `RowDescription` or `ParameterDescription`.
366///
367/// # Arguments
368///
369/// * `kind` - 'S' for statement, 'P' for portal
370/// * `name` - Statement or portal name (null-terminated, empty string for unnamed)
371/// * `buf` - Buffer to write the message to
372///
373/// # Errors
374///
375/// Currently infallible — always returns `Ok(())`. The `io::Result` return
376/// type is preserved for forward compatibility.
377///
378/// # Panics
379///
380/// Panics via `msg_len` if `name.len()` exceeds `i32::MAX` bytes.
381pub fn describe(kind: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
382 buf.put_u8(b'D');
383 buf.put_i32(4 + 1 + msg_len(name.len()) + 1);
384 buf.put_u8(kind); // 'S' for statement, 'P' for portal
385 buf.put_slice(name.as_bytes());
386 buf.put_u8(0);
387 Ok(())
388}
389
390/// Writes an execute message.
391///
392/// Executes a portal (bound statement). The server responds with `DataRow`
393/// messages and `CommandComplete`. Use 0 for `max_rows` to fetch all rows.
394///
395/// # Arguments
396///
397/// * `portal` - Portal name (null-terminated, empty string for unnamed)
398/// * `max_rows` - Maximum number of rows to return (0 = unlimited)
399/// * `buf` - Buffer to write the message to
400///
401/// # Errors
402///
403/// Currently infallible — always returns `Ok(())`. The `io::Result` return
404/// type is preserved for forward compatibility.
405///
406/// # Panics
407///
408/// Panics via `msg_len` if `portal.len()` exceeds `i32::MAX` bytes.
409pub fn execute(portal: &str, max_rows: i32, buf: &mut BytesMut) -> io::Result<()> {
410 buf.put_u8(b'E');
411 buf.put_i32(4 + msg_len(portal.len()) + 1 + 4);
412 buf.put_slice(portal.as_bytes());
413 buf.put_u8(0);
414 buf.put_i32(max_rows);
415 Ok(())
416}
417
418/// Writes a sync message.
419///
420/// Forces the server to process all pending messages and respond with `ReadyForQuery`.
421/// Should be sent after completing a query sequence (Parse/Bind/Execute).
422pub fn sync(buf: &mut BytesMut) {
423 buf.put_u8(b'S');
424 buf.put_i32(4);
425}
426
427/// Writes a flush message.
428///
429/// Requests that the server flush its output buffer.
430/// The server will send any pending messages but does not send a response.
431pub fn flush(buf: &mut BytesMut) {
432 buf.put_u8(b'H');
433 buf.put_i32(4);
434}
435
436/// Writes a close message.
437///
438/// Closes a prepared statement or portal. The server responds with `CloseComplete`.
439///
440/// # Arguments
441///
442/// * `kind` - 'S' for statement, 'P' for portal
443/// * `name` - Statement or portal name (null-terminated, empty string for unnamed)
444/// * `buf` - Buffer to write the message to
445///
446/// # Errors
447///
448/// Currently infallible — always returns `Ok(())`. The `io::Result` return
449/// type is preserved for forward compatibility.
450///
451/// # Panics
452///
453/// Panics via `msg_len` if `name.len()` exceeds `i32::MAX` bytes.
454pub fn close(kind: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
455 buf.put_u8(b'C');
456 buf.put_i32(4 + 1 + msg_len(name.len()) + 1);
457 buf.put_u8(kind); // 'S' for statement, 'P' for portal
458 buf.put_slice(name.as_bytes());
459 buf.put_u8(0);
460 Ok(())
461}
462
463/// Writes a terminate message.
464///
465/// Closes the connection gracefully. The server closes the connection
466/// after receiving this message. No response is sent.
467pub fn terminate(buf: &mut BytesMut) {
468 buf.put_u8(b'X');
469 buf.put_i32(4);
470}
471
472/// Writes a cancel request.
473///
474/// Cancels a running query. This is sent on a separate connection from
475/// the main connection. The server does not send a response.
476///
477/// # Arguments
478///
479/// * `process_id` - Backend process ID (from `BackendKeyData` message)
480/// * `secret_key` - Secret key (from `BackendKeyData` message)
481/// * `buf` - Buffer to write the message to
482pub fn cancel_request(process_id: i32, secret_key: i32, buf: &mut BytesMut) {
483 buf.put_i32(16); // Length
484 buf.put_i32(80877102); // Cancel request code
485 buf.put_i32(process_id);
486 buf.put_i32(secret_key);
487}
488
489/// Writes a copy data message.
490///
491/// Sends a chunk of COPY data to the server during COPY IN operation.
492/// The data format depends on the COPY format (text or binary).
493///
494/// # Arguments
495///
496/// * `data` - COPY data bytes
497/// * `buf` - Buffer to write the message to
498pub fn copy_data(data: &[u8], buf: &mut BytesMut) {
499 buf.put_u8(b'd');
500 buf.put_i32(4 + msg_len(data.len()));
501 buf.put_slice(data);
502}
503
504/// Writes a copy done message.
505///
506/// Indicates that all COPY data has been sent (end of COPY IN operation).
507/// The server responds with `CommandComplete`.
508pub fn copy_done(buf: &mut BytesMut) {
509 buf.put_u8(b'c');
510 buf.put_i32(4);
511}
512
513/// Writes a copy fail message.
514///
515/// Aborts a COPY IN operation with an error message.
516/// The server responds with `ErrorResponse`.
517///
518/// # Arguments
519///
520/// * `message` - Error message explaining why COPY failed
521/// * `buf` - Buffer to write the message to
522pub fn copy_fail(message: &str, buf: &mut BytesMut) {
523 buf.put_u8(b'f');
524 buf.put_i32(4 + msg_len(message.len()) + 1);
525 buf.put_slice(message.as_bytes());
526 buf.put_u8(0);
527}