Skip to main content

nodedb_types/
protocol.rs

1//! Native binary wire protocol types.
2//!
3//! Shared between the server (`nodedb`) and the client (`nodedb-client`).
4//! All types derive `Serialize`/`Deserialize` for MessagePack encoding.
5
6use serde::{Deserialize, Serialize};
7
8use crate::value::Value;
9
10// ─── Operation Codes ────────────────────────────────────────────────
11
12/// Operation codes for the native binary protocol.
13///
14/// Encoded as a single `u8` in the MessagePack request frame.
15/// Opcodes are grouped by functional area with 16-slot gaps to allow
16/// future additions without renumbering.
17#[repr(u8)]
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub enum OpCode {
20    // ── Auth & session ──────────────────────────────────────────
21    Auth = 0x01,
22    Ping = 0x02,
23
24    // ── Data operations (direct Data Plane dispatch) ────────────
25    PointGet = 0x10,
26    PointPut = 0x11,
27    PointDelete = 0x12,
28    VectorSearch = 0x13,
29    RangeScan = 0x14,
30    CrdtRead = 0x15,
31    CrdtApply = 0x16,
32    GraphRagFusion = 0x17,
33    AlterCollectionPolicy = 0x18,
34
35    // ── SQL & DDL ───────────────────────────────────────────────
36    Sql = 0x20,
37    Ddl = 0x21,
38    Explain = 0x22,
39    CopyFrom = 0x23,
40
41    // ── Session parameters ──────────────────────────────────────
42    Set = 0x30,
43    Show = 0x31,
44    Reset = 0x32,
45
46    // ── Transaction control ─────────────────────────────────────
47    Begin = 0x40,
48    Commit = 0x41,
49    Rollback = 0x42,
50
51    // ── Graph operations (direct Data Plane dispatch) ───────────
52    GraphHop = 0x50,
53    GraphNeighbors = 0x51,
54    GraphPath = 0x52,
55    GraphSubgraph = 0x53,
56    EdgePut = 0x54,
57    EdgeDelete = 0x55,
58
59    // ── Search operations (direct Data Plane dispatch) ──────────
60    TextSearch = 0x60,
61    HybridSearch = 0x61,
62
63    // ── Batch operations ────────────────────────────────────────
64    VectorBatchInsert = 0x70,
65    DocumentBatchInsert = 0x71,
66}
67
68impl OpCode {
69    /// Returns true if this operation is a write that requires WAL append.
70    pub fn is_write(&self) -> bool {
71        matches!(
72            self,
73            OpCode::PointPut
74                | OpCode::PointDelete
75                | OpCode::CrdtApply
76                | OpCode::EdgePut
77                | OpCode::EdgeDelete
78                | OpCode::VectorBatchInsert
79                | OpCode::DocumentBatchInsert
80                | OpCode::AlterCollectionPolicy
81        )
82    }
83}
84
85// ─── Response Status ────────────────────────────────────────────────
86
87/// Status code in response frames.
88#[repr(u8)]
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
90pub enum ResponseStatus {
91    /// Request completed successfully.
92    Ok = 0,
93    /// Partial result — more chunks follow with the same `seq`.
94    Partial = 1,
95    /// Request failed — see `error` field.
96    Error = 2,
97}
98
99// ─── Auth ───────────────────────────────────────────────────────────
100
101/// Authentication method in an `Auth` request.
102#[derive(Debug, Clone, Serialize, Deserialize)]
103#[serde(tag = "method", rename_all = "snake_case")]
104pub enum AuthMethod {
105    Trust {
106        #[serde(default = "default_username")]
107        username: String,
108    },
109    Password {
110        username: String,
111        password: String,
112    },
113    ApiKey {
114        token: String,
115    },
116}
117
118fn default_username() -> String {
119    "admin".into()
120}
121
122/// Successful auth response payload.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct AuthResponse {
125    pub username: String,
126    pub tenant_id: u32,
127}
128
129// ─── Request Frame ──────────────────────────────────────────────────
130
131/// A request sent from client to server over the native protocol.
132///
133/// Serialized as MessagePack. The `op` field selects the handler,
134/// `seq` correlates request to response.
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct NativeRequest {
137    /// Operation code.
138    pub op: OpCode,
139    /// Client-assigned sequence number for request/response correlation.
140    pub seq: u64,
141    /// Operation-specific fields (flattened into the same map).
142    #[serde(flatten)]
143    pub fields: RequestFields,
144}
145
146/// Operation-specific request fields.
147///
148/// Each variant carries only the fields needed for that operation.
149/// Unknown fields are silently ignored during deserialization.
150#[derive(Debug, Clone, Serialize, Deserialize)]
151#[serde(untagged)]
152pub enum RequestFields {
153    /// Auth, SQL, DDL, Explain, Set, Show, Reset, Begin, Commit, Rollback,
154    /// CopyFrom, Ping — all use a subset of these text fields.
155    Text(TextFields),
156}
157
158/// Catch-all text fields used by most operations.
159///
160/// Each operation uses a subset; unused fields default to `None`/empty.
161#[derive(Debug, Clone, Default, Serialize, Deserialize)]
162pub struct TextFields {
163    // ── Auth ─────────────────────────────────────────────────
164    #[serde(skip_serializing_if = "Option::is_none")]
165    pub auth: Option<AuthMethod>,
166
167    // ── SQL/DDL/Explain/Set/Show/Reset/CopyFrom ─────────────
168    #[serde(skip_serializing_if = "Option::is_none")]
169    pub sql: Option<String>,
170    #[serde(skip_serializing_if = "Option::is_none")]
171    pub key: Option<String>,
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub value: Option<String>,
174
175    // ── Collection + document targeting ──────────────────────
176    #[serde(skip_serializing_if = "Option::is_none")]
177    pub collection: Option<String>,
178    #[serde(skip_serializing_if = "Option::is_none")]
179    pub document_id: Option<String>,
180    #[serde(skip_serializing_if = "Option::is_none")]
181    pub data: Option<Vec<u8>>,
182
183    // ── Vector search ────────────────────────────────────────
184    #[serde(skip_serializing_if = "Option::is_none")]
185    pub query_vector: Option<Vec<f32>>,
186    #[serde(skip_serializing_if = "Option::is_none")]
187    pub top_k: Option<u64>,
188
189    // ── Range scan ───────────────────────────────────────────
190    #[serde(skip_serializing_if = "Option::is_none")]
191    pub field: Option<String>,
192    #[serde(skip_serializing_if = "Option::is_none")]
193    pub limit: Option<u64>,
194
195    // ── CRDT ─────────────────────────────────────────────────
196    #[serde(skip_serializing_if = "Option::is_none")]
197    pub delta: Option<Vec<u8>>,
198    #[serde(skip_serializing_if = "Option::is_none")]
199    pub peer_id: Option<u64>,
200
201    // ── Graph RAG fusion ─────────────────────────────────────
202    #[serde(skip_serializing_if = "Option::is_none")]
203    pub vector_top_k: Option<u64>,
204    #[serde(skip_serializing_if = "Option::is_none")]
205    pub edge_label: Option<String>,
206    #[serde(skip_serializing_if = "Option::is_none")]
207    pub direction: Option<String>,
208    #[serde(skip_serializing_if = "Option::is_none")]
209    pub expansion_depth: Option<u64>,
210    #[serde(skip_serializing_if = "Option::is_none")]
211    pub final_top_k: Option<u64>,
212    #[serde(skip_serializing_if = "Option::is_none")]
213    pub vector_k: Option<f64>,
214    #[serde(skip_serializing_if = "Option::is_none")]
215    pub graph_k: Option<f64>,
216
217    // ── Graph ops ────────────────────────────────────────────
218    #[serde(skip_serializing_if = "Option::is_none")]
219    pub start_node: Option<String>,
220    #[serde(skip_serializing_if = "Option::is_none")]
221    pub end_node: Option<String>,
222    #[serde(skip_serializing_if = "Option::is_none")]
223    pub depth: Option<u64>,
224    #[serde(skip_serializing_if = "Option::is_none")]
225    pub from_node: Option<String>,
226    #[serde(skip_serializing_if = "Option::is_none")]
227    pub to_node: Option<String>,
228    #[serde(skip_serializing_if = "Option::is_none")]
229    pub edge_type: Option<String>,
230    #[serde(skip_serializing_if = "Option::is_none")]
231    pub properties: Option<serde_json::Value>,
232
233    // ── Text/Hybrid search ───────────────────────────────────
234    #[serde(skip_serializing_if = "Option::is_none")]
235    pub query_text: Option<String>,
236    #[serde(skip_serializing_if = "Option::is_none")]
237    pub vector_weight: Option<f64>,
238
239    // ── Batch operations ─────────────────────────────────────
240    #[serde(skip_serializing_if = "Option::is_none")]
241    pub vectors: Option<Vec<BatchVector>>,
242    #[serde(skip_serializing_if = "Option::is_none")]
243    pub documents: Option<Vec<BatchDocument>>,
244
245    // ── Collection policy ────────────────────────────────────
246    #[serde(skip_serializing_if = "Option::is_none")]
247    pub policy: Option<serde_json::Value>,
248}
249
250/// A single vector in a batch insert.
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct BatchVector {
253    pub id: String,
254    pub embedding: Vec<f32>,
255    #[serde(skip_serializing_if = "Option::is_none")]
256    pub metadata: Option<serde_json::Value>,
257}
258
259/// A single document in a batch insert.
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct BatchDocument {
262    pub id: String,
263    pub fields: serde_json::Value,
264}
265
266// ─── Response Frame ─────────────────────────────────────────────────
267
268/// A response sent from server to client over the native protocol.
269#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct NativeResponse {
271    /// Echoed from the request for correlation.
272    pub seq: u64,
273    /// Execution outcome.
274    pub status: ResponseStatus,
275    /// Column names (for query results).
276    #[serde(skip_serializing_if = "Option::is_none")]
277    pub columns: Option<Vec<String>>,
278    /// Row data (for query results). Each row is a Vec of Values.
279    #[serde(skip_serializing_if = "Option::is_none")]
280    pub rows: Option<Vec<Vec<Value>>>,
281    /// Number of rows affected (for writes).
282    #[serde(skip_serializing_if = "Option::is_none")]
283    pub rows_affected: Option<u64>,
284    /// WAL LSN watermark at time of computation.
285    pub watermark_lsn: u64,
286    /// Error details (if status == Error).
287    #[serde(skip_serializing_if = "Option::is_none")]
288    pub error: Option<ErrorPayload>,
289    /// Auth response (if op == Auth and status == Ok).
290    #[serde(skip_serializing_if = "Option::is_none")]
291    pub auth: Option<AuthResponse>,
292}
293
294/// Error details in a response.
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct ErrorPayload {
297    /// SQLSTATE-style error code (e.g., "42P01" for undefined table).
298    pub code: String,
299    /// Human-readable error message.
300    pub message: String,
301}
302
303impl NativeResponse {
304    /// Create a successful response with no data.
305    pub fn ok(seq: u64) -> Self {
306        Self {
307            seq,
308            status: ResponseStatus::Ok,
309            columns: None,
310            rows: None,
311            rows_affected: None,
312            watermark_lsn: 0,
313            error: None,
314            auth: None,
315        }
316    }
317
318    /// Create a successful response from a `QueryResult`.
319    pub fn from_query_result(seq: u64, qr: crate::result::QueryResult, lsn: u64) -> Self {
320        Self {
321            seq,
322            status: ResponseStatus::Ok,
323            columns: Some(qr.columns),
324            rows: Some(qr.rows),
325            rows_affected: Some(qr.rows_affected),
326            watermark_lsn: lsn,
327            error: None,
328            auth: None,
329        }
330    }
331
332    /// Create an error response.
333    pub fn error(seq: u64, code: impl Into<String>, message: impl Into<String>) -> Self {
334        Self {
335            seq,
336            status: ResponseStatus::Error,
337            columns: None,
338            rows: None,
339            rows_affected: None,
340            watermark_lsn: 0,
341            error: Some(ErrorPayload {
342                code: code.into(),
343                message: message.into(),
344            }),
345            auth: None,
346        }
347    }
348
349    /// Create an auth success response.
350    pub fn auth_ok(seq: u64, username: String, tenant_id: u32) -> Self {
351        Self {
352            seq,
353            status: ResponseStatus::Ok,
354            columns: None,
355            rows: None,
356            rows_affected: None,
357            watermark_lsn: 0,
358            error: None,
359            auth: Some(AuthResponse {
360                username,
361                tenant_id,
362            }),
363        }
364    }
365
366    /// Create a response with a single "status" column and one row.
367    pub fn status_row(seq: u64, message: impl Into<String>) -> Self {
368        Self {
369            seq,
370            status: ResponseStatus::Ok,
371            columns: Some(vec!["status".into()]),
372            rows: Some(vec![vec![Value::String(message.into())]]),
373            rows_affected: Some(1),
374            watermark_lsn: 0,
375            error: None,
376            auth: None,
377        }
378    }
379}
380
381// ─── Protocol Constants ─────────────────────────────────────────────
382
383/// Maximum frame payload size (16 MiB).
384pub const MAX_FRAME_SIZE: u32 = 16 * 1024 * 1024;
385
386/// Length of the frame header (4-byte big-endian u32 payload length).
387pub const FRAME_HEADER_LEN: usize = 4;
388
389/// Default server port for the native protocol.
390pub const DEFAULT_NATIVE_PORT: u16 = 6433;
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395
396    #[test]
397    fn opcode_repr() {
398        assert_eq!(OpCode::Auth as u8, 0x01);
399        assert_eq!(OpCode::Sql as u8, 0x20);
400        assert_eq!(OpCode::Begin as u8, 0x40);
401        assert_eq!(OpCode::GraphHop as u8, 0x50);
402        assert_eq!(OpCode::TextSearch as u8, 0x60);
403        assert_eq!(OpCode::VectorBatchInsert as u8, 0x70);
404    }
405
406    #[test]
407    fn opcode_is_write() {
408        assert!(OpCode::PointPut.is_write());
409        assert!(OpCode::PointDelete.is_write());
410        assert!(OpCode::CrdtApply.is_write());
411        assert!(OpCode::EdgePut.is_write());
412        assert!(!OpCode::PointGet.is_write());
413        assert!(!OpCode::Sql.is_write());
414        assert!(!OpCode::VectorSearch.is_write());
415        assert!(!OpCode::Ping.is_write());
416    }
417
418    #[test]
419    fn response_status_repr() {
420        assert_eq!(ResponseStatus::Ok as u8, 0);
421        assert_eq!(ResponseStatus::Partial as u8, 1);
422        assert_eq!(ResponseStatus::Error as u8, 2);
423    }
424
425    #[test]
426    fn native_response_ok() {
427        let r = NativeResponse::ok(42);
428        assert_eq!(r.seq, 42);
429        assert_eq!(r.status, ResponseStatus::Ok);
430        assert!(r.error.is_none());
431    }
432
433    #[test]
434    fn native_response_error() {
435        let r = NativeResponse::error(1, "42P01", "collection not found");
436        assert_eq!(r.status, ResponseStatus::Error);
437        let e = r.error.unwrap();
438        assert_eq!(e.code, "42P01");
439        assert_eq!(e.message, "collection not found");
440    }
441
442    #[test]
443    fn native_response_from_query_result() {
444        let qr = crate::result::QueryResult {
445            columns: vec!["id".into(), "name".into()],
446            rows: vec![vec![
447                Value::String("u1".into()),
448                Value::String("Alice".into()),
449            ]],
450            rows_affected: 0,
451        };
452        let r = NativeResponse::from_query_result(5, qr, 100);
453        assert_eq!(r.seq, 5);
454        assert_eq!(r.watermark_lsn, 100);
455        assert_eq!(r.columns.as_ref().unwrap().len(), 2);
456        assert_eq!(r.rows.as_ref().unwrap().len(), 1);
457    }
458
459    #[test]
460    fn native_response_status_row() {
461        let r = NativeResponse::status_row(3, "OK");
462        assert_eq!(r.columns.as_ref().unwrap(), &["status"]);
463        assert_eq!(r.rows.as_ref().unwrap()[0][0].as_str(), Some("OK"));
464    }
465
466    #[test]
467    fn msgpack_roundtrip_request() {
468        let req = NativeRequest {
469            op: OpCode::Sql,
470            seq: 1,
471            fields: RequestFields::Text(TextFields {
472                sql: Some("SELECT 1".into()),
473                ..Default::default()
474            }),
475        };
476        let bytes = rmp_serde::to_vec_named(&req).unwrap();
477        let decoded: NativeRequest = rmp_serde::from_slice(&bytes).unwrap();
478        assert_eq!(decoded.op, OpCode::Sql);
479        assert_eq!(decoded.seq, 1);
480    }
481
482    #[test]
483    fn msgpack_roundtrip_response() {
484        let resp = NativeResponse::from_query_result(
485            7,
486            crate::result::QueryResult {
487                columns: vec!["x".into()],
488                rows: vec![vec![Value::Integer(42)]],
489                rows_affected: 0,
490            },
491            99,
492        );
493        let bytes = rmp_serde::to_vec_named(&resp).unwrap();
494        let decoded: NativeResponse = rmp_serde::from_slice(&bytes).unwrap();
495        assert_eq!(decoded.seq, 7);
496        assert_eq!(decoded.watermark_lsn, 99);
497        assert_eq!(decoded.rows.unwrap()[0][0].as_i64(), Some(42));
498    }
499
500    #[test]
501    fn auth_method_variants() {
502        let trust = AuthMethod::Trust {
503            username: "admin".into(),
504        };
505        let bytes = rmp_serde::to_vec_named(&trust).unwrap();
506        let decoded: AuthMethod = rmp_serde::from_slice(&bytes).unwrap();
507        match decoded {
508            AuthMethod::Trust { username } => assert_eq!(username, "admin"),
509            _ => panic!("expected Trust variant"),
510        }
511
512        let pw = AuthMethod::Password {
513            username: "user".into(),
514            password: "secret".into(),
515        };
516        let bytes = rmp_serde::to_vec_named(&pw).unwrap();
517        let decoded: AuthMethod = rmp_serde::from_slice(&bytes).unwrap();
518        match decoded {
519            AuthMethod::Password { username, password } => {
520                assert_eq!(username, "user");
521                assert_eq!(password, "secret");
522            }
523            _ => panic!("expected Password variant"),
524        }
525    }
526}