1use serde::{Deserialize, Serialize};
7
8use crate::value::Value;
9
10#[repr(u8)]
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub enum OpCode {
20 Auth = 0x01,
22 Ping = 0x02,
23
24 PointGet = 0x10,
26 PointPut = 0x11,
27 PointDelete = 0x12,
28 VectorSearch = 0x13,
29 RangeScan = 0x14,
30 CrdtRead = 0x15,
31 CrdtApply = 0x16,
32 GraphRagFusion = 0x17,
33 AlterCollectionPolicy = 0x18,
34
35 Sql = 0x20,
37 Ddl = 0x21,
38 Explain = 0x22,
39 CopyFrom = 0x23,
40
41 Set = 0x30,
43 Show = 0x31,
44 Reset = 0x32,
45
46 Begin = 0x40,
48 Commit = 0x41,
49 Rollback = 0x42,
50
51 GraphHop = 0x50,
53 GraphNeighbors = 0x51,
54 GraphPath = 0x52,
55 GraphSubgraph = 0x53,
56 EdgePut = 0x54,
57 EdgeDelete = 0x55,
58
59 TextSearch = 0x60,
61 HybridSearch = 0x61,
62
63 VectorBatchInsert = 0x70,
65 DocumentBatchInsert = 0x71,
66}
67
68impl OpCode {
69 pub fn is_write(&self) -> bool {
71 matches!(
72 self,
73 OpCode::PointPut
74 | OpCode::PointDelete
75 | OpCode::CrdtApply
76 | OpCode::EdgePut
77 | OpCode::EdgeDelete
78 | OpCode::VectorBatchInsert
79 | OpCode::DocumentBatchInsert
80 | OpCode::AlterCollectionPolicy
81 )
82 }
83}
84
85#[repr(u8)]
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
90pub enum ResponseStatus {
91 Ok = 0,
93 Partial = 1,
95 Error = 2,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
103#[serde(tag = "method", rename_all = "snake_case")]
104pub enum AuthMethod {
105 Trust {
106 #[serde(default = "default_username")]
107 username: String,
108 },
109 Password {
110 username: String,
111 password: String,
112 },
113 ApiKey {
114 token: String,
115 },
116}
117
118fn default_username() -> String {
119 "admin".into()
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct AuthResponse {
125 pub username: String,
126 pub tenant_id: u32,
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct NativeRequest {
137 pub op: OpCode,
139 pub seq: u64,
141 #[serde(flatten)]
143 pub fields: RequestFields,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
151#[serde(untagged)]
152pub enum RequestFields {
153 Text(TextFields),
156}
157
158#[derive(Debug, Clone, Default, Serialize, Deserialize)]
162pub struct TextFields {
163 #[serde(skip_serializing_if = "Option::is_none")]
165 pub auth: Option<AuthMethod>,
166
167 #[serde(skip_serializing_if = "Option::is_none")]
169 pub sql: Option<String>,
170 #[serde(skip_serializing_if = "Option::is_none")]
171 pub key: Option<String>,
172 #[serde(skip_serializing_if = "Option::is_none")]
173 pub value: Option<String>,
174
175 #[serde(skip_serializing_if = "Option::is_none")]
177 pub collection: Option<String>,
178 #[serde(skip_serializing_if = "Option::is_none")]
179 pub document_id: Option<String>,
180 #[serde(skip_serializing_if = "Option::is_none")]
181 pub data: Option<Vec<u8>>,
182
183 #[serde(skip_serializing_if = "Option::is_none")]
185 pub query_vector: Option<Vec<f32>>,
186 #[serde(skip_serializing_if = "Option::is_none")]
187 pub top_k: Option<u64>,
188
189 #[serde(skip_serializing_if = "Option::is_none")]
191 pub field: Option<String>,
192 #[serde(skip_serializing_if = "Option::is_none")]
193 pub limit: Option<u64>,
194
195 #[serde(skip_serializing_if = "Option::is_none")]
197 pub delta: Option<Vec<u8>>,
198 #[serde(skip_serializing_if = "Option::is_none")]
199 pub peer_id: Option<u64>,
200
201 #[serde(skip_serializing_if = "Option::is_none")]
203 pub vector_top_k: Option<u64>,
204 #[serde(skip_serializing_if = "Option::is_none")]
205 pub edge_label: Option<String>,
206 #[serde(skip_serializing_if = "Option::is_none")]
207 pub direction: Option<String>,
208 #[serde(skip_serializing_if = "Option::is_none")]
209 pub expansion_depth: Option<u64>,
210 #[serde(skip_serializing_if = "Option::is_none")]
211 pub final_top_k: Option<u64>,
212 #[serde(skip_serializing_if = "Option::is_none")]
213 pub vector_k: Option<f64>,
214 #[serde(skip_serializing_if = "Option::is_none")]
215 pub graph_k: Option<f64>,
216
217 #[serde(skip_serializing_if = "Option::is_none")]
219 pub start_node: Option<String>,
220 #[serde(skip_serializing_if = "Option::is_none")]
221 pub end_node: Option<String>,
222 #[serde(skip_serializing_if = "Option::is_none")]
223 pub depth: Option<u64>,
224 #[serde(skip_serializing_if = "Option::is_none")]
225 pub from_node: Option<String>,
226 #[serde(skip_serializing_if = "Option::is_none")]
227 pub to_node: Option<String>,
228 #[serde(skip_serializing_if = "Option::is_none")]
229 pub edge_type: Option<String>,
230 #[serde(skip_serializing_if = "Option::is_none")]
231 pub properties: Option<serde_json::Value>,
232
233 #[serde(skip_serializing_if = "Option::is_none")]
235 pub query_text: Option<String>,
236 #[serde(skip_serializing_if = "Option::is_none")]
237 pub vector_weight: Option<f64>,
238
239 #[serde(skip_serializing_if = "Option::is_none")]
241 pub vectors: Option<Vec<BatchVector>>,
242 #[serde(skip_serializing_if = "Option::is_none")]
243 pub documents: Option<Vec<BatchDocument>>,
244
245 #[serde(skip_serializing_if = "Option::is_none")]
247 pub policy: Option<serde_json::Value>,
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct BatchVector {
253 pub id: String,
254 pub embedding: Vec<f32>,
255 #[serde(skip_serializing_if = "Option::is_none")]
256 pub metadata: Option<serde_json::Value>,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct BatchDocument {
262 pub id: String,
263 pub fields: serde_json::Value,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct NativeResponse {
271 pub seq: u64,
273 pub status: ResponseStatus,
275 #[serde(skip_serializing_if = "Option::is_none")]
277 pub columns: Option<Vec<String>>,
278 #[serde(skip_serializing_if = "Option::is_none")]
280 pub rows: Option<Vec<Vec<Value>>>,
281 #[serde(skip_serializing_if = "Option::is_none")]
283 pub rows_affected: Option<u64>,
284 pub watermark_lsn: u64,
286 #[serde(skip_serializing_if = "Option::is_none")]
288 pub error: Option<ErrorPayload>,
289 #[serde(skip_serializing_if = "Option::is_none")]
291 pub auth: Option<AuthResponse>,
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct ErrorPayload {
297 pub code: String,
299 pub message: String,
301}
302
303impl NativeResponse {
304 pub fn ok(seq: u64) -> Self {
306 Self {
307 seq,
308 status: ResponseStatus::Ok,
309 columns: None,
310 rows: None,
311 rows_affected: None,
312 watermark_lsn: 0,
313 error: None,
314 auth: None,
315 }
316 }
317
318 pub fn from_query_result(seq: u64, qr: crate::result::QueryResult, lsn: u64) -> Self {
320 Self {
321 seq,
322 status: ResponseStatus::Ok,
323 columns: Some(qr.columns),
324 rows: Some(qr.rows),
325 rows_affected: Some(qr.rows_affected),
326 watermark_lsn: lsn,
327 error: None,
328 auth: None,
329 }
330 }
331
332 pub fn error(seq: u64, code: impl Into<String>, message: impl Into<String>) -> Self {
334 Self {
335 seq,
336 status: ResponseStatus::Error,
337 columns: None,
338 rows: None,
339 rows_affected: None,
340 watermark_lsn: 0,
341 error: Some(ErrorPayload {
342 code: code.into(),
343 message: message.into(),
344 }),
345 auth: None,
346 }
347 }
348
349 pub fn auth_ok(seq: u64, username: String, tenant_id: u32) -> Self {
351 Self {
352 seq,
353 status: ResponseStatus::Ok,
354 columns: None,
355 rows: None,
356 rows_affected: None,
357 watermark_lsn: 0,
358 error: None,
359 auth: Some(AuthResponse {
360 username,
361 tenant_id,
362 }),
363 }
364 }
365
366 pub fn status_row(seq: u64, message: impl Into<String>) -> Self {
368 Self {
369 seq,
370 status: ResponseStatus::Ok,
371 columns: Some(vec!["status".into()]),
372 rows: Some(vec![vec![Value::String(message.into())]]),
373 rows_affected: Some(1),
374 watermark_lsn: 0,
375 error: None,
376 auth: None,
377 }
378 }
379}
380
381pub const MAX_FRAME_SIZE: u32 = 16 * 1024 * 1024;
385
386pub const FRAME_HEADER_LEN: usize = 4;
388
389pub const DEFAULT_NATIVE_PORT: u16 = 6433;
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395
396 #[test]
397 fn opcode_repr() {
398 assert_eq!(OpCode::Auth as u8, 0x01);
399 assert_eq!(OpCode::Sql as u8, 0x20);
400 assert_eq!(OpCode::Begin as u8, 0x40);
401 assert_eq!(OpCode::GraphHop as u8, 0x50);
402 assert_eq!(OpCode::TextSearch as u8, 0x60);
403 assert_eq!(OpCode::VectorBatchInsert as u8, 0x70);
404 }
405
406 #[test]
407 fn opcode_is_write() {
408 assert!(OpCode::PointPut.is_write());
409 assert!(OpCode::PointDelete.is_write());
410 assert!(OpCode::CrdtApply.is_write());
411 assert!(OpCode::EdgePut.is_write());
412 assert!(!OpCode::PointGet.is_write());
413 assert!(!OpCode::Sql.is_write());
414 assert!(!OpCode::VectorSearch.is_write());
415 assert!(!OpCode::Ping.is_write());
416 }
417
418 #[test]
419 fn response_status_repr() {
420 assert_eq!(ResponseStatus::Ok as u8, 0);
421 assert_eq!(ResponseStatus::Partial as u8, 1);
422 assert_eq!(ResponseStatus::Error as u8, 2);
423 }
424
425 #[test]
426 fn native_response_ok() {
427 let r = NativeResponse::ok(42);
428 assert_eq!(r.seq, 42);
429 assert_eq!(r.status, ResponseStatus::Ok);
430 assert!(r.error.is_none());
431 }
432
433 #[test]
434 fn native_response_error() {
435 let r = NativeResponse::error(1, "42P01", "collection not found");
436 assert_eq!(r.status, ResponseStatus::Error);
437 let e = r.error.unwrap();
438 assert_eq!(e.code, "42P01");
439 assert_eq!(e.message, "collection not found");
440 }
441
442 #[test]
443 fn native_response_from_query_result() {
444 let qr = crate::result::QueryResult {
445 columns: vec!["id".into(), "name".into()],
446 rows: vec![vec![
447 Value::String("u1".into()),
448 Value::String("Alice".into()),
449 ]],
450 rows_affected: 0,
451 };
452 let r = NativeResponse::from_query_result(5, qr, 100);
453 assert_eq!(r.seq, 5);
454 assert_eq!(r.watermark_lsn, 100);
455 assert_eq!(r.columns.as_ref().unwrap().len(), 2);
456 assert_eq!(r.rows.as_ref().unwrap().len(), 1);
457 }
458
459 #[test]
460 fn native_response_status_row() {
461 let r = NativeResponse::status_row(3, "OK");
462 assert_eq!(r.columns.as_ref().unwrap(), &["status"]);
463 assert_eq!(r.rows.as_ref().unwrap()[0][0].as_str(), Some("OK"));
464 }
465
466 #[test]
467 fn msgpack_roundtrip_request() {
468 let req = NativeRequest {
469 op: OpCode::Sql,
470 seq: 1,
471 fields: RequestFields::Text(TextFields {
472 sql: Some("SELECT 1".into()),
473 ..Default::default()
474 }),
475 };
476 let bytes = rmp_serde::to_vec_named(&req).unwrap();
477 let decoded: NativeRequest = rmp_serde::from_slice(&bytes).unwrap();
478 assert_eq!(decoded.op, OpCode::Sql);
479 assert_eq!(decoded.seq, 1);
480 }
481
482 #[test]
483 fn msgpack_roundtrip_response() {
484 let resp = NativeResponse::from_query_result(
485 7,
486 crate::result::QueryResult {
487 columns: vec!["x".into()],
488 rows: vec![vec![Value::Integer(42)]],
489 rows_affected: 0,
490 },
491 99,
492 );
493 let bytes = rmp_serde::to_vec_named(&resp).unwrap();
494 let decoded: NativeResponse = rmp_serde::from_slice(&bytes).unwrap();
495 assert_eq!(decoded.seq, 7);
496 assert_eq!(decoded.watermark_lsn, 99);
497 assert_eq!(decoded.rows.unwrap()[0][0].as_i64(), Some(42));
498 }
499
500 #[test]
501 fn auth_method_variants() {
502 let trust = AuthMethod::Trust {
503 username: "admin".into(),
504 };
505 let bytes = rmp_serde::to_vec_named(&trust).unwrap();
506 let decoded: AuthMethod = rmp_serde::from_slice(&bytes).unwrap();
507 match decoded {
508 AuthMethod::Trust { username } => assert_eq!(username, "admin"),
509 _ => panic!("expected Trust variant"),
510 }
511
512 let pw = AuthMethod::Password {
513 username: "user".into(),
514 password: "secret".into(),
515 };
516 let bytes = rmp_serde::to_vec_named(&pw).unwrap();
517 let decoded: AuthMethod = rmp_serde::from_slice(&bytes).unwrap();
518 match decoded {
519 AuthMethod::Password { username, password } => {
520 assert_eq!(username, "user");
521 assert_eq!(password, "secret");
522 }
523 _ => panic!("expected Password variant"),
524 }
525 }
526}