Skip to main content

ethrex_p2p/rlpx/eth/
blocks.rs

1use crate::rlpx::{
2    message::RLPxMessage,
3    utils::{snappy_compress, snappy_decompress},
4};
5use bytes::BufMut;
6use ethrex_common::types::{BlockBody, BlockHash, BlockHeader, BlockNumber};
7use ethrex_rlp::{
8    decode::RLPDecode,
9    encode::RLPEncode,
10    error::{RLPDecodeError, RLPEncodeError},
11    structs::{Decoder, Encoder},
12};
13use ethrex_storage::{Store, error::StoreError};
14use tracing::{error, trace};
15
16pub const HASH_FIRST_BYTE_DECODER: u8 = 160;
17
18#[derive(Debug, PartialEq, Eq, Clone, Copy)]
19pub enum HashOrNumber {
20    Hash(BlockHash),
21    Number(BlockNumber),
22}
23
24impl core::fmt::Display for HashOrNumber {
25    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
26        match self {
27            HashOrNumber::Hash(hash) => write!(f, "{hash:#x}"),
28            HashOrNumber::Number(number) => write!(f, "{number}"),
29        }
30    }
31}
32
33impl RLPEncode for HashOrNumber {
34    fn encode(&self, buf: &mut dyn BufMut) {
35        match self {
36            HashOrNumber::Hash(hash) => hash.encode(buf),
37            HashOrNumber::Number(number) => number.encode(buf),
38        }
39    }
40
41    fn length(&self) -> usize {
42        match self {
43            HashOrNumber::Hash(hash) => hash.length(),
44            HashOrNumber::Number(number) => number.length(),
45        }
46    }
47}
48
49impl From<BlockHash> for HashOrNumber {
50    fn from(value: BlockHash) -> Self {
51        Self::Hash(value)
52    }
53}
54
55impl RLPDecode for HashOrNumber {
56    fn decode_unfinished(buf: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
57        let first_byte = buf.first().ok_or(RLPDecodeError::InvalidLength)?;
58        // https://ethereum.org/en/developers/docs/data-structures-and-encoding/rlp/
59        // hashes are 32 bytes long, so they enter in the 0-55 bytes range for rlp. This means the first byte
60        // is the value 0x80 + len, where len = 32 (0x20). so we get the result of 0xa0 which is 160 in decimal
61        if *first_byte == HASH_FIRST_BYTE_DECODER {
62            let (hash, rest) = BlockHash::decode_unfinished(buf)?;
63            return Ok((Self::Hash(hash), rest));
64        }
65
66        let (number, rest) = u64::decode_unfinished(buf)?;
67        Ok((Self::Number(number), rest))
68    }
69}
70
71// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getblockheaders-0x03
72#[derive(Debug, Clone)]
73pub struct GetBlockHeaders {
74    // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response
75    // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages
76    pub id: u64,
77    pub startblock: HashOrNumber,
78    pub limit: u64,
79    pub skip: u64,
80    pub reverse: bool,
81}
82
83// Limit taken from here: https://github.com/ethereum/go-ethereum/blob/20bf543a64d7c2a590b18a1e1d907cae65707013/eth/protocols/eth/handler.go#L40
84pub const BLOCK_HEADER_LIMIT: u64 = 1024;
85
86impl GetBlockHeaders {
87    pub fn new(id: u64, startblock: HashOrNumber, limit: u64, skip: u64, reverse: bool) -> Self {
88        Self {
89            id,
90            startblock,
91            limit,
92            skip,
93            reverse,
94        }
95    }
96
97    pub async fn fetch_headers(&self, storage: &Store) -> Vec<BlockHeader> {
98        // According to the spec, we don't need to service non-canonical headers,
99        // but geth does, and it helps in reorg scenarios, so we handle that case.
100        let start_block = match self.startblock {
101            // Try to fetch the block number from the hash
102            // Otherwise keep the hash
103            HashOrNumber::Hash(block_hash) => {
104                if let Ok(Some(block_number)) = storage.get_block_number(block_hash).await {
105                    HashOrNumber::Number(block_number)
106                } else {
107                    self.startblock
108                }
109            }
110            // Don't check if the block number is available
111            // because if it it's not, the loop below will
112            // break early and return an empty vec.
113            HashOrNumber::Number(_) => self.startblock,
114        };
115
116        let mut headers = vec![];
117
118        let mut current_block = start_block;
119
120        let block_skip = if self.reverse {
121            -((self.skip + 1) as i64)
122        } else {
123            (self.skip + 1) as i64
124        };
125
126        let limit = if self.limit > BLOCK_HEADER_LIMIT {
127            BLOCK_HEADER_LIMIT
128        } else {
129            self.limit
130        };
131
132        for _ in 0..limit {
133            let block_header_opt = match get_block_header(storage, current_block) {
134                Ok(block_header) => block_header,
135                Err(err) => {
136                    error!(%err, block_ref=%current_block, "Error accessing DB while building header response for peer");
137                    break;
138                }
139            };
140            let Some(block_header) = block_header_opt else {
141                trace!(block_ref=%current_block, "Block header not found");
142                break;
143            };
144            headers.push(block_header);
145
146            // Update to next block to fetch
147            match current_block {
148                // We don't support fetching multiple headers by hash, unless it's
149                // part of the canonical chain, so we break here.
150                // TODO: we could support fetching by hash in descending order,
151                // by fetching the parent of each block.
152                HashOrNumber::Hash(_) => break,
153                HashOrNumber::Number(number) => {
154                    let Ok(new_number) = (number as i64 + block_skip).try_into() else {
155                        break;
156                    };
157                    current_block = HashOrNumber::Number(new_number)
158                }
159            }
160        }
161        headers
162    }
163}
164
165fn get_block_header(
166    storage: &Store,
167    block_ref: HashOrNumber,
168) -> Result<Option<BlockHeader>, StoreError> {
169    match block_ref {
170        HashOrNumber::Hash(block_hash) => storage.get_block_header_by_hash(block_hash),
171        HashOrNumber::Number(block_number) => storage.get_block_header(block_number),
172    }
173}
174
175impl RLPxMessage for GetBlockHeaders {
176    const CODE: u8 = 0x03;
177    fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> {
178        let mut encoded_data = vec![];
179        let limit = self.limit;
180        let skip = self.skip;
181        let reverse = self.reverse as u8;
182        Encoder::new(&mut encoded_data)
183            .encode_field(&self.id)
184            .encode_field(&(self.startblock, limit, skip, reverse))
185            .finish();
186        let msg_data = snappy_compress(encoded_data)?;
187        buf.put_slice(&msg_data);
188        Ok(())
189    }
190
191    fn decode(msg_data: &[u8]) -> Result<Self, RLPDecodeError> {
192        let decompressed_data = snappy_decompress(msg_data)?;
193        let decoder = Decoder::new(&decompressed_data)?;
194        let (id, decoder): (u64, _) = decoder.decode_field("request-id")?;
195        let ((start_block, limit, skip, reverse), _): ((HashOrNumber, u64, u64, bool), _) =
196            decoder.decode_field("get headers request params")?;
197        Ok(Self::new(id, start_block, limit, skip, reverse))
198    }
199}
200
201// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#blockheaders-0x04
202#[derive(Debug, Clone)]
203pub struct BlockHeaders {
204    // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response
205    // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages
206    pub id: u64,
207    pub block_headers: Vec<BlockHeader>,
208}
209
210impl BlockHeaders {
211    pub fn new(id: u64, block_headers: Vec<BlockHeader>) -> Self {
212        Self { block_headers, id }
213    }
214}
215
216impl RLPxMessage for BlockHeaders {
217    const CODE: u8 = 0x04;
218    fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> {
219        let mut encoded_data = vec![];
220        // Each message is encoded with its own
221        // message identifier (code).
222        // Go ethereum reference: https://github.com/ethereum/go-ethereum/blob/20bf543a64d7c2a590b18a1e1d907cae65707013/p2p/transport.go#L94
223        Encoder::new(&mut encoded_data)
224            .encode_field(&self.id)
225            .encode_field(&self.block_headers)
226            .finish();
227        let msg_data = snappy_compress(encoded_data)?;
228        buf.put_slice(&msg_data);
229        Ok(())
230    }
231
232    fn decode(msg_data: &[u8]) -> Result<Self, RLPDecodeError> {
233        let decompressed_data = snappy_decompress(msg_data)?;
234        let decoder = Decoder::new(&decompressed_data)?;
235        let (id, decoder): (u64, _) = decoder.decode_field("request-id")?;
236        let (block_headers, _): (Vec<BlockHeader>, _) = decoder.decode_field("headers")?;
237
238        Ok(Self::new(id, block_headers))
239    }
240}
241
242// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getblockbodies-0x05
243#[derive(Debug, Clone)]
244pub struct GetBlockBodies {
245    // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response
246    // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages
247    pub id: u64,
248    pub block_hashes: Vec<BlockHash>,
249}
250
251// Limit taken from here:
252// https://github.com/ethereum/go-ethereum/blob/a1093d98eb3260f2abf340903c2d968b2b891c11/eth/protocols/eth/handler.go#L45
253pub const BLOCK_BODY_LIMIT: usize = 1024;
254
255impl GetBlockBodies {
256    pub fn new(id: u64, block_hashes: Vec<BlockHash>) -> Self {
257        Self { block_hashes, id }
258    }
259    pub async fn fetch_blocks(&self, storage: &Store) -> Vec<BlockBody> {
260        let mut block_bodies = vec![];
261        for block_hash in &self.block_hashes {
262            match storage.get_block_body_by_hash(*block_hash).await {
263                Ok(Some(block)) => {
264                    block_bodies.push(block);
265                    if block_bodies.len() >= BLOCK_BODY_LIMIT {
266                        break;
267                    }
268                }
269                Ok(None) => {
270                    continue;
271                }
272                Err(err) => {
273                    error!(
274                        "Error accessing DB while building block bodies response for peer: {err}"
275                    );
276                    return vec![];
277                }
278            }
279        }
280        block_bodies
281    }
282}
283
284impl RLPxMessage for GetBlockBodies {
285    const CODE: u8 = 0x05;
286    fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> {
287        let mut encoded_data = vec![];
288        Encoder::new(&mut encoded_data)
289            .encode_field(&self.id)
290            .encode_field(&self.block_hashes)
291            .finish();
292
293        let msg_data = snappy_compress(encoded_data)?;
294        buf.put_slice(&msg_data);
295        Ok(())
296    }
297
298    fn decode(msg_data: &[u8]) -> Result<Self, RLPDecodeError> {
299        let decompressed_data = snappy_decompress(msg_data)?;
300        let decoder = Decoder::new(&decompressed_data)?;
301        let (id, decoder): (u64, _) = decoder.decode_field("request-id")?;
302        let (block_hashes, _): (Vec<BlockHash>, _) = decoder.decode_field("blockHashes")?;
303
304        Ok(Self::new(id, block_hashes))
305    }
306}
307
308// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#blockbodies-0x06
309#[derive(Debug, Clone)]
310pub struct BlockBodies {
311    // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response
312    // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages
313    pub id: u64,
314    pub block_bodies: Vec<BlockBody>,
315}
316
317impl BlockBodies {
318    pub fn new(id: u64, block_bodies: Vec<BlockBody>) -> Self {
319        Self { block_bodies, id }
320    }
321}
322
323impl RLPxMessage for BlockBodies {
324    const CODE: u8 = 0x06;
325    fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> {
326        let mut encoded_data = vec![];
327        Encoder::new(&mut encoded_data)
328            .encode_field(&self.id)
329            .encode_field(&self.block_bodies)
330            .finish();
331
332        let msg_data = snappy_compress(encoded_data)?;
333        buf.put_slice(&msg_data);
334        Ok(())
335    }
336
337    fn decode(msg_data: &[u8]) -> Result<Self, RLPDecodeError> {
338        let decompressed_data = snappy_decompress(msg_data)?;
339        let decoder = Decoder::new(&decompressed_data)?;
340        let (id, decoder): (u64, _) = decoder.decode_field("request-id")?;
341        let (block_bodies, _): (Vec<BlockBody>, _) = decoder.decode_field("blockBodies")?;
342
343        Ok(Self::new(id, block_bodies))
344    }
345}