Skip to main content

sentinel_driver/protocol/
frontend.rs

1use bytes::{BufMut, BytesMut};
2
3/// Encode the startup message (no type byte — special case).
4///
5/// Format: `[length: i32][protocol_version: i32][param_name\0param_value\0...]\0`
6pub fn startup(buf: &mut BytesMut, user: &str, database: &str, params: &[(&str, &str)]) {
7    let start = buf.len();
8    buf.put_i32(0); // placeholder for length
9    buf.put_i32(196608); // protocol version 3.0
10
11    put_cstr(buf, "user");
12    put_cstr(buf, user);
13
14    if !database.is_empty() {
15        put_cstr(buf, "database");
16        put_cstr(buf, database);
17    }
18
19    for &(key, value) in params {
20        put_cstr(buf, key);
21        put_cstr(buf, value);
22    }
23
24    buf.put_u8(0); // terminator
25
26    let len = (buf.len() - start) as i32;
27    buf[start..start + 4].copy_from_slice(&len.to_be_bytes());
28}
29
30/// SSLRequest message — asks server if TLS is supported.
31///
32/// Format: `[length: i32 = 8][magic: i32 = 80877103]`
33pub fn ssl_request(buf: &mut BytesMut) {
34    buf.put_i32(8);
35    buf.put_i32(80877103);
36}
37
38/// Simple Query (Q).
39pub fn query(buf: &mut BytesMut, sql: &str) {
40    encode_message(buf, b'Q', |buf| {
41        put_cstr(buf, sql);
42    });
43}
44
45/// Parse (P) — prepare a statement.
46pub fn parse(buf: &mut BytesMut, name: &str, sql: &str, param_types: &[u32]) {
47    encode_message(buf, b'P', |buf| {
48        put_cstr(buf, name);
49        put_cstr(buf, sql);
50        buf.put_i16(param_types.len() as i16);
51        for &oid in param_types {
52            buf.put_u32(oid);
53        }
54    });
55}
56
57/// Bind (B) — bind parameters to a prepared statement.
58///
59/// `params` are pre-encoded binary values. `None` represents NULL.
60pub fn bind(
61    buf: &mut BytesMut,
62    portal: &str,
63    statement: &str,
64    params: &[Option<&[u8]>],
65    result_formats: &[i16],
66) {
67    encode_message(buf, b'B', |buf| {
68        put_cstr(buf, portal);
69        put_cstr(buf, statement);
70
71        // Parameter format codes: all binary (1)
72        buf.put_i16(1); // one format code
73        buf.put_i16(1); // binary
74
75        // Parameters
76        buf.put_i16(params.len() as i16);
77        for param in params {
78            match param {
79                Some(data) => {
80                    buf.put_i32(data.len() as i32);
81                    buf.put_slice(data);
82                }
83                None => {
84                    buf.put_i32(-1); // NULL
85                }
86            }
87        }
88
89        // Result format codes
90        if result_formats.is_empty() {
91            buf.put_i16(1); // one format code
92            buf.put_i16(1); // binary
93        } else {
94            buf.put_i16(result_formats.len() as i16);
95            for &fmt in result_formats {
96                buf.put_i16(fmt);
97            }
98        }
99    });
100}
101
102/// Describe (D) — describe a statement.
103pub fn describe_statement(buf: &mut BytesMut, name: &str) {
104    encode_message(buf, b'D', |buf| {
105        buf.put_u8(b'S');
106        put_cstr(buf, name);
107    });
108}
109
110/// Describe (D) — describe a portal.
111pub fn describe_portal(buf: &mut BytesMut, name: &str) {
112    encode_message(buf, b'D', |buf| {
113        buf.put_u8(b'P');
114        put_cstr(buf, name);
115    });
116}
117
118/// Execute (E) — execute a bound portal.
119pub fn execute(buf: &mut BytesMut, portal: &str, max_rows: i32) {
120    encode_message(buf, b'E', |buf| {
121        put_cstr(buf, portal);
122        buf.put_i32(max_rows); // 0 = no limit
123    });
124}
125
126/// Sync (S) — end of an extended query pipeline.
127pub fn sync(buf: &mut BytesMut) {
128    encode_message(buf, b'S', |_| {});
129}
130
131/// Flush (H) — request server to flush output.
132pub fn flush(buf: &mut BytesMut) {
133    encode_message(buf, b'H', |_| {});
134}
135
136/// Close (C) — close a statement.
137pub fn close_statement(buf: &mut BytesMut, name: &str) {
138    encode_message(buf, b'C', |buf| {
139        buf.put_u8(b'S');
140        put_cstr(buf, name);
141    });
142}
143
144/// Close (C) — close a portal.
145pub fn close_portal(buf: &mut BytesMut, name: &str) {
146    encode_message(buf, b'C', |buf| {
147        buf.put_u8(b'P');
148        put_cstr(buf, name);
149    });
150}
151
152/// Terminate (X) — disconnect.
153pub fn terminate(buf: &mut BytesMut) {
154    encode_message(buf, b'X', |_| {});
155}
156
157/// CopyData (d) — a chunk of COPY data.
158pub fn copy_data(buf: &mut BytesMut, data: &[u8]) {
159    encode_message(buf, b'd', |buf| {
160        buf.put_slice(data);
161    });
162}
163
164/// CopyDone (c) — end of COPY IN data.
165pub fn copy_done(buf: &mut BytesMut) {
166    encode_message(buf, b'c', |_| {});
167}
168
169/// CopyFail (f) — abort COPY IN with error message.
170pub fn copy_fail(buf: &mut BytesMut, message: &str) {
171    encode_message(buf, b'f', |buf| {
172        put_cstr(buf, message);
173    });
174}
175
176/// PasswordMessage (p) — send password (cleartext or MD5).
177pub fn password(buf: &mut BytesMut, password: &str) {
178    encode_message(buf, b'p', |buf| {
179        put_cstr(buf, password);
180    });
181}
182
183/// SASLInitialResponse (p) — first SCRAM message.
184pub fn sasl_initial_response(buf: &mut BytesMut, mechanism: &str, data: &[u8]) {
185    encode_message(buf, b'p', |buf| {
186        put_cstr(buf, mechanism);
187        buf.put_i32(data.len() as i32);
188        buf.put_slice(data);
189    });
190}
191
192/// SASLResponse (p) — subsequent SCRAM message.
193pub fn sasl_response(buf: &mut BytesMut, data: &[u8]) {
194    encode_message(buf, b'p', |buf| {
195        buf.put_slice(data);
196    });
197}
198
199/// CancelRequest — sent on a new connection to cancel a running query.
200///
201/// Format: `[length: i32 = 16][magic: i32 = 80877102][process_id: i32][secret_key: i32]`
202///
203/// Unlike other messages, CancelRequest has no type byte — it uses a
204/// length prefix + magic number, similar to StartupMessage and SSLRequest.
205pub fn cancel_request(buf: &mut BytesMut, process_id: i32, secret_key: i32) {
206    buf.put_i32(16); // total length
207    buf.put_i32(80877102); // cancel request code
208    buf.put_i32(process_id);
209    buf.put_i32(secret_key);
210}
211
212// ── Helpers ──────────────────────────────────────────
213
214/// Encode a PG wire protocol message: `[type: u8][length: i32][payload]`.
215fn encode_message(buf: &mut BytesMut, msg_type: u8, encode_body: impl FnOnce(&mut BytesMut)) {
216    buf.put_u8(msg_type);
217    let len_idx = buf.len();
218    buf.put_i32(0); // placeholder
219    encode_body(buf);
220    let len = (buf.len() - len_idx) as i32;
221    buf[len_idx..len_idx + 4].copy_from_slice(&len.to_be_bytes());
222}
223
224/// Write a C-string (null-terminated).
225fn put_cstr(buf: &mut BytesMut, s: &str) {
226    buf.put_slice(s.as_bytes());
227    buf.put_u8(0);
228}