1use crate::rlpx::{
2 message::RLPxMessage,
3 utils::{snappy_compress, snappy_decompress},
4};
5use bytes::BufMut;
6use ethrex_common::types::{BlockBody, BlockHash, BlockHeader, BlockNumber};
7use ethrex_rlp::{
8 decode::RLPDecode,
9 encode::RLPEncode,
10 error::{RLPDecodeError, RLPEncodeError},
11 structs::{Decoder, Encoder},
12};
13use ethrex_storage::{Store, error::StoreError};
14use tracing::{error, trace};
15
16pub const HASH_FIRST_BYTE_DECODER: u8 = 160;
17
18#[derive(Debug, PartialEq, Eq, Clone, Copy)]
19pub enum HashOrNumber {
20 Hash(BlockHash),
21 Number(BlockNumber),
22}
23
24impl core::fmt::Display for HashOrNumber {
25 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
26 match self {
27 HashOrNumber::Hash(hash) => write!(f, "{hash:#x}"),
28 HashOrNumber::Number(number) => write!(f, "{number}"),
29 }
30 }
31}
32
33impl RLPEncode for HashOrNumber {
34 fn encode(&self, buf: &mut dyn BufMut) {
35 match self {
36 HashOrNumber::Hash(hash) => hash.encode(buf),
37 HashOrNumber::Number(number) => number.encode(buf),
38 }
39 }
40
41 fn length(&self) -> usize {
42 match self {
43 HashOrNumber::Hash(hash) => hash.length(),
44 HashOrNumber::Number(number) => number.length(),
45 }
46 }
47}
48
49impl From<BlockHash> for HashOrNumber {
50 fn from(value: BlockHash) -> Self {
51 Self::Hash(value)
52 }
53}
54
55impl RLPDecode for HashOrNumber {
56 fn decode_unfinished(buf: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
57 let first_byte = buf.first().ok_or(RLPDecodeError::InvalidLength)?;
58 if *first_byte == HASH_FIRST_BYTE_DECODER {
62 let (hash, rest) = BlockHash::decode_unfinished(buf)?;
63 return Ok((Self::Hash(hash), rest));
64 }
65
66 let (number, rest) = u64::decode_unfinished(buf)?;
67 Ok((Self::Number(number), rest))
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct GetBlockHeaders {
74 pub id: u64,
77 pub startblock: HashOrNumber,
78 pub limit: u64,
79 pub skip: u64,
80 pub reverse: bool,
81}
82
83pub const BLOCK_HEADER_LIMIT: u64 = 1024;
85
86impl GetBlockHeaders {
87 pub fn new(id: u64, startblock: HashOrNumber, limit: u64, skip: u64, reverse: bool) -> Self {
88 Self {
89 id,
90 startblock,
91 limit,
92 skip,
93 reverse,
94 }
95 }
96
97 pub async fn fetch_headers(&self, storage: &Store) -> Vec<BlockHeader> {
98 let start_block = match self.startblock {
101 HashOrNumber::Hash(block_hash) => {
104 if let Ok(Some(block_number)) = storage.get_block_number(block_hash).await {
105 HashOrNumber::Number(block_number)
106 } else {
107 self.startblock
108 }
109 }
110 HashOrNumber::Number(_) => self.startblock,
114 };
115
116 let mut headers = vec![];
117
118 let mut current_block = start_block;
119
120 let block_skip = if self.reverse {
121 -((self.skip + 1) as i64)
122 } else {
123 (self.skip + 1) as i64
124 };
125
126 let limit = if self.limit > BLOCK_HEADER_LIMIT {
127 BLOCK_HEADER_LIMIT
128 } else {
129 self.limit
130 };
131
132 for _ in 0..limit {
133 let block_header_opt = match get_block_header(storage, current_block) {
134 Ok(block_header) => block_header,
135 Err(err) => {
136 error!(%err, block_ref=%current_block, "Error accessing DB while building header response for peer");
137 break;
138 }
139 };
140 let Some(block_header) = block_header_opt else {
141 trace!(block_ref=%current_block, "Block header not found");
142 break;
143 };
144 headers.push(block_header);
145
146 match current_block {
148 HashOrNumber::Hash(_) => break,
153 HashOrNumber::Number(number) => {
154 let Ok(new_number) = (number as i64 + block_skip).try_into() else {
155 break;
156 };
157 current_block = HashOrNumber::Number(new_number)
158 }
159 }
160 }
161 headers
162 }
163}
164
165fn get_block_header(
166 storage: &Store,
167 block_ref: HashOrNumber,
168) -> Result<Option<BlockHeader>, StoreError> {
169 match block_ref {
170 HashOrNumber::Hash(block_hash) => storage.get_block_header_by_hash(block_hash),
171 HashOrNumber::Number(block_number) => storage.get_block_header(block_number),
172 }
173}
174
175impl RLPxMessage for GetBlockHeaders {
176 const CODE: u8 = 0x03;
177 fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> {
178 let mut encoded_data = vec![];
179 let limit = self.limit;
180 let skip = self.skip;
181 let reverse = self.reverse as u8;
182 Encoder::new(&mut encoded_data)
183 .encode_field(&self.id)
184 .encode_field(&(self.startblock, limit, skip, reverse))
185 .finish();
186 let msg_data = snappy_compress(encoded_data)?;
187 buf.put_slice(&msg_data);
188 Ok(())
189 }
190
191 fn decode(msg_data: &[u8]) -> Result<Self, RLPDecodeError> {
192 let decompressed_data = snappy_decompress(msg_data)?;
193 let decoder = Decoder::new(&decompressed_data)?;
194 let (id, decoder): (u64, _) = decoder.decode_field("request-id")?;
195 let ((start_block, limit, skip, reverse), _): ((HashOrNumber, u64, u64, bool), _) =
196 decoder.decode_field("get headers request params")?;
197 Ok(Self::new(id, start_block, limit, skip, reverse))
198 }
199}
200
201#[derive(Debug, Clone)]
203pub struct BlockHeaders {
204 pub id: u64,
207 pub block_headers: Vec<BlockHeader>,
208}
209
210impl BlockHeaders {
211 pub fn new(id: u64, block_headers: Vec<BlockHeader>) -> Self {
212 Self { block_headers, id }
213 }
214}
215
216impl RLPxMessage for BlockHeaders {
217 const CODE: u8 = 0x04;
218 fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> {
219 let mut encoded_data = vec![];
220 Encoder::new(&mut encoded_data)
224 .encode_field(&self.id)
225 .encode_field(&self.block_headers)
226 .finish();
227 let msg_data = snappy_compress(encoded_data)?;
228 buf.put_slice(&msg_data);
229 Ok(())
230 }
231
232 fn decode(msg_data: &[u8]) -> Result<Self, RLPDecodeError> {
233 let decompressed_data = snappy_decompress(msg_data)?;
234 let decoder = Decoder::new(&decompressed_data)?;
235 let (id, decoder): (u64, _) = decoder.decode_field("request-id")?;
236 let (block_headers, _): (Vec<BlockHeader>, _) = decoder.decode_field("headers")?;
237
238 Ok(Self::new(id, block_headers))
239 }
240}
241
242#[derive(Debug, Clone)]
244pub struct GetBlockBodies {
245 pub id: u64,
248 pub block_hashes: Vec<BlockHash>,
249}
250
251pub const BLOCK_BODY_LIMIT: usize = 1024;
254
255impl GetBlockBodies {
256 pub fn new(id: u64, block_hashes: Vec<BlockHash>) -> Self {
257 Self { block_hashes, id }
258 }
259 pub async fn fetch_blocks(&self, storage: &Store) -> Vec<BlockBody> {
260 let mut block_bodies = vec![];
261 for block_hash in &self.block_hashes {
262 match storage.get_block_body_by_hash(*block_hash).await {
263 Ok(Some(block)) => {
264 block_bodies.push(block);
265 if block_bodies.len() >= BLOCK_BODY_LIMIT {
266 break;
267 }
268 }
269 Ok(None) => {
270 continue;
271 }
272 Err(err) => {
273 error!(
274 "Error accessing DB while building block bodies response for peer: {err}"
275 );
276 return vec![];
277 }
278 }
279 }
280 block_bodies
281 }
282}
283
284impl RLPxMessage for GetBlockBodies {
285 const CODE: u8 = 0x05;
286 fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> {
287 let mut encoded_data = vec![];
288 Encoder::new(&mut encoded_data)
289 .encode_field(&self.id)
290 .encode_field(&self.block_hashes)
291 .finish();
292
293 let msg_data = snappy_compress(encoded_data)?;
294 buf.put_slice(&msg_data);
295 Ok(())
296 }
297
298 fn decode(msg_data: &[u8]) -> Result<Self, RLPDecodeError> {
299 let decompressed_data = snappy_decompress(msg_data)?;
300 let decoder = Decoder::new(&decompressed_data)?;
301 let (id, decoder): (u64, _) = decoder.decode_field("request-id")?;
302 let (block_hashes, _): (Vec<BlockHash>, _) = decoder.decode_field("blockHashes")?;
303
304 Ok(Self::new(id, block_hashes))
305 }
306}
307
308#[derive(Debug, Clone)]
310pub struct BlockBodies {
311 pub id: u64,
314 pub block_bodies: Vec<BlockBody>,
315}
316
317impl BlockBodies {
318 pub fn new(id: u64, block_bodies: Vec<BlockBody>) -> Self {
319 Self { block_bodies, id }
320 }
321}
322
323impl RLPxMessage for BlockBodies {
324 const CODE: u8 = 0x06;
325 fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> {
326 let mut encoded_data = vec![];
327 Encoder::new(&mut encoded_data)
328 .encode_field(&self.id)
329 .encode_field(&self.block_bodies)
330 .finish();
331
332 let msg_data = snappy_compress(encoded_data)?;
333 buf.put_slice(&msg_data);
334 Ok(())
335 }
336
337 fn decode(msg_data: &[u8]) -> Result<Self, RLPDecodeError> {
338 let decompressed_data = snappy_decompress(msg_data)?;
339 let decoder = Decoder::new(&decompressed_data)?;
340 let (id, decoder): (u64, _) = decoder.decode_field("request-id")?;
341 let (block_bodies, _): (Vec<BlockBody>, _) = decoder.decode_field("blockBodies")?;
342
343 Ok(Self::new(id, block_bodies))
344 }
345}