Skip to main content

ethrex_p2p/snap/
server.rs

1use bytes::Bytes;
2use ethrex_rlp::encode::RLPEncode;
3use ethrex_storage::Store;
4
5use crate::rlpx::snap::{
6    AccountRange, AccountRangeUnit, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges,
7    GetTrieNodes, StorageRanges, StorageSlot, TrieNodes,
8};
9use ethrex_common::types::AccountStateSlimCodec;
10
11use super::error::SnapError;
12use super::proof_to_encodable;
13
14// Request Processing
15
16pub async fn process_account_range_request(
17    request: GetAccountRange,
18    store: Store,
19) -> Result<AccountRange, SnapError> {
20    tokio::task::spawn_blocking(move || {
21        let mut accounts = vec![];
22        let mut bytes_used = 0;
23        for (hash, account) in store.iter_accounts_from(request.root_hash, request.starting_hash)? {
24            debug_assert!(hash >= request.starting_hash);
25            bytes_used += 32 + AccountStateSlimCodec(account).length() as u64;
26            accounts.push(AccountRangeUnit { hash, account });
27            if hash >= request.limit_hash || bytes_used >= request.response_bytes {
28                break;
29            }
30        }
31        let proof = proof_to_encodable(store.get_account_range_proof(
32            request.root_hash,
33            request.starting_hash,
34            accounts.last().map(|acc| acc.hash),
35        )?);
36        Ok(AccountRange {
37            id: request.id,
38            accounts,
39            proof,
40        })
41    })
42    .await
43    .map_err(|e| SnapError::TaskPanic(e.to_string()))?
44}
45
46pub async fn process_storage_ranges_request(
47    request: GetStorageRanges,
48    store: Store,
49) -> Result<StorageRanges, SnapError> {
50    tokio::task::spawn_blocking(move || {
51        let mut slots = vec![];
52        let mut proof = vec![];
53        let mut bytes_used = 0;
54
55        for hashed_address in request.account_hashes {
56            let mut account_slots = vec![];
57            let mut res_capped = false;
58
59            if let Some(storage_iter) =
60                store.iter_storage_from(request.root_hash, hashed_address, request.starting_hash)?
61            {
62                for (hash, data) in storage_iter {
63                    debug_assert!(hash >= request.starting_hash);
64                    bytes_used += 64_u64; // slot size
65                    account_slots.push(StorageSlot { hash, data });
66                    if hash >= request.limit_hash || bytes_used >= request.response_bytes {
67                        if bytes_used >= request.response_bytes {
68                            res_capped = true;
69                        }
70                        break;
71                    }
72                }
73            }
74
75            // Generate proofs only if the response doesn't contain the full storage range for the account
76            // Aka if the starting hash is not zero or if the response was capped due to byte limit
77            if !request.starting_hash.is_zero() || res_capped && !account_slots.is_empty() {
78                proof.extend(proof_to_encodable(
79                    store
80                        .get_storage_range_proof(
81                            request.root_hash,
82                            hashed_address,
83                            request.starting_hash,
84                            account_slots.last().map(|acc| acc.hash),
85                        )?
86                        .unwrap_or_default(),
87                ));
88            }
89
90            if !account_slots.is_empty() {
91                slots.push(account_slots);
92            }
93
94            if bytes_used >= request.response_bytes {
95                break;
96            }
97        }
98        Ok(StorageRanges {
99            id: request.id,
100            slots,
101            proof,
102        })
103    })
104    .await
105    .map_err(|e| SnapError::TaskPanic(e.to_string()))?
106}
107
108pub async fn process_byte_codes_request(
109    request: GetByteCodes,
110    store: Store,
111) -> Result<ByteCodes, SnapError> {
112    tokio::task::spawn_blocking(move || {
113        let mut codes = vec![];
114        let mut bytes_used = 0;
115        for code_hash in request.hashes {
116            if let Some(code) = store.get_account_code(code_hash)?.map(|c| c.code_bytes()) {
117                bytes_used += code.len() as u64;
118                codes.push(code);
119            }
120            if bytes_used >= request.bytes {
121                break;
122            }
123        }
124        Ok(ByteCodes {
125            id: request.id,
126            codes,
127        })
128    })
129    .await
130    .map_err(|e| SnapError::TaskPanic(e.to_string()))?
131}
132
133pub async fn process_trie_nodes_request(
134    request: GetTrieNodes,
135    store: Store,
136) -> Result<TrieNodes, SnapError> {
137    tokio::task::spawn_blocking(move || {
138        let mut nodes = vec![];
139        let mut remaining_bytes = request.bytes;
140        for paths in request.paths {
141            if paths.is_empty() {
142                return Err(SnapError::BadRequest(
143                    "zero-item pathset requested".to_string(),
144                ));
145            }
146            let trie_nodes = store.get_trie_nodes(
147                request.root_hash,
148                paths.into_iter().map(|bytes| bytes.to_vec()).collect(),
149                remaining_bytes,
150            )?;
151            nodes.extend(trie_nodes.iter().map(|nodes| Bytes::copy_from_slice(nodes)));
152            remaining_bytes = remaining_bytes
153                .saturating_sub(trie_nodes.iter().fold(0, |acc, nodes| acc + nodes.len()) as u64);
154            if remaining_bytes == 0 {
155                break;
156            }
157        }
158
159        Ok(TrieNodes {
160            id: request.id,
161            nodes,
162        })
163    })
164    .await
165    .map_err(|e| SnapError::TaskPanic(e.to_string()))?
166}