reddb_server/wire/protocol.rs
1//! RedDB Wire Protocol — binary TCP, zero JSON overhead.
2//!
3//! Frame: [total_len: u32 LE][msg_type: u8][payload...]
4//!
5//! Message types (client → server):
6//! 0x01 Query [sql_bytes...]
7//! 0x04 BulkInsert [coll_len:u16][coll_bytes][n:u32][json_len:u32 + json_bytes]...
8//!
9//! Message types (server → client):
10//! 0x02 Result [ncols:u16][col_name_len:u16 + col_name]...[nrows:u32][row...]
11//! row = [val_type:u8 + val_data]... per column
12//! 0x03 Error [error_bytes...]
13//! 0x05 BulkOk [count:u64]
14
15// --- Message type constants ---
16pub const MSG_QUERY: u8 = 0x01;
17pub const MSG_RESULT: u8 = 0x02;
18pub const MSG_ERROR: u8 = 0x03;
19pub const MSG_BULK_INSERT: u8 = 0x04;
20pub const MSG_BULK_OK: u8 = 0x05;
21pub const MSG_BULK_INSERT_BINARY: u8 = 0x06;
22pub const MSG_QUERY_BINARY: u8 = 0x07;
23
24/// Fast-path bulk insert: payload is `[coll_len u16][coll][ncols u16]
25/// ([col_name u16 len + bytes])*ncols [nrows u32] ([val_tag u8 +
26/// val_data]*ncols)*nrows`, IDENTICAL to `MSG_BULK_INSERT_BINARY`.
27/// The only difference from 0x06 is semantic: the caller guarantees
28/// every value already matches the declared column type and that
29/// contract / uniqueness rules either don't apply or were already
30/// checked client-side. The server skips
31/// `normalize_row_fields_for_contract`, `enforce_row_uniqueness`
32/// and `enforce_row_batch_uniqueness` on the whole batch, cutting
33/// 15-column typed inserts from O(nrows × ncols) contract work
34/// down to O(nrows) serialise-and-insert. Intended for typed-bench
35/// workloads and driver-generated inserts where types were already
36/// validated before the send. Old servers that don't know 0x08
37/// reply with `MSG_ERROR "unknown message type"` so clients can
38/// fall back to the safe path.
39pub const MSG_BULK_INSERT_PREVALIDATED: u8 = 0x08;
40
41// ── Streaming bulk insert (PG COPY-equivalent) ─────────────────────
42//
43// The prevalidated path (0x08) still pays one TCP round-trip per
44// batch. For a 25k-row typed_insert chunked at BULK_BATCH_SIZE=1000
45// that's 25 round-trips — each with its own 3ms wire latency and
46// full schema re-declaration. PG's COPY BINARY sends the schema
47// once and streams all rows in a single logical transaction, which
48// is why PG typed_insert hits 120k ops/s versus our ~11k.
49//
50// The streaming protocol closes that gap:
51//
52// client → MSG_BULK_STREAM_START (collection + schema, once)
53// client → MSG_BULK_STREAM_ROWS (nrows + row data, 1…N times)
54// client → MSG_BULK_STREAM_COMMIT (empty; server finalises)
55// server → MSG_BULK_OK (total inserted count)
56//
57// Schema sent once, commit round-trip amortised across every ROWS
58// frame. Rows are accumulated server-side in one `Vec<Vec<Value>>`
59// and handed to the columnar pre-validated insert path as a single
60// batch — same server-side semantics as 0x08, just amortised wire
61// framing. If the caller needs lower peak memory it can still
62// split with a COMMIT per N rows.
63pub const MSG_BULK_STREAM_START: u8 = 0x09;
64pub const MSG_BULK_STREAM_ROWS: u8 = 0x0A;
65pub const MSG_BULK_STREAM_COMMIT: u8 = 0x0B;
66/// Intermediate ack sent back for START + ROWS frames so the
67/// client can pipeline safely (it always receives a frame per
68/// frame sent) without conflating progress with the terminal
69/// MSG_BULK_OK at COMMIT time.
70pub const MSG_BULK_STREAM_ACK: u8 = 0x0C;
71
72// ── Prepared statements (PG-style Prepare/Execute) ────────────────
73//
74// The plan cache already parameterizes literals and caches by shape
75// key, but every MSG_QUERY_BINARY still pays: (a) the byte-scan
76// `normalize_and_extract` that builds the shape key from text, (b) a
77// `HashMap<String, CachedPlan>` lookup, and (c) Arc clones. On a tight
78// `select_point` loop that's ~50-100µs per call, dominated by the
79// parse/normalize work.
80//
81// Prepared statements skip all three by letting the client allocate a
82// per-connection `stmt_id: u32` at PREPARE time (parse + parameterize
83// once) and later reference the compiled shape by that integer at
84// EXECUTE time (bind + run, no text involved).
85//
86// client → MSG_PREPARE [stmt_id u32][sql_len u32][sql]
87// server → MSG_PREPARED_OK [stmt_id u32][param_count u16]
88// client → MSG_EXECUTE_PREPARED [stmt_id u32][nparams u16](val_tag+data)*
89// server → MSG_RESULT / MSG_ERROR (same shape as MSG_QUERY_BINARY)
90// client → MSG_DEALLOCATE [stmt_id u32]
91// server → (no response on success; MSG_ERROR on unknown id)
92//
93// State is per-connection. Disconnect drops all prepared statements
94// the connection owned. Clients that don't implement these messages
95// are unaffected — old servers reply `unknown message type` so the
96// client transparently falls back to MSG_QUERY_BINARY.
97pub const MSG_PREPARE: u8 = 0x0D;
98pub const MSG_PREPARED_OK: u8 = 0x0E;
99pub const MSG_EXECUTE_PREPARED: u8 = 0x0F;
100pub const MSG_DEALLOCATE: u8 = 0x10;
101
102// ── Cursors (server-side paginated SELECT) ────────────────────────
103//
104// Large SELECTs force one of two bad choices today: return every row
105// in a single MSG_RESULT frame (O(result) memory on both client and
106// server) or paginate via LIMIT/OFFSET (O(offset) re-scan per page).
107// Cursors cut both: DECLARE runs the query once and parks the result,
108// each FETCH slices the next N rows without re-executing.
109//
110// V1 materialises the full result set at DECLARE and serves FETCH
111// from the buffered Vec. Trades peak memory for scan-once semantics —
112// fine for bench scenarios and the typical range-pagination pattern.
113// Streaming variant with snapshot pinning lands in V2 alongside the
114// MVCC context capture work.
115//
116// client → MSG_DECLARE_CURSOR [cursor_id u32][sql_len u32][sql]
117// server → MSG_CURSOR_OK [cursor_id u32][ncols u16]
118// ([col_len u16][col_name])*ncols
119// [total_rows u64]
120// client → MSG_FETCH [cursor_id u32][max_rows u32]
121// server → MSG_CURSOR_BATCH [cursor_id u32][nrows u32][has_more u8]
122// ([val_tag u8][val_data])* per row/col
123// client → MSG_CLOSE_CURSOR [cursor_id u32]
124// server → (no response; MSG_ERROR on unknown id)
125//
126// State is per-connection. Disconnect drops every cursor the
127// connection owned. Max 16 open cursors per connection — DECLAREs
128// past the cap return MSG_ERROR so runaway clients can't OOM the
129// server.
130pub const MSG_DECLARE_CURSOR: u8 = 0x11;
131pub const MSG_CURSOR_OK: u8 = 0x12;
132pub const MSG_FETCH: u8 = 0x13;
133pub const MSG_CURSOR_BATCH: u8 = 0x14;
134pub const MSG_CLOSE_CURSOR: u8 = 0x15;
135
136// --- Value type tags ---
137pub const VAL_NULL: u8 = 0;
138pub const VAL_I64: u8 = 1;
139pub const VAL_F64: u8 = 2;
140pub const VAL_TEXT: u8 = 3;
141pub const VAL_BOOL: u8 = 4;
142pub const VAL_U64: u8 = 5;
143
144use crate::storage::schema::Value;
145
146/// Write a frame header: [total_len: u32 LE][msg_type: u8]
147#[inline]
148pub fn write_frame_header(buf: &mut Vec<u8>, msg_type: u8, payload_len: u32) {
149 let total = payload_len + 1; // +1 for msg_type
150 buf.extend_from_slice(&total.to_le_bytes());
151 buf.push(msg_type);
152}
153
154/// Encode a Value to wire format bytes, appending to buf.
155#[inline]
156pub fn encode_value(buf: &mut Vec<u8>, value: &Value) {
157 match value {
158 Value::Null => buf.push(VAL_NULL),
159 Value::Integer(n) => {
160 buf.push(VAL_I64);
161 buf.extend_from_slice(&n.to_le_bytes());
162 }
163 Value::UnsignedInteger(n) => {
164 buf.push(VAL_U64);
165 buf.extend_from_slice(&n.to_le_bytes());
166 }
167 Value::Float(f) => {
168 buf.push(VAL_F64);
169 buf.extend_from_slice(&f.to_le_bytes());
170 }
171 Value::Text(s) => {
172 buf.push(VAL_TEXT);
173 let bytes = s.as_bytes();
174 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
175 buf.extend_from_slice(bytes);
176 }
177 Value::Boolean(b) => {
178 buf.push(VAL_BOOL);
179 buf.push(*b as u8);
180 }
181 Value::Timestamp(t) => {
182 buf.push(VAL_U64);
183 buf.extend_from_slice(&t.to_le_bytes());
184 }
185 _ => buf.push(VAL_NULL),
186 }
187}
188
189/// Decode a Value from wire bytes at the given position.
190#[inline]
191pub fn decode_value(data: &[u8], pos: &mut usize) -> Value {
192 try_decode_value(data, pos).unwrap_or(Value::Null)
193}
194
195#[inline]
196pub fn try_decode_value(data: &[u8], pos: &mut usize) -> Result<Value, &'static str> {
197 if *pos >= data.len() {
198 return Err("missing value tag");
199 }
200
201 let tag = data[*pos];
202 *pos += 1;
203
204 match tag {
205 VAL_NULL => Ok(Value::Null),
206 VAL_I64 => Ok(Value::Integer(i64::from_le_bytes(read_array::<8>(
207 data,
208 pos,
209 "truncated i64 value",
210 )?))),
211 VAL_U64 => Ok(Value::UnsignedInteger(u64::from_le_bytes(read_array::<8>(
212 data,
213 pos,
214 "truncated u64 value",
215 )?))),
216 VAL_F64 => Ok(Value::Float(f64::from_le_bytes(read_array::<8>(
217 data,
218 pos,
219 "truncated f64 value",
220 )?))),
221 VAL_TEXT => {
222 let len =
223 u32::from_le_bytes(read_array::<4>(data, pos, "truncated text length")?) as usize;
224 let bytes = read_bytes(data, pos, len, "truncated text value")?;
225 // Avoid the double allocation the previous code paid:
226 // bytes → String (via Cow::to_string) → Arc<str> (via Value::text).
227 // The common case (valid UTF-8 from a well-behaved client) borrows
228 // without allocating from the Cow, then `Arc::<str>::from(&str)`
229 // copies once into the shared ref-counted buffer.
230 let cow = std::string::String::from_utf8_lossy(bytes);
231 Ok(Value::text(std::sync::Arc::<str>::from(cow.as_ref())))
232 }
233 VAL_BOOL => {
234 let bytes = read_bytes(data, pos, 1, "truncated bool value")?;
235 Ok(Value::Boolean(bytes[0] != 0))
236 }
237 _ => Err("unknown value tag"),
238 }
239}
240
241#[inline]
242fn read_bytes<'a>(
243 data: &'a [u8],
244 pos: &mut usize,
245 len: usize,
246 err: &'static str,
247) -> Result<&'a [u8], &'static str> {
248 let end = pos.saturating_add(len);
249 if end > data.len() {
250 return Err(err);
251 }
252 let bytes = &data[*pos..end];
253 *pos = end;
254 Ok(bytes)
255}
256
257#[inline]
258fn read_array<const N: usize>(
259 data: &[u8],
260 pos: &mut usize,
261 err: &'static str,
262) -> Result<[u8; N], &'static str> {
263 let bytes = read_bytes(data, pos, N, err)?;
264 let mut array = [0u8; N];
265 array.copy_from_slice(bytes);
266 Ok(array)
267}
268
269/// Encode a column name to wire format.
270#[inline]
271pub fn encode_column_name(buf: &mut Vec<u8>, name: &str) {
272 let bytes = name.as_bytes();
273 buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
274 buf.extend_from_slice(bytes);
275}