use serde::{Deserialize, Serialize};
use super::error::MeshError;
use super::planner::ExecutionPlan;
use super::query::ResultRow;
pub const SUBPROTOCOL_MESHDB: u16 = 0x0F00;
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ContinuationToken(pub Vec<u8>);
impl ContinuationToken {
pub fn new(bytes: Vec<u8>) -> Self {
Self(bytes)
}
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ResultBatch {
pub rows: Vec<ResultRow>,
pub r#final: bool,
}
impl ResultBatch {
pub fn chunk(rows: Vec<ResultRow>) -> Self {
Self {
rows,
r#final: false,
}
}
pub fn last(rows: Vec<ResultRow>) -> Self {
Self {
rows,
r#final: true,
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum MeshDbRequest {
Execute {
call_id: u64,
plan: ExecutionPlan,
},
Resume {
call_id: u64,
token: ContinuationToken,
},
Cancel {
call_id: u64,
},
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum MeshDbResponse {
Batch {
call_id: u64,
batch: ResultBatch,
},
End {
call_id: u64,
},
Error {
call_id: u64,
error: MeshError,
},
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum MeshDbFrame {
Request(MeshDbRequest),
Response(MeshDbResponse),
}
impl MeshDbFrame {
pub fn encode(&self) -> Result<Vec<u8>, postcard::Error> {
postcard::to_allocvec(self)
}
pub fn decode(bytes: &[u8]) -> Result<Self, postcard::Error> {
postcard::from_bytes(bytes)
}
}
#[cfg(test)]
mod tests {
use std::ops::Range;
use super::super::query::SeqNum;
use super::*;
fn sample_row() -> ResultRow {
ResultRow {
origin: 0xABCD_EF01_2345_6789,
seq: SeqNum(42),
payload: b"hello".to_vec(),
}
}
fn sample_plan() -> ExecutionPlan {
use super::super::planner::{CostEstimate, OperatorNode, OperatorPlan};
ExecutionPlan {
root: OperatorNode {
operator: OperatorPlan::LatestRead {
origin: 0xABCD_EF01_2345_6789,
},
target_nodes: vec![0xDEAD, 0xBEEF],
cost: CostEstimate::default(),
},
total_cost: CostEstimate::default(),
}
}
#[test]
fn subprotocol_slot_is_stable() {
assert_eq!(SUBPROTOCOL_MESHDB, 0x0F00);
}
#[test]
fn continuation_token_round_trips_through_postcard() {
let t = ContinuationToken::new(vec![1, 2, 3, 4, 5]);
let bytes = postcard::to_allocvec(&t).expect("encode");
let decoded: ContinuationToken = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, t);
assert_eq!(decoded.as_bytes(), &[1, 2, 3, 4, 5]);
assert!(!decoded.is_empty());
}
#[test]
fn continuation_token_empty_round_trips() {
let t = ContinuationToken::default();
assert!(t.is_empty());
let bytes = postcard::to_allocvec(&t).expect("encode");
let decoded: ContinuationToken = postcard::from_bytes(&bytes).expect("decode");
assert!(decoded.is_empty());
}
#[test]
fn result_batch_chunk_vs_last_flags() {
let chunk = ResultBatch::chunk(vec![sample_row()]);
assert!(!chunk.r#final);
let last = ResultBatch::last(vec![sample_row()]);
assert!(last.r#final);
let empty_last = ResultBatch::last(vec![]);
assert!(empty_last.r#final);
assert!(empty_last.rows.is_empty());
}
#[test]
fn result_batch_round_trips_through_postcard() {
let b = ResultBatch::chunk(vec![sample_row(), sample_row()]);
let bytes = postcard::to_allocvec(&b).expect("encode");
let decoded: ResultBatch = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, b);
}
#[test]
fn request_execute_round_trips() {
let req = MeshDbRequest::Execute {
call_id: 0x1234_5678_9ABC_DEF0,
plan: sample_plan(),
};
let bytes = postcard::to_allocvec(&req).expect("encode");
let decoded: MeshDbRequest = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, req);
}
#[test]
fn request_resume_round_trips() {
let req = MeshDbRequest::Resume {
call_id: 7,
token: ContinuationToken::new(b"opaque-cursor-bytes".to_vec()),
};
let bytes = postcard::to_allocvec(&req).expect("encode");
let decoded: MeshDbRequest = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, req);
}
#[test]
fn request_cancel_round_trips_with_no_extra_payload() {
let req = MeshDbRequest::Cancel { call_id: 99 };
let bytes = postcard::to_allocvec(&req).expect("encode");
let decoded: MeshDbRequest = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, req);
}
#[test]
fn response_batch_round_trips() {
let resp = MeshDbResponse::Batch {
call_id: 1,
batch: ResultBatch::chunk(vec![sample_row()]),
};
let bytes = postcard::to_allocvec(&resp).expect("encode");
let decoded: MeshDbResponse = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, resp);
}
#[test]
fn response_end_round_trips() {
let resp = MeshDbResponse::End { call_id: 1 };
let bytes = postcard::to_allocvec(&resp).expect("encode");
let decoded: MeshDbResponse = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, resp);
}
#[test]
fn response_error_round_trips_all_mesh_error_variants() {
let cases = vec![
MeshError::HistoricalRangeUnavailable {
origin: 0xAA,
requested: SeqNum(10)..SeqNum(20),
available: vec![SeqNum(0)..SeqNum(5)],
},
MeshError::LineageMaxDepthExceeded {
origin: 0xBB,
depth: 16,
},
MeshError::LineageCycleDetected {
origin: 0xCC,
cycle: vec![0xCC, 0xDD, 0xCC],
},
MeshError::JoinMemoryExceeded {
strategy: "broadcast".to_string(),
threshold_bytes: 1 << 20,
},
MeshError::QueryBudgetExceeded {
metric: super::super::error::BudgetMetric::MaxRows,
used: 1001,
limit: 1000,
},
MeshError::PartialResult {
rows: vec![sample_row()],
continuation: b"cursor".to_vec(),
reason: "watermark expired".to_string(),
},
MeshError::PlannerError {
detail: "test".to_string(),
},
MeshError::ExecutorError {
node: 0xEE,
detail: "boom".to_string(),
},
MeshError::NoCapableHolder {
origin: 0xFF,
requirement: "causal:abc".to_string(),
},
MeshError::QueryCancelled,
];
for err in cases {
let resp = MeshDbResponse::Error {
call_id: 42,
error: err,
};
let bytes = postcard::to_allocvec(&resp).expect("encode");
let decoded: MeshDbResponse = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(format!("{decoded:?}"), format!("{resp:?}"));
}
}
#[test]
fn full_session_round_trips_through_postcard() {
let session = vec![MeshDbRequest::Execute {
call_id: 1,
plan: sample_plan(),
}];
let responses = vec![
MeshDbResponse::Batch {
call_id: 1,
batch: ResultBatch::chunk(vec![sample_row()]),
},
MeshDbResponse::Batch {
call_id: 1,
batch: ResultBatch::last(vec![sample_row()]),
},
MeshDbResponse::End { call_id: 1 },
];
for r in &session {
let bytes = postcard::to_allocvec(r).expect("encode");
let _: MeshDbRequest = postcard::from_bytes(&bytes).expect("decode");
}
for r in &responses {
let bytes = postcard::to_allocvec(r).expect("encode");
let _: MeshDbResponse = postcard::from_bytes(&bytes).expect("decode");
}
}
#[test]
fn frame_round_trips_request_and_response_through_postcard() {
let request_frame = MeshDbFrame::Request(MeshDbRequest::Execute {
call_id: 0xDEAD_BEEF,
plan: sample_plan(),
});
let bytes = request_frame.encode().expect("encode request frame");
let decoded = MeshDbFrame::decode(&bytes).expect("decode request frame");
assert_eq!(decoded, request_frame);
let response_frame = MeshDbFrame::Response(MeshDbResponse::Batch {
call_id: 0xDEAD_BEEF,
batch: ResultBatch::last(vec![sample_row()]),
});
let bytes = response_frame.encode().expect("encode response frame");
let decoded = MeshDbFrame::decode(&bytes).expect("decode response frame");
assert_eq!(decoded, response_frame);
}
#[test]
fn frame_decode_rejects_garbage() {
let result = MeshDbFrame::decode(&[0xFF, 0xFF, 0xFF, 0xFF]);
assert!(result.is_err());
}
#[test]
fn envelopes_round_trip_through_json() {
let req = MeshDbRequest::Execute {
call_id: 1,
plan: sample_plan(),
};
let json = serde_json::to_string(&req).expect("json encode");
let _: MeshDbRequest = serde_json::from_str(&json).expect("json decode");
let resp = MeshDbResponse::Batch {
call_id: 1,
batch: ResultBatch::chunk(vec![sample_row()]),
};
let json = serde_json::to_string(&resp).expect("json encode");
let _: MeshDbResponse = serde_json::from_str(&json).expect("json decode");
}
#[allow(dead_code)]
fn _range_in_scope() -> Range<SeqNum> {
SeqNum(0)..SeqNum(1)
}
}