1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
//! RedDB Wire Protocol — binary TCP, zero JSON overhead.
//!
//! Frame: [total_len: u32 LE][msg_type: u8][payload...]
//!
//! Message types (client → server):
//! 0x01 Query [sql_bytes...]
//! 0x04 BulkInsert [coll_len:u16][coll_bytes][n:u32][json_len:u32 + json_bytes]...
//!
//! Message types (server → client):
//! 0x02 Result [ncols:u16][col_name_len:u16 + col_name]...[nrows:u32][row...]
//! row = [val_type:u8 + val_data]... per column
//! 0x03 Error [error_bytes...]
//! 0x05 BulkOk [count:u64]
// --- Message type constants ---
pub const MSG_QUERY: u8 = 0x01;
pub const MSG_RESULT: u8 = 0x02;
pub const MSG_ERROR: u8 = 0x03;
pub const MSG_BULK_INSERT: u8 = 0x04;
pub const MSG_BULK_OK: u8 = 0x05;
pub const MSG_BULK_INSERT_BINARY: u8 = 0x06;
pub const MSG_QUERY_BINARY: u8 = 0x07;
/// Fast-path bulk insert: payload is `[coll_len u16][coll][ncols u16]
/// ([col_name u16 len + bytes])*ncols [nrows u32] ([val_tag u8 +
/// val_data]*ncols)*nrows`, IDENTICAL to `MSG_BULK_INSERT_BINARY`.
/// The only difference from 0x06 is semantic: the caller guarantees
/// every value already matches the declared column type and that
/// contract / uniqueness rules either don't apply or were already
/// checked client-side. The server skips
/// `normalize_row_fields_for_contract`, `enforce_row_uniqueness`
/// and `enforce_row_batch_uniqueness` on the whole batch, cutting
/// 15-column typed inserts from O(nrows × ncols) contract work
/// down to O(nrows) serialise-and-insert. Intended for typed-bench
/// workloads and driver-generated inserts where types were already
/// validated before the send. Old servers that don't know 0x08
/// reply with `MSG_ERROR "unknown message type"` so clients can
/// fall back to the safe path.
pub const MSG_BULK_INSERT_PREVALIDATED: u8 = 0x08;
// ── Streaming bulk insert (PG COPY-equivalent) ─────────────────────
//
// The prevalidated path (0x08) still pays one TCP round-trip per
// batch. For a 25k-row typed_insert chunked at BULK_BATCH_SIZE=1000
// that's 25 round-trips — each with its own 3ms wire latency and
// full schema re-declaration. PG's COPY BINARY sends the schema
// once and streams all rows in a single logical transaction, which
// is why PG typed_insert hits 120k ops/s versus our ~11k.
//
// The streaming protocol closes that gap:
//
// client → MSG_BULK_STREAM_START (collection + schema, once)
// client → MSG_BULK_STREAM_ROWS (nrows + row data, 1…N times)
// client → MSG_BULK_STREAM_COMMIT (empty; server finalises)
// server → MSG_BULK_OK (total inserted count)
//
// Schema sent once, commit round-trip amortised across every ROWS
// frame. Rows are accumulated server-side in one `Vec<Vec<Value>>`
// and handed to the columnar pre-validated insert path as a single
// batch — same server-side semantics as 0x08, just amortised wire
// framing. If the caller needs lower peak memory it can still
// split with a COMMIT per N rows.
pub const MSG_BULK_STREAM_START: u8 = 0x09;
pub const MSG_BULK_STREAM_ROWS: u8 = 0x0A;
pub const MSG_BULK_STREAM_COMMIT: u8 = 0x0B;
/// Intermediate ack sent back for START + ROWS frames so the
/// client can pipeline safely (it always receives a frame per
/// frame sent) without conflating progress with the terminal
/// MSG_BULK_OK at COMMIT time.
pub const MSG_BULK_STREAM_ACK: u8 = 0x0C;
// ── Prepared statements (PG-style Prepare/Execute) ────────────────
//
// The plan cache already parameterizes literals and caches by shape
// key, but every MSG_QUERY_BINARY still pays: (a) the byte-scan
// `normalize_and_extract` that builds the shape key from text, (b) a
// `HashMap<String, CachedPlan>` lookup, and (c) Arc clones. On a tight
// `select_point` loop that's ~50-100µs per call, dominated by the
// parse/normalize work.
//
// Prepared statements skip all three by letting the client allocate a
// per-connection `stmt_id: u32` at PREPARE time (parse + parameterize
// once) and later reference the compiled shape by that integer at
// EXECUTE time (bind + run, no text involved).
//
// client → MSG_PREPARE [stmt_id u32][sql_len u32][sql]
// server → MSG_PREPARED_OK [stmt_id u32][param_count u16]
// client → MSG_EXECUTE_PREPARED [stmt_id u32][nparams u16](val_tag+data)*
// server → MSG_RESULT / MSG_ERROR (same shape as MSG_QUERY_BINARY)
// client → MSG_DEALLOCATE [stmt_id u32]
// server → (no response on success; MSG_ERROR on unknown id)
//
// State is per-connection. Disconnect drops all prepared statements
// the connection owned. Clients that don't implement these messages
// are unaffected — old servers reply `unknown message type` so the
// client transparently falls back to MSG_QUERY_BINARY.
pub const MSG_PREPARE: u8 = 0x0D;
pub const MSG_PREPARED_OK: u8 = 0x0E;
pub const MSG_EXECUTE_PREPARED: u8 = 0x0F;
pub const MSG_DEALLOCATE: u8 = 0x10;
// ── Cursors (server-side paginated SELECT) ────────────────────────
//
// Large SELECTs force one of two bad choices today: return every row
// in a single MSG_RESULT frame (O(result) memory on both client and
// server) or paginate via LIMIT/OFFSET (O(offset) re-scan per page).
// Cursors cut both: DECLARE runs the query once and parks the result,
// each FETCH slices the next N rows without re-executing.
//
// V1 materialises the full result set at DECLARE and serves FETCH
// from the buffered Vec. Trades peak memory for scan-once semantics —
// fine for bench scenarios and the typical range-pagination pattern.
// Streaming variant with snapshot pinning lands in V2 alongside the
// MVCC context capture work.
//
// client → MSG_DECLARE_CURSOR [cursor_id u32][sql_len u32][sql]
// server → MSG_CURSOR_OK [cursor_id u32][ncols u16]
// ([col_len u16][col_name])*ncols
// [total_rows u64]
// client → MSG_FETCH [cursor_id u32][max_rows u32]
// server → MSG_CURSOR_BATCH [cursor_id u32][nrows u32][has_more u8]
// ([val_tag u8][val_data])* per row/col
// client → MSG_CLOSE_CURSOR [cursor_id u32]
// server → (no response; MSG_ERROR on unknown id)
//
// State is per-connection. Disconnect drops every cursor the
// connection owned. Max 16 open cursors per connection — DECLAREs
// past the cap return MSG_ERROR so runaway clients can't OOM the
// server.
pub const MSG_DECLARE_CURSOR: u8 = 0x11;
pub const MSG_CURSOR_OK: u8 = 0x12;
pub const MSG_FETCH: u8 = 0x13;
pub const MSG_CURSOR_BATCH: u8 = 0x14;
pub const MSG_CLOSE_CURSOR: u8 = 0x15;
// --- Value type tags ---
pub const VAL_NULL: u8 = 0;
pub const VAL_I64: u8 = 1;
pub const VAL_F64: u8 = 2;
pub const VAL_TEXT: u8 = 3;
pub const VAL_BOOL: u8 = 4;
pub const VAL_U64: u8 = 5;
use crateValue;
/// Write a frame header: [total_len: u32 LE][msg_type: u8]
/// Encode a Value to wire format bytes, appending to buf.
/// Decode a Value from wire bytes at the given position.
/// Encode a column name to wire format.