use serde::{Deserialize, Serialize};
use crate::value::Value;
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum OpCode {
Auth = 0x01,
Ping = 0x02,
PointGet = 0x10,
PointPut = 0x11,
PointDelete = 0x12,
VectorSearch = 0x13,
RangeScan = 0x14,
CrdtRead = 0x15,
CrdtApply = 0x16,
GraphRagFusion = 0x17,
AlterCollectionPolicy = 0x18,
Sql = 0x20,
Ddl = 0x21,
Explain = 0x22,
CopyFrom = 0x23,
Set = 0x30,
Show = 0x31,
Reset = 0x32,
Begin = 0x40,
Commit = 0x41,
Rollback = 0x42,
GraphHop = 0x50,
GraphNeighbors = 0x51,
GraphPath = 0x52,
GraphSubgraph = 0x53,
EdgePut = 0x54,
EdgeDelete = 0x55,
TextSearch = 0x60,
HybridSearch = 0x61,
VectorBatchInsert = 0x70,
DocumentBatchInsert = 0x71,
}
impl OpCode {
pub fn is_write(&self) -> bool {
matches!(
self,
OpCode::PointPut
| OpCode::PointDelete
| OpCode::CrdtApply
| OpCode::EdgePut
| OpCode::EdgeDelete
| OpCode::VectorBatchInsert
| OpCode::DocumentBatchInsert
| OpCode::AlterCollectionPolicy
)
}
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ResponseStatus {
Ok = 0,
Partial = 1,
Error = 2,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "method", rename_all = "snake_case")]
pub enum AuthMethod {
Trust {
#[serde(default = "default_username")]
username: String,
},
Password {
username: String,
password: String,
},
ApiKey {
token: String,
},
}
fn default_username() -> String {
"admin".into()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthResponse {
pub username: String,
pub tenant_id: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NativeRequest {
pub op: OpCode,
pub seq: u64,
#[serde(flatten)]
pub fields: RequestFields,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum RequestFields {
Text(TextFields),
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TextFields {
#[serde(skip_serializing_if = "Option::is_none")]
pub auth: Option<AuthMethod>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sql: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub collection: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub document_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<Vec<u8>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub query_vector: Option<Vec<f32>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub top_k: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub field: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub delta: Option<Vec<u8>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub peer_id: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub vector_top_k: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub edge_label: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub direction: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub expansion_depth: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub final_top_k: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub vector_k: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub graph_k: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub start_node: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub end_node: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub depth: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub from_node: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub to_node: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub edge_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub properties: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub query_text: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub vector_weight: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub vectors: Option<Vec<BatchVector>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub documents: Option<Vec<BatchDocument>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub policy: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchVector {
pub id: String,
pub embedding: Vec<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchDocument {
pub id: String,
pub fields: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NativeResponse {
pub seq: u64,
pub status: ResponseStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub columns: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub rows: Option<Vec<Vec<Value>>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub rows_affected: Option<u64>,
pub watermark_lsn: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<ErrorPayload>,
#[serde(skip_serializing_if = "Option::is_none")]
pub auth: Option<AuthResponse>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorPayload {
pub code: String,
pub message: String,
}
impl NativeResponse {
pub fn ok(seq: u64) -> Self {
Self {
seq,
status: ResponseStatus::Ok,
columns: None,
rows: None,
rows_affected: None,
watermark_lsn: 0,
error: None,
auth: None,
}
}
pub fn from_query_result(seq: u64, qr: crate::result::QueryResult, lsn: u64) -> Self {
Self {
seq,
status: ResponseStatus::Ok,
columns: Some(qr.columns),
rows: Some(qr.rows),
rows_affected: Some(qr.rows_affected),
watermark_lsn: lsn,
error: None,
auth: None,
}
}
pub fn error(seq: u64, code: impl Into<String>, message: impl Into<String>) -> Self {
Self {
seq,
status: ResponseStatus::Error,
columns: None,
rows: None,
rows_affected: None,
watermark_lsn: 0,
error: Some(ErrorPayload {
code: code.into(),
message: message.into(),
}),
auth: None,
}
}
pub fn auth_ok(seq: u64, username: String, tenant_id: u32) -> Self {
Self {
seq,
status: ResponseStatus::Ok,
columns: None,
rows: None,
rows_affected: None,
watermark_lsn: 0,
error: None,
auth: Some(AuthResponse {
username,
tenant_id,
}),
}
}
pub fn status_row(seq: u64, message: impl Into<String>) -> Self {
Self {
seq,
status: ResponseStatus::Ok,
columns: Some(vec!["status".into()]),
rows: Some(vec![vec![Value::String(message.into())]]),
rows_affected: Some(1),
watermark_lsn: 0,
error: None,
auth: None,
}
}
}
pub const MAX_FRAME_SIZE: u32 = 16 * 1024 * 1024;
pub const FRAME_HEADER_LEN: usize = 4;
pub const DEFAULT_NATIVE_PORT: u16 = 6433;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn opcode_repr() {
assert_eq!(OpCode::Auth as u8, 0x01);
assert_eq!(OpCode::Sql as u8, 0x20);
assert_eq!(OpCode::Begin as u8, 0x40);
assert_eq!(OpCode::GraphHop as u8, 0x50);
assert_eq!(OpCode::TextSearch as u8, 0x60);
assert_eq!(OpCode::VectorBatchInsert as u8, 0x70);
}
#[test]
fn opcode_is_write() {
assert!(OpCode::PointPut.is_write());
assert!(OpCode::PointDelete.is_write());
assert!(OpCode::CrdtApply.is_write());
assert!(OpCode::EdgePut.is_write());
assert!(!OpCode::PointGet.is_write());
assert!(!OpCode::Sql.is_write());
assert!(!OpCode::VectorSearch.is_write());
assert!(!OpCode::Ping.is_write());
}
#[test]
fn response_status_repr() {
assert_eq!(ResponseStatus::Ok as u8, 0);
assert_eq!(ResponseStatus::Partial as u8, 1);
assert_eq!(ResponseStatus::Error as u8, 2);
}
#[test]
fn native_response_ok() {
let r = NativeResponse::ok(42);
assert_eq!(r.seq, 42);
assert_eq!(r.status, ResponseStatus::Ok);
assert!(r.error.is_none());
}
#[test]
fn native_response_error() {
let r = NativeResponse::error(1, "42P01", "collection not found");
assert_eq!(r.status, ResponseStatus::Error);
let e = r.error.unwrap();
assert_eq!(e.code, "42P01");
assert_eq!(e.message, "collection not found");
}
#[test]
fn native_response_from_query_result() {
let qr = crate::result::QueryResult {
columns: vec!["id".into(), "name".into()],
rows: vec![vec![
Value::String("u1".into()),
Value::String("Alice".into()),
]],
rows_affected: 0,
};
let r = NativeResponse::from_query_result(5, qr, 100);
assert_eq!(r.seq, 5);
assert_eq!(r.watermark_lsn, 100);
assert_eq!(r.columns.as_ref().unwrap().len(), 2);
assert_eq!(r.rows.as_ref().unwrap().len(), 1);
}
#[test]
fn native_response_status_row() {
let r = NativeResponse::status_row(3, "OK");
assert_eq!(r.columns.as_ref().unwrap(), &["status"]);
assert_eq!(r.rows.as_ref().unwrap()[0][0].as_str(), Some("OK"));
}
#[test]
fn msgpack_roundtrip_request() {
let req = NativeRequest {
op: OpCode::Sql,
seq: 1,
fields: RequestFields::Text(TextFields {
sql: Some("SELECT 1".into()),
..Default::default()
}),
};
let bytes = rmp_serde::to_vec_named(&req).unwrap();
let decoded: NativeRequest = rmp_serde::from_slice(&bytes).unwrap();
assert_eq!(decoded.op, OpCode::Sql);
assert_eq!(decoded.seq, 1);
}
#[test]
fn msgpack_roundtrip_response() {
let resp = NativeResponse::from_query_result(
7,
crate::result::QueryResult {
columns: vec!["x".into()],
rows: vec![vec![Value::Integer(42)]],
rows_affected: 0,
},
99,
);
let bytes = rmp_serde::to_vec_named(&resp).unwrap();
let decoded: NativeResponse = rmp_serde::from_slice(&bytes).unwrap();
assert_eq!(decoded.seq, 7);
assert_eq!(decoded.watermark_lsn, 99);
assert_eq!(decoded.rows.unwrap()[0][0].as_i64(), Some(42));
}
#[test]
fn auth_method_variants() {
let trust = AuthMethod::Trust {
username: "admin".into(),
};
let bytes = rmp_serde::to_vec_named(&trust).unwrap();
let decoded: AuthMethod = rmp_serde::from_slice(&bytes).unwrap();
match decoded {
AuthMethod::Trust { username } => assert_eq!(username, "admin"),
_ => panic!("expected Trust variant"),
}
let pw = AuthMethod::Password {
username: "user".into(),
password: "secret".into(),
};
let bytes = rmp_serde::to_vec_named(&pw).unwrap();
let decoded: AuthMethod = rmp_serde::from_slice(&bytes).unwrap();
match decoded {
AuthMethod::Password { username, password } => {
assert_eq!(username, "user");
assert_eq!(password, "secret");
}
_ => panic!("expected Password variant"),
}
}
}