Skip to main content

chopin_pg/
codec.rs

1//! Zero-copy binary codec for PostgreSQL v3 wire protocol.
2//!
3//! All encoding writes directly into a caller-provided buffer.
4//! All decoding slices directly from the read buffer.
5
6use crate::error::PgError;
7use crate::protocol::*;
8
9/// Maximum size of a single PG message we'll accept (16 MB).
10pub const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024;
11
12// ─── Encoding (Frontend → Server) ─────────────────────────────
13
14/// Encode a StartupMessage into `buf`. Returns bytes written.
15///
16/// Format: Int32(len) Int32(196608=v3.0) { CString(param) CString(value) }* \0
17pub fn encode_startup(
18    buf: &mut [u8],
19    user: &str,
20    database: &str,
21    params: &[(&str, &str)],
22) -> usize {
23    let mut pos = 4; // reserve length prefix
24
25    // Protocol version 3.0
26    put_i32(buf, pos, 196608);
27    pos += 4;
28
29    // user
30    pos += put_cstring(buf, pos, "user");
31    pos += put_cstring(buf, pos, user);
32
33    // database
34    pos += put_cstring(buf, pos, "database");
35    pos += put_cstring(buf, pos, database);
36
37    // extra params
38    for (k, v) in params {
39        pos += put_cstring(buf, pos, k);
40        pos += put_cstring(buf, pos, v);
41    }
42
43    // terminator
44    buf[pos] = 0;
45    pos += 1;
46
47    // write length
48    put_i32(buf, 0, pos as i32);
49    pos
50}
51
52/// Encode a PasswordMessage (cleartext or md5 response).
53pub fn encode_password(buf: &mut [u8], password: &str) -> usize {
54    let mut pos = 0;
55    buf[pos] = b'p';
56    pos += 1;
57    let len = 4 + password.len() + 1; // length includes self + string + null
58    put_i32(buf, pos, len as i32);
59    pos += 4;
60    pos += put_cstring(buf, pos, password);
61    pos
62}
63
64/// Encode a SASLInitialResponse message.
65pub fn encode_sasl_initial(buf: &mut [u8], mechanism: &str, data: &[u8]) -> usize {
66    let mut pos = 0;
67    buf[pos] = b'p';
68    pos += 1;
69
70    // length placeholder
71    let len_pos = pos;
72    pos += 4;
73
74    // mechanism name (C string)
75    pos += put_cstring(buf, pos, mechanism);
76
77    // client-first-message length
78    put_i32(buf, pos, data.len() as i32);
79    pos += 4;
80
81    // client-first-message data
82    buf[pos..pos + data.len()].copy_from_slice(data);
83    pos += data.len();
84
85    // fill in length
86    put_i32(buf, len_pos, (pos - len_pos) as i32);
87    pos
88}
89
90/// Encode a SASLResponse message (client-final-message).
91pub fn encode_sasl_response(buf: &mut [u8], data: &[u8]) -> usize {
92    let mut pos = 0;
93    buf[pos] = b'p';
94    pos += 1;
95
96    let len = 4 + data.len();
97    put_i32(buf, pos, len as i32);
98    pos += 4;
99
100    buf[pos..pos + data.len()].copy_from_slice(data);
101    pos += data.len();
102    pos
103}
104
105/// Encode a Simple Query message ('Q').
106pub fn encode_query(buf: &mut [u8], sql: &str) -> usize {
107    let mut pos = 0;
108    buf[pos] = b'Q';
109    pos += 1;
110    let len = 4 + sql.len() + 1;
111    put_i32(buf, pos, len as i32);
112    pos += 4;
113    pos += put_cstring(buf, pos, sql);
114    pos
115}
116
117/// Encode a Parse message ('P') for the extended query protocol.
118pub fn encode_parse(buf: &mut [u8], stmt_name: &str, sql: &str, param_oids: &[i32]) -> usize {
119    let mut pos = 0;
120    buf[pos] = b'P';
121    pos += 1;
122
123    let len_pos = pos;
124    pos += 4;
125
126    pos += put_cstring(buf, pos, stmt_name);
127    pos += put_cstring(buf, pos, sql);
128
129    // number of parameter type OIDs
130    put_i16(buf, pos, param_oids.len() as i16);
131    pos += 2;
132    for &oid in param_oids {
133        put_i32(buf, pos, oid);
134        pos += 4;
135    }
136
137    put_i32(buf, len_pos, (pos - len_pos) as i32);
138    pos
139}
140
141/// Encode a Bind message ('B').
142pub fn encode_bind(
143    buf: &mut [u8],
144    portal: &str,
145    stmt_name: &str,
146    param_formats: &[i16],
147    param_values: &[Option<&[u8]>],
148    result_formats: &[i16],
149) -> usize {
150    let mut pos = 0;
151    buf[pos] = b'B';
152    pos += 1;
153
154    let len_pos = pos;
155    pos += 4;
156
157    pos += put_cstring(buf, pos, portal);
158    pos += put_cstring(buf, pos, stmt_name);
159
160    // parameter format codes
161    put_i16(buf, pos, param_formats.len() as i16);
162    pos += 2;
163    for &f in param_formats {
164        put_i16(buf, pos, f);
165        pos += 2;
166    }
167
168    // parameter values
169    put_i16(buf, pos, param_values.len() as i16);
170    pos += 2;
171    for val in param_values {
172        match val {
173            Some(data) => {
174                put_i32(buf, pos, data.len() as i32);
175                pos += 4;
176                buf[pos..pos + data.len()].copy_from_slice(data);
177                pos += data.len();
178            }
179            None => {
180                put_i32(buf, pos, -1); // NULL
181                pos += 4;
182            }
183        }
184    }
185
186    // result column format codes
187    put_i16(buf, pos, result_formats.len() as i16);
188    pos += 2;
189    for &f in result_formats {
190        put_i16(buf, pos, f);
191        pos += 2;
192    }
193
194    put_i32(buf, len_pos, (pos - len_pos) as i32);
195    pos
196}
197
198/// Encode a Describe message ('D').
199pub fn encode_describe(buf: &mut [u8], target: DescribeTarget, name: &str) -> usize {
200    let mut pos = 0;
201    buf[pos] = b'D';
202    pos += 1;
203
204    let len_pos = pos;
205    pos += 4;
206
207    buf[pos] = match target {
208        DescribeTarget::Statement => b'S',
209        DescribeTarget::Portal => b'P',
210    };
211    pos += 1;
212
213    pos += put_cstring(buf, pos, name);
214
215    put_i32(buf, len_pos, (pos - len_pos) as i32);
216    pos
217}
218
219/// Encode an Execute message ('E').
220pub fn encode_execute(buf: &mut [u8], portal: &str, max_rows: i32) -> usize {
221    let mut pos = 0;
222    buf[pos] = b'E';
223    pos += 1;
224
225    let len_pos = pos;
226    pos += 4;
227
228    pos += put_cstring(buf, pos, portal);
229    put_i32(buf, pos, max_rows);
230    pos += 4;
231
232    put_i32(buf, len_pos, (pos - len_pos) as i32);
233    pos
234}
235
236/// Encode a Sync message ('S').
237pub fn encode_sync(buf: &mut [u8]) -> usize {
238    buf[0] = b'S';
239    put_i32(buf, 1, 4);
240    5
241}
242
243/// Encode a Flush message ('H').
244pub fn encode_flush(buf: &mut [u8]) -> usize {
245    buf[0] = b'H';
246    put_i32(buf, 1, 4);
247    5
248}
249
250/// Encode a Terminate message ('X').
251pub fn encode_terminate(buf: &mut [u8]) -> usize {
252    buf[0] = b'X';
253    put_i32(buf, 1, 4);
254    5
255}
256
257/// Encode a Close message ('C').
258pub fn encode_close(buf: &mut [u8], target: CloseTarget, name: &str) -> usize {
259    let mut pos = 0;
260    buf[pos] = b'C';
261    pos += 1;
262
263    let len_pos = pos;
264    pos += 4;
265
266    buf[pos] = match target {
267        CloseTarget::Statement => b'S',
268        CloseTarget::Portal => b'P',
269    };
270    pos += 1;
271    pos += put_cstring(buf, pos, name);
272
273    put_i32(buf, len_pos, (pos - len_pos) as i32);
274    pos
275}
276
277/// Encode a CopyData message ('d').
278pub fn encode_copy_data(buf: &mut [u8], data: &[u8]) -> usize {
279    let mut pos = 0;
280    buf[pos] = b'd';
281    pos += 1;
282    let len = 4 + data.len();
283    put_i32(buf, pos, len as i32);
284    pos += 4;
285    buf[pos..pos + data.len()].copy_from_slice(data);
286    pos += data.len();
287    pos
288}
289
290/// Encode a CopyDone message ('c').
291pub fn encode_copy_done(buf: &mut [u8]) -> usize {
292    buf[0] = b'c';
293    put_i32(buf, 1, 4);
294    5
295}
296
297/// Encode a CopyFail message ('f').
298///
299/// Sent by the frontend to abort a COPY FROM STDIN operation.
300/// The `reason` string is included in the server's error response.
301pub fn encode_copy_fail(buf: &mut [u8], reason: &str) -> usize {
302    let mut pos = 0;
303    buf[pos] = b'f';
304    pos += 1;
305    let len = 4 + reason.len() + 1;
306    put_i32(buf, pos, len as i32);
307    pos += 4;
308    pos += put_cstring(buf, pos, reason);
309    pos
310}
311
312// ─── Decoding (Server → Frontend) ─────────────────────────────
313
314/// A decoded backend message header.
315#[derive(Debug, Clone, Copy)]
316pub struct MessageHeader {
317    pub tag: BackendTag,
318    pub length: u32, // length including 4-byte self but excluding tag byte
319}
320
321/// Try to read a complete message header from `buf`.
322/// Returns None if not enough data is available.
323pub fn decode_header(buf: &[u8]) -> Option<MessageHeader> {
324    if buf.len() < 5 {
325        return None;
326    }
327    let tag = BackendTag::from(buf[0]);
328    let length = read_u32(buf, 1);
329    Some(MessageHeader { tag, length })
330}
331
332/// Check if a complete message is available in `buf`.
333///
334/// Returns:
335/// - `Ok(Some(n))` — a complete message of `n` bytes is ready.
336/// - `Ok(None)` — not enough data yet; caller should read more.
337/// - `Err(PgError::BufferOverflow)` — the message length field exceeds
338///   `MAX_MESSAGE_SIZE`; the connection must be closed.
339pub fn message_complete(buf: &[u8]) -> Result<Option<usize>, PgError> {
340    if buf.len() < 5 {
341        return Ok(None);
342    }
343    let length = read_u32(buf, 1) as usize;
344    if length > MAX_MESSAGE_SIZE {
345        return Err(PgError::BufferOverflow);
346    }
347    let total = 1 + length; // tag + length-included body
348    if buf.len() >= total {
349        Ok(Some(total))
350    } else {
351        Ok(None)
352    }
353}
354
355/// Read an i32 from a backend message body.
356pub fn read_i32(buf: &[u8], offset: usize) -> i32 {
357    i32::from_be_bytes([
358        buf[offset],
359        buf[offset + 1],
360        buf[offset + 2],
361        buf[offset + 3],
362    ])
363}
364
365/// Read a u32 from a backend message body.
366pub fn read_u32(buf: &[u8], offset: usize) -> u32 {
367    u32::from_be_bytes([
368        buf[offset],
369        buf[offset + 1],
370        buf[offset + 2],
371        buf[offset + 3],
372    ])
373}
374
375/// Read an i16 from a backend message body.
376pub fn read_i16(buf: &[u8], offset: usize) -> i16 {
377    i16::from_be_bytes([buf[offset], buf[offset + 1]])
378}
379
380/// Read a C-string from `buf[offset..]`. Returns the string slice and bytes consumed (including null).
381pub fn read_cstring(buf: &[u8], offset: usize) -> (&str, usize) {
382    let start = offset;
383    let mut end = start;
384    while end < buf.len() && buf[end] != 0 {
385        end += 1;
386    }
387    let s = std::str::from_utf8(&buf[start..end]).unwrap_or("");
388    (s, end - start + 1) // +1 for null terminator
389}
390
391/// Parse an ErrorResponse or NoticeResponse message body.
392/// Returns a list of (field_type, value) pairs.
393pub fn parse_error_fields(body: &[u8]) -> Vec<(u8, String)> {
394    let mut fields = Vec::new();
395    let mut pos = 0;
396    while pos < body.len() {
397        let field_type = body[pos];
398        pos += 1;
399        if field_type == 0 {
400            break;
401        }
402        let (value, consumed) = read_cstring(body, pos);
403        fields.push((field_type, value.to_string()));
404        pos += consumed;
405    }
406    fields
407}
408
409/// Parse a RowDescription message body.
410/// Returns column descriptors: (name, table_oid, col_attr, type_oid, type_size, type_modifier, format_code)
411pub fn parse_row_description(body: &[u8]) -> Vec<ColumnDesc> {
412    let num_fields = read_i16(body, 0) as usize;
413    let mut columns = Vec::with_capacity(num_fields);
414    let mut pos = 2;
415
416    for _ in 0..num_fields {
417        let (name, consumed) = read_cstring(body, pos);
418        pos += consumed;
419
420        let table_oid = read_i32(body, pos) as u32;
421        pos += 4;
422        let col_attr = read_i16(body, pos);
423        pos += 2;
424        let type_oid = read_i32(body, pos) as u32;
425        pos += 4;
426        let type_size = read_i16(body, pos);
427        pos += 2;
428        let type_modifier = read_i32(body, pos);
429        pos += 4;
430        let format_code = FormatCode::from(read_i16(body, pos));
431        pos += 2;
432
433        columns.push(ColumnDesc {
434            name: name.to_string(),
435            table_oid,
436            col_attr,
437            type_oid,
438            type_size,
439            type_modifier,
440            format_code,
441        });
442    }
443    columns
444}
445
446/// Parse a DataRow message body. Returns column byte slices.
447/// Each column is Option<&[u8]> where None = SQL NULL.
448pub fn parse_data_row(body: &[u8]) -> Vec<Option<&[u8]>> {
449    let num_columns = read_i16(body, 0) as usize;
450    let mut columns = Vec::with_capacity(num_columns);
451    let mut pos = 2;
452
453    for _ in 0..num_columns {
454        let len = read_i32(body, pos);
455        pos += 4;
456        if len < 0 {
457            columns.push(None); // NULL
458        } else {
459            let len = len as usize;
460            columns.push(Some(&body[pos..pos + len]));
461            pos += len;
462        }
463    }
464    columns
465}
466
467/// A column descriptor from RowDescription.
468#[derive(Debug, Clone)]
469pub struct ColumnDesc {
470    pub name: String,
471    pub table_oid: u32,
472    pub col_attr: i16,
473    pub type_oid: u32,
474    pub type_size: i16,
475    pub type_modifier: i32,
476    pub format_code: FormatCode,
477}
478
479// ─── Helper Functions ──────────────────────────────────────────
480
481fn put_i32(buf: &mut [u8], offset: usize, value: i32) {
482    let bytes = value.to_be_bytes();
483    buf[offset..offset + 4].copy_from_slice(&bytes);
484}
485
486fn put_i16(buf: &mut [u8], offset: usize, value: i16) {
487    let bytes = value.to_be_bytes();
488    buf[offset..offset + 2].copy_from_slice(&bytes);
489}
490
491fn put_cstring(buf: &mut [u8], offset: usize, s: &str) -> usize {
492    let bytes = s.as_bytes();
493    buf[offset..offset + bytes.len()].copy_from_slice(bytes);
494    buf[offset + bytes.len()] = 0;
495    bytes.len() + 1
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use crate::error::PgError;
502    use crate::protocol::{BackendTag, CloseTarget, DescribeTarget, FormatCode};
503
504    #[test]
505    fn test_startup_encoding() {
506        let mut buf = [0u8; 256];
507        let n = encode_startup(&mut buf, "postgres", "mydb", &[]);
508        assert!(n > 0);
509        // Check protocol version at offset 4
510        assert_eq!(read_i32(&buf, 4), 196608);
511    }
512
513    #[test]
514    fn test_query_encoding() {
515        let mut buf = [0u8; 256];
516        let n = encode_query(&mut buf, "SELECT 1");
517        assert_eq!(buf[0], b'Q');
518        assert!(n > 5);
519    }
520
521    #[test]
522    fn test_sync_encoding() {
523        let mut buf = [0u8; 8];
524        let n = encode_sync(&mut buf);
525        assert_eq!(n, 5);
526        assert_eq!(buf[0], b'S');
527    }
528
529    #[test]
530    fn test_message_complete() {
531        // tag(1) + length(4) = 5 bytes minimum
532        let msg = [b'Z', 0, 0, 0, 5, b'I'];
533        assert_eq!(message_complete(&msg).unwrap(), Some(6));
534        assert_eq!(message_complete(&msg[..4]).unwrap(), None); // incomplete
535    }
536
537    #[test]
538    fn test_message_complete_rejects_oversized() {
539        // Length field = MAX_MESSAGE_SIZE + 1 → should return Err(BufferOverflow)
540        let huge_len = (MAX_MESSAGE_SIZE + 1) as u32;
541        let mut msg = [0u8; 6];
542        msg[0] = b'D';
543        msg[1..5].copy_from_slice(&huge_len.to_be_bytes());
544        assert!(matches!(
545            message_complete(&msg),
546            Err(PgError::BufferOverflow)
547        ));
548    }
549
550    #[test]
551    fn test_copy_fail_encoding() {
552        let mut buf = [0u8; 256];
553        let n = encode_copy_fail(&mut buf, "abort test");
554        assert_eq!(buf[0], b'f');
555        // length = 4 + len("abort test") + 1 = 15
556        assert_eq!(read_i32(&buf, 1), 15);
557        assert_eq!(n, 1 + 15); // tag + length-included body
558    }
559
560    // ─── Extended Query Protocol Encoding ────────────────────────────────────
561
562    #[test]
563    fn test_parse_encoding_tag_and_name() {
564        let mut buf = [0u8; 256];
565        let n = encode_parse(&mut buf, "s0", "SELECT $1", &[23]); // 23 = INT4 OID
566        assert!(n > 0);
567        assert_eq!(buf[0], b'P');
568        // cstring "s0\0SELECT $1\0" + i16(1) + i32(23)
569        // length = 4 + 3 + 10 + 2 + 4 = 23
570        let length = read_i32(&buf, 1);
571        assert_eq!(length as usize, n - 1); // length field excludes the tag byte
572        assert!(n < 256, "shouldn't exceed buffer");
573    }
574
575    #[test]
576    fn test_parse_encoding_anonymous_no_oids() {
577        let mut buf = [0u8; 128];
578        let n = encode_parse(&mut buf, "", "SELECT 1", &[]);
579        assert_eq!(buf[0], b'P');
580        assert!(n > 5);
581    }
582
583    #[test]
584    fn test_bind_encoding_no_params() {
585        let mut buf = [0u8; 256];
586        let n = encode_bind(&mut buf, "", "s0", &[], &[], &[1]); // binary results
587        assert_eq!(buf[0], b'B');
588        let length = read_i32(&buf, 1) as usize;
589        assert_eq!(length, n - 1);
590        // result format count should be 1
591        // find it: after portal(""\0) + stmt("s0\0") + param_format_count(i16) + param_count(i16)
592        // = 1 + 3 + 2 + 2 = 8 bytes after tag+length
593        let result_format_count = read_i16(&buf, 1 + 4 + 1 + 3 + 2 + 2);
594        assert_eq!(result_format_count, 1);
595    }
596
597    #[test]
598    fn test_bind_encoding_null_param() {
599        let mut buf = [0u8; 256];
600        let n = encode_bind(&mut buf, "", "s0", &[0], &[None], &[]);
601        assert_eq!(buf[0], b'B');
602        assert!(n > 5);
603        // The NULL param should encode -1 as i32
604        // Layout: tag(1) + len(4) + portal(""\0=1) + stmt("s0\0"=3)
605        //       + param_fmt_cnt(2) + 1 fmt_code(2) + param_val_cnt(2) = 15
606        let null_marker = read_i32(&buf, 15);
607        assert_eq!(null_marker, -1, "NULL param must encode as -1");
608    }
609
610    #[test]
611    fn test_bind_encoding_with_text_param() {
612        let mut buf = [0u8; 256];
613        let value = b"hello";
614        let n = encode_bind(&mut buf, "", "s0", &[0], &[Some(value)], &[]);
615        assert_eq!(buf[0], b'B');
616        assert!(n > 5);
617    }
618
619    #[test]
620    fn test_execute_encoding() {
621        let mut buf = [0u8; 64];
622        let n = encode_execute(&mut buf, "", 0); // unlimited rows
623        assert_eq!(buf[0], b'E');
624        let length = read_i32(&buf, 1) as usize;
625        assert_eq!(length, n - 1);
626        // max_rows = 0 is at the end
627        let max_rows = read_i32(&buf, n - 4);
628        assert_eq!(max_rows, 0);
629    }
630
631    #[test]
632    fn test_execute_encoding_with_max_rows() {
633        let mut buf = [0u8; 64];
634        let n = encode_execute(&mut buf, "", 100);
635        assert_eq!(buf[0], b'E');
636        let max_rows = read_i32(&buf, n - 4);
637        assert_eq!(max_rows, 100);
638    }
639
640    #[test]
641    fn test_describe_statement_encoding() {
642        let mut buf = [0u8; 64];
643        let n = encode_describe(&mut buf, DescribeTarget::Statement, "s0");
644        assert_eq!(buf[0], b'D');
645        let length = read_i32(&buf, 1) as usize;
646        assert_eq!(length, n - 1);
647        // Target byte: 'S' for Statement
648        assert_eq!(buf[5], b'S');
649        // Statement name 's0\0' starts at offset 6
650        assert_eq!(&buf[6..9], b"s0\0");
651    }
652
653    #[test]
654    fn test_describe_portal_encoding() {
655        let mut buf = [0u8; 64];
656        let n = encode_describe(&mut buf, DescribeTarget::Portal, "myportal");
657        assert_eq!(buf[0], b'D');
658        assert_eq!(buf[5], b'P');
659        assert!(n > 5);
660    }
661
662    #[test]
663    fn test_close_statement_encoding() {
664        let mut buf = [0u8; 64];
665        let n = encode_close(&mut buf, CloseTarget::Statement, "s7");
666        assert_eq!(buf[0], b'C');
667        let length = read_i32(&buf, 1) as usize;
668        assert_eq!(length, n - 1);
669        assert_eq!(buf[5], b'S');
670        assert_eq!(&buf[6..9], b"s7\0");
671    }
672
673    #[test]
674    fn test_close_portal_encoding() {
675        let mut buf = [0u8; 64];
676        let n = encode_close(&mut buf, CloseTarget::Portal, "");
677        assert_eq!(buf[0], b'C');
678        assert_eq!(buf[5], b'P');
679        assert!(n > 0);
680    }
681
682    #[test]
683    fn test_terminate_encoding() {
684        let mut buf = [0u8; 8];
685        let n = encode_terminate(&mut buf);
686        assert_eq!(n, 5);
687        assert_eq!(buf[0], b'X');
688        assert_eq!(read_i32(&buf, 1), 4);
689    }
690
691    #[test]
692    fn test_flush_encoding() {
693        let mut buf = [0u8; 8];
694        let n = encode_flush(&mut buf);
695        assert_eq!(n, 5);
696        assert_eq!(buf[0], b'H');
697        assert_eq!(read_i32(&buf, 1), 4);
698    }
699
700    #[test]
701    fn test_copy_data_encoding() {
702        let mut buf = [0u8; 64];
703        let data = b"col1\tcol2\n";
704        let n = encode_copy_data(&mut buf, data);
705        assert_eq!(buf[0], b'd');
706        let length = read_i32(&buf, 1) as usize;
707        assert_eq!(length, 4 + data.len());
708        assert_eq!(n, 1 + length);
709        assert_eq!(&buf[5..5 + data.len()], data);
710    }
711
712    #[test]
713    fn test_copy_done_encoding() {
714        let mut buf = [0u8; 8];
715        let n = encode_copy_done(&mut buf);
716        assert_eq!(n, 5);
717        assert_eq!(buf[0], b'c');
718        assert_eq!(read_i32(&buf, 1), 4);
719    }
720
721    // ─── Decoding ─────────────────────────────────────────────────────────────
722
723    #[test]
724    fn test_decode_header_basic() {
725        let msg = [b'Z', 0, 0, 0, 5, b'I']; // ReadyForQuery
726        let hdr = decode_header(&msg).unwrap();
727        assert_eq!(hdr.tag, BackendTag::ReadyForQuery);
728        assert_eq!(hdr.length, 5);
729    }
730
731    #[test]
732    fn test_decode_header_too_short() {
733        let msg = [b'Z', 0, 0]; // only 3 bytes
734        assert!(decode_header(&msg).is_none());
735    }
736
737    #[test]
738    fn test_message_complete_exact_size() {
739        // tag(1) + len(4) = 5-byte header, body = 1 byte → total = 6
740        let msg = [b'Z', 0, 0, 0, 5, b'I'];
741        assert_eq!(message_complete(&msg).unwrap(), Some(6));
742    }
743
744    #[test]
745    fn test_message_complete_one_byte_short() {
746        let msg = [b'Z', 0, 0, 0, 5]; // header says 5 bytes body, but we only have 4 (no body)
747        assert_eq!(message_complete(&msg).unwrap(), None);
748    }
749
750    #[test]
751    fn test_message_complete_needs_exactly_5_bytes() {
752        // 4 bytes → None
753        assert_eq!(message_complete(&[b'Z', 0, 0, 0]).unwrap(), None);
754        // 5 bytes with length=4 (empty body) → Some(5)
755        let msg = [b'C', 0, 0, 0, 4]; // CommandComplete with no text
756        assert_eq!(message_complete(&msg).unwrap(), Some(5));
757    }
758
759    #[test]
760    fn test_message_complete_large_but_valid_payload() {
761        // Build a 10-byte payload message
762        let payload = [0u8; 10];
763        let mut msg = vec![b'D', 0, 0, 0, 14]; // length = 4 + 10 = 14
764        msg.extend_from_slice(&payload);
765        assert_eq!(message_complete(&msg).unwrap(), Some(15)); // 1 + 14
766    }
767
768    #[test]
769    fn test_parse_data_row_all_non_null() {
770        // DataRow with 2 columns: "hello" and "42"
771        // Format: i16(num_cols) | i32(len1) bytes1 | i32(len2) bytes2
772        let mut body = vec![];
773        body.extend_from_slice(&2i16.to_be_bytes()); // 2 columns
774        body.extend_from_slice(&5i32.to_be_bytes()); // col0 len = 5
775        body.extend_from_slice(b"hello");
776        body.extend_from_slice(&2i32.to_be_bytes()); // col1 len = 2
777        body.extend_from_slice(b"42");
778        let cols = parse_data_row(&body);
779        assert_eq!(cols.len(), 2);
780        assert_eq!(cols[0], Some(b"hello" as &[u8]));
781        assert_eq!(cols[1], Some(b"42" as &[u8]));
782    }
783
784    #[test]
785    fn test_parse_data_row_with_null() {
786        // DataRow with 2 columns: NULL and "value"
787        let mut body = vec![];
788        body.extend_from_slice(&2i16.to_be_bytes());
789        body.extend_from_slice(&(-1i32).to_be_bytes()); // NULL
790        body.extend_from_slice(&5i32.to_be_bytes());
791        body.extend_from_slice(b"value");
792        let cols = parse_data_row(&body);
793        assert_eq!(cols.len(), 2);
794        assert_eq!(cols[0], None);
795        assert_eq!(cols[1], Some(b"value" as &[u8]));
796    }
797
798    #[test]
799    fn test_parse_data_row_empty_row() {
800        let mut body = vec![];
801        body.extend_from_slice(&0i16.to_be_bytes()); // 0 columns
802        let cols = parse_data_row(&body);
803        assert_eq!(cols.len(), 0);
804    }
805
806    #[test]
807    fn test_parse_row_description_single_column() {
808        // Build a RowDescription body for 1 column "id" INT4 (OID=23, size=4)
809        let mut body = vec![];
810        body.extend_from_slice(&1i16.to_be_bytes()); // num_fields = 1
811        body.extend_from_slice(b"id\0"); // name + null terminator
812        body.extend_from_slice(&0i32.to_be_bytes()); // table_oid = 0
813        body.extend_from_slice(&0i16.to_be_bytes()); // col_attr = 0
814        body.extend_from_slice(&23i32.to_be_bytes()); // type_oid = INT4
815        body.extend_from_slice(&4i16.to_be_bytes()); // type_size = 4
816        body.extend_from_slice(&(-1i32).to_be_bytes()); // type_modifier = -1
817        body.extend_from_slice(&0i16.to_be_bytes()); // format_code = text
818        let cols = parse_row_description(&body);
819        assert_eq!(cols.len(), 1);
820        assert_eq!(cols[0].name, "id");
821        assert_eq!(cols[0].type_oid, 23);
822        assert_eq!(cols[0].type_size, 4);
823        assert!(matches!(cols[0].format_code, FormatCode::Text));
824    }
825
826    #[test]
827    fn test_parse_row_description_binary_format() {
828        let mut body = vec![];
829        body.extend_from_slice(&1i16.to_be_bytes());
830        body.extend_from_slice(b"score\0");
831        body.extend_from_slice(&0i32.to_be_bytes());
832        body.extend_from_slice(&0i16.to_be_bytes());
833        body.extend_from_slice(&701i32.to_be_bytes()); // FLOAT8 OID
834        body.extend_from_slice(&8i16.to_be_bytes());
835        body.extend_from_slice(&(-1i32).to_be_bytes());
836        body.extend_from_slice(&1i16.to_be_bytes()); // format = binary
837        let cols = parse_row_description(&body);
838        assert_eq!(cols.len(), 1);
839        assert_eq!(cols[0].name, "score");
840        assert!(matches!(cols[0].format_code, FormatCode::Binary));
841    }
842
843    #[test]
844    fn test_parse_error_fields_basic() {
845        // Severity='S', Code='C', Message='M', terminator='\0'
846        let mut body = vec![];
847        body.push(b'S');
848        body.extend_from_slice(b"ERROR\0");
849        body.push(b'C');
850        body.extend_from_slice(b"42601\0");
851        body.push(b'M');
852        body.extend_from_slice(b"syntax error\0");
853        body.push(0); // terminator
854        let fields = parse_error_fields(&body);
855        assert_eq!(fields.len(), 3);
856        assert_eq!(fields[0], (b'S', "ERROR".to_string()));
857        assert_eq!(fields[1], (b'C', "42601".to_string()));
858        assert_eq!(fields[2], (b'M', "syntax error".to_string()));
859    }
860
861    #[test]
862    fn test_parse_error_fields_empty() {
863        let body = [0u8]; // just the terminator
864        let fields = parse_error_fields(&body);
865        assert!(fields.is_empty());
866    }
867
868    // ─── Helper read functions ────────────────────────────────────────────────
869
870    #[test]
871    fn test_read_i32_big_endian() {
872        let buf = [0x00, 0x01, 0x86, 0xA0u8]; // 100000
873        assert_eq!(read_i32(&buf, 0), 100_000);
874    }
875
876    #[test]
877    fn test_read_i32_negative() {
878        let buf = (-1i32).to_be_bytes();
879        assert_eq!(read_i32(&buf, 0), -1);
880    }
881
882    #[test]
883    fn test_read_i16() {
884        let buf = [0x01, 0x00u8]; // 256
885        assert_eq!(read_i16(&buf, 0), 256);
886    }
887
888    #[test]
889    fn test_read_u32() {
890        let buf = 0xFF_FF_FF_FFu32.to_be_bytes();
891        assert_eq!(read_u32(&buf, 0), 0xFF_FF_FF_FF);
892    }
893
894    #[test]
895    fn test_read_cstring_normal() {
896        let buf = b"hello\0world";
897        let (s, consumed) = read_cstring(buf, 0);
898        assert_eq!(s, "hello");
899        assert_eq!(consumed, 6); // 5 chars + null
900    }
901
902    #[test]
903    fn test_read_cstring_empty() {
904        let buf = b"\0rest";
905        let (s, consumed) = read_cstring(buf, 0);
906        assert_eq!(s, "");
907        assert_eq!(consumed, 1);
908    }
909
910    #[test]
911    fn test_read_cstring_with_offset() {
912        let buf = b"skip\0name\0";
913        let (s, consumed) = read_cstring(buf, 5);
914        assert_eq!(s, "name");
915        assert_eq!(consumed, 5); // 4 chars + null
916    }
917}