Skip to main content

nodedb_types/protocol/
handshake.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Handshake frames and per-operation limits for the native binary protocol.
4
5use serde::{Deserialize, Serialize};
6
7// ─── Protocol Constants ─────────────────────────────────────────────
8
9/// Maximum frame payload size (16 MiB).
10pub const MAX_FRAME_SIZE: u32 = 16 * 1024 * 1024;
11
12/// Length of the frame header (4-byte big-endian u32 payload length).
13pub const FRAME_HEADER_LEN: usize = 4;
14
15/// Default server port for the native protocol.
16pub const DEFAULT_NATIVE_PORT: u16 = 6433;
17
18/// Current native protocol version advertised in `HelloFrame`.
19pub const PROTO_VERSION: u16 = 1;
20
21/// Minimum protocol version this server accepts from clients.
22pub const PROTO_VERSION_MIN: u16 = 1;
23
24/// Maximum protocol version this server can speak.
25pub const PROTO_VERSION_MAX: u16 = PROTO_VERSION;
26
27// ─── Capability Bits ────────────────────────────────────────────────
28
29/// Capability bit: server supports streaming (partial-response chunking).
30pub const CAP_STREAMING: u64 = 1 << 0;
31/// Capability bit: server supports GraphRAG fusion (`GraphRagFusion` opcode).
32pub const CAP_GRAPHRAG: u64 = 1 << 1;
33/// Capability bit: server supports full-text search opcodes.
34pub const CAP_FTS: u64 = 1 << 2;
35/// Capability bit: server supports CRDT sync.
36pub const CAP_CRDT: u64 = 1 << 3;
37/// Capability bit: server supports spatial operations.
38pub const CAP_SPATIAL: u64 = 1 << 4;
39/// Capability bit: server supports timeseries operations.
40pub const CAP_TIMESERIES: u64 = 1 << 5;
41/// Capability bit: server supports columnar scan.
42pub const CAP_COLUMNAR: u64 = 1 << 6;
43
44/// Capability bit: connection uses MessagePack framing (always set for native protocol).
45pub const CAP_MSGPACK: u64 = 1 << 7;
46
47// ─── Per-Operation Limits ───────────────────────────────────────────
48
49/// Per-operation capability limits negotiated during the connection handshake.
50#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
51pub struct Limits {
52    pub max_vector_dim: Option<u32>,
53    pub max_top_k: Option<u32>,
54    pub max_scan_limit: Option<u32>,
55    pub max_batch_size: Option<u32>,
56    pub max_crdt_delta_bytes: Option<u32>,
57    pub max_query_text_bytes: Option<u32>,
58    pub max_graph_depth: Option<u32>,
59}
60
61// ─── HelloFrame ─────────────────────────────────────────────────────
62
63/// First frame sent by the client after TCP connection.
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct HelloFrame {
66    pub proto_min: u16,
67    pub proto_max: u16,
68    pub capabilities: u64,
69}
70
71/// Magic bytes for `HelloFrame`: b"NDBH".
72pub const HELLO_MAGIC: u32 = 0x4E44_4248;
73
74impl HelloFrame {
75    pub const WIRE_SIZE: usize = 16;
76
77    /// Build a `HelloFrame` advertising the current protocol range and all capabilities.
78    pub fn current() -> Self {
79        Self {
80            proto_min: PROTO_VERSION_MIN,
81            proto_max: PROTO_VERSION_MAX,
82            capabilities: CAP_STREAMING
83                | CAP_GRAPHRAG
84                | CAP_FTS
85                | CAP_CRDT
86                | CAP_SPATIAL
87                | CAP_TIMESERIES
88                | CAP_COLUMNAR
89                | CAP_MSGPACK,
90        }
91    }
92
93    pub fn encode(&self) -> [u8; Self::WIRE_SIZE] {
94        let mut buf = [0u8; Self::WIRE_SIZE];
95        buf[0..4].copy_from_slice(&HELLO_MAGIC.to_be_bytes());
96        buf[4..6].copy_from_slice(&self.proto_min.to_be_bytes());
97        buf[6..8].copy_from_slice(&self.proto_max.to_be_bytes());
98        buf[8..16].copy_from_slice(&self.capabilities.to_be_bytes());
99        buf
100    }
101
102    pub fn decode(buf: &[u8; Self::WIRE_SIZE]) -> Option<Self> {
103        let magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
104        if magic != HELLO_MAGIC {
105            return None;
106        }
107        let proto_min = u16::from_be_bytes([buf[4], buf[5]]);
108        let proto_max = u16::from_be_bytes([buf[6], buf[7]]);
109        let capabilities = u64::from_be_bytes([
110            buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
111        ]);
112        Some(Self {
113            proto_min,
114            proto_max,
115            capabilities,
116        })
117    }
118}
119
120// ─── HelloAckFrame ──────────────────────────────────────────────────
121
122/// Server's response to a `HelloFrame`.
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct HelloAckFrame {
125    pub proto_version: u16,
126    pub capabilities: u64,
127    pub server_version: String,
128    pub limits: Limits,
129}
130
131/// Magic bytes for `HelloAckFrame`: b"NDBA".
132pub const HELLO_ACK_MAGIC: u32 = 0x4E44_4241;
133
134impl HelloAckFrame {
135    pub fn encode(&self) -> Vec<u8> {
136        let sv = self.server_version.as_bytes();
137        let sv_len = sv.len().min(255) as u8;
138        let mut buf = Vec::with_capacity(15 + sv_len as usize + 1 + 7 * 5);
139        buf.extend_from_slice(&HELLO_ACK_MAGIC.to_be_bytes());
140        buf.extend_from_slice(&self.proto_version.to_be_bytes());
141        buf.extend_from_slice(&self.capabilities.to_be_bytes());
142        buf.push(sv_len);
143        buf.extend_from_slice(&sv[..sv_len as usize]);
144        buf.push(1u8);
145        encode_limit_field(&mut buf, self.limits.max_vector_dim);
146        encode_limit_field(&mut buf, self.limits.max_top_k);
147        encode_limit_field(&mut buf, self.limits.max_scan_limit);
148        encode_limit_field(&mut buf, self.limits.max_batch_size);
149        encode_limit_field(&mut buf, self.limits.max_crdt_delta_bytes);
150        encode_limit_field(&mut buf, self.limits.max_query_text_bytes);
151        encode_limit_field(&mut buf, self.limits.max_graph_depth);
152        buf
153    }
154
155    pub fn decode(data: &[u8]) -> Option<Self> {
156        if data.len() < 15 {
157            return None;
158        }
159        let magic = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
160        if magic != HELLO_ACK_MAGIC {
161            return None;
162        }
163        let proto_version = u16::from_be_bytes([data[4], data[5]]);
164        let capabilities = u64::from_be_bytes([
165            data[6], data[7], data[8], data[9], data[10], data[11], data[12], data[13],
166        ]);
167        let sv_len = data[14] as usize;
168        let sv_end = 15 + sv_len;
169        if data.len() < sv_end {
170            return None;
171        }
172        let server_version = String::from_utf8_lossy(&data[15..sv_end]).into_owned();
173        let mut limits = Limits::default();
174        if data.len() > sv_end && data[sv_end] == 1 {
175            let mut pos = sv_end + 1;
176            limits.max_vector_dim = decode_limit_field(data, &mut pos);
177            limits.max_top_k = decode_limit_field(data, &mut pos);
178            limits.max_scan_limit = decode_limit_field(data, &mut pos);
179            limits.max_batch_size = decode_limit_field(data, &mut pos);
180            limits.max_crdt_delta_bytes = decode_limit_field(data, &mut pos);
181            limits.max_query_text_bytes = decode_limit_field(data, &mut pos);
182            limits.max_graph_depth = decode_limit_field(data, &mut pos);
183        }
184        Some(Self {
185            proto_version,
186            capabilities,
187            server_version,
188            limits,
189        })
190    }
191}
192
193// ─── HelloErrorFrame ─────────────────────────────────────────────────
194
195/// Error code sent in a `HelloErrorFrame` when the server rejects a handshake.
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
197pub enum HelloErrorCode {
198    /// The client sent a frame with an unrecognised magic number.
199    BadMagic,
200    /// The client's version range does not overlap with the server's.
201    VersionMismatch,
202    /// The frame was otherwise malformed (truncated, invalid field values, etc.)
203    Malformed,
204}
205
206impl std::fmt::Display for HelloErrorCode {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        match self {
209            HelloErrorCode::BadMagic => write!(f, "BadMagic"),
210            HelloErrorCode::VersionMismatch => write!(f, "VersionMismatch"),
211            HelloErrorCode::Malformed => write!(f, "Malformed"),
212        }
213    }
214}
215
216/// Frame sent by the server when it rejects a `HelloFrame`.
217///
218/// Wire format (all big-endian):
219/// - magic: `b"NDBE"` (4 bytes)
220/// - code:  u8  (0=BadMagic, 1=VersionMismatch, 2=Malformed)
221/// - msg_len: u8
222/// - message: UTF-8 bytes (up to 255)
223#[derive(Debug, Clone, PartialEq, Eq)]
224pub struct HelloErrorFrame {
225    pub code: HelloErrorCode,
226    pub message: String,
227}
228
229/// Magic bytes for `HelloErrorFrame`: `b"NDBE"`.
230pub const HELLO_ERROR_MAGIC: &[u8; 4] = b"NDBE";
231
232/// Magic for `HelloErrorFrame` as a `u32` (big-endian: 0x4E44_4245).
233pub const HELLO_ERROR_MAGIC_U32: u32 = 0x4E44_4245;
234
235impl HelloErrorFrame {
236    pub fn encode(&self) -> Vec<u8> {
237        let msg = self.message.as_bytes();
238        let msg_len = msg.len().min(255) as u8;
239        let code_byte = match self.code {
240            HelloErrorCode::BadMagic => 0u8,
241            HelloErrorCode::VersionMismatch => 1u8,
242            HelloErrorCode::Malformed => 2u8,
243        };
244        let mut buf = Vec::with_capacity(6 + msg_len as usize);
245        buf.extend_from_slice(HELLO_ERROR_MAGIC);
246        buf.push(code_byte);
247        buf.push(msg_len);
248        buf.extend_from_slice(&msg[..msg_len as usize]);
249        buf
250    }
251
252    pub fn decode(data: &[u8]) -> Option<Self> {
253        if data.len() < 6 {
254            return None;
255        }
256        if &data[0..4] != HELLO_ERROR_MAGIC {
257            return None;
258        }
259        let code = match data[4] {
260            0 => HelloErrorCode::BadMagic,
261            1 => HelloErrorCode::VersionMismatch,
262            2 => HelloErrorCode::Malformed,
263            _ => return None,
264        };
265        let msg_len = data[5] as usize;
266        if data.len() < 6 + msg_len {
267            return None;
268        }
269        let message = String::from_utf8_lossy(&data[6..6 + msg_len]).into_owned();
270        Some(Self { code, message })
271    }
272}
273
274fn encode_limit_field(buf: &mut Vec<u8>, val: Option<u32>) {
275    match val {
276        Some(v) => {
277            buf.push(1u8);
278            buf.extend_from_slice(&v.to_be_bytes());
279        }
280        None => {
281            buf.push(0u8);
282            buf.extend_from_slice(&0u32.to_be_bytes());
283        }
284    }
285}
286
287fn decode_limit_field(data: &[u8], pos: &mut usize) -> Option<u32> {
288    if *pos + 5 > data.len() {
289        return None;
290    }
291    let present = data[*pos];
292    let value = u32::from_be_bytes([
293        data[*pos + 1],
294        data[*pos + 2],
295        data[*pos + 3],
296        data[*pos + 4],
297    ]);
298    *pos += 5;
299    if present == 1 { Some(value) } else { None }
300}