Skip to main content

nodedb_cluster/rpc_codec/
execute.rs

1//! ExecuteRequest / ExecuteResponse — cross-node physical-plan execution RPC.
2//!
3//! Discriminants 18 and 19 are permanently assigned to these variants.
4
5use super::discriminants::*;
6use super::header::write_frame;
7use super::raft_rpc::RaftRpc;
8use crate::error::{ClusterError, Result};
9
10// ── Wire types ──────────────────────────────────────────────────────────────
11
12/// A single (collection, version) entry sent by the caller to let the receiver
13/// validate descriptor freshness before executing the plan.
14///
15/// Cross-version safety: new optional fields should be added as `Option<T>`.
16#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
17pub struct DescriptorVersionEntry {
18    pub collection: String,
19    pub version: u64,
20}
21
22/// Send an already-planned `PhysicalPlan` to a remote node for execution.
23#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
24pub struct ExecuteRequest {
25    /// zerompk-encoded PhysicalPlan (via nodedb::bridge::physical_plan::wire::encode).
26    pub plan_bytes: Vec<u8>,
27    /// Tenant ID authenticated on the originating node; trusted on the receiver.
28    pub tenant_id: u32,
29    /// Milliseconds remaining until the caller's deadline.
30    /// 0 means the deadline has already expired — receiver returns DeadlineExceeded.
31    pub deadline_remaining_ms: u64,
32    /// Distributed trace ID for observability.
33    pub trace_id: u64,
34    /// Caller's view of descriptor versions for every collection touched by the plan.
35    pub descriptor_versions: Vec<DescriptorVersionEntry>,
36}
37
38/// Response to an `ExecuteRequest`.
39#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
40pub struct ExecuteResponse {
41    pub success: bool,
42    /// Raw Data Plane response payloads, one per result set.
43    pub payloads: Vec<Vec<u8>>,
44    pub error: Option<TypedClusterError>,
45}
46
47/// Typed error returned by the remote executor.
48#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
49pub enum TypedClusterError {
50    NotLeader {
51        group_id: u64,
52        leader_node_id: Option<u64>,
53        leader_addr: Option<String>,
54        term: u64,
55    },
56    DescriptorMismatch {
57        collection: String,
58        expected_version: u64,
59        actual_version: u64,
60    },
61    DeadlineExceeded {
62        elapsed_ms: u64,
63    },
64    /// Catch-all. `code` is a `nodedb_types::error::ErrorCode` as u32.
65    Internal {
66        code: u32,
67        message: String,
68    },
69}
70
71impl ExecuteResponse {
72    pub fn ok(payloads: Vec<Vec<u8>>) -> Self {
73        Self {
74            success: true,
75            payloads,
76            error: None,
77        }
78    }
79    pub fn err(error: TypedClusterError) -> Self {
80        Self {
81            success: false,
82            payloads: vec![],
83            error: Some(error),
84        }
85    }
86}
87
88// ── Codec ────────────────────────────────────────────────────────────────────
89
90macro_rules! to_bytes {
91    ($msg:expr) => {
92        rkyv::to_bytes::<rkyv::rancor::Error>($msg)
93            .map(|b| b.to_vec())
94            .map_err(|e| ClusterError::Codec {
95                detail: format!("rkyv serialize: {e}"),
96            })
97    };
98}
99
100macro_rules! from_bytes {
101    ($payload:expr, $T:ty, $name:expr) => {{
102        let mut aligned = rkyv::util::AlignedVec::<16>::with_capacity($payload.len());
103        aligned.extend_from_slice($payload);
104        rkyv::from_bytes::<$T, rkyv::rancor::Error>(&aligned).map_err(|e| ClusterError::Codec {
105            detail: format!("rkyv deserialize {}: {e}", $name),
106        })
107    }};
108}
109
110pub(super) fn encode_execute_req(msg: &ExecuteRequest, out: &mut Vec<u8>) -> Result<()> {
111    write_frame(RPC_EXECUTE_REQ, &to_bytes!(msg)?, out)
112}
113pub(super) fn encode_execute_resp(msg: &ExecuteResponse, out: &mut Vec<u8>) -> Result<()> {
114    write_frame(RPC_EXECUTE_RESP, &to_bytes!(msg)?, out)
115}
116
117pub(super) fn decode_execute_req(payload: &[u8]) -> Result<RaftRpc> {
118    Ok(RaftRpc::ExecuteRequest(from_bytes!(
119        payload,
120        ExecuteRequest,
121        "ExecuteRequest"
122    )?))
123}
124pub(super) fn decode_execute_resp(payload: &[u8]) -> Result<RaftRpc> {
125    Ok(RaftRpc::ExecuteResponse(from_bytes!(
126        payload,
127        ExecuteResponse,
128        "ExecuteResponse"
129    )?))
130}
131
132/// Numeric code for `TypedClusterError::Internal` when plan bytes fail to decode.
133pub const PLAN_DECODE_FAILED: u32 = 0x_CE00_0001;
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    fn roundtrip_req(req: ExecuteRequest) -> ExecuteRequest {
140        let rpc = RaftRpc::ExecuteRequest(req);
141        let encoded = super::super::encode(&rpc).unwrap();
142        match super::super::decode(&encoded).unwrap() {
143            RaftRpc::ExecuteRequest(r) => r,
144            other => panic!("expected ExecuteRequest, got {other:?}"),
145        }
146    }
147
148    fn roundtrip_resp(resp: ExecuteResponse) -> ExecuteResponse {
149        let rpc = RaftRpc::ExecuteResponse(resp);
150        let encoded = super::super::encode(&rpc).unwrap();
151        match super::super::decode(&encoded).unwrap() {
152            RaftRpc::ExecuteResponse(r) => r,
153            other => panic!("expected ExecuteResponse, got {other:?}"),
154        }
155    }
156
157    #[test]
158    fn roundtrip_execute_request_basic() {
159        let req = ExecuteRequest {
160            plan_bytes: b"msgpack-plan-bytes".to_vec(),
161            tenant_id: 7,
162            deadline_remaining_ms: 5000,
163            trace_id: 0xDEAD_BEEF_1234_5678,
164            descriptor_versions: vec![
165                DescriptorVersionEntry {
166                    collection: "orders".into(),
167                    version: 42,
168                },
169                DescriptorVersionEntry {
170                    collection: "users".into(),
171                    version: 1,
172                },
173            ],
174        };
175        let decoded = roundtrip_req(req.clone());
176        assert_eq!(decoded.plan_bytes, req.plan_bytes);
177        assert_eq!(decoded.tenant_id, 7);
178        assert_eq!(decoded.deadline_remaining_ms, 5000);
179        assert_eq!(decoded.trace_id, req.trace_id);
180        assert_eq!(decoded.descriptor_versions.len(), 2);
181        assert_eq!(decoded.descriptor_versions[0].collection, "orders");
182        assert_eq!(decoded.descriptor_versions[0].version, 42);
183    }
184
185    #[test]
186    fn roundtrip_execute_request_empty_descriptors() {
187        let req = ExecuteRequest {
188            plan_bytes: vec![0xAB, 0xCD],
189            tenant_id: 0,
190            deadline_remaining_ms: 1000,
191            trace_id: 0,
192            descriptor_versions: vec![],
193        };
194        let decoded = roundtrip_req(req);
195        assert!(decoded.descriptor_versions.is_empty());
196    }
197
198    #[test]
199    fn roundtrip_execute_response_success() {
200        let resp = ExecuteResponse::ok(vec![b"row1".to_vec(), b"row2".to_vec()]);
201        let decoded = roundtrip_resp(resp);
202        assert!(decoded.success);
203        assert_eq!(decoded.payloads.len(), 2);
204        assert_eq!(decoded.payloads[0], b"row1");
205        assert!(decoded.error.is_none());
206    }
207
208    #[test]
209    fn roundtrip_execute_response_not_leader() {
210        let resp = ExecuteResponse::err(TypedClusterError::NotLeader {
211            group_id: 3,
212            leader_node_id: Some(1),
213            leader_addr: Some("10.0.0.1:9400".into()),
214            term: 7,
215        });
216        let decoded = roundtrip_resp(resp);
217        assert!(!decoded.success);
218        match decoded.error {
219            Some(TypedClusterError::NotLeader {
220                group_id,
221                leader_node_id,
222                leader_addr,
223                term,
224            }) => {
225                assert_eq!(group_id, 3);
226                assert_eq!(leader_node_id, Some(1));
227                assert_eq!(leader_addr.as_deref(), Some("10.0.0.1:9400"));
228                assert_eq!(term, 7);
229            }
230            other => panic!("expected NotLeader, got {other:?}"),
231        }
232    }
233
234    #[test]
235    fn roundtrip_execute_response_descriptor_mismatch() {
236        let resp = ExecuteResponse::err(TypedClusterError::DescriptorMismatch {
237            collection: "orders".into(),
238            expected_version: 5,
239            actual_version: 6,
240        });
241        let decoded = roundtrip_resp(resp);
242        match decoded.error {
243            Some(TypedClusterError::DescriptorMismatch {
244                collection,
245                expected_version,
246                actual_version,
247            }) => {
248                assert_eq!(collection, "orders");
249                assert_eq!(expected_version, 5);
250                assert_eq!(actual_version, 6);
251            }
252            other => panic!("expected DescriptorMismatch, got {other:?}"),
253        }
254    }
255
256    #[test]
257    fn roundtrip_execute_response_deadline_exceeded() {
258        let resp = ExecuteResponse::err(TypedClusterError::DeadlineExceeded { elapsed_ms: 3000 });
259        let decoded = roundtrip_resp(resp);
260        match decoded.error {
261            Some(TypedClusterError::DeadlineExceeded { elapsed_ms }) => {
262                assert_eq!(elapsed_ms, 3000)
263            }
264            other => panic!("expected DeadlineExceeded, got {other:?}"),
265        }
266    }
267
268    #[test]
269    fn roundtrip_execute_response_internal_error() {
270        let resp = ExecuteResponse::err(TypedClusterError::Internal {
271            code: PLAN_DECODE_FAILED,
272            message: "failed to decode plan".into(),
273        });
274        let decoded = roundtrip_resp(resp);
275        match decoded.error {
276            Some(TypedClusterError::Internal { code, message }) => {
277                assert_eq!(code, PLAN_DECODE_FAILED);
278                assert!(message.contains("plan"));
279            }
280            other => panic!("expected Internal, got {other:?}"),
281        }
282    }
283
284    #[test]
285    fn roundtrip_execute_response_not_leader_no_hint() {
286        let resp = ExecuteResponse::err(TypedClusterError::NotLeader {
287            group_id: 0,
288            leader_node_id: None,
289            leader_addr: None,
290            term: 0,
291        });
292        let decoded = roundtrip_resp(resp);
293        match decoded.error {
294            Some(TypedClusterError::NotLeader {
295                leader_node_id,
296                leader_addr,
297                ..
298            }) => {
299                assert!(leader_node_id.is_none());
300                assert!(leader_addr.is_none());
301            }
302            other => panic!("expected NotLeader, got {other:?}"),
303        }
304    }
305}