hashiverse_lib/protocol/rpc/
rpc_request.rs1use crate::protocol::payload::payload::PayloadRequestKind;
21use crate::tools::pow_generator::pow_generator::PowGenerator;
22use crate::tools::server_id::ServerId;
23use crate::tools::time::{TimeMillis, TimeMillisBytes, TIME_MILLIS_BYTES};
24use crate::tools::time_provider::time_provider::TimeProvider;
25use crate::tools::types::{Hash, Id, PQCommitmentBytes, Pow, Salt, VerificationKeyBytes, HASH_BYTES, ID_BYTES, SALT_BYTES};
26use crate::tools::{compression, config, hashing};
27use bitflags::bitflags;
28use bytes::{Buf, BufMut, Bytes, BytesMut};
29
30bitflags! {
31 pub struct RpcRequestPacketTxFlags: u8 {
32 const COMPRESSED = 1 << 0;
33 const SERVER_KNOWN = 1 << 1;
34 }
35}
36
37pub struct RpcRequestPacketTx {
50 pub pow_content_hash: Hash,
51 pub bytes: Bytes,
52}
53impl RpcRequestPacketTx {
54 #[allow(clippy::too_many_arguments)] pub async fn encode(
56 time_provider: &dyn TimeProvider,
57 pow_minimum_per_rpc: Pow,
58 flags: RpcRequestPacketTxFlags,
59 payload_request_kind: PayloadRequestKind,
60 pow_sponsor_id: &Id,
61 destination_verification_key_bytes: &VerificationKeyBytes,
62 destination_pq_commitment_bytes: &PQCommitmentBytes,
63 payload_uncompressed: Bytes,
64 pow_generator: &dyn PowGenerator,
65 ) -> anyhow::Result<Self> {
66 let payload_compressed = match flags.contains(RpcRequestPacketTxFlags::COMPRESSED) {
68 true => compression::compress_for_speed(payload_uncompressed.as_ref())?.to_bytes(),
69 false => payload_uncompressed,
70 };
71
72 if payload_compressed.len() > config::PROTOCOL_MAX_BLOB_SIZE_REQUEST {
74 anyhow::bail!("request payload size exceeds maximum allowed size: {} > {}", payload_compressed.len(), config::PROTOCOL_MAX_BLOB_SIZE_REQUEST);
75 }
76
77 let pow_content_hash: Hash = hashing::hash(payload_compressed.as_ref());
79
80 let pow_label = format!("rpc:{}", payload_request_kind);
82 let (pow_timestamp, pow_salt, _, _) = ServerId::pow_generate(&pow_label, time_provider, pow_minimum_per_rpc, pow_sponsor_id, destination_verification_key_bytes, destination_pq_commitment_bytes, &pow_content_hash, pow_generator).await?;
83
84 let mut bytes_mut = BytesMut::with_capacity(size_of::<u8>() + size_of::<u8>() + size_of::<u16>() + ID_BYTES + TIME_MILLIS_BYTES + HASH_BYTES + SALT_BYTES + size_of::<u32>() + payload_compressed.len());
86 bytes_mut.put_u8(1);
87 bytes_mut.put_u8(flags.bits());
88 bytes_mut.put_u16_le(payload_request_kind as u16);
89 bytes_mut.put_slice(pow_sponsor_id.as_ref());
90 bytes_mut.put_slice(pow_timestamp.encode_be().as_ref());
91 bytes_mut.put_slice(pow_content_hash.as_ref());
92 bytes_mut.put_slice(pow_salt.as_ref());
93 bytes_mut.put_u32_le(payload_compressed.len() as u32);
94 bytes_mut.put_slice(&payload_compressed);
95
96 let bytes = bytes_mut.freeze();
97
98 Ok(Self { pow_content_hash, bytes })
99 }
100}
101
102#[derive(Debug)]
112pub struct RpcRequestPacketRx {
113 pub payload_request_kind: PayloadRequestKind,
114 pub pow_sponsor_id: Id,
115 pub pow_server_known: bool,
116 pub pow: Pow,
117 pub pow_timestamp: TimeMillis,
118 pub pow_content_hash: Hash,
119 pub pow_salt: Salt,
120 pub bytes: Bytes,
121}
122impl RpcRequestPacketRx {
123 pub fn decode(current_time_millis: &TimeMillis, verification_key_bytes: &VerificationKeyBytes, pq_commitment_bytes: &PQCommitmentBytes, mut response_bytes: Bytes) -> anyhow::Result<Self> {
124 if response_bytes.len() < size_of::<u8>() + size_of::<u8>() + size_of::<u16>() + ID_BYTES + TIME_MILLIS_BYTES + HASH_BYTES + SALT_BYTES + size_of::<u32>() {
126 anyhow::bail!("RpcRequestPacket is too short for header");
127 }
128
129 let version = response_bytes.get_u8();
130 if 1 != version {
131 anyhow::bail!("Unsupported RpcRequestPacket version: {}", version);
132 }
133
134 let flags = RpcRequestPacketTxFlags::from_bits(response_bytes.get_u8()).ok_or_else(|| anyhow::anyhow!("Invalid RpcRequestPacket flags"))?;
135 let payload_request_kind = PayloadRequestKind::from_u16(response_bytes.get_u16_le())?;
136 let pow_sponsor_id = Id::from_slice(response_bytes.slice(..ID_BYTES).as_ref())?;
137 response_bytes.advance(ID_BYTES);
138 let pow_timestamp_bytes: TimeMillisBytes = TimeMillisBytes::from_bytes(response_bytes.slice(..TIME_MILLIS_BYTES).as_ref())?;
139 response_bytes.advance(TIME_MILLIS_BYTES);
140 let pow_content_hash: Hash = Hash::from_slice(response_bytes.slice(..HASH_BYTES).as_ref())?;
141 response_bytes.advance(HASH_BYTES);
142 let pow_salt = Salt::from_slice(response_bytes.slice(..SALT_BYTES).as_ref())?;
143 response_bytes.advance(SALT_BYTES);
144
145 let pow_timestamp = TimeMillis::timestamp_decode_be(&pow_timestamp_bytes);
147 {
148 let delta_millis = (*current_time_millis - pow_timestamp).abs();
149 if delta_millis.0 > config::POW_MAX_CLOCK_DRIFT_MILLIS.0 {
150 anyhow::bail!("Client pow clock drift too large: us={} them={}", current_time_millis, pow_timestamp);
151 }
152 }
153
154 let (pow, pow_server_known) = match flags.contains(RpcRequestPacketTxFlags::SERVER_KNOWN) {
156 true => {
157 let (pow, _) = ServerId::pow_measure(&pow_sponsor_id, verification_key_bytes, pq_commitment_bytes, &pow_timestamp_bytes, &pow_content_hash, &pow_salt)?;
158 if pow < config::POW_MINIMUM_PER_RPC_SERVER_KNOWN {
159 anyhow::bail!("Client has not done enough known pow: {} < {}", pow, config::POW_MINIMUM_PER_RPC_SERVER_KNOWN);
160 }
161 (pow, true)
162 }
163 false => {
164 let (pow, _) = ServerId::pow_measure(&pow_sponsor_id, &VerificationKeyBytes::zero(), &PQCommitmentBytes::zero(), &pow_timestamp_bytes, &pow_content_hash, &pow_salt)?;
165 if pow < config::POW_MINIMUM_PER_RPC_SERVER_UNKNOWN {
166 anyhow::bail!("Client has not done enough unknown pow: {} < {}", pow, config::POW_MINIMUM_PER_RPC_SERVER_UNKNOWN);
167 }
168 (pow, false)
169 }
170 };
171
172 let request_payload_len = response_bytes.get_u32_le() as usize;
173
174 if request_payload_len > config::PROTOCOL_MAX_BLOB_SIZE_REQUEST {
175 anyhow::bail!("RpcRequestPacket payload too large: {} > {}", request_payload_len, config::PROTOCOL_MAX_BLOB_SIZE_REQUEST);
176 }
177
178 if response_bytes.len() < request_payload_len {
180 anyhow::bail!("RpcRequestPacket is too short for payload");
181 }
182
183 let response_payload = response_bytes.slice(..request_payload_len);
184 response_bytes.advance(request_payload_len);
185
186 if !response_bytes.is_empty() {
188 anyhow::bail!("RpcRequestPacket is too long");
189 }
190
191 let response_payload_decompressed = match flags.contains(RpcRequestPacketTxFlags::COMPRESSED) {
193 true => compression::decompress(response_payload.as_ref())?.to_bytes(),
194 false => response_payload,
195 };
196
197 let selfie = Self {
198 payload_request_kind,
199 pow_sponsor_id,
200 pow_server_known,
201 pow,
202 pow_timestamp,
203 pow_content_hash,
204 pow_salt,
205 bytes: response_payload_decompressed,
206 };
207 Ok(selfie)
210 }
211}
212