Skip to main content

chopin_pg/
codec.rs

1//! Zero-copy binary codec for PostgreSQL v3 wire protocol.
2//!
3//! All encoding writes directly into a caller-provided buffer.
4//! All decoding slices directly from the read buffer.
5
6use crate::protocol::*;
7
8/// Maximum size of a single PG message we'll accept (16 MB).
9pub const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024;
10
11// ─── Encoding (Frontend → Server) ─────────────────────────────
12
13/// Encode a StartupMessage into `buf`. Returns bytes written.
14///
15/// Format: Int32(len) Int32(196608=v3.0) { CString(param) CString(value) }* \0
16pub fn encode_startup(
17    buf: &mut [u8],
18    user: &str,
19    database: &str,
20    params: &[(&str, &str)],
21) -> usize {
22    let mut pos = 4; // reserve length prefix
23
24    // Protocol version 3.0
25    put_i32(buf, pos, 196608);
26    pos += 4;
27
28    // user
29    pos += put_cstring(buf, pos, "user");
30    pos += put_cstring(buf, pos, user);
31
32    // database
33    pos += put_cstring(buf, pos, "database");
34    pos += put_cstring(buf, pos, database);
35
36    // extra params
37    for (k, v) in params {
38        pos += put_cstring(buf, pos, k);
39        pos += put_cstring(buf, pos, v);
40    }
41
42    // terminator
43    buf[pos] = 0;
44    pos += 1;
45
46    // write length
47    put_i32(buf, 0, pos as i32);
48    pos
49}
50
51/// Encode a PasswordMessage (cleartext or md5 response).
52pub fn encode_password(buf: &mut [u8], password: &str) -> usize {
53    let mut pos = 0;
54    buf[pos] = b'p';
55    pos += 1;
56    let len = 4 + password.len() + 1; // length includes self + string + null
57    put_i32(buf, pos, len as i32);
58    pos += 4;
59    pos += put_cstring(buf, pos, password);
60    pos
61}
62
63/// Encode a SASLInitialResponse message.
64pub fn encode_sasl_initial(buf: &mut [u8], mechanism: &str, data: &[u8]) -> usize {
65    let mut pos = 0;
66    buf[pos] = b'p';
67    pos += 1;
68
69    // length placeholder
70    let len_pos = pos;
71    pos += 4;
72
73    // mechanism name (C string)
74    pos += put_cstring(buf, pos, mechanism);
75
76    // client-first-message length
77    put_i32(buf, pos, data.len() as i32);
78    pos += 4;
79
80    // client-first-message data
81    buf[pos..pos + data.len()].copy_from_slice(data);
82    pos += data.len();
83
84    // fill in length
85    put_i32(buf, len_pos, (pos - len_pos) as i32);
86    pos
87}
88
89/// Encode a SASLResponse message (client-final-message).
90pub fn encode_sasl_response(buf: &mut [u8], data: &[u8]) -> usize {
91    let mut pos = 0;
92    buf[pos] = b'p';
93    pos += 1;
94
95    let len = 4 + data.len();
96    put_i32(buf, pos, len as i32);
97    pos += 4;
98
99    buf[pos..pos + data.len()].copy_from_slice(data);
100    pos += data.len();
101    pos
102}
103
104/// Encode a Simple Query message ('Q').
105pub fn encode_query(buf: &mut [u8], sql: &str) -> usize {
106    let mut pos = 0;
107    buf[pos] = b'Q';
108    pos += 1;
109    let len = 4 + sql.len() + 1;
110    put_i32(buf, pos, len as i32);
111    pos += 4;
112    pos += put_cstring(buf, pos, sql);
113    pos
114}
115
116/// Encode a Parse message ('P') for the extended query protocol.
117pub fn encode_parse(buf: &mut [u8], stmt_name: &str, sql: &str, param_oids: &[i32]) -> usize {
118    let mut pos = 0;
119    buf[pos] = b'P';
120    pos += 1;
121
122    let len_pos = pos;
123    pos += 4;
124
125    pos += put_cstring(buf, pos, stmt_name);
126    pos += put_cstring(buf, pos, sql);
127
128    // number of parameter type OIDs
129    put_i16(buf, pos, param_oids.len() as i16);
130    pos += 2;
131    for &oid in param_oids {
132        put_i32(buf, pos, oid);
133        pos += 4;
134    }
135
136    put_i32(buf, len_pos, (pos - len_pos) as i32);
137    pos
138}
139
140/// Encode a Bind message ('B').
141pub fn encode_bind(
142    buf: &mut [u8],
143    portal: &str,
144    stmt_name: &str,
145    param_formats: &[i16],
146    param_values: &[Option<&[u8]>],
147    result_formats: &[i16],
148) -> usize {
149    let mut pos = 0;
150    buf[pos] = b'B';
151    pos += 1;
152
153    let len_pos = pos;
154    pos += 4;
155
156    pos += put_cstring(buf, pos, portal);
157    pos += put_cstring(buf, pos, stmt_name);
158
159    // parameter format codes
160    put_i16(buf, pos, param_formats.len() as i16);
161    pos += 2;
162    for &f in param_formats {
163        put_i16(buf, pos, f);
164        pos += 2;
165    }
166
167    // parameter values
168    put_i16(buf, pos, param_values.len() as i16);
169    pos += 2;
170    for val in param_values {
171        match val {
172            Some(data) => {
173                put_i32(buf, pos, data.len() as i32);
174                pos += 4;
175                buf[pos..pos + data.len()].copy_from_slice(data);
176                pos += data.len();
177            }
178            None => {
179                put_i32(buf, pos, -1); // NULL
180                pos += 4;
181            }
182        }
183    }
184
185    // result column format codes
186    put_i16(buf, pos, result_formats.len() as i16);
187    pos += 2;
188    for &f in result_formats {
189        put_i16(buf, pos, f);
190        pos += 2;
191    }
192
193    put_i32(buf, len_pos, (pos - len_pos) as i32);
194    pos
195}
196
197/// Encode a Describe message ('D').
198pub fn encode_describe(buf: &mut [u8], target: DescribeTarget, name: &str) -> usize {
199    let mut pos = 0;
200    buf[pos] = b'D';
201    pos += 1;
202
203    let len_pos = pos;
204    pos += 4;
205
206    buf[pos] = match target {
207        DescribeTarget::Statement => b'S',
208        DescribeTarget::Portal => b'P',
209    };
210    pos += 1;
211
212    pos += put_cstring(buf, pos, name);
213
214    put_i32(buf, len_pos, (pos - len_pos) as i32);
215    pos
216}
217
218/// Encode an Execute message ('E').
219pub fn encode_execute(buf: &mut [u8], portal: &str, max_rows: i32) -> usize {
220    let mut pos = 0;
221    buf[pos] = b'E';
222    pos += 1;
223
224    let len_pos = pos;
225    pos += 4;
226
227    pos += put_cstring(buf, pos, portal);
228    put_i32(buf, pos, max_rows);
229    pos += 4;
230
231    put_i32(buf, len_pos, (pos - len_pos) as i32);
232    pos
233}
234
235/// Encode a Sync message ('S').
236pub fn encode_sync(buf: &mut [u8]) -> usize {
237    buf[0] = b'S';
238    put_i32(buf, 1, 4);
239    5
240}
241
242/// Encode a Flush message ('H').
243pub fn encode_flush(buf: &mut [u8]) -> usize {
244    buf[0] = b'H';
245    put_i32(buf, 1, 4);
246    5
247}
248
249/// Encode a Terminate message ('X').
250pub fn encode_terminate(buf: &mut [u8]) -> usize {
251    buf[0] = b'X';
252    put_i32(buf, 1, 4);
253    5
254}
255
256/// Encode a Close message ('C').
257pub fn encode_close(buf: &mut [u8], target: CloseTarget, name: &str) -> usize {
258    let mut pos = 0;
259    buf[pos] = b'C';
260    pos += 1;
261
262    let len_pos = pos;
263    pos += 4;
264
265    buf[pos] = match target {
266        CloseTarget::Statement => b'S',
267        CloseTarget::Portal => b'P',
268    };
269    pos += 1;
270    pos += put_cstring(buf, pos, name);
271
272    put_i32(buf, len_pos, (pos - len_pos) as i32);
273    pos
274}
275
276/// Encode a CopyData message ('d').
277pub fn encode_copy_data(buf: &mut [u8], data: &[u8]) -> usize {
278    let mut pos = 0;
279    buf[pos] = b'd';
280    pos += 1;
281    let len = 4 + data.len();
282    put_i32(buf, pos, len as i32);
283    pos += 4;
284    buf[pos..pos + data.len()].copy_from_slice(data);
285    pos += data.len();
286    pos
287}
288
289/// Encode a CopyDone message ('c').
290pub fn encode_copy_done(buf: &mut [u8]) -> usize {
291    buf[0] = b'c';
292    put_i32(buf, 1, 4);
293    5
294}
295
296/// Encode a CopyFail message ('f').
297///
298/// Sent by the frontend to abort a COPY FROM STDIN operation.
299/// The `reason` string is included in the server's error response.
300pub fn encode_copy_fail(buf: &mut [u8], reason: &str) -> usize {
301    let mut pos = 0;
302    buf[pos] = b'f';
303    pos += 1;
304    let len = 4 + reason.len() + 1;
305    put_i32(buf, pos, len as i32);
306    pos += 4;
307    pos += put_cstring(buf, pos, reason);
308    pos
309}
310
311// ─── Decoding (Server → Frontend) ─────────────────────────────
312
313/// A decoded backend message header.
314#[derive(Debug, Clone, Copy)]
315pub struct MessageHeader {
316    pub tag: BackendTag,
317    pub length: u32, // length including 4-byte self but excluding tag byte
318}
319
320/// Try to read a complete message header from `buf`.
321/// Returns None if not enough data is available.
322pub fn decode_header(buf: &[u8]) -> Option<MessageHeader> {
323    if buf.len() < 5 {
324        return None;
325    }
326    let tag = BackendTag::from(buf[0]);
327    let length = read_u32(buf, 1);
328    Some(MessageHeader { tag, length })
329}
330
331/// Check if a complete message is available in `buf`.
332/// Returns `None` if not enough data, or if the message exceeds MAX_MESSAGE_SIZE.
333pub fn message_complete(buf: &[u8]) -> Option<usize> {
334    if buf.len() < 5 {
335        return None;
336    }
337    let length = read_u32(buf, 1) as usize;
338    // Reject messages that exceed our safety limit
339    if length > MAX_MESSAGE_SIZE {
340        return None;
341    }
342    let total = 1 + length; // tag + length-included body
343    if buf.len() >= total {
344        Some(total)
345    } else {
346        None
347    }
348}
349
350/// Read an i32 from a backend message body.
351pub fn read_i32(buf: &[u8], offset: usize) -> i32 {
352    i32::from_be_bytes([
353        buf[offset],
354        buf[offset + 1],
355        buf[offset + 2],
356        buf[offset + 3],
357    ])
358}
359
360/// Read a u32 from a backend message body.
361pub fn read_u32(buf: &[u8], offset: usize) -> u32 {
362    u32::from_be_bytes([
363        buf[offset],
364        buf[offset + 1],
365        buf[offset + 2],
366        buf[offset + 3],
367    ])
368}
369
370/// Read an i16 from a backend message body.
371pub fn read_i16(buf: &[u8], offset: usize) -> i16 {
372    i16::from_be_bytes([buf[offset], buf[offset + 1]])
373}
374
375/// Read a C-string from `buf[offset..]`. Returns the string slice and bytes consumed (including null).
376pub fn read_cstring(buf: &[u8], offset: usize) -> (&str, usize) {
377    let start = offset;
378    let mut end = start;
379    while end < buf.len() && buf[end] != 0 {
380        end += 1;
381    }
382    let s = std::str::from_utf8(&buf[start..end]).unwrap_or("");
383    (s, end - start + 1) // +1 for null terminator
384}
385
386/// Parse an ErrorResponse or NoticeResponse message body.
387/// Returns a list of (field_type, value) pairs.
388pub fn parse_error_fields(body: &[u8]) -> Vec<(u8, String)> {
389    let mut fields = Vec::new();
390    let mut pos = 0;
391    while pos < body.len() {
392        let field_type = body[pos];
393        pos += 1;
394        if field_type == 0 {
395            break;
396        }
397        let (value, consumed) = read_cstring(body, pos);
398        fields.push((field_type, value.to_string()));
399        pos += consumed;
400    }
401    fields
402}
403
404/// Parse a RowDescription message body.
405/// Returns column descriptors: (name, table_oid, col_attr, type_oid, type_size, type_modifier, format_code)
406pub fn parse_row_description(body: &[u8]) -> Vec<ColumnDesc> {
407    let num_fields = read_i16(body, 0) as usize;
408    let mut columns = Vec::with_capacity(num_fields);
409    let mut pos = 2;
410
411    for _ in 0..num_fields {
412        let (name, consumed) = read_cstring(body, pos);
413        pos += consumed;
414
415        let table_oid = read_i32(body, pos) as u32;
416        pos += 4;
417        let col_attr = read_i16(body, pos);
418        pos += 2;
419        let type_oid = read_i32(body, pos) as u32;
420        pos += 4;
421        let type_size = read_i16(body, pos);
422        pos += 2;
423        let type_modifier = read_i32(body, pos);
424        pos += 4;
425        let format_code = FormatCode::from(read_i16(body, pos));
426        pos += 2;
427
428        columns.push(ColumnDesc {
429            name: name.to_string(),
430            table_oid,
431            col_attr,
432            type_oid,
433            type_size,
434            type_modifier,
435            format_code,
436        });
437    }
438    columns
439}
440
441/// Parse a DataRow message body. Returns column byte slices.
442/// Each column is Option<&[u8]> where None = SQL NULL.
443pub fn parse_data_row(body: &[u8]) -> Vec<Option<&[u8]>> {
444    let num_columns = read_i16(body, 0) as usize;
445    let mut columns = Vec::with_capacity(num_columns);
446    let mut pos = 2;
447
448    for _ in 0..num_columns {
449        let len = read_i32(body, pos);
450        pos += 4;
451        if len < 0 {
452            columns.push(None); // NULL
453        } else {
454            let len = len as usize;
455            columns.push(Some(&body[pos..pos + len]));
456            pos += len;
457        }
458    }
459    columns
460}
461
462/// A column descriptor from RowDescription.
463#[derive(Debug, Clone)]
464pub struct ColumnDesc {
465    pub name: String,
466    pub table_oid: u32,
467    pub col_attr: i16,
468    pub type_oid: u32,
469    pub type_size: i16,
470    pub type_modifier: i32,
471    pub format_code: FormatCode,
472}
473
474// ─── Helper Functions ──────────────────────────────────────────
475
476fn put_i32(buf: &mut [u8], offset: usize, value: i32) {
477    let bytes = value.to_be_bytes();
478    buf[offset..offset + 4].copy_from_slice(&bytes);
479}
480
481fn put_i16(buf: &mut [u8], offset: usize, value: i16) {
482    let bytes = value.to_be_bytes();
483    buf[offset..offset + 2].copy_from_slice(&bytes);
484}
485
486fn put_cstring(buf: &mut [u8], offset: usize, s: &str) -> usize {
487    let bytes = s.as_bytes();
488    buf[offset..offset + bytes.len()].copy_from_slice(bytes);
489    buf[offset + bytes.len()] = 0;
490    bytes.len() + 1
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use crate::protocol::{BackendTag, CloseTarget, DescribeTarget, FormatCode};
497
498    #[test]
499    fn test_startup_encoding() {
500        let mut buf = [0u8; 256];
501        let n = encode_startup(&mut buf, "postgres", "mydb", &[]);
502        assert!(n > 0);
503        // Check protocol version at offset 4
504        assert_eq!(read_i32(&buf, 4), 196608);
505    }
506
507    #[test]
508    fn test_query_encoding() {
509        let mut buf = [0u8; 256];
510        let n = encode_query(&mut buf, "SELECT 1");
511        assert_eq!(buf[0], b'Q');
512        assert!(n > 5);
513    }
514
515    #[test]
516    fn test_sync_encoding() {
517        let mut buf = [0u8; 8];
518        let n = encode_sync(&mut buf);
519        assert_eq!(n, 5);
520        assert_eq!(buf[0], b'S');
521    }
522
523    #[test]
524    fn test_message_complete() {
525        // tag(1) + length(4) = 5 bytes minimum
526        let msg = [b'Z', 0, 0, 0, 5, b'I'];
527        assert_eq!(message_complete(&msg), Some(6));
528        assert_eq!(message_complete(&msg[..4]), None); // incomplete
529    }
530
531    #[test]
532    fn test_message_complete_rejects_oversized() {
533        // Length field = MAX_MESSAGE_SIZE + 1 → should return None
534        let huge_len = (MAX_MESSAGE_SIZE + 1) as u32;
535        let mut msg = [0u8; 6];
536        msg[0] = b'D';
537        msg[1..5].copy_from_slice(&huge_len.to_be_bytes());
538        assert_eq!(message_complete(&msg), None);
539    }
540
541    #[test]
542    fn test_copy_fail_encoding() {
543        let mut buf = [0u8; 256];
544        let n = encode_copy_fail(&mut buf, "abort test");
545        assert_eq!(buf[0], b'f');
546        // length = 4 + len("abort test") + 1 = 15
547        assert_eq!(read_i32(&buf, 1), 15);
548        assert_eq!(n, 1 + 15); // tag + length-included body
549    }
550
551    // ─── Extended Query Protocol Encoding ────────────────────────────────────
552
553    #[test]
554    fn test_parse_encoding_tag_and_name() {
555        let mut buf = [0u8; 256];
556        let n = encode_parse(&mut buf, "s0", "SELECT $1", &[23]); // 23 = INT4 OID
557        assert!(n > 0);
558        assert_eq!(buf[0], b'P');
559        // cstring "s0\0SELECT $1\0" + i16(1) + i32(23)
560        // length = 4 + 3 + 10 + 2 + 4 = 23
561        let length = read_i32(&buf, 1);
562        assert_eq!(length as usize, n - 1); // length field excludes the tag byte
563        assert!(n < 256, "shouldn't exceed buffer");
564    }
565
566    #[test]
567    fn test_parse_encoding_anonymous_no_oids() {
568        let mut buf = [0u8; 128];
569        let n = encode_parse(&mut buf, "", "SELECT 1", &[]);
570        assert_eq!(buf[0], b'P');
571        assert!(n > 5);
572    }
573
574    #[test]
575    fn test_bind_encoding_no_params() {
576        let mut buf = [0u8; 256];
577        let n = encode_bind(&mut buf, "", "s0", &[], &[], &[1]); // binary results
578        assert_eq!(buf[0], b'B');
579        let length = read_i32(&buf, 1) as usize;
580        assert_eq!(length, n - 1);
581        // result format count should be 1
582        // find it: after portal(""\0) + stmt("s0\0") + param_format_count(i16) + param_count(i16)
583        // = 1 + 3 + 2 + 2 = 8 bytes after tag+length
584        let result_format_count = read_i16(&buf, 1 + 4 + 1 + 3 + 2 + 2);
585        assert_eq!(result_format_count, 1);
586    }
587
588    #[test]
589    fn test_bind_encoding_null_param() {
590        let mut buf = [0u8; 256];
591        let n = encode_bind(&mut buf, "", "s0", &[0], &[None], &[]);
592        assert_eq!(buf[0], b'B');
593        assert!(n > 5);
594        // The NULL param should encode -1 as i32
595        // Layout: tag(1) + len(4) + portal(""\0=1) + stmt("s0\0"=3)
596        //       + param_fmt_cnt(2) + 1 fmt_code(2) + param_val_cnt(2) = 15
597        let null_marker = read_i32(&buf, 15);
598        assert_eq!(null_marker, -1, "NULL param must encode as -1");
599    }
600
601    #[test]
602    fn test_bind_encoding_with_text_param() {
603        let mut buf = [0u8; 256];
604        let value = b"hello";
605        let n = encode_bind(&mut buf, "", "s0", &[0], &[Some(value)], &[]);
606        assert_eq!(buf[0], b'B');
607        assert!(n > 5);
608    }
609
610    #[test]
611    fn test_execute_encoding() {
612        let mut buf = [0u8; 64];
613        let n = encode_execute(&mut buf, "", 0); // unlimited rows
614        assert_eq!(buf[0], b'E');
615        let length = read_i32(&buf, 1) as usize;
616        assert_eq!(length, n - 1);
617        // max_rows = 0 is at the end
618        let max_rows = read_i32(&buf, n - 4);
619        assert_eq!(max_rows, 0);
620    }
621
622    #[test]
623    fn test_execute_encoding_with_max_rows() {
624        let mut buf = [0u8; 64];
625        let n = encode_execute(&mut buf, "", 100);
626        assert_eq!(buf[0], b'E');
627        let max_rows = read_i32(&buf, n - 4);
628        assert_eq!(max_rows, 100);
629    }
630
631    #[test]
632    fn test_describe_statement_encoding() {
633        let mut buf = [0u8; 64];
634        let n = encode_describe(&mut buf, DescribeTarget::Statement, "s0");
635        assert_eq!(buf[0], b'D');
636        let length = read_i32(&buf, 1) as usize;
637        assert_eq!(length, n - 1);
638        // Target byte: 'S' for Statement
639        assert_eq!(buf[5], b'S');
640        // Statement name 's0\0' starts at offset 6
641        assert_eq!(&buf[6..9], b"s0\0");
642    }
643
644    #[test]
645    fn test_describe_portal_encoding() {
646        let mut buf = [0u8; 64];
647        let n = encode_describe(&mut buf, DescribeTarget::Portal, "myportal");
648        assert_eq!(buf[0], b'D');
649        assert_eq!(buf[5], b'P');
650        assert!(n > 5);
651    }
652
653    #[test]
654    fn test_close_statement_encoding() {
655        let mut buf = [0u8; 64];
656        let n = encode_close(&mut buf, CloseTarget::Statement, "s7");
657        assert_eq!(buf[0], b'C');
658        let length = read_i32(&buf, 1) as usize;
659        assert_eq!(length, n - 1);
660        assert_eq!(buf[5], b'S');
661        assert_eq!(&buf[6..9], b"s7\0");
662    }
663
664    #[test]
665    fn test_close_portal_encoding() {
666        let mut buf = [0u8; 64];
667        let n = encode_close(&mut buf, CloseTarget::Portal, "");
668        assert_eq!(buf[0], b'C');
669        assert_eq!(buf[5], b'P');
670        assert!(n > 0);
671    }
672
673    #[test]
674    fn test_terminate_encoding() {
675        let mut buf = [0u8; 8];
676        let n = encode_terminate(&mut buf);
677        assert_eq!(n, 5);
678        assert_eq!(buf[0], b'X');
679        assert_eq!(read_i32(&buf, 1), 4);
680    }
681
682    #[test]
683    fn test_flush_encoding() {
684        let mut buf = [0u8; 8];
685        let n = encode_flush(&mut buf);
686        assert_eq!(n, 5);
687        assert_eq!(buf[0], b'H');
688        assert_eq!(read_i32(&buf, 1), 4);
689    }
690
691    #[test]
692    fn test_copy_data_encoding() {
693        let mut buf = [0u8; 64];
694        let data = b"col1\tcol2\n";
695        let n = encode_copy_data(&mut buf, data);
696        assert_eq!(buf[0], b'd');
697        let length = read_i32(&buf, 1) as usize;
698        assert_eq!(length, 4 + data.len());
699        assert_eq!(n, 1 + length);
700        assert_eq!(&buf[5..5 + data.len()], data);
701    }
702
703    #[test]
704    fn test_copy_done_encoding() {
705        let mut buf = [0u8; 8];
706        let n = encode_copy_done(&mut buf);
707        assert_eq!(n, 5);
708        assert_eq!(buf[0], b'c');
709        assert_eq!(read_i32(&buf, 1), 4);
710    }
711
712    // ─── Decoding ─────────────────────────────────────────────────────────────
713
714    #[test]
715    fn test_decode_header_basic() {
716        let msg = [b'Z', 0, 0, 0, 5, b'I']; // ReadyForQuery
717        let hdr = decode_header(&msg).unwrap();
718        assert_eq!(hdr.tag, BackendTag::ReadyForQuery);
719        assert_eq!(hdr.length, 5);
720    }
721
722    #[test]
723    fn test_decode_header_too_short() {
724        let msg = [b'Z', 0, 0]; // only 3 bytes
725        assert!(decode_header(&msg).is_none());
726    }
727
728    #[test]
729    fn test_message_complete_exact_size() {
730        // tag(1) + len(4) = 5-byte header, body = 1 byte → total = 6
731        let msg = [b'Z', 0, 0, 0, 5, b'I'];
732        assert_eq!(message_complete(&msg), Some(6));
733    }
734
735    #[test]
736    fn test_message_complete_one_byte_short() {
737        let msg = [b'Z', 0, 0, 0, 5]; // header says 5 bytes body, but we only have 4 (no body)
738        assert_eq!(message_complete(&msg), None);
739    }
740
741    #[test]
742    fn test_message_complete_needs_exactly_5_bytes() {
743        // 4 bytes → None
744        assert_eq!(message_complete(&[b'Z', 0, 0, 0]), None);
745        // 5 bytes with length=4 (empty body) → Some(5)
746        let msg = [b'C', 0, 0, 0, 4]; // CommandComplete with no text
747        assert_eq!(message_complete(&msg), Some(5));
748    }
749
750    #[test]
751    fn test_message_complete_large_but_valid_payload() {
752        // Build a 10-byte payload message
753        let payload = [0u8; 10];
754        let mut msg = vec![b'D', 0, 0, 0, 14]; // length = 4 + 10 = 14
755        msg.extend_from_slice(&payload);
756        assert_eq!(message_complete(&msg), Some(15)); // 1 + 14
757    }
758
759    #[test]
760    fn test_parse_data_row_all_non_null() {
761        // DataRow with 2 columns: "hello" and "42"
762        // Format: i16(num_cols) | i32(len1) bytes1 | i32(len2) bytes2
763        let mut body = vec![];
764        body.extend_from_slice(&2i16.to_be_bytes()); // 2 columns
765        body.extend_from_slice(&5i32.to_be_bytes()); // col0 len = 5
766        body.extend_from_slice(b"hello");
767        body.extend_from_slice(&2i32.to_be_bytes()); // col1 len = 2
768        body.extend_from_slice(b"42");
769        let cols = parse_data_row(&body);
770        assert_eq!(cols.len(), 2);
771        assert_eq!(cols[0], Some(b"hello" as &[u8]));
772        assert_eq!(cols[1], Some(b"42" as &[u8]));
773    }
774
775    #[test]
776    fn test_parse_data_row_with_null() {
777        // DataRow with 2 columns: NULL and "value"
778        let mut body = vec![];
779        body.extend_from_slice(&2i16.to_be_bytes());
780        body.extend_from_slice(&(-1i32).to_be_bytes()); // NULL
781        body.extend_from_slice(&5i32.to_be_bytes());
782        body.extend_from_slice(b"value");
783        let cols = parse_data_row(&body);
784        assert_eq!(cols.len(), 2);
785        assert_eq!(cols[0], None);
786        assert_eq!(cols[1], Some(b"value" as &[u8]));
787    }
788
789    #[test]
790    fn test_parse_data_row_empty_row() {
791        let mut body = vec![];
792        body.extend_from_slice(&0i16.to_be_bytes()); // 0 columns
793        let cols = parse_data_row(&body);
794        assert_eq!(cols.len(), 0);
795    }
796
797    #[test]
798    fn test_parse_row_description_single_column() {
799        // Build a RowDescription body for 1 column "id" INT4 (OID=23, size=4)
800        let mut body = vec![];
801        body.extend_from_slice(&1i16.to_be_bytes()); // num_fields = 1
802        body.extend_from_slice(b"id\0"); // name + null terminator
803        body.extend_from_slice(&0i32.to_be_bytes()); // table_oid = 0
804        body.extend_from_slice(&0i16.to_be_bytes()); // col_attr = 0
805        body.extend_from_slice(&23i32.to_be_bytes()); // type_oid = INT4
806        body.extend_from_slice(&4i16.to_be_bytes()); // type_size = 4
807        body.extend_from_slice(&(-1i32).to_be_bytes()); // type_modifier = -1
808        body.extend_from_slice(&0i16.to_be_bytes()); // format_code = text
809        let cols = parse_row_description(&body);
810        assert_eq!(cols.len(), 1);
811        assert_eq!(cols[0].name, "id");
812        assert_eq!(cols[0].type_oid, 23);
813        assert_eq!(cols[0].type_size, 4);
814        assert!(matches!(cols[0].format_code, FormatCode::Text));
815    }
816
817    #[test]
818    fn test_parse_row_description_binary_format() {
819        let mut body = vec![];
820        body.extend_from_slice(&1i16.to_be_bytes());
821        body.extend_from_slice(b"score\0");
822        body.extend_from_slice(&0i32.to_be_bytes());
823        body.extend_from_slice(&0i16.to_be_bytes());
824        body.extend_from_slice(&701i32.to_be_bytes()); // FLOAT8 OID
825        body.extend_from_slice(&8i16.to_be_bytes());
826        body.extend_from_slice(&(-1i32).to_be_bytes());
827        body.extend_from_slice(&1i16.to_be_bytes()); // format = binary
828        let cols = parse_row_description(&body);
829        assert_eq!(cols.len(), 1);
830        assert_eq!(cols[0].name, "score");
831        assert!(matches!(cols[0].format_code, FormatCode::Binary));
832    }
833
834    #[test]
835    fn test_parse_error_fields_basic() {
836        // Severity='S', Code='C', Message='M', terminator='\0'
837        let mut body = vec![];
838        body.push(b'S');
839        body.extend_from_slice(b"ERROR\0");
840        body.push(b'C');
841        body.extend_from_slice(b"42601\0");
842        body.push(b'M');
843        body.extend_from_slice(b"syntax error\0");
844        body.push(0); // terminator
845        let fields = parse_error_fields(&body);
846        assert_eq!(fields.len(), 3);
847        assert_eq!(fields[0], (b'S', "ERROR".to_string()));
848        assert_eq!(fields[1], (b'C', "42601".to_string()));
849        assert_eq!(fields[2], (b'M', "syntax error".to_string()));
850    }
851
852    #[test]
853    fn test_parse_error_fields_empty() {
854        let body = [0u8]; // just the terminator
855        let fields = parse_error_fields(&body);
856        assert!(fields.is_empty());
857    }
858
859    // ─── Helper read functions ────────────────────────────────────────────────
860
861    #[test]
862    fn test_read_i32_big_endian() {
863        let buf = [0x00, 0x01, 0x86, 0xA0u8]; // 100000
864        assert_eq!(read_i32(&buf, 0), 100_000);
865    }
866
867    #[test]
868    fn test_read_i32_negative() {
869        let buf = (-1i32).to_be_bytes();
870        assert_eq!(read_i32(&buf, 0), -1);
871    }
872
873    #[test]
874    fn test_read_i16() {
875        let buf = [0x01, 0x00u8]; // 256
876        assert_eq!(read_i16(&buf, 0), 256);
877    }
878
879    #[test]
880    fn test_read_u32() {
881        let buf = 0xFF_FF_FF_FFu32.to_be_bytes();
882        assert_eq!(read_u32(&buf, 0), 0xFF_FF_FF_FF);
883    }
884
885    #[test]
886    fn test_read_cstring_normal() {
887        let buf = b"hello\0world";
888        let (s, consumed) = read_cstring(buf, 0);
889        assert_eq!(s, "hello");
890        assert_eq!(consumed, 6); // 5 chars + null
891    }
892
893    #[test]
894    fn test_read_cstring_empty() {
895        let buf = b"\0rest";
896        let (s, consumed) = read_cstring(buf, 0);
897        assert_eq!(s, "");
898        assert_eq!(consumed, 1);
899    }
900
901    #[test]
902    fn test_read_cstring_with_offset() {
903        let buf = b"skip\0name\0";
904        let (s, consumed) = read_cstring(buf, 5);
905        assert_eq!(s, "name");
906        assert_eq!(consumed, 5); // 4 chars + null
907    }
908}