Skip to main content

nodedb_cluster/rpc_codec/
data_propose.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! DataProposeRequest / DataProposeResponse wire types and codecs.
4//!
5//! Used to forward data-group (non-metadata) Raft proposals from a follower
6//! node to the group leader. The leader applies the proposal locally and
7//! returns `(group_id, log_index)` so the forwarder can register a
8//! `ProposeTracker` waiter and await commit.
9
10use super::discriminants::*;
11use super::header::write_frame;
12use super::raft_rpc::RaftRpc;
13use crate::error::{ClusterError, Result};
14
15/// Forward an opaque data-group proposal payload to the data-group leader.
16///
17/// `vshard_id` identifies the vShard (and thus the Raft group) the entry
18/// belongs to. `bytes` is the serialized `ReplicatedEntry`.
19#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
20pub struct DataProposeRequest {
21    pub vshard_id: u32,
22    pub bytes: Vec<u8>,
23}
24
25/// Response to a forwarded data-group proposal.
26#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
27pub struct DataProposeResponse {
28    pub success: bool,
29    pub group_id: u64,
30    pub log_index: u64,
31    pub leader_hint: Option<u64>,
32    pub error_message: String,
33}
34
35impl DataProposeResponse {
36    pub fn ok(group_id: u64, log_index: u64) -> Self {
37        Self {
38            success: true,
39            group_id,
40            log_index,
41            leader_hint: None,
42            error_message: String::new(),
43        }
44    }
45
46    pub fn err(message: impl Into<String>, leader_hint: Option<u64>) -> Self {
47        Self {
48            success: false,
49            group_id: 0,
50            log_index: 0,
51            leader_hint,
52            error_message: message.into(),
53        }
54    }
55}
56
57macro_rules! to_bytes {
58    ($msg:expr) => {
59        rkyv::to_bytes::<rkyv::rancor::Error>($msg)
60            .map(|b| b.to_vec())
61            .map_err(|e| ClusterError::Codec {
62                detail: format!("rkyv serialize: {e}"),
63            })
64    };
65}
66
67macro_rules! from_bytes {
68    ($payload:expr, $T:ty, $name:expr) => {{
69        let mut aligned = rkyv::util::AlignedVec::<16>::with_capacity($payload.len());
70        aligned.extend_from_slice($payload);
71        rkyv::from_bytes::<$T, rkyv::rancor::Error>(&aligned).map_err(|e| ClusterError::Codec {
72            detail: format!("rkyv deserialize {}: {e}", $name),
73        })
74    }};
75}
76
77pub(super) fn encode_data_propose_req(msg: &DataProposeRequest, out: &mut Vec<u8>) -> Result<()> {
78    write_frame(RPC_DATA_PROPOSE_REQ, &to_bytes!(msg)?, out)
79}
80pub(super) fn encode_data_propose_resp(msg: &DataProposeResponse, out: &mut Vec<u8>) -> Result<()> {
81    write_frame(RPC_DATA_PROPOSE_RESP, &to_bytes!(msg)?, out)
82}
83
84pub(super) fn decode_data_propose_req(payload: &[u8]) -> Result<RaftRpc> {
85    Ok(RaftRpc::DataProposeRequest(from_bytes!(
86        payload,
87        DataProposeRequest,
88        "DataProposeRequest"
89    )?))
90}
91pub(super) fn decode_data_propose_resp(payload: &[u8]) -> Result<RaftRpc> {
92    Ok(RaftRpc::DataProposeResponse(from_bytes!(
93        payload,
94        DataProposeResponse,
95        "DataProposeResponse"
96    )?))
97}