Skip to main content

reddb_server/wire/
protocol.rs

1//! RedDB Wire Protocol — binary TCP, zero JSON overhead.
2//!
3//! Frame: [total_len: u32 LE][msg_type: u8][payload...]
4//!
5//! Message types (client → server):
6//!   0x01 Query       [sql_bytes...]
7//!   0x04 BulkInsert  [coll_len:u16][coll_bytes][n:u32][json_len:u32 + json_bytes]...
8//!
9//! Message types (server → client):
10//!   0x02 Result      [ncols:u16][col_name_len:u16 + col_name]...[nrows:u32][row...]
11//!                     row = [val_type:u8 + val_data]... per column
12//!   0x03 Error       [error_bytes...]
13//!   0x05 BulkOk      [count:u64]
14
15// --- Message type constants ---
16pub const MSG_QUERY: u8 = 0x01;
17pub const MSG_RESULT: u8 = 0x02;
18pub const MSG_ERROR: u8 = 0x03;
19pub const MSG_BULK_INSERT: u8 = 0x04;
20pub const MSG_BULK_OK: u8 = 0x05;
21pub const MSG_BULK_INSERT_BINARY: u8 = 0x06;
22pub const MSG_QUERY_BINARY: u8 = 0x07;
23
24/// Fast-path bulk insert: payload is `[coll_len u16][coll][ncols u16]
25/// ([col_name u16 len + bytes])*ncols [nrows u32] ([val_tag u8 +
26/// val_data]*ncols)*nrows`, IDENTICAL to `MSG_BULK_INSERT_BINARY`.
27/// The only difference from 0x06 is semantic: the caller guarantees
28/// every value already matches the declared column type and that
29/// contract / uniqueness rules either don't apply or were already
30/// checked client-side. The server skips
31/// `normalize_row_fields_for_contract`, `enforce_row_uniqueness`
32/// and `enforce_row_batch_uniqueness` on the whole batch, cutting
33/// 15-column typed inserts from O(nrows × ncols) contract work
34/// down to O(nrows) serialise-and-insert. Intended for typed-bench
35/// workloads and driver-generated inserts where types were already
36/// validated before the send. Old servers that don't know 0x08
37/// reply with `MSG_ERROR "unknown message type"` so clients can
38/// fall back to the safe path.
39pub const MSG_BULK_INSERT_PREVALIDATED: u8 = 0x08;
40
41// ── Streaming bulk insert (PG COPY-equivalent) ─────────────────────
42//
43// The prevalidated path (0x08) still pays one TCP round-trip per
44// batch. For a 25k-row typed_insert chunked at BULK_BATCH_SIZE=1000
45// that's 25 round-trips — each with its own 3ms wire latency and
46// full schema re-declaration. PG's COPY BINARY sends the schema
47// once and streams all rows in a single logical transaction, which
48// is why PG typed_insert hits 120k ops/s versus our ~11k.
49//
50// The streaming protocol closes that gap:
51//
52//   client → MSG_BULK_STREAM_START  (collection + schema, once)
53//   client → MSG_BULK_STREAM_ROWS   (nrows + row data, 1…N times)
54//   client → MSG_BULK_STREAM_COMMIT (empty; server finalises)
55//   server → MSG_BULK_OK            (total inserted count)
56//
57// Schema sent once, commit round-trip amortised across every ROWS
58// frame. Rows are accumulated server-side in one `Vec<Vec<Value>>`
59// and handed to the columnar pre-validated insert path as a single
60// batch — same server-side semantics as 0x08, just amortised wire
61// framing. If the caller needs lower peak memory it can still
62// split with a COMMIT per N rows.
63pub const MSG_BULK_STREAM_START: u8 = 0x09;
64pub const MSG_BULK_STREAM_ROWS: u8 = 0x0A;
65pub const MSG_BULK_STREAM_COMMIT: u8 = 0x0B;
66/// Intermediate ack sent back for START + ROWS frames so the
67/// client can pipeline safely (it always receives a frame per
68/// frame sent) without conflating progress with the terminal
69/// MSG_BULK_OK at COMMIT time.
70pub const MSG_BULK_STREAM_ACK: u8 = 0x0C;
71
72// ── Prepared statements (PG-style Prepare/Execute) ────────────────
73//
74// The plan cache already parameterizes literals and caches by shape
75// key, but every MSG_QUERY_BINARY still pays: (a) the byte-scan
76// `normalize_and_extract` that builds the shape key from text, (b) a
77// `HashMap<String, CachedPlan>` lookup, and (c) Arc clones. On a tight
78// `select_point` loop that's ~50-100µs per call, dominated by the
79// parse/normalize work.
80//
81// Prepared statements skip all three by letting the client allocate a
82// per-connection `stmt_id: u32` at PREPARE time (parse + parameterize
83// once) and later reference the compiled shape by that integer at
84// EXECUTE time (bind + run, no text involved).
85//
86//   client → MSG_PREPARE          [stmt_id u32][sql_len u32][sql]
87//   server → MSG_PREPARED_OK      [stmt_id u32][param_count u16]
88//   client → MSG_EXECUTE_PREPARED [stmt_id u32][nparams u16](val_tag+data)*
89//   server → MSG_RESULT / MSG_ERROR  (same shape as MSG_QUERY_BINARY)
90//   client → MSG_DEALLOCATE       [stmt_id u32]
91//   server → (no response on success; MSG_ERROR on unknown id)
92//
93// State is per-connection. Disconnect drops all prepared statements
94// the connection owned. Clients that don't implement these messages
95// are unaffected — old servers reply `unknown message type` so the
96// client transparently falls back to MSG_QUERY_BINARY.
97pub const MSG_PREPARE: u8 = 0x0D;
98pub const MSG_PREPARED_OK: u8 = 0x0E;
99pub const MSG_EXECUTE_PREPARED: u8 = 0x0F;
100pub const MSG_DEALLOCATE: u8 = 0x10;
101
102// ── Cursors (server-side paginated SELECT) ────────────────────────
103//
104// Large SELECTs force one of two bad choices today: return every row
105// in a single MSG_RESULT frame (O(result) memory on both client and
106// server) or paginate via LIMIT/OFFSET (O(offset) re-scan per page).
107// Cursors cut both: DECLARE runs the query once and parks the result,
108// each FETCH slices the next N rows without re-executing.
109//
110// V1 materialises the full result set at DECLARE and serves FETCH
111// from the buffered Vec. Trades peak memory for scan-once semantics —
112// fine for bench scenarios and the typical range-pagination pattern.
113// Streaming variant with snapshot pinning lands in V2 alongside the
114// MVCC context capture work.
115//
116//   client → MSG_DECLARE_CURSOR [cursor_id u32][sql_len u32][sql]
117//   server → MSG_CURSOR_OK      [cursor_id u32][ncols u16]
118//                               ([col_len u16][col_name])*ncols
119//                               [total_rows u64]
120//   client → MSG_FETCH          [cursor_id u32][max_rows u32]
121//   server → MSG_CURSOR_BATCH   [cursor_id u32][nrows u32][has_more u8]
122//                               ([val_tag u8][val_data])* per row/col
123//   client → MSG_CLOSE_CURSOR   [cursor_id u32]
124//   server → (no response; MSG_ERROR on unknown id)
125//
126// State is per-connection. Disconnect drops every cursor the
127// connection owned. Max 16 open cursors per connection — DECLAREs
128// past the cap return MSG_ERROR so runaway clients can't OOM the
129// server.
130pub const MSG_DECLARE_CURSOR: u8 = 0x11;
131pub const MSG_CURSOR_OK: u8 = 0x12;
132pub const MSG_FETCH: u8 = 0x13;
133pub const MSG_CURSOR_BATCH: u8 = 0x14;
134pub const MSG_CLOSE_CURSOR: u8 = 0x15;
135
136// --- Value type tags ---
137pub const VAL_NULL: u8 = 0;
138pub const VAL_I64: u8 = 1;
139pub const VAL_F64: u8 = 2;
140pub const VAL_TEXT: u8 = 3;
141pub const VAL_BOOL: u8 = 4;
142pub const VAL_U64: u8 = 5;
143
144use crate::storage::schema::Value;
145
146/// Write a frame header: [total_len: u32 LE][msg_type: u8]
147#[inline]
148pub fn write_frame_header(buf: &mut Vec<u8>, msg_type: u8, payload_len: u32) {
149    let total = payload_len + 1; // +1 for msg_type
150    buf.extend_from_slice(&total.to_le_bytes());
151    buf.push(msg_type);
152}
153
154/// Encode a Value to wire format bytes, appending to buf.
155#[inline]
156pub fn encode_value(buf: &mut Vec<u8>, value: &Value) {
157    match value {
158        Value::Null => buf.push(VAL_NULL),
159        Value::Integer(n) => {
160            buf.push(VAL_I64);
161            buf.extend_from_slice(&n.to_le_bytes());
162        }
163        Value::UnsignedInteger(n) => {
164            buf.push(VAL_U64);
165            buf.extend_from_slice(&n.to_le_bytes());
166        }
167        Value::Float(f) => {
168            buf.push(VAL_F64);
169            buf.extend_from_slice(&f.to_le_bytes());
170        }
171        Value::Text(s) => {
172            buf.push(VAL_TEXT);
173            let bytes = s.as_bytes();
174            buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
175            buf.extend_from_slice(bytes);
176        }
177        Value::Boolean(b) => {
178            buf.push(VAL_BOOL);
179            buf.push(*b as u8);
180        }
181        Value::Timestamp(t) => {
182            buf.push(VAL_U64);
183            buf.extend_from_slice(&t.to_le_bytes());
184        }
185        _ => buf.push(VAL_NULL),
186    }
187}
188
189/// Decode a Value from wire bytes at the given position.
190#[inline]
191pub fn decode_value(data: &[u8], pos: &mut usize) -> Value {
192    try_decode_value(data, pos).unwrap_or(Value::Null)
193}
194
195#[inline]
196pub fn try_decode_value(data: &[u8], pos: &mut usize) -> Result<Value, &'static str> {
197    if *pos >= data.len() {
198        return Err("missing value tag");
199    }
200
201    let tag = data[*pos];
202    *pos += 1;
203
204    match tag {
205        VAL_NULL => Ok(Value::Null),
206        VAL_I64 => Ok(Value::Integer(i64::from_le_bytes(read_array::<8>(
207            data,
208            pos,
209            "truncated i64 value",
210        )?))),
211        VAL_U64 => Ok(Value::UnsignedInteger(u64::from_le_bytes(read_array::<8>(
212            data,
213            pos,
214            "truncated u64 value",
215        )?))),
216        VAL_F64 => Ok(Value::Float(f64::from_le_bytes(read_array::<8>(
217            data,
218            pos,
219            "truncated f64 value",
220        )?))),
221        VAL_TEXT => {
222            let len =
223                u32::from_le_bytes(read_array::<4>(data, pos, "truncated text length")?) as usize;
224            let bytes = read_bytes(data, pos, len, "truncated text value")?;
225            // Avoid the double allocation the previous code paid:
226            //   bytes → String (via Cow::to_string) → Arc<str> (via Value::text).
227            // The common case (valid UTF-8 from a well-behaved client) borrows
228            // without allocating from the Cow, then `Arc::<str>::from(&str)`
229            // copies once into the shared ref-counted buffer.
230            let cow = std::string::String::from_utf8_lossy(bytes);
231            Ok(Value::text(std::sync::Arc::<str>::from(cow.as_ref())))
232        }
233        VAL_BOOL => {
234            let bytes = read_bytes(data, pos, 1, "truncated bool value")?;
235            Ok(Value::Boolean(bytes[0] != 0))
236        }
237        _ => Err("unknown value tag"),
238    }
239}
240
241#[inline]
242fn read_bytes<'a>(
243    data: &'a [u8],
244    pos: &mut usize,
245    len: usize,
246    err: &'static str,
247) -> Result<&'a [u8], &'static str> {
248    let end = pos.saturating_add(len);
249    if end > data.len() {
250        return Err(err);
251    }
252    let bytes = &data[*pos..end];
253    *pos = end;
254    Ok(bytes)
255}
256
257#[inline]
258fn read_array<const N: usize>(
259    data: &[u8],
260    pos: &mut usize,
261    err: &'static str,
262) -> Result<[u8; N], &'static str> {
263    let bytes = read_bytes(data, pos, N, err)?;
264    let mut array = [0u8; N];
265    array.copy_from_slice(bytes);
266    Ok(array)
267}
268
269/// Encode a column name to wire format.
270#[inline]
271pub fn encode_column_name(buf: &mut Vec<u8>, name: &str) {
272    let bytes = name.as_bytes();
273    buf.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
274    buf.extend_from_slice(bytes);
275}