nodedb_cluster/rpc_codec/
execute.rs1use super::discriminants::*;
6use super::header::write_frame;
7use super::raft_rpc::RaftRpc;
8use crate::error::{ClusterError, Result};
9
10#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
17pub struct DescriptorVersionEntry {
18 pub collection: String,
19 pub version: u64,
20}
21
22#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
24pub struct ExecuteRequest {
25 pub plan_bytes: Vec<u8>,
27 pub tenant_id: u32,
29 pub deadline_remaining_ms: u64,
32 pub trace_id: u64,
34 pub descriptor_versions: Vec<DescriptorVersionEntry>,
36}
37
38#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
40pub struct ExecuteResponse {
41 pub success: bool,
42 pub payloads: Vec<Vec<u8>>,
44 pub error: Option<TypedClusterError>,
45}
46
47#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
49pub enum TypedClusterError {
50 NotLeader {
51 group_id: u64,
52 leader_node_id: Option<u64>,
53 leader_addr: Option<String>,
54 term: u64,
55 },
56 DescriptorMismatch {
57 collection: String,
58 expected_version: u64,
59 actual_version: u64,
60 },
61 DeadlineExceeded {
62 elapsed_ms: u64,
63 },
64 Internal {
66 code: u32,
67 message: String,
68 },
69}
70
71impl ExecuteResponse {
72 pub fn ok(payloads: Vec<Vec<u8>>) -> Self {
73 Self {
74 success: true,
75 payloads,
76 error: None,
77 }
78 }
79 pub fn err(error: TypedClusterError) -> Self {
80 Self {
81 success: false,
82 payloads: vec![],
83 error: Some(error),
84 }
85 }
86}
87
88macro_rules! to_bytes {
91 ($msg:expr) => {
92 rkyv::to_bytes::<rkyv::rancor::Error>($msg)
93 .map(|b| b.to_vec())
94 .map_err(|e| ClusterError::Codec {
95 detail: format!("rkyv serialize: {e}"),
96 })
97 };
98}
99
100macro_rules! from_bytes {
101 ($payload:expr, $T:ty, $name:expr) => {{
102 let mut aligned = rkyv::util::AlignedVec::<16>::with_capacity($payload.len());
103 aligned.extend_from_slice($payload);
104 rkyv::from_bytes::<$T, rkyv::rancor::Error>(&aligned).map_err(|e| ClusterError::Codec {
105 detail: format!("rkyv deserialize {}: {e}", $name),
106 })
107 }};
108}
109
110pub(super) fn encode_execute_req(msg: &ExecuteRequest, out: &mut Vec<u8>) -> Result<()> {
111 write_frame(RPC_EXECUTE_REQ, &to_bytes!(msg)?, out)
112}
113pub(super) fn encode_execute_resp(msg: &ExecuteResponse, out: &mut Vec<u8>) -> Result<()> {
114 write_frame(RPC_EXECUTE_RESP, &to_bytes!(msg)?, out)
115}
116
117pub(super) fn decode_execute_req(payload: &[u8]) -> Result<RaftRpc> {
118 Ok(RaftRpc::ExecuteRequest(from_bytes!(
119 payload,
120 ExecuteRequest,
121 "ExecuteRequest"
122 )?))
123}
124pub(super) fn decode_execute_resp(payload: &[u8]) -> Result<RaftRpc> {
125 Ok(RaftRpc::ExecuteResponse(from_bytes!(
126 payload,
127 ExecuteResponse,
128 "ExecuteResponse"
129 )?))
130}
131
132pub const PLAN_DECODE_FAILED: u32 = 0x_CE00_0001;
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 fn roundtrip_req(req: ExecuteRequest) -> ExecuteRequest {
140 let rpc = RaftRpc::ExecuteRequest(req);
141 let encoded = super::super::encode(&rpc).unwrap();
142 match super::super::decode(&encoded).unwrap() {
143 RaftRpc::ExecuteRequest(r) => r,
144 other => panic!("expected ExecuteRequest, got {other:?}"),
145 }
146 }
147
148 fn roundtrip_resp(resp: ExecuteResponse) -> ExecuteResponse {
149 let rpc = RaftRpc::ExecuteResponse(resp);
150 let encoded = super::super::encode(&rpc).unwrap();
151 match super::super::decode(&encoded).unwrap() {
152 RaftRpc::ExecuteResponse(r) => r,
153 other => panic!("expected ExecuteResponse, got {other:?}"),
154 }
155 }
156
157 #[test]
158 fn roundtrip_execute_request_basic() {
159 let req = ExecuteRequest {
160 plan_bytes: b"msgpack-plan-bytes".to_vec(),
161 tenant_id: 7,
162 deadline_remaining_ms: 5000,
163 trace_id: 0xDEAD_BEEF_1234_5678,
164 descriptor_versions: vec![
165 DescriptorVersionEntry {
166 collection: "orders".into(),
167 version: 42,
168 },
169 DescriptorVersionEntry {
170 collection: "users".into(),
171 version: 1,
172 },
173 ],
174 };
175 let decoded = roundtrip_req(req.clone());
176 assert_eq!(decoded.plan_bytes, req.plan_bytes);
177 assert_eq!(decoded.tenant_id, 7);
178 assert_eq!(decoded.deadline_remaining_ms, 5000);
179 assert_eq!(decoded.trace_id, req.trace_id);
180 assert_eq!(decoded.descriptor_versions.len(), 2);
181 assert_eq!(decoded.descriptor_versions[0].collection, "orders");
182 assert_eq!(decoded.descriptor_versions[0].version, 42);
183 }
184
185 #[test]
186 fn roundtrip_execute_request_empty_descriptors() {
187 let req = ExecuteRequest {
188 plan_bytes: vec![0xAB, 0xCD],
189 tenant_id: 0,
190 deadline_remaining_ms: 1000,
191 trace_id: 0,
192 descriptor_versions: vec![],
193 };
194 let decoded = roundtrip_req(req);
195 assert!(decoded.descriptor_versions.is_empty());
196 }
197
198 #[test]
199 fn roundtrip_execute_response_success() {
200 let resp = ExecuteResponse::ok(vec![b"row1".to_vec(), b"row2".to_vec()]);
201 let decoded = roundtrip_resp(resp);
202 assert!(decoded.success);
203 assert_eq!(decoded.payloads.len(), 2);
204 assert_eq!(decoded.payloads[0], b"row1");
205 assert!(decoded.error.is_none());
206 }
207
208 #[test]
209 fn roundtrip_execute_response_not_leader() {
210 let resp = ExecuteResponse::err(TypedClusterError::NotLeader {
211 group_id: 3,
212 leader_node_id: Some(1),
213 leader_addr: Some("10.0.0.1:9400".into()),
214 term: 7,
215 });
216 let decoded = roundtrip_resp(resp);
217 assert!(!decoded.success);
218 match decoded.error {
219 Some(TypedClusterError::NotLeader {
220 group_id,
221 leader_node_id,
222 leader_addr,
223 term,
224 }) => {
225 assert_eq!(group_id, 3);
226 assert_eq!(leader_node_id, Some(1));
227 assert_eq!(leader_addr.as_deref(), Some("10.0.0.1:9400"));
228 assert_eq!(term, 7);
229 }
230 other => panic!("expected NotLeader, got {other:?}"),
231 }
232 }
233
234 #[test]
235 fn roundtrip_execute_response_descriptor_mismatch() {
236 let resp = ExecuteResponse::err(TypedClusterError::DescriptorMismatch {
237 collection: "orders".into(),
238 expected_version: 5,
239 actual_version: 6,
240 });
241 let decoded = roundtrip_resp(resp);
242 match decoded.error {
243 Some(TypedClusterError::DescriptorMismatch {
244 collection,
245 expected_version,
246 actual_version,
247 }) => {
248 assert_eq!(collection, "orders");
249 assert_eq!(expected_version, 5);
250 assert_eq!(actual_version, 6);
251 }
252 other => panic!("expected DescriptorMismatch, got {other:?}"),
253 }
254 }
255
256 #[test]
257 fn roundtrip_execute_response_deadline_exceeded() {
258 let resp = ExecuteResponse::err(TypedClusterError::DeadlineExceeded { elapsed_ms: 3000 });
259 let decoded = roundtrip_resp(resp);
260 match decoded.error {
261 Some(TypedClusterError::DeadlineExceeded { elapsed_ms }) => {
262 assert_eq!(elapsed_ms, 3000)
263 }
264 other => panic!("expected DeadlineExceeded, got {other:?}"),
265 }
266 }
267
268 #[test]
269 fn roundtrip_execute_response_internal_error() {
270 let resp = ExecuteResponse::err(TypedClusterError::Internal {
271 code: PLAN_DECODE_FAILED,
272 message: "failed to decode plan".into(),
273 });
274 let decoded = roundtrip_resp(resp);
275 match decoded.error {
276 Some(TypedClusterError::Internal { code, message }) => {
277 assert_eq!(code, PLAN_DECODE_FAILED);
278 assert!(message.contains("plan"));
279 }
280 other => panic!("expected Internal, got {other:?}"),
281 }
282 }
283
284 #[test]
285 fn roundtrip_execute_response_not_leader_no_hint() {
286 let resp = ExecuteResponse::err(TypedClusterError::NotLeader {
287 group_id: 0,
288 leader_node_id: None,
289 leader_addr: None,
290 term: 0,
291 });
292 let decoded = roundtrip_resp(resp);
293 match decoded.error {
294 Some(TypedClusterError::NotLeader {
295 leader_node_id,
296 leader_addr,
297 ..
298 }) => {
299 assert!(leader_node_id.is_none());
300 assert!(leader_addr.is_none());
301 }
302 other => panic!("expected NotLeader, got {other:?}"),
303 }
304 }
305}