Skip to main content

nodedb_cluster/rpc_codec/
execute.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! ExecuteRequest / ExecuteResponse — cross-node physical-plan execution RPC.
4//!
5//! Discriminants 18 and 19 are permanently assigned to these variants.
6
7use super::discriminants::*;
8use super::header::write_frame;
9use super::raft_rpc::RaftRpc;
10use crate::error::{ClusterError, Result};
11
12// ── Wire types ──────────────────────────────────────────────────────────────
13
14/// A single (collection, version) entry sent by the caller to let the receiver
15/// validate descriptor freshness before executing the plan.
16///
17/// Cross-version safety: new optional fields should be added as `Option<T>`.
18#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
19pub struct DescriptorVersionEntry {
20    pub collection: String,
21    pub version: u64,
22}
23
24/// Send an already-planned `PhysicalPlan` to a remote node for execution.
25#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
26pub struct ExecuteRequest {
27    /// zerompk-encoded PhysicalPlan (via nodedb::bridge::physical_plan::wire::encode).
28    pub plan_bytes: Vec<u8>,
29    /// Tenant ID authenticated on the originating node; trusted on the receiver.
30    pub tenant_id: u64,
31    /// Database scope authenticated on the originating node; trusted on the receiver.
32    /// `0` maps to `DatabaseId::DEFAULT` (the built-in `default` database).
33    pub database_id: u64,
34    /// Milliseconds remaining until the caller's deadline.
35    /// 0 means the deadline has already expired — receiver returns DeadlineExceeded.
36    pub deadline_remaining_ms: u64,
37    /// Distributed trace ID for observability (16-byte W3C-compatible TraceId).
38    pub trace_id: [u8; 16],
39    /// Caller's view of descriptor versions for every collection touched by the plan.
40    pub descriptor_versions: Vec<DescriptorVersionEntry>,
41}
42
43/// Response to an `ExecuteRequest`.
44#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
45pub struct ExecuteResponse {
46    pub success: bool,
47    /// Raw Data Plane response payloads, one per result set.
48    pub payloads: Vec<Vec<u8>>,
49    pub error: Option<TypedClusterError>,
50}
51
52/// Typed error returned by the remote executor.
53#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
54pub enum TypedClusterError {
55    NotLeader {
56        group_id: u64,
57        leader_node_id: Option<u64>,
58        leader_addr: Option<String>,
59        term: u64,
60    },
61    DescriptorMismatch {
62        collection: String,
63        expected_version: u64,
64        actual_version: u64,
65    },
66    DeadlineExceeded {
67        elapsed_ms: u64,
68    },
69    /// Catch-all. `code` is a `nodedb_types::error::ErrorCode` as u32.
70    Internal {
71        code: u32,
72        message: String,
73    },
74}
75
76impl ExecuteResponse {
77    pub fn ok(payloads: Vec<Vec<u8>>) -> Self {
78        Self {
79            success: true,
80            payloads,
81            error: None,
82        }
83    }
84    pub fn err(error: TypedClusterError) -> Self {
85        Self {
86            success: false,
87            payloads: vec![],
88            error: Some(error),
89        }
90    }
91}
92
93// ── Codec ────────────────────────────────────────────────────────────────────
94
95macro_rules! to_bytes {
96    ($msg:expr) => {
97        rkyv::to_bytes::<rkyv::rancor::Error>($msg)
98            .map(|b| b.to_vec())
99            .map_err(|e| ClusterError::Codec {
100                detail: format!("rkyv serialize: {e}"),
101            })
102    };
103}
104
105macro_rules! from_bytes {
106    ($payload:expr, $T:ty, $name:expr) => {{
107        let mut aligned = rkyv::util::AlignedVec::<16>::with_capacity($payload.len());
108        aligned.extend_from_slice($payload);
109        rkyv::from_bytes::<$T, rkyv::rancor::Error>(&aligned).map_err(|e| ClusterError::Codec {
110            detail: format!("rkyv deserialize {}: {e}", $name),
111        })
112    }};
113}
114
115pub(super) fn encode_execute_req(msg: &ExecuteRequest, out: &mut Vec<u8>) -> Result<()> {
116    write_frame(RPC_EXECUTE_REQ, &to_bytes!(msg)?, out)
117}
118pub(super) fn encode_execute_resp(msg: &ExecuteResponse, out: &mut Vec<u8>) -> Result<()> {
119    write_frame(RPC_EXECUTE_RESP, &to_bytes!(msg)?, out)
120}
121
122pub(super) fn decode_execute_req(payload: &[u8]) -> Result<RaftRpc> {
123    Ok(RaftRpc::ExecuteRequest(from_bytes!(
124        payload,
125        ExecuteRequest,
126        "ExecuteRequest"
127    )?))
128}
129pub(super) fn decode_execute_resp(payload: &[u8]) -> Result<RaftRpc> {
130    Ok(RaftRpc::ExecuteResponse(from_bytes!(
131        payload,
132        ExecuteResponse,
133        "ExecuteResponse"
134    )?))
135}
136
137/// Numeric code for `TypedClusterError::Internal` when plan bytes fail to decode.
138pub const PLAN_DECODE_FAILED: u32 = 0x_CE00_0001;
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143
144    fn roundtrip_req(req: ExecuteRequest) -> ExecuteRequest {
145        let rpc = RaftRpc::ExecuteRequest(req);
146        let encoded = super::super::encode(&rpc).unwrap();
147        match super::super::decode(&encoded).unwrap() {
148            RaftRpc::ExecuteRequest(r) => r,
149            other => panic!("expected ExecuteRequest, got {other:?}"),
150        }
151    }
152
153    fn roundtrip_resp(resp: ExecuteResponse) -> ExecuteResponse {
154        let rpc = RaftRpc::ExecuteResponse(resp);
155        let encoded = super::super::encode(&rpc).unwrap();
156        match super::super::decode(&encoded).unwrap() {
157            RaftRpc::ExecuteResponse(r) => r,
158            other => panic!("expected ExecuteResponse, got {other:?}"),
159        }
160    }
161
162    #[test]
163    fn roundtrip_execute_request_basic() {
164        let req = ExecuteRequest {
165            plan_bytes: b"msgpack-plan-bytes".to_vec(),
166            tenant_id: 7,
167            database_id: 0,
168            deadline_remaining_ms: 5000,
169            trace_id: [
170                0xDE, 0xAD, 0xBE, 0xEF, 0x12, 0x34, 0x56, 0x78, 0xDE, 0xAD, 0xBE, 0xEF, 0x12, 0x34,
171                0x56, 0x78,
172            ],
173            descriptor_versions: vec![
174                DescriptorVersionEntry {
175                    collection: "orders".into(),
176                    version: 42,
177                },
178                DescriptorVersionEntry {
179                    collection: "users".into(),
180                    version: 1,
181                },
182            ],
183        };
184        let decoded = roundtrip_req(req.clone());
185        assert_eq!(decoded.plan_bytes, req.plan_bytes);
186        assert_eq!(decoded.tenant_id, 7);
187        assert_eq!(decoded.deadline_remaining_ms, 5000);
188        assert_eq!(
189            decoded.trace_id, req.trace_id,
190            "trace_id roundtrips correctly"
191        );
192        assert_eq!(decoded.descriptor_versions.len(), 2);
193        assert_eq!(decoded.descriptor_versions[0].collection, "orders");
194        assert_eq!(decoded.descriptor_versions[0].version, 42);
195    }
196
197    #[test]
198    fn roundtrip_execute_request_empty_descriptors() {
199        let req = ExecuteRequest {
200            plan_bytes: vec![0xAB, 0xCD],
201            tenant_id: 0,
202            database_id: 0,
203            deadline_remaining_ms: 1000,
204            trace_id: [0u8; 16],
205            descriptor_versions: vec![],
206        };
207        let decoded = roundtrip_req(req);
208        assert!(decoded.descriptor_versions.is_empty());
209    }
210
211    #[test]
212    fn roundtrip_execute_response_success() {
213        let resp = ExecuteResponse::ok(vec![b"row1".to_vec(), b"row2".to_vec()]);
214        let decoded = roundtrip_resp(resp);
215        assert!(decoded.success);
216        assert_eq!(decoded.payloads.len(), 2);
217        assert_eq!(decoded.payloads[0], b"row1");
218        assert!(decoded.error.is_none());
219    }
220
221    #[test]
222    fn roundtrip_execute_response_not_leader() {
223        let resp = ExecuteResponse::err(TypedClusterError::NotLeader {
224            group_id: 3,
225            leader_node_id: Some(1),
226            leader_addr: Some("10.0.0.1:9400".into()),
227            term: 7,
228        });
229        let decoded = roundtrip_resp(resp);
230        assert!(!decoded.success);
231        match decoded.error {
232            Some(TypedClusterError::NotLeader {
233                group_id,
234                leader_node_id,
235                leader_addr,
236                term,
237            }) => {
238                assert_eq!(group_id, 3);
239                assert_eq!(leader_node_id, Some(1));
240                assert_eq!(leader_addr.as_deref(), Some("10.0.0.1:9400"));
241                assert_eq!(term, 7);
242            }
243            other => panic!("expected NotLeader, got {other:?}"),
244        }
245    }
246
247    #[test]
248    fn roundtrip_execute_response_descriptor_mismatch() {
249        let resp = ExecuteResponse::err(TypedClusterError::DescriptorMismatch {
250            collection: "orders".into(),
251            expected_version: 5,
252            actual_version: 6,
253        });
254        let decoded = roundtrip_resp(resp);
255        match decoded.error {
256            Some(TypedClusterError::DescriptorMismatch {
257                collection,
258                expected_version,
259                actual_version,
260            }) => {
261                assert_eq!(collection, "orders");
262                assert_eq!(expected_version, 5);
263                assert_eq!(actual_version, 6);
264            }
265            other => panic!("expected DescriptorMismatch, got {other:?}"),
266        }
267    }
268
269    #[test]
270    fn roundtrip_execute_response_deadline_exceeded() {
271        let resp = ExecuteResponse::err(TypedClusterError::DeadlineExceeded { elapsed_ms: 3000 });
272        let decoded = roundtrip_resp(resp);
273        match decoded.error {
274            Some(TypedClusterError::DeadlineExceeded { elapsed_ms }) => {
275                assert_eq!(elapsed_ms, 3000)
276            }
277            other => panic!("expected DeadlineExceeded, got {other:?}"),
278        }
279    }
280
281    #[test]
282    fn roundtrip_execute_response_internal_error() {
283        let resp = ExecuteResponse::err(TypedClusterError::Internal {
284            code: PLAN_DECODE_FAILED,
285            message: "failed to decode plan".into(),
286        });
287        let decoded = roundtrip_resp(resp);
288        match decoded.error {
289            Some(TypedClusterError::Internal { code, message }) => {
290                assert_eq!(code, PLAN_DECODE_FAILED);
291                assert!(message.contains("plan"));
292            }
293            other => panic!("expected Internal, got {other:?}"),
294        }
295    }
296
297    #[test]
298    fn roundtrip_execute_response_not_leader_no_hint() {
299        let resp = ExecuteResponse::err(TypedClusterError::NotLeader {
300            group_id: 0,
301            leader_node_id: None,
302            leader_addr: None,
303            term: 0,
304        });
305        let decoded = roundtrip_resp(resp);
306        match decoded.error {
307            Some(TypedClusterError::NotLeader {
308                leader_node_id,
309                leader_addr,
310                ..
311            }) => {
312                assert!(leader_node_id.is_none());
313                assert!(leader_addr.is_none());
314            }
315            other => panic!("expected NotLeader, got {other:?}"),
316        }
317    }
318}