nodedb_cluster/rpc_codec/
execute.rs1use super::discriminants::*;
8use super::header::write_frame;
9use super::raft_rpc::RaftRpc;
10use crate::error::{ClusterError, Result};
11
12#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
19pub struct DescriptorVersionEntry {
20 pub collection: String,
21 pub version: u64,
22}
23
24#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
26pub struct ExecuteRequest {
27 pub plan_bytes: Vec<u8>,
29 pub tenant_id: u64,
31 pub database_id: u64,
34 pub deadline_remaining_ms: u64,
37 pub trace_id: [u8; 16],
39 pub descriptor_versions: Vec<DescriptorVersionEntry>,
41}
42
43#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
45pub struct ExecuteResponse {
46 pub success: bool,
47 pub payloads: Vec<Vec<u8>>,
49 pub error: Option<TypedClusterError>,
50}
51
52#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
54pub enum TypedClusterError {
55 NotLeader {
56 group_id: u64,
57 leader_node_id: Option<u64>,
58 leader_addr: Option<String>,
59 term: u64,
60 },
61 DescriptorMismatch {
62 collection: String,
63 expected_version: u64,
64 actual_version: u64,
65 },
66 DeadlineExceeded {
67 elapsed_ms: u64,
68 },
69 Internal {
71 code: u32,
72 message: String,
73 },
74}
75
76impl ExecuteResponse {
77 pub fn ok(payloads: Vec<Vec<u8>>) -> Self {
78 Self {
79 success: true,
80 payloads,
81 error: None,
82 }
83 }
84 pub fn err(error: TypedClusterError) -> Self {
85 Self {
86 success: false,
87 payloads: vec![],
88 error: Some(error),
89 }
90 }
91}
92
93macro_rules! to_bytes {
96 ($msg:expr) => {
97 rkyv::to_bytes::<rkyv::rancor::Error>($msg)
98 .map(|b| b.to_vec())
99 .map_err(|e| ClusterError::Codec {
100 detail: format!("rkyv serialize: {e}"),
101 })
102 };
103}
104
105macro_rules! from_bytes {
106 ($payload:expr, $T:ty, $name:expr) => {{
107 let mut aligned = rkyv::util::AlignedVec::<16>::with_capacity($payload.len());
108 aligned.extend_from_slice($payload);
109 rkyv::from_bytes::<$T, rkyv::rancor::Error>(&aligned).map_err(|e| ClusterError::Codec {
110 detail: format!("rkyv deserialize {}: {e}", $name),
111 })
112 }};
113}
114
115pub(super) fn encode_execute_req(msg: &ExecuteRequest, out: &mut Vec<u8>) -> Result<()> {
116 write_frame(RPC_EXECUTE_REQ, &to_bytes!(msg)?, out)
117}
118pub(super) fn encode_execute_resp(msg: &ExecuteResponse, out: &mut Vec<u8>) -> Result<()> {
119 write_frame(RPC_EXECUTE_RESP, &to_bytes!(msg)?, out)
120}
121
122pub(super) fn decode_execute_req(payload: &[u8]) -> Result<RaftRpc> {
123 Ok(RaftRpc::ExecuteRequest(from_bytes!(
124 payload,
125 ExecuteRequest,
126 "ExecuteRequest"
127 )?))
128}
129pub(super) fn decode_execute_resp(payload: &[u8]) -> Result<RaftRpc> {
130 Ok(RaftRpc::ExecuteResponse(from_bytes!(
131 payload,
132 ExecuteResponse,
133 "ExecuteResponse"
134 )?))
135}
136
137pub const PLAN_DECODE_FAILED: u32 = 0x_CE00_0001;
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143
144 fn roundtrip_req(req: ExecuteRequest) -> ExecuteRequest {
145 let rpc = RaftRpc::ExecuteRequest(req);
146 let encoded = super::super::encode(&rpc).unwrap();
147 match super::super::decode(&encoded).unwrap() {
148 RaftRpc::ExecuteRequest(r) => r,
149 other => panic!("expected ExecuteRequest, got {other:?}"),
150 }
151 }
152
153 fn roundtrip_resp(resp: ExecuteResponse) -> ExecuteResponse {
154 let rpc = RaftRpc::ExecuteResponse(resp);
155 let encoded = super::super::encode(&rpc).unwrap();
156 match super::super::decode(&encoded).unwrap() {
157 RaftRpc::ExecuteResponse(r) => r,
158 other => panic!("expected ExecuteResponse, got {other:?}"),
159 }
160 }
161
162 #[test]
163 fn roundtrip_execute_request_basic() {
164 let req = ExecuteRequest {
165 plan_bytes: b"msgpack-plan-bytes".to_vec(),
166 tenant_id: 7,
167 database_id: 0,
168 deadline_remaining_ms: 5000,
169 trace_id: [
170 0xDE, 0xAD, 0xBE, 0xEF, 0x12, 0x34, 0x56, 0x78, 0xDE, 0xAD, 0xBE, 0xEF, 0x12, 0x34,
171 0x56, 0x78,
172 ],
173 descriptor_versions: vec![
174 DescriptorVersionEntry {
175 collection: "orders".into(),
176 version: 42,
177 },
178 DescriptorVersionEntry {
179 collection: "users".into(),
180 version: 1,
181 },
182 ],
183 };
184 let decoded = roundtrip_req(req.clone());
185 assert_eq!(decoded.plan_bytes, req.plan_bytes);
186 assert_eq!(decoded.tenant_id, 7);
187 assert_eq!(decoded.deadline_remaining_ms, 5000);
188 assert_eq!(
189 decoded.trace_id, req.trace_id,
190 "trace_id roundtrips correctly"
191 );
192 assert_eq!(decoded.descriptor_versions.len(), 2);
193 assert_eq!(decoded.descriptor_versions[0].collection, "orders");
194 assert_eq!(decoded.descriptor_versions[0].version, 42);
195 }
196
197 #[test]
198 fn roundtrip_execute_request_empty_descriptors() {
199 let req = ExecuteRequest {
200 plan_bytes: vec![0xAB, 0xCD],
201 tenant_id: 0,
202 database_id: 0,
203 deadline_remaining_ms: 1000,
204 trace_id: [0u8; 16],
205 descriptor_versions: vec![],
206 };
207 let decoded = roundtrip_req(req);
208 assert!(decoded.descriptor_versions.is_empty());
209 }
210
211 #[test]
212 fn roundtrip_execute_response_success() {
213 let resp = ExecuteResponse::ok(vec![b"row1".to_vec(), b"row2".to_vec()]);
214 let decoded = roundtrip_resp(resp);
215 assert!(decoded.success);
216 assert_eq!(decoded.payloads.len(), 2);
217 assert_eq!(decoded.payloads[0], b"row1");
218 assert!(decoded.error.is_none());
219 }
220
221 #[test]
222 fn roundtrip_execute_response_not_leader() {
223 let resp = ExecuteResponse::err(TypedClusterError::NotLeader {
224 group_id: 3,
225 leader_node_id: Some(1),
226 leader_addr: Some("10.0.0.1:9400".into()),
227 term: 7,
228 });
229 let decoded = roundtrip_resp(resp);
230 assert!(!decoded.success);
231 match decoded.error {
232 Some(TypedClusterError::NotLeader {
233 group_id,
234 leader_node_id,
235 leader_addr,
236 term,
237 }) => {
238 assert_eq!(group_id, 3);
239 assert_eq!(leader_node_id, Some(1));
240 assert_eq!(leader_addr.as_deref(), Some("10.0.0.1:9400"));
241 assert_eq!(term, 7);
242 }
243 other => panic!("expected NotLeader, got {other:?}"),
244 }
245 }
246
247 #[test]
248 fn roundtrip_execute_response_descriptor_mismatch() {
249 let resp = ExecuteResponse::err(TypedClusterError::DescriptorMismatch {
250 collection: "orders".into(),
251 expected_version: 5,
252 actual_version: 6,
253 });
254 let decoded = roundtrip_resp(resp);
255 match decoded.error {
256 Some(TypedClusterError::DescriptorMismatch {
257 collection,
258 expected_version,
259 actual_version,
260 }) => {
261 assert_eq!(collection, "orders");
262 assert_eq!(expected_version, 5);
263 assert_eq!(actual_version, 6);
264 }
265 other => panic!("expected DescriptorMismatch, got {other:?}"),
266 }
267 }
268
269 #[test]
270 fn roundtrip_execute_response_deadline_exceeded() {
271 let resp = ExecuteResponse::err(TypedClusterError::DeadlineExceeded { elapsed_ms: 3000 });
272 let decoded = roundtrip_resp(resp);
273 match decoded.error {
274 Some(TypedClusterError::DeadlineExceeded { elapsed_ms }) => {
275 assert_eq!(elapsed_ms, 3000)
276 }
277 other => panic!("expected DeadlineExceeded, got {other:?}"),
278 }
279 }
280
281 #[test]
282 fn roundtrip_execute_response_internal_error() {
283 let resp = ExecuteResponse::err(TypedClusterError::Internal {
284 code: PLAN_DECODE_FAILED,
285 message: "failed to decode plan".into(),
286 });
287 let decoded = roundtrip_resp(resp);
288 match decoded.error {
289 Some(TypedClusterError::Internal { code, message }) => {
290 assert_eq!(code, PLAN_DECODE_FAILED);
291 assert!(message.contains("plan"));
292 }
293 other => panic!("expected Internal, got {other:?}"),
294 }
295 }
296
297 #[test]
298 fn roundtrip_execute_response_not_leader_no_hint() {
299 let resp = ExecuteResponse::err(TypedClusterError::NotLeader {
300 group_id: 0,
301 leader_node_id: None,
302 leader_addr: None,
303 term: 0,
304 });
305 let decoded = roundtrip_resp(resp);
306 match decoded.error {
307 Some(TypedClusterError::NotLeader {
308 leader_node_id,
309 leader_addr,
310 ..
311 }) => {
312 assert!(leader_node_id.is_none());
313 assert!(leader_addr.is_none());
314 }
315 other => panic!("expected NotLeader, got {other:?}"),
316 }
317 }
318}