Skip to main content

hashiverse_lib/protocol/rpc/
rpc_request.rs

1//! # RPC request packet encode / decode
2//!
3//! The wire format of every outgoing and incoming RPC request. Split into a strict
4//! Tx / Rx pair so the compiler enforces the asymmetry between "I'm building a request"
5//! and "I'm parsing one":
6//!
7//! - [`RpcRequestPacketTx`] — build a request. `encode` optionally compresses the
8//!   payload, runs the PoW search (via a
9//!   [`crate::tools::pow_generator::pow_generator::PowGenerator`]) targeting the
10//!   destination server's identity, and produces a header containing the discriminator,
11//!   sponsor id, PoW timestamp, content hash, salt, and payload length.
12//! - [`RpcRequestPacketRx`] — parse a request on the receive side. Extracts the header,
13//!   verifies PoW against the receiving server's own identity, rejects under-powered
14//!   or stale requests (via [`crate::tools::config::POW_MAX_CLOCK_DRIFT_MILLIS`]) before
15//!   any payload work happens.
16//!
17//! The PoW sits in the *header* and binds to the server's identity, so a request valid
18//! for server A can't be forwarded to server B and still authenticate.
19
20use crate::protocol::payload::payload::PayloadRequestKind;
21use crate::tools::pow_generator::pow_generator::PowGenerator;
22use crate::tools::server_id::ServerId;
23use crate::tools::time::{TimeMillis, TimeMillisBytes, TIME_MILLIS_BYTES};
24use crate::tools::time_provider::time_provider::TimeProvider;
25use crate::tools::types::{Hash, Id, PQCommitmentBytes, Pow, Salt, VerificationKeyBytes, HASH_BYTES, ID_BYTES, SALT_BYTES};
26use crate::tools::{compression, config, hashing};
27use bitflags::bitflags;
28use bytes::{Buf, BufMut, Bytes, BytesMut};
29
30bitflags! {
31    pub struct RpcRequestPacketTxFlags: u8 {
32        const COMPRESSED = 1 << 0;
33        const SERVER_KNOWN = 1 << 1;
34    }
35}
36
37/// An outbound RPC request, freshly encoded and ready to be sent over a [`crate::transport::TransportFactory`].
38///
39/// The "Tx" (transmit) side owns the encode path. Constructing one is expensive: in addition
40/// to compressing the payload and prefixing the header, `encode` performs a proof-of-work
41/// search against the destination server's identity via [`PowGenerator`]. The `pow`
42/// sits in the wire header so the server can reject under-powered requests cheaply before
43/// doing any payload work. `pow_content_hash` is retained on the struct so the caller can
44/// later verify that the response was produced for *this* specific request (see
45/// [`RpcResponsePacketRx`]).
46///
47/// This type and its "Rx" counterpart [`RpcRequestPacketRx`] are intentionally separate so
48/// the encode side and the decode side cannot be accidentally mixed up.
49pub struct RpcRequestPacketTx {
50    pub pow_content_hash: Hash,
51    pub bytes: Bytes,
52}
53impl RpcRequestPacketTx {
54    #[allow(clippy::too_many_arguments)] // protocol layer — each arg is a wire field
55    pub async fn encode(
56        time_provider: &dyn TimeProvider,
57        pow_minimum_per_rpc: Pow,
58        flags: RpcRequestPacketTxFlags,
59        payload_request_kind: PayloadRequestKind,
60        pow_sponsor_id: &Id,
61        destination_verification_key_bytes: &VerificationKeyBytes,
62        destination_pq_commitment_bytes: &PQCommitmentBytes,
63        payload_uncompressed: Bytes,
64        pow_generator: &dyn PowGenerator,
65    ) -> anyhow::Result<Self> {
66        // Do we actually need to compress this?
67        let payload_compressed = match flags.contains(RpcRequestPacketTxFlags::COMPRESSED) {
68            true => compression::compress_for_speed(payload_uncompressed.as_ref())?.to_bytes(),
69            false => payload_uncompressed,
70        };
71
72        // Check that it is not too large...
73        if payload_compressed.len() > config::PROTOCOL_MAX_BLOB_SIZE_REQUEST {
74            anyhow::bail!("request payload size exceeds maximum allowed size: {} > {}", payload_compressed.len(), config::PROTOCOL_MAX_BLOB_SIZE_REQUEST);
75        }
76
77        // This will be needed when we verify that the server processed our request
78        let pow_content_hash: Hash = hashing::hash(payload_compressed.as_ref());
79
80        // Do some proof of work on behalf of the destination server, sponsored by the caller
81        let pow_label = format!("rpc:{}", payload_request_kind);
82        let (pow_timestamp, pow_salt, _, _) = ServerId::pow_generate(&pow_label, time_provider, pow_minimum_per_rpc, pow_sponsor_id, destination_verification_key_bytes, destination_pq_commitment_bytes, &pow_content_hash, pow_generator).await?;
83
84        // Write the header
85        let mut bytes_mut = BytesMut::with_capacity(size_of::<u8>() + size_of::<u8>() + size_of::<u16>() + ID_BYTES + TIME_MILLIS_BYTES + HASH_BYTES + SALT_BYTES + size_of::<u32>() + payload_compressed.len());
86        bytes_mut.put_u8(1);
87        bytes_mut.put_u8(flags.bits());
88        bytes_mut.put_u16_le(payload_request_kind as u16);
89        bytes_mut.put_slice(pow_sponsor_id.as_ref());
90        bytes_mut.put_slice(pow_timestamp.encode_be().as_ref());
91        bytes_mut.put_slice(pow_content_hash.as_ref());
92        bytes_mut.put_slice(pow_salt.as_ref());
93        bytes_mut.put_u32_le(payload_compressed.len() as u32);
94        bytes_mut.put_slice(&payload_compressed);
95
96        let bytes = bytes_mut.freeze();
97
98        Ok(Self { pow_content_hash, bytes })
99    }
100}
101
102/// The server-side view of an inbound RPC request after header parsing and PoW verification.
103///
104/// `RpcRequestPacketRx` is what falls out of `decode` on the receiving end. It exposes the
105/// routing discriminator ([`PayloadRequestKind`]) so the dispatcher knows which handler to
106/// invoke, plus all the PoW fields needed to decide how much trust to assign to the call.
107/// The `bytes` field still contains the compressed payload — payload decoding happens later,
108/// after the dispatcher has picked the right handler.
109///
110/// Paired with [`RpcRequestPacketTx`] as the decode side of the same wire format.
111#[derive(Debug)]
112pub struct RpcRequestPacketRx {
113    pub payload_request_kind: PayloadRequestKind,
114    pub pow_sponsor_id: Id,
115    pub pow_server_known: bool,
116    pub pow: Pow,
117    pub pow_timestamp: TimeMillis,
118    pub pow_content_hash: Hash,
119    pub pow_salt: Salt,
120    pub bytes: Bytes,
121}
122impl RpcRequestPacketRx {
123    pub fn decode(current_time_millis: &TimeMillis, verification_key_bytes: &VerificationKeyBytes, pq_commitment_bytes: &PQCommitmentBytes, mut response_bytes: Bytes) -> anyhow::Result<Self> {
124        // Do we have enough bytes for the header?
125        if response_bytes.len() < size_of::<u8>() + size_of::<u8>() + size_of::<u16>() + ID_BYTES + TIME_MILLIS_BYTES + HASH_BYTES + SALT_BYTES + size_of::<u32>() {
126            anyhow::bail!("RpcRequestPacket is too short for header");
127        }
128
129        let version = response_bytes.get_u8();
130        if 1 != version {
131            anyhow::bail!("Unsupported RpcRequestPacket version: {}", version);
132        }
133
134        let flags = RpcRequestPacketTxFlags::from_bits(response_bytes.get_u8()).ok_or_else(|| anyhow::anyhow!("Invalid RpcRequestPacket flags"))?;
135        let payload_request_kind = PayloadRequestKind::from_u16(response_bytes.get_u16_le())?;
136        let pow_sponsor_id = Id::from_slice(response_bytes.slice(..ID_BYTES).as_ref())?;
137        response_bytes.advance(ID_BYTES);
138        let pow_timestamp_bytes: TimeMillisBytes = TimeMillisBytes::from_bytes(response_bytes.slice(..TIME_MILLIS_BYTES).as_ref())?;
139        response_bytes.advance(TIME_MILLIS_BYTES);
140        let pow_content_hash: Hash = Hash::from_slice(response_bytes.slice(..HASH_BYTES).as_ref())?;
141        response_bytes.advance(HASH_BYTES);
142        let pow_salt = Salt::from_slice(response_bytes.slice(..SALT_BYTES).as_ref())?;
143        response_bytes.advance(SALT_BYTES);
144
145        // Is the packet timestamp close enough to ours?
146        let pow_timestamp = TimeMillis::timestamp_decode_be(&pow_timestamp_bytes);
147        {
148            let delta_millis = (*current_time_millis - pow_timestamp).abs();
149            if delta_millis.0 > config::POW_MAX_CLOCK_DRIFT_MILLIS.0 {
150                anyhow::bail!("Client pow clock drift too large: us={} them={}", current_time_millis, pow_timestamp);
151            }
152        }
153
154        // Has the client done enough pow for us?
155        let (pow, pow_server_known) = match flags.contains(RpcRequestPacketTxFlags::SERVER_KNOWN) {
156            true => {
157                let (pow, _) = ServerId::pow_measure(&pow_sponsor_id, verification_key_bytes, pq_commitment_bytes, &pow_timestamp_bytes, &pow_content_hash, &pow_salt)?;
158                if pow < config::POW_MINIMUM_PER_RPC_SERVER_KNOWN {
159                    anyhow::bail!("Client has not done enough known pow: {} < {}", pow, config::POW_MINIMUM_PER_RPC_SERVER_KNOWN);
160                }
161                (pow, true)
162            }
163            false => {
164                let (pow, _) = ServerId::pow_measure(&pow_sponsor_id, &VerificationKeyBytes::zero(), &PQCommitmentBytes::zero(), &pow_timestamp_bytes, &pow_content_hash, &pow_salt)?;
165                if pow < config::POW_MINIMUM_PER_RPC_SERVER_UNKNOWN {
166                    anyhow::bail!("Client has not done enough unknown pow: {} < {}", pow, config::POW_MINIMUM_PER_RPC_SERVER_UNKNOWN);
167                }
168                (pow, false)
169            }
170        };
171
172        let request_payload_len = response_bytes.get_u32_le() as usize;
173
174        if request_payload_len > config::PROTOCOL_MAX_BLOB_SIZE_REQUEST {
175            anyhow::bail!("RpcRequestPacket payload too large: {} > {}", request_payload_len, config::PROTOCOL_MAX_BLOB_SIZE_REQUEST);
176        }
177
178        // Do we have enough bytes for the payload?
179        if response_bytes.len() < request_payload_len {
180            anyhow::bail!("RpcRequestPacket is too short for payload");
181        }
182
183        let response_payload = response_bytes.slice(..request_payload_len);
184        response_bytes.advance(request_payload_len);
185
186        // Sanity check - are we done?
187        if !response_bytes.is_empty() {
188            anyhow::bail!("RpcRequestPacket is too long");
189        }
190
191        // Do we need to decompress?
192        let response_payload_decompressed = match flags.contains(RpcRequestPacketTxFlags::COMPRESSED) {
193            true => compression::decompress(response_payload.as_ref())?.to_bytes(),
194            false => response_payload,
195        };
196
197        let selfie = Self {
198            payload_request_kind,
199            pow_sponsor_id,
200            pow_server_known,
201            pow,
202            pow_timestamp,
203            pow_content_hash,
204            pow_salt,
205            bytes: response_payload_decompressed,
206        };
207        // trace!("Decoded RpcRequestPacketRx={:?}", selfie);
208
209        Ok(selfie)
210    }
211}
212