Skip to main content

ethrex_p2p/snap/
client.rs

1//! Snap sync client - functions for requesting snap protocol data from peers
2//!
3//! This module contains all the client-side snap protocol request functions.
4
5use crate::rlpx::message::Message as RLPxMessage;
6use crate::{
7    metrics::{CurrentStepValue, METRICS},
8    peer_handler::PeerHandler,
9    peer_table::{PeerTableServerProtocol as _, RequestPermit},
10    rlpx::{
11        connection::server::PeerConnection,
12        error::PeerConnectionError,
13        p2p::SUPPORTED_SNAP_CAPABILITIES,
14        snap::{
15            AccountRange, AccountRangeUnit, ByteCodes, GetAccountRange, GetByteCodes,
16            GetStorageRanges, GetTrieNodes, StorageRanges, TrieNodes,
17        },
18    },
19    snap::{async_fs, constants::*, encodable_to_proof, error::SnapError},
20    sync::{AccountStorageRoots, SnapBlockSyncState, block_is_stale, update_pivot},
21    utils::{
22        AccountsWithStorage, dump_accounts_to_file, dump_storages_to_file,
23        get_account_state_snapshot_file, get_account_storages_snapshot_file,
24    },
25};
26use bytes::Bytes;
27use ethrex_common::{
28    BigEndianHash, H256, U256,
29    types::{AccountState, BlockHeader},
30};
31use ethrex_crypto::NativeCrypto;
32use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode};
33use ethrex_storage::Store;
34use ethrex_trie::Nibbles;
35use ethrex_trie::{Node, verify_range};
36use std::{
37    collections::{BTreeMap, HashMap, VecDeque},
38    path::Path,
39    sync::atomic::Ordering,
40    time::{Duration, SystemTime},
41};
42use tracing::{debug, error, trace, warn};
43
44// Re-export DumpError from error module
45pub use super::error::DumpError;
46
47/// Metadata for requesting trie nodes
48#[derive(Debug, Clone)]
49pub struct RequestMetadata {
50    pub hash: H256,
51    pub path: Nibbles,
52    /// What node is the parent of this node
53    pub parent_path: Nibbles,
54}
55
56/// Error type for storage trie node requests (includes request ID for tracking)
57#[derive(Debug, thiserror::Error)]
58#[error("Storage trie node request {request_id} failed: {source}")]
59pub struct RequestStorageTrieNodesError {
60    pub request_id: u64,
61    #[source]
62    pub source: SnapError,
63}
64
65struct StorageTaskResult {
66    start_index: usize,
67    account_storages: Vec<Vec<(H256, U256)>>,
68    peer_id: H256,
69    remaining_start: usize,
70    remaining_end: usize,
71    remaining_hash_range: (H256, Option<H256>),
72    // The start_hash of the original task. Distinct from remaining_hash_range.0,
73    // which is the worker's advancing pointer (zero on full completion). Needed
74    // by the response handler to match the completed interval unambiguously.
75    task_start_hash: H256,
76}
77
78#[derive(Debug)]
79struct StorageTask {
80    start_index: usize,
81    end_index: usize,
82    start_hash: H256,
83    // end_hash is None if the task is for the first big storage request
84    end_hash: Option<H256>,
85}
86
87/// Removes the completed interval `(start_hash, end_hash)` from whichever
88/// account in the group at `accounts_by_root_hash[start_index]` currently
89/// holds the interval list, and when the list empties marks every account in
90/// the group as done and healed.
91///
92/// Returns `true` if an interval was found and removed, `false` if no account
93/// in the group has any live intervals (a sibling task already finalized the
94/// account earlier in this call).
95///
96/// Within a group sharing the same storage root, the split path stores
97/// intervals under one canonical account, so only that account's entry holds
98/// the live list. We scan the group rather than relying on `accounts.first()`
99/// because the canonical account can shift across calls if the set of tracked
100/// accounts changes between iterations of `accounts_with_storage_root`.
101fn clear_completed_interval(
102    account_storage_roots: &mut AccountStorageRoots,
103    accounts_by_root_hash: &[(H256, Vec<H256>)],
104    accounts_done: &mut HashMap<H256, Vec<(H256, H256)>>,
105    start_index: usize,
106    interval: (H256, H256),
107) -> Result<bool, SnapError> {
108    let accounts = &accounts_by_root_hash[start_index].1;
109    let acc_hash = accounts.iter().copied().find(|account| {
110        account_storage_roots
111            .accounts_with_storage_root
112            .get(account)
113            .is_some_and(|(_, ivs)| !ivs.is_empty())
114    });
115    let Some(acc_hash) = acc_hash else {
116        return Ok(false);
117    };
118    let (_, old_intervals) = account_storage_roots
119        .accounts_with_storage_root
120        .get_mut(&acc_hash)
121        .ok_or_else(|| {
122            SnapError::InternalError(
123                "Tried to get the old download intervals for an account but did not find them"
124                    .to_owned(),
125            )
126        })?;
127    let pos = old_intervals
128        .iter()
129        .position(|iv| *iv == interval)
130        .ok_or_else(|| {
131            SnapError::InternalError(
132                "Could not find an old interval that we were tracking".to_owned(),
133            )
134        })?;
135    old_intervals.remove(pos);
136    if old_intervals.is_empty() {
137        for account in accounts {
138            accounts_done.insert(*account, vec![]);
139            account_storage_roots.healed_accounts.insert(*account);
140        }
141    }
142    Ok(true)
143}
144
145/// Requests an account range from any suitable peer given the state trie's root and the starting hash and the limit hash.
146/// Will also return a boolean indicating if there is more state to be fetched towards the right of the trie
147/// (Note that the boolean will be true even if the remaining state is ouside the boundary set by the limit hash)
148///
149/// # Returns
150///
151/// The account range or `None` if:
152///
153/// - There are no available peers (the node just started up or was rejected by all other nodes)
154/// - No peer returned a valid response in the given time and retry limits
155pub async fn request_account_range(
156    peers: &mut PeerHandler,
157    start: H256,
158    limit: H256,
159    account_state_snapshots_dir: &Path,
160    pivot_header: &mut BlockHeader,
161    block_sync_state: &mut SnapBlockSyncState,
162    diagnostics: &std::sync::Arc<tokio::sync::RwLock<crate::sync::SyncDiagnostics>>,
163) -> Result<(), SnapError> {
164    METRICS
165        .current_step
166        .set(CurrentStepValue::RequestingAccountRanges);
167    // 1) split the range in chunks of same length
168    let start_u256 = U256::from_big_endian(&start.0);
169    let limit_u256 = U256::from_big_endian(&limit.0);
170
171    let range = limit_u256 - start_u256;
172    // Bounded by ACCOUNT_RANGE_CHUNK_COUNT (800), always fits in usize.
173    let chunk_count =
174        usize::try_from(U256::from(ACCOUNT_RANGE_CHUNK_COUNT).min(range.max(U256::one())))
175            .expect("chunk_count bounded by ACCOUNT_RANGE_CHUNK_COUNT");
176    let chunk_size = range / chunk_count;
177
178    // list of tasks to be executed
179    let mut tasks_queue_not_started = VecDeque::<(H256, H256)>::new();
180    for i in 0..(chunk_count as u64) {
181        let chunk_start_u256 = chunk_size * i + start_u256;
182        // We subtract one because ranges are inclusive
183        let chunk_end_u256 = chunk_start_u256 + chunk_size - 1u64;
184        let chunk_start = H256::from_uint(&(chunk_start_u256));
185        let chunk_end = H256::from_uint(&(chunk_end_u256));
186        tasks_queue_not_started.push_back((chunk_start, chunk_end));
187    }
188    // Modify the last chunk to include the limit
189    let last_task = tasks_queue_not_started
190        .back_mut()
191        .ok_or(SnapError::NoTasks)?;
192    last_task.1 = limit;
193
194    // 2) request the chunks from peers
195
196    let mut downloaded_count = 0_u64;
197    let mut all_account_hashes = Vec::new();
198    let mut all_accounts_state = Vec::new();
199
200    // channel to send the tasks to the peers
201    let (task_sender, mut task_receiver) =
202        tokio::sync::mpsc::channel::<(Vec<AccountRangeUnit>, H256, Option<(H256, H256)>)>(1000);
203
204    debug!("Starting to download account ranges from peers");
205
206    *METRICS.account_tries_download_start_time.lock().await = Some(SystemTime::now());
207
208    let mut completed_tasks = 0;
209    let mut chunk_file = 0;
210    let mut last_update: SystemTime = SystemTime::now();
211    let mut write_set = tokio::task::JoinSet::new();
212
213    let mut logged_no_free_peers_count = 0;
214
215    loop {
216        if all_accounts_state.len() * size_of::<AccountState>() >= RANGE_FILE_CHUNK_SIZE {
217            let current_account_hashes = std::mem::take(&mut all_account_hashes);
218            let current_account_states = std::mem::take(&mut all_accounts_state);
219
220            let account_state_chunk = current_account_hashes
221                .into_iter()
222                .zip(current_account_states)
223                .collect::<Vec<(H256, AccountState)>>();
224
225            async_fs::ensure_dir_exists(account_state_snapshots_dir).await?;
226
227            let account_state_snapshots_dir_cloned = account_state_snapshots_dir.to_path_buf();
228            write_set.spawn(async move {
229                let path = get_account_state_snapshot_file(
230                    &account_state_snapshots_dir_cloned,
231                    chunk_file,
232                );
233                // TODO: check the error type and handle it properly
234                dump_accounts_to_file(&path, account_state_chunk)
235            });
236
237            chunk_file += 1;
238        }
239
240        if last_update
241            .elapsed()
242            .expect("Time shouldn't be in the past")
243            >= Duration::from_secs(1)
244        {
245            METRICS
246                .downloaded_account_tries
247                .store(downloaded_count, Ordering::Relaxed);
248            last_update = SystemTime::now();
249        }
250
251        if let Ok((accounts, peer_id, chunk_start_end)) = task_receiver.try_recv() {
252            if let Some((chunk_start, chunk_end)) = chunk_start_end {
253                if chunk_start <= chunk_end {
254                    tasks_queue_not_started.push_back((chunk_start, chunk_end));
255                } else {
256                    completed_tasks += 1;
257                }
258            }
259            if chunk_start_end.is_none() {
260                completed_tasks += 1;
261            }
262            if accounts.is_empty() {
263                peers.peer_table.record_failure(peer_id)?;
264                continue;
265            }
266            peers.peer_table.record_success(peer_id)?;
267
268            downloaded_count += accounts.len() as u64;
269
270            debug!(
271                "Downloaded {} accounts from peer {} (current count: {downloaded_count})",
272                accounts.len(),
273                peer_id
274            );
275            all_account_hashes.extend(accounts.iter().map(|unit| unit.hash));
276            all_accounts_state.extend(accounts.iter().map(|unit| unit.account));
277        }
278
279        let Some((peer_id, connection, permit)) = peers
280            .peer_table
281            .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec())
282            .await
283            .inspect_err(|err| warn!(%err, "Error requesting a peer for account range"))
284            .unwrap_or(None)
285        else {
286            // Log ~ once every 10 seconds
287            if logged_no_free_peers_count == 0 {
288                trace!("We are missing peers in request_account_range");
289                logged_no_free_peers_count = 1000;
290            }
291            logged_no_free_peers_count -= 1;
292            // Sleep a bit to avoid busy polling
293            tokio::time::sleep(Duration::from_millis(10)).await;
294            continue;
295        };
296
297        let Some((chunk_start, chunk_end)) = tasks_queue_not_started.pop_front() else {
298            if completed_tasks >= chunk_count {
299                debug!("All account ranges downloaded successfully");
300                break;
301            }
302            continue;
303        };
304
305        let tx = task_sender.clone();
306
307        if block_is_stale(pivot_header) {
308            debug!("Pivot became stale during account range download, updating pivot");
309            *pivot_header = update_pivot(
310                pivot_header.number,
311                pivot_header.timestamp,
312                peers,
313                block_sync_state,
314                diagnostics,
315            )
316            .await
317            .expect("Should be able to update pivot")
318        }
319
320        tokio::spawn(request_account_range_worker(
321            peer_id,
322            connection,
323            chunk_start,
324            chunk_end,
325            pivot_header.state_root,
326            tx,
327            permit,
328        ));
329    }
330
331    write_set
332        .join_all()
333        .await
334        .into_iter()
335        .collect::<Result<Vec<()>, DumpError>>()
336        .map_err(SnapError::from)?;
337
338    // TODO: This is repeated code, consider refactoring
339    {
340        let current_account_hashes = std::mem::take(&mut all_account_hashes);
341        let current_account_states = std::mem::take(&mut all_accounts_state);
342
343        let account_state_chunk = current_account_hashes
344            .into_iter()
345            .zip(current_account_states)
346            .collect::<Vec<(H256, AccountState)>>();
347
348        async_fs::ensure_dir_exists(account_state_snapshots_dir).await?;
349
350        let path = get_account_state_snapshot_file(account_state_snapshots_dir, chunk_file);
351        dump_accounts_to_file(&path, account_state_chunk)
352            .inspect_err(|err| error!("Failed to dump remaining accounts to disk: {}", err.error))
353            .map_err(|_| {
354                SnapError::SnapshotDir(format!(
355                    "Failed to write state snapshot chunk {}",
356                    chunk_file
357                ))
358            })?;
359    }
360
361    METRICS
362        .downloaded_account_tries
363        .store(downloaded_count, Ordering::Relaxed);
364    *METRICS.account_tries_download_end_time.lock().await = Some(SystemTime::now());
365
366    Ok(())
367}
368
369/// Requests bytecodes for the given code hashes
370/// Returns the bytecodes or None if:
371/// - There are no available peers (the node just started up or was rejected by all other nodes)
372/// - No peer returned a valid response in the given time and retry limits
373pub async fn request_bytecodes(
374    peers: &mut PeerHandler,
375    all_bytecode_hashes: &[H256],
376) -> Result<Option<Vec<Bytes>>, SnapError> {
377    METRICS
378        .current_step
379        .set(CurrentStepValue::RequestingBytecodes);
380    if all_bytecode_hashes.is_empty() {
381        return Ok(Some(Vec::new()));
382    }
383    const MAX_BYTECODES_REQUEST_SIZE: usize = 100;
384    // 1) split the range in chunks of same length
385    let chunk_count = 800;
386    let chunk_count = chunk_count.min(all_bytecode_hashes.len());
387    let chunk_size = all_bytecode_hashes.len() / chunk_count;
388
389    // list of tasks to be executed
390    // Types are (start_index, end_index, starting_hash)
391    // NOTE: end_index is NOT inclusive
392    let mut tasks_queue_not_started = VecDeque::<(usize, usize)>::new();
393    for i in 0..chunk_count {
394        let chunk_start = chunk_size * i;
395        let chunk_end = chunk_start + chunk_size;
396        tasks_queue_not_started.push_back((chunk_start, chunk_end));
397    }
398    // Modify the last chunk to include the limit
399    let last_task = tasks_queue_not_started
400        .back_mut()
401        .ok_or(SnapError::NoTasks)?;
402    last_task.1 = all_bytecode_hashes.len();
403
404    // 2) request the chunks from peers
405    let mut downloaded_count = 0_u64;
406    let mut all_bytecodes = vec![Bytes::new(); all_bytecode_hashes.len()];
407
408    // channel to send the tasks to the peers
409    struct TaskResult {
410        start_index: usize,
411        bytecodes: Vec<Bytes>,
412        peer_id: H256,
413        remaining_start: usize,
414        remaining_end: usize,
415    }
416    let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::<TaskResult>(1000);
417
418    debug!("Starting to download bytecodes from peers");
419
420    METRICS
421        .bytecodes_to_download
422        .fetch_add(all_bytecode_hashes.len() as u64, Ordering::Relaxed);
423
424    let mut completed_tasks = 0;
425
426    let mut logged_no_free_peers_count = 0;
427
428    loop {
429        if let Ok(result) = task_receiver.try_recv() {
430            let TaskResult {
431                start_index,
432                bytecodes,
433                peer_id,
434                remaining_start,
435                remaining_end,
436            } = result;
437
438            debug!(
439                "Downloaded {} bytecodes from peer {peer_id} (current count: {downloaded_count})",
440                bytecodes.len(),
441            );
442
443            if remaining_start < remaining_end {
444                tasks_queue_not_started.push_back((remaining_start, remaining_end));
445            } else {
446                completed_tasks += 1;
447            }
448            if bytecodes.is_empty() {
449                peers.peer_table.record_failure(peer_id)?;
450                continue;
451            }
452
453            downloaded_count += bytecodes.len() as u64;
454
455            peers.peer_table.record_success(peer_id)?;
456            for (i, bytecode) in bytecodes.into_iter().enumerate() {
457                all_bytecodes[start_index + i] = bytecode;
458            }
459        }
460
461        let Some((peer_id, mut connection, permit)) = peers
462            .peer_table
463            .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec())
464            .await
465            .inspect_err(|err| warn!(%err, "Error requesting a peer for bytecodes"))
466            .unwrap_or(None)
467        else {
468            // Log ~ once every 10 seconds
469            if logged_no_free_peers_count == 0 {
470                trace!("We are missing peers in request_bytecodes");
471                logged_no_free_peers_count = 1000;
472            }
473            logged_no_free_peers_count -= 1;
474            // Sleep a bit to avoid busy polling
475            tokio::time::sleep(Duration::from_millis(10)).await;
476            continue;
477        };
478
479        let Some((chunk_start, chunk_end)) = tasks_queue_not_started.pop_front() else {
480            if completed_tasks >= chunk_count {
481                debug!("All bytecodes downloaded successfully");
482                break;
483            }
484            continue;
485        };
486
487        let tx = task_sender.clone();
488
489        let hashes_to_request: Vec<_> = all_bytecode_hashes
490            .iter()
491            .skip(chunk_start)
492            .take((chunk_end - chunk_start).min(MAX_BYTECODES_REQUEST_SIZE))
493            .copied()
494            .collect();
495
496        tokio::spawn(async move {
497            debug!(
498                "Requesting bytecode from peer {peer_id}, chunk: {chunk_start:?} - {chunk_end:?}"
499            );
500            let request_id = rand::random();
501            let request = RLPxMessage::GetByteCodes(GetByteCodes {
502                id: request_id,
503                hashes: hashes_to_request.clone(),
504                bytes: MAX_RESPONSE_BYTES,
505            });
506
507            let response = connection
508                .outgoing_request(request, PEER_REPLY_TIMEOUT)
509                .await;
510            drop(permit);
511
512            let (bytecodes, remaining_start) = match response {
513                Ok(RLPxMessage::ByteCodes(ByteCodes { id: _, codes })) if !codes.is_empty() => {
514                    let validated_codes: Vec<Bytes> = codes
515                        .into_iter()
516                        .zip(hashes_to_request)
517                        .take_while(|(b, hash)| ethrex_common::utils::keccak(b) == *hash)
518                        .map(|(b, _hash)| b)
519                        .collect();
520                    let new_remaining_start = chunk_start + validated_codes.len();
521                    (validated_codes, new_remaining_start)
522                }
523                Ok(RLPxMessage::ByteCodes(_)) => {
524                    // Empty response; retry the full chunk.
525                    (Vec::new(), chunk_start)
526                }
527                _ => {
528                    tracing::debug!("Failed to get bytecode");
529                    (Vec::new(), chunk_start)
530                }
531            };
532
533            let result = TaskResult {
534                start_index: chunk_start,
535                bytecodes,
536                peer_id,
537                remaining_start,
538                remaining_end: chunk_end,
539            };
540            tx.send(result).await.ok();
541        });
542    }
543
544    METRICS
545        .downloaded_bytecodes
546        .fetch_add(downloaded_count, Ordering::Relaxed);
547    debug!(
548        "Finished downloading bytecodes, total bytecodes: {}",
549        all_bytecode_hashes.len()
550    );
551
552    Ok(Some(all_bytecodes))
553}
554
555/// Requests storage ranges for accounts given their hashed address and storage roots, and the root of their state trie
556/// account_hashes & storage_roots must have the same length
557/// storage_roots must not contain empty trie hashes, we will treat empty ranges as invalid responses
558/// Returns true if the last account's storage was not completely fetched by the request
559/// Returns the list of hashed storage keys and values for each account's storage or None if:
560/// - There are no available peers (the node just started up or was rejected by all other nodes)
561/// - No peer returned a valid response in the given time and retry limits
562pub async fn request_storage_ranges(
563    peers: &mut PeerHandler,
564    account_storage_roots: &mut AccountStorageRoots,
565    account_storages_snapshots_dir: &Path,
566    mut chunk_index: u64,
567    pivot_header: &mut BlockHeader,
568    store: Store,
569) -> Result<u64, SnapError> {
570    METRICS
571        .current_step
572        .set(CurrentStepValue::RequestingStorageRanges);
573    debug!("Starting request_storage_ranges function");
574    // 1) split the range in chunks of same length
575    let mut accounts_by_root_hash: BTreeMap<_, Vec<_>> = BTreeMap::new();
576    for (account, (maybe_root_hash, _)) in &account_storage_roots.accounts_with_storage_root {
577        match maybe_root_hash {
578            Some(root) => {
579                accounts_by_root_hash
580                    .entry(*root)
581                    .or_default()
582                    .push(*account);
583            }
584            None => {
585                let root = store
586                    .get_account_state_by_acc_hash(pivot_header.hash(), *account)?
587                    .ok_or_else(|| {
588                        SnapError::InternalError(
589                            "Could not find account that should have been downloaded or healed"
590                                .to_string(),
591                        )
592                    })?
593                    .storage_root;
594                accounts_by_root_hash
595                    .entry(root)
596                    .or_default()
597                    .push(*account);
598            }
599        }
600    }
601    // Invariant: within a group sharing the same storage root, the split path
602    // stores intervals under one canonical account, so at most one account's
603    // entry in `accounts_with_storage_root` holds a non-empty interval list.
604    // Scheduling and completion code below scan the group to find that account
605    // rather than relying on `accounts.first()`, because iteration order of
606    // `accounts_with_storage_root` can shift between calls when the tracked
607    // set changes.
608    let mut accounts_by_root_hash = Vec::from_iter(accounts_by_root_hash);
609    // TODO: Turn this into a stable sort for binary search.
610    accounts_by_root_hash.sort_unstable_by_key(|(_, accounts)| !accounts.len());
611    let chunk_size = STORAGE_BATCH_SIZE;
612
613    // Partition into bulk-path tasks (fresh accounts with empty intervals) and
614    // per-interval tasks (big accounts marked in a prior call). The previous
615    // implementation queued every account from `start_hash: zero` and relied
616    // on the response handler's bulk-task big-account split path to re-queue
617    // per-interval tasks each call. That fails when peers cover a big account
618    // fully without hitting their response limit on it: the split path doesn't
619    // fire, no per-interval tasks get queued, intervals never drain, the
620    // account is stuck pending forever even after its data is on disk.
621    let mut tasks_queue_not_started = VecDeque::<StorageTask>::new();
622    let mut bulk_chunk_start: Option<usize> = None;
623    for (i, (_, accounts)) in accounts_by_root_hash.iter().enumerate() {
624        let intervals = accounts.iter().find_map(|acc| {
625            account_storage_roots
626                .accounts_with_storage_root
627                .get(acc)
628                .and_then(|(_, ivs)| (!ivs.is_empty()).then_some(ivs))
629        });
630        if let Some(intervals) = intervals {
631            if let Some(start) = bulk_chunk_start.take() {
632                tasks_queue_not_started.push_back(StorageTask {
633                    start_index: start,
634                    end_index: i,
635                    start_hash: H256::zero(),
636                    end_hash: None,
637                });
638            }
639            for &(start_hash, end_hash) in intervals.iter() {
640                tasks_queue_not_started.push_back(StorageTask {
641                    start_index: i,
642                    end_index: i + 1,
643                    start_hash,
644                    end_hash: Some(end_hash),
645                });
646            }
647        } else {
648            let chunk_start = *bulk_chunk_start.get_or_insert(i);
649            if i + 1 - chunk_start >= chunk_size {
650                tasks_queue_not_started.push_back(StorageTask {
651                    start_index: chunk_start,
652                    end_index: i + 1,
653                    start_hash: H256::zero(),
654                    end_hash: None,
655                });
656                bulk_chunk_start = None;
657            }
658        }
659    }
660    if let Some(start) = bulk_chunk_start {
661        tasks_queue_not_started.push_back(StorageTask {
662            start_index: start,
663            end_index: accounts_by_root_hash.len(),
664            start_hash: H256::zero(),
665            end_hash: None,
666        });
667    }
668
669    // channel to send the tasks to the peers
670    let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::<StorageTaskResult>(1000);
671
672    // channel to send the result of dumping storages
673    let mut disk_joinset: tokio::task::JoinSet<Result<(), DumpError>> = tokio::task::JoinSet::new();
674
675    let mut task_count = tasks_queue_not_started.len();
676    let mut completed_tasks = 0;
677
678    // TODO: in a refactor, delete this replace with a structure that can handle removes
679    let mut accounts_done: HashMap<H256, Vec<(H256, H256)>> = HashMap::new();
680    // Maps storage root to vector of hashed addresses matching that root and
681    // vector of hashed storage keys and storage values.
682    let mut current_account_storages: BTreeMap<H256, AccountsWithStorage> = BTreeMap::new();
683
684    let mut logged_no_free_peers_count = 0;
685
686    debug!("Starting request_storage_ranges loop");
687    loop {
688        if current_account_storages
689            .values()
690            .map(|accounts| 32 * accounts.accounts.len() + 64 * accounts.storages.len())
691            .sum::<usize>()
692            > RANGE_FILE_CHUNK_SIZE
693        {
694            let current_account_storages = std::mem::take(&mut current_account_storages);
695            let snapshot = current_account_storages.into_values().collect::<Vec<_>>();
696
697            async_fs::ensure_dir_exists(account_storages_snapshots_dir).await?;
698
699            let account_storages_snapshots_dir_cloned =
700                account_storages_snapshots_dir.to_path_buf();
701            if !disk_joinset.is_empty() {
702                debug!("Writing to disk");
703                disk_joinset
704                    .join_next()
705                    .await
706                    .expect("Shouldn't be empty")
707                    .expect("Shouldn't have a join error")
708                    .inspect_err(|err| error!("Failed to dump storage snapshot to file: {err:?}"))
709                    .map_err(SnapError::from)?;
710            }
711            disk_joinset.spawn(async move {
712                let path = get_account_storages_snapshot_file(
713                    &account_storages_snapshots_dir_cloned,
714                    chunk_index,
715                );
716                dump_storages_to_file(&path, snapshot)
717            });
718
719            chunk_index += 1;
720        }
721
722        if let Ok(result) = task_receiver.try_recv() {
723            let StorageTaskResult {
724                start_index,
725                mut account_storages,
726                peer_id,
727                remaining_start,
728                remaining_end,
729                remaining_hash_range: (hash_start, hash_end),
730                task_start_hash,
731            } = result;
732            completed_tasks += 1;
733
734            for (_, accounts) in accounts_by_root_hash[start_index..remaining_start].iter() {
735                for account in accounts {
736                    if !accounts_done.contains_key(account) {
737                        let (_, old_intervals) = account_storage_roots
738                                .accounts_with_storage_root
739                                .get_mut(account)
740                                .ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
741
742                        if old_intervals.is_empty() {
743                            accounts_done.insert(*account, vec![]);
744                        }
745                    }
746                }
747            }
748
749            if remaining_start < remaining_end {
750                debug!("Failed to download entire chunk from peer {peer_id}");
751                if hash_start.is_zero() {
752                    // Task is common storage range request
753                    let task = StorageTask {
754                        start_index: remaining_start,
755                        end_index: remaining_end,
756                        start_hash: H256::zero(),
757                        end_hash: None,
758                    };
759                    tasks_queue_not_started.push_back(task);
760                    task_count += 1;
761                } else if let Some(hash_end) = hash_end {
762                    // Task was a big storage account result
763                    if hash_start <= hash_end {
764                        let task = StorageTask {
765                            start_index: remaining_start,
766                            end_index: remaining_end,
767                            start_hash: hash_start,
768                            end_hash: Some(hash_end),
769                        };
770                        tasks_queue_not_started.push_back(task);
771                        task_count += 1;
772
773                        let acc_hash = *accounts_by_root_hash[remaining_start]
774                            .1
775                            .first()
776                            .ok_or(SnapError::InternalError("Empty accounts vector".to_owned()))?;
777                        let (_, old_intervals) = account_storage_roots
778                                .accounts_with_storage_root
779                                .get_mut(&acc_hash).ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
780                        for (old_start, end) in old_intervals {
781                            if *old_start == task_start_hash && *end == hash_end {
782                                *old_start = hash_start;
783                                break;
784                            }
785                        }
786                        account_storage_roots
787                            .healed_accounts
788                            .extend(accounts_by_root_hash[start_index].1.iter().copied());
789                    } else {
790                        // Peer overran the original interval limit; the original
791                        // task's interval is fully covered. Remaining work in
792                        // this chunk still exists, so no sibling task has
793                        // drained the account yet — a missing acc_hash here
794                        // indicates corruption.
795                        let found = clear_completed_interval(
796                            account_storage_roots,
797                            &accounts_by_root_hash,
798                            &mut accounts_done,
799                            remaining_start,
800                            (task_start_hash, hash_end),
801                        )?;
802                        if !found {
803                            panic!("Should have found the account hash");
804                        }
805                    }
806                } else {
807                    if remaining_start + 1 < remaining_end {
808                        let task = StorageTask {
809                            start_index: remaining_start + 1,
810                            end_index: remaining_end,
811                            start_hash: H256::zero(),
812                            end_hash: None,
813                        };
814                        tasks_queue_not_started.push_back(task);
815                        task_count += 1;
816                    }
817                    // Task found a big storage account, so we split the chunk into multiple chunks
818                    let start_hash_u256 = U256::from_big_endian(&hash_start.0);
819                    let missing_storage_range = U256::MAX - start_hash_u256;
820
821                    // Big accounts need to be marked for storage healing unconditionally
822                    for account in accounts_by_root_hash[remaining_start].1.iter() {
823                        account_storage_roots.healed_accounts.insert(*account);
824                    }
825
826                    let slot_count = account_storages
827                        .last()
828                        .map(|v| v.len())
829                        .ok_or(SnapError::NoAccountStorages)?
830                        .max(1);
831                    let storage_density = start_hash_u256 / slot_count;
832
833                    let slots_per_chunk = U256::from(10000);
834                    let chunk_size = storage_density
835                        .checked_mul(slots_per_chunk)
836                        .unwrap_or(U256::MAX);
837
838                    let chunk_count = usize::try_from(missing_storage_range / chunk_size)
839                        .unwrap_or(ACCOUNT_RANGE_CHUNK_COUNT)
840                        .max(1);
841
842                    let first_acc_hash = *accounts_by_root_hash[remaining_start]
843                        .1
844                        .first()
845                        .ok_or(SnapError::InternalError("Empty accounts vector".to_owned()))?;
846
847                    let maybe_old_intervals = account_storage_roots
848                        .accounts_with_storage_root
849                        .get(&first_acc_hash);
850
851                    if let Some((_, old_intervals)) = maybe_old_intervals {
852                        if !old_intervals.is_empty() {
853                            for (start_hash, end_hash) in old_intervals {
854                                let task = StorageTask {
855                                    start_index: remaining_start,
856                                    end_index: remaining_start + 1,
857                                    start_hash: *start_hash,
858                                    end_hash: Some(*end_hash),
859                                };
860
861                                tasks_queue_not_started.push_back(task);
862                                task_count += 1;
863                            }
864                        } else {
865                            // TODO: DRY
866                            account_storage_roots
867                                .accounts_with_storage_root
868                                .insert(first_acc_hash, (None, vec![]));
869                            let (_, intervals) = account_storage_roots
870                                    .accounts_with_storage_root
871                                    .get_mut(&first_acc_hash)
872                                    .ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
873
874                            for i in 0..chunk_count {
875                                let start_hash_u256 = start_hash_u256 + chunk_size * i;
876                                let start_hash = H256::from_uint(&start_hash_u256);
877                                let end_hash = if i == chunk_count - 1 {
878                                    HASH_MAX
879                                } else {
880                                    let end_hash_u256 = start_hash_u256
881                                        .checked_add(chunk_size)
882                                        .unwrap_or(U256::MAX);
883                                    H256::from_uint(&end_hash_u256)
884                                };
885
886                                let task = StorageTask {
887                                    start_index: remaining_start,
888                                    end_index: remaining_start + 1,
889                                    start_hash,
890                                    end_hash: Some(end_hash),
891                                };
892
893                                intervals.push((start_hash, end_hash));
894
895                                tasks_queue_not_started.push_back(task);
896                                task_count += 1;
897                            }
898                            debug!("Split big storage account into {chunk_count} chunks.");
899                        }
900                    } else {
901                        account_storage_roots
902                            .accounts_with_storage_root
903                            .insert(first_acc_hash, (None, vec![]));
904                        let (_, intervals) = account_storage_roots
905                                .accounts_with_storage_root
906                                .get_mut(&first_acc_hash)
907                                .ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
908
909                        for i in 0..chunk_count {
910                            let start_hash_u256 = start_hash_u256 + chunk_size * i;
911                            let start_hash = H256::from_uint(&start_hash_u256);
912                            let end_hash = if i == chunk_count - 1 {
913                                HASH_MAX
914                            } else {
915                                let end_hash_u256 =
916                                    start_hash_u256.checked_add(chunk_size).unwrap_or(U256::MAX);
917                                H256::from_uint(&end_hash_u256)
918                            };
919
920                            let task = StorageTask {
921                                start_index: remaining_start,
922                                end_index: remaining_start + 1,
923                                start_hash,
924                                end_hash: Some(end_hash),
925                            };
926
927                            intervals.push((start_hash, end_hash));
928
929                            tasks_queue_not_started.push_back(task);
930                            task_count += 1;
931                        }
932                        debug!("Split big storage account into {chunk_count} chunks.");
933                    }
934                }
935            } else if let Some(hash_end) = hash_end {
936                // Per-interval task completed: the peer covered
937                // [task_start_hash, hash_end] fully and verify_range reported
938                // should_continue=false, so the worker returns
939                // remaining_start == remaining_end and the guard above does
940                // not fire. Drop the matching interval here so the account
941                // can finalize across calls; otherwise the partition logic
942                // at function entry would re-queue the same range forever.
943                //
944                // The helper returns false when no live interval is found —
945                // that happens when a sibling per-interval task for the same
946                // account already drained the last interval and finalized it
947                // earlier in this call's loop. Unlike the partial-completion
948                // path above (which panics on a missing acc_hash because no
949                // sibling can have drained while work still remains in the
950                // chunk), here we silently skip.
951                clear_completed_interval(
952                    account_storage_roots,
953                    &accounts_by_root_hash,
954                    &mut accounts_done,
955                    start_index,
956                    (task_start_hash, hash_end),
957                )?;
958            }
959
960            if account_storages.is_empty() {
961                peers.peer_table.record_failure(peer_id)?;
962                continue;
963            }
964            if let Some(hash_end) = hash_end {
965                // This is a big storage account, and the range might be empty
966                if account_storages[0].len() == 1 && account_storages[0][0].0 > hash_end {
967                    continue;
968                }
969            }
970
971            peers.peer_table.record_success(peer_id)?;
972
973            let n_storages = account_storages.len();
974            let n_slots = account_storages
975                .iter()
976                .map(|storage| storage.len())
977                .sum::<usize>();
978
979            // These take into account we downloaded the same thing for different accounts
980            let effective_slots: usize = account_storages
981                .iter()
982                .enumerate()
983                .map(|(i, storages)| {
984                    accounts_by_root_hash[start_index + i].1.len() * storages.len()
985                })
986                .sum();
987
988            METRICS
989                .storage_leaves_downloaded
990                .inc_by(effective_slots as u64);
991
992            debug!("Downloaded {n_storages} storages ({n_slots} slots) from peer {peer_id}");
993            debug!(
994                "Total tasks: {task_count}, completed tasks: {completed_tasks}, queued tasks: {}",
995                tasks_queue_not_started.len()
996            );
997            // THEN: update insert to read with the correct structure and reuse
998            // tries, only changing the prefix for insertion.
999            if account_storages.len() == 1 {
1000                let (root_hash, accounts) = &accounts_by_root_hash[start_index];
1001                // We downloaded a big storage account
1002                current_account_storages
1003                    .entry(*root_hash)
1004                    .or_insert_with(|| AccountsWithStorage {
1005                        accounts: accounts.clone(),
1006                        storages: Vec::new(),
1007                    })
1008                    .storages
1009                    .extend(account_storages.remove(0));
1010            } else {
1011                for (i, storages) in account_storages.into_iter().enumerate() {
1012                    let (root_hash, accounts) = &accounts_by_root_hash[start_index + i];
1013                    current_account_storages.insert(
1014                        *root_hash,
1015                        AccountsWithStorage {
1016                            accounts: accounts.clone(),
1017                            storages,
1018                        },
1019                    );
1020                }
1021            }
1022        }
1023
1024        if block_is_stale(pivot_header) {
1025            debug!("Pivot became stale during storage range download, stopping this round");
1026            break;
1027        }
1028
1029        let Some((peer_id, connection, permit)) = peers
1030            .peer_table
1031            .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec())
1032            .await
1033            .inspect_err(|err| warn!(%err, "Error requesting a peer for storage ranges"))
1034            .unwrap_or(None)
1035        else {
1036            // Log ~ once every 10 seconds
1037            if logged_no_free_peers_count == 0 {
1038                trace!("We are missing peers in request_storage_ranges");
1039                logged_no_free_peers_count = 1000;
1040            }
1041            logged_no_free_peers_count -= 1;
1042            // Sleep a bit to avoid busy polling
1043            tokio::time::sleep(Duration::from_millis(10)).await;
1044            continue;
1045        };
1046
1047        let Some(task) = tasks_queue_not_started.pop_front() else {
1048            if completed_tasks >= task_count {
1049                break;
1050            }
1051            continue;
1052        };
1053
1054        let tx = task_sender.clone();
1055
1056        // FIXME: this unzip is probably pointless and takes up unnecessary memory.
1057        let (chunk_account_hashes, chunk_storage_roots): (Vec<_>, Vec<_>) = accounts_by_root_hash
1058            [task.start_index..task.end_index]
1059            .iter()
1060            .map(|(root, storages)| (*storages.first().unwrap_or(&H256::zero()), *root))
1061            .unzip();
1062
1063        if task_count - completed_tasks < 30 {
1064            debug!(
1065                "Assigning task: {task:?}, account_hash: {}, storage_root: {}",
1066                chunk_account_hashes.first().unwrap_or(&H256::zero()),
1067                chunk_storage_roots.first().unwrap_or(&H256::zero()),
1068            );
1069        }
1070        tokio::spawn(request_storage_ranges_worker(
1071            task,
1072            peer_id,
1073            connection,
1074            pivot_header.state_root,
1075            chunk_account_hashes,
1076            chunk_storage_roots,
1077            tx,
1078            permit,
1079        ));
1080    }
1081
1082    {
1083        let snapshot = current_account_storages.into_values().collect::<Vec<_>>();
1084
1085        async_fs::ensure_dir_exists(account_storages_snapshots_dir).await?;
1086
1087        let path = get_account_storages_snapshot_file(account_storages_snapshots_dir, chunk_index);
1088        dump_storages_to_file(&path, snapshot).map_err(|_| {
1089            SnapError::SnapshotDir(format!(
1090                "Failed to write storage snapshot chunk {}",
1091                chunk_index
1092            ))
1093        })?;
1094    }
1095    disk_joinset
1096        .join_all()
1097        .await
1098        .into_iter()
1099        .map(|result| {
1100            result.inspect_err(|err| error!("Failed to dump storage snapshot to file: {err:?}"))
1101        })
1102        .collect::<Result<Vec<()>, DumpError>>()
1103        .map_err(SnapError::from)?;
1104
1105    for (account_done, intervals) in accounts_done {
1106        if intervals.is_empty() {
1107            account_storage_roots
1108                .accounts_with_storage_root
1109                .remove(&account_done);
1110        }
1111    }
1112
1113    // Dropping the task sender so that the recv returns None
1114    drop(task_sender);
1115
1116    Ok(chunk_index + 1)
1117}
1118
1119/// Requests state trie nodes at the given paths from an already-selected peer.
1120/// Releases the peer slot as soon as the wire response is in; hash
1121/// verification below is pure computation.
1122/// Returns `SnapError::InvalidHash` if any returned node's hash does not match
1123/// the requested path, and `SnapError::InvalidData` on an empty or oversized
1124/// response.
1125pub async fn request_state_trienodes(
1126    mut connection: PeerConnection,
1127    permit: RequestPermit,
1128    state_root: H256,
1129    paths: Vec<RequestMetadata>,
1130) -> Result<Vec<Node>, SnapError> {
1131    let expected_nodes = paths.len();
1132
1133    let request_id = rand::random();
1134    let request = RLPxMessage::GetTrieNodes(GetTrieNodes {
1135        id: request_id,
1136        root_hash: state_root,
1137        // [acc_path, acc_path,...] -> [[acc_path], [acc_path]]
1138        paths: paths
1139            .iter()
1140            .map(|vec| vec![Bytes::from(vec.path.encode_compact())])
1141            .collect(),
1142        bytes: MAX_RESPONSE_BYTES,
1143    });
1144    let response = connection
1145        .outgoing_request(request, PEER_REPLY_TIMEOUT)
1146        .await;
1147    drop(permit);
1148    let nodes = match response {
1149        Ok(RLPxMessage::TrieNodes(trie_nodes)) => trie_nodes
1150            .nodes
1151            .iter()
1152            .map(|node| Node::decode(node))
1153            .collect::<Result<Vec<_>, _>>()
1154            .map_err(SnapError::from),
1155        Ok(other_msg) => Err(SnapError::Protocol(
1156            PeerConnectionError::UnexpectedResponse("TrieNodes".to_string(), other_msg.to_string()),
1157        )),
1158        Err(other_err) => Err(SnapError::Protocol(other_err)),
1159    }?;
1160
1161    if nodes.is_empty() || nodes.len() > expected_nodes {
1162        return Err(SnapError::InvalidData);
1163    }
1164
1165    for (index, node) in nodes.iter().enumerate() {
1166        if node.compute_hash(&NativeCrypto).finalize(&NativeCrypto) != paths[index].hash {
1167            debug!(
1168                "A peer is sending wrong data for the state trie node {:?}",
1169                paths[index].path
1170            );
1171            return Err(SnapError::InvalidHash);
1172        }
1173    }
1174
1175    Ok(nodes)
1176}
1177
1178/// Requests storage trie nodes from an already-selected peer. The `GetTrieNodes`
1179/// payload carries the state root and the per-account paths (hashed address
1180/// prefix followed by storage-trie paths, which may be full or partial).
1181/// Consumes a `RequestPermit` reserved by the caller at peer selection time;
1182/// the permit drops when this function returns, releasing the slot.
1183/// Errors are returned as `RequestStorageTrieNodesError` carrying the
1184/// request ID so the caller can reconcile it with its in-flight map.
1185pub async fn request_storage_trienodes(
1186    mut connection: PeerConnection,
1187    _permit: RequestPermit,
1188    get_trie_nodes: GetTrieNodes,
1189) -> Result<TrieNodes, RequestStorageTrieNodesError> {
1190    let request_id = get_trie_nodes.id;
1191    let request = RLPxMessage::GetTrieNodes(get_trie_nodes);
1192    match connection
1193        .outgoing_request(request, PEER_REPLY_TIMEOUT)
1194        .await
1195    {
1196        Ok(RLPxMessage::TrieNodes(trie_nodes)) => Ok(trie_nodes),
1197        Ok(other_msg) => Err(RequestStorageTrieNodesError {
1198            request_id,
1199            source: SnapError::Protocol(PeerConnectionError::UnexpectedResponse(
1200                "TrieNodes".to_string(),
1201                other_msg.to_string(),
1202            )),
1203        }),
1204        Err(e) => Err(RequestStorageTrieNodesError {
1205            request_id,
1206            source: SnapError::Protocol(e),
1207        }),
1208    }
1209}
1210
1211#[allow(clippy::type_complexity)]
1212async fn request_account_range_worker(
1213    peer_id: H256,
1214    mut connection: PeerConnection,
1215    chunk_start: H256,
1216    chunk_end: H256,
1217    state_root: H256,
1218    tx: tokio::sync::mpsc::Sender<(Vec<AccountRangeUnit>, H256, Option<(H256, H256)>)>,
1219    permit: RequestPermit,
1220) -> Result<(), SnapError> {
1221    debug!("Requesting account range from peer {peer_id}, chunk: {chunk_start:?} - {chunk_end:?}");
1222    let request_id = rand::random();
1223    let request = RLPxMessage::GetAccountRange(GetAccountRange {
1224        id: request_id,
1225        root_hash: state_root,
1226        starting_hash: chunk_start,
1227        limit_hash: chunk_end,
1228        response_bytes: MAX_RESPONSE_BYTES,
1229    });
1230
1231    // Perform the wire request and release the peer slot as soon as the
1232    // response (or error) is in — processing below is pure computation.
1233    let response = connection
1234        .outgoing_request(request, PEER_REPLY_TIMEOUT)
1235        .await;
1236    drop(permit);
1237
1238    let retry = || {
1239        (
1240            Vec::<AccountRangeUnit>::new(),
1241            Some((chunk_start, chunk_end)),
1242        )
1243    };
1244    let (accounts_out, chunk_left) = if let Ok(RLPxMessage::AccountRange(AccountRange {
1245        id: _,
1246        accounts,
1247        proof,
1248    })) = response
1249    {
1250        if accounts.is_empty() {
1251            retry()
1252        } else {
1253            // Validate response — build the verification inputs by borrowing
1254            // `accounts` so we can still consume it for the filtered output.
1255            let proof = encodable_to_proof(&proof);
1256            let account_hashes: Vec<H256> = accounts.iter().map(|u| u.hash).collect();
1257            let encoded_accounts: Vec<_> =
1258                accounts.iter().map(|u| u.account.encode_to_vec()).collect();
1259
1260            match verify_range(
1261                state_root,
1262                &chunk_start,
1263                &account_hashes,
1264                &encoded_accounts,
1265                &proof,
1266            ) {
1267                Ok(should_continue) => {
1268                    let chunk_left = if should_continue {
1269                        match account_hashes.last() {
1270                            Some(last_hash) => {
1271                                let new_start_u256 = U256::from_big_endian(&last_hash.0) + 1;
1272                                let new_start = H256::from_uint(&new_start_u256);
1273                                Some((new_start, chunk_end))
1274                            }
1275                            None => {
1276                                // Unreachable: accounts is non-empty here.
1277                                error!("Account hashes last failed, this shouldn't happen");
1278                                Some((chunk_start, chunk_end))
1279                            }
1280                        }
1281                    } else {
1282                        None
1283                    };
1284                    let filtered = accounts
1285                        .into_iter()
1286                        .filter(|unit| unit.hash <= chunk_end)
1287                        .collect::<Vec<_>>();
1288                    (filtered, chunk_left)
1289                }
1290                Err(_) => {
1291                    tracing::debug!("Received invalid account range");
1292                    retry()
1293                }
1294            }
1295        }
1296    } else {
1297        tracing::debug!("Failed to get account range");
1298        retry()
1299    };
1300
1301    tx.send((accounts_out, peer_id, chunk_left)).await.ok();
1302    Ok::<(), SnapError>(())
1303}
1304
1305#[allow(clippy::too_many_arguments)]
1306async fn request_storage_ranges_worker(
1307    task: StorageTask,
1308    peer_id: H256,
1309    mut connection: PeerConnection,
1310    state_root: H256,
1311    chunk_account_hashes: Vec<H256>,
1312    chunk_storage_roots: Vec<H256>,
1313    tx: tokio::sync::mpsc::Sender<StorageTaskResult>,
1314    permit: RequestPermit,
1315) -> Result<(), SnapError> {
1316    let start = task.start_index;
1317    let end = task.end_index;
1318    let start_hash = task.start_hash;
1319
1320    // Defaults for the "retry this same range" outcome used by every failure
1321    // branch below.
1322    let retry_outcome = || {
1323        (
1324            Vec::<Vec<(H256, U256)>>::new(),
1325            task.start_index,
1326            task.end_index,
1327            (start_hash, task.end_hash),
1328        )
1329    };
1330
1331    let request_id = rand::random();
1332    let request = RLPxMessage::GetStorageRanges(GetStorageRanges {
1333        id: request_id,
1334        root_hash: state_root,
1335        account_hashes: chunk_account_hashes,
1336        starting_hash: start_hash,
1337        limit_hash: task.end_hash.unwrap_or(HASH_MAX),
1338        response_bytes: MAX_RESPONSE_BYTES,
1339    });
1340    tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", "Sending storage range request");
1341
1342    // Perform the wire request and release the peer slot as soon as the
1343    // response (or error) is in — validation below is pure computation.
1344    let response = connection
1345        .outgoing_request(request, PEER_REPLY_TIMEOUT)
1346        .await;
1347    drop(permit);
1348
1349    let (account_storages, remaining_start, remaining_end, remaining_hash_range) = 'outcome: {
1350        let Ok(RLPxMessage::StorageRanges(StorageRanges {
1351            id: _,
1352            slots,
1353            proof,
1354        })) = response
1355        else {
1356            #[cfg(feature = "metrics")]
1357            ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("timeout");
1358            tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", outcome = "timeout", "Storage range request failed");
1359            tracing::debug!("Failed to get storage range");
1360            break 'outcome retry_outcome();
1361        };
1362        if slots.is_empty() && proof.is_empty() {
1363            #[cfg(feature = "metrics")]
1364            ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("empty");
1365            tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "empty", "Storage range response empty");
1366            tracing::debug!("Received empty storage range");
1367            break 'outcome retry_outcome();
1368        }
1369        if slots.len() > chunk_storage_roots.len() || slots.is_empty() {
1370            break 'outcome retry_outcome();
1371        }
1372        let proof = encodable_to_proof(&proof);
1373        let mut account_storages: Vec<Vec<(H256, U256)>> = vec![];
1374        let mut should_continue = false;
1375        let mut validation_failed = false;
1376        let mut storage_roots = chunk_storage_roots.into_iter();
1377        let last_slot_index = slots.len() - 1;
1378        for (i, next_account_slots) in slots.into_iter().enumerate() {
1379            if next_account_slots.is_empty() {
1380                debug!("Received empty storage range, skipping");
1381                validation_failed = true;
1382                break;
1383            }
1384            let encoded_values = next_account_slots
1385                .iter()
1386                .map(|slot| slot.data.encode_to_vec())
1387                .collect::<Vec<_>>();
1388            let hashed_keys: Vec<_> = next_account_slots.iter().map(|slot| slot.hash).collect();
1389
1390            let storage_root = match storage_roots.next() {
1391                Some(root) => root,
1392                None => {
1393                    debug!("No storage root for account {i}");
1394                    break 'outcome retry_outcome();
1395                }
1396            };
1397
1398            // The proof corresponds to the last slot, for the previous ones the slot must be the full range without edge proofs
1399            if i == last_slot_index && !proof.is_empty() {
1400                let Ok(sc) = verify_range(
1401                    storage_root,
1402                    &start_hash,
1403                    &hashed_keys,
1404                    &encoded_values,
1405                    &proof,
1406                ) else {
1407                    validation_failed = true;
1408                    break;
1409                };
1410                should_continue = sc;
1411            } else if verify_range(
1412                storage_root,
1413                &start_hash,
1414                &hashed_keys,
1415                &encoded_values,
1416                &[],
1417            )
1418            .is_err()
1419            {
1420                validation_failed = true;
1421                break;
1422            }
1423
1424            account_storages.push(
1425                next_account_slots
1426                    .iter()
1427                    .map(|slot| (slot.hash, slot.data))
1428                    .collect(),
1429            );
1430        }
1431
1432        if validation_failed {
1433            break 'outcome retry_outcome();
1434        }
1435
1436        let (remaining_start, remaining_end, remaining_start_hash) = if should_continue {
1437            let last_account_storage = match account_storages.last() {
1438                Some(storage) => storage,
1439                None => {
1440                    error!("No account storage found, this shouldn't happen");
1441                    break 'outcome retry_outcome();
1442                }
1443            };
1444            let (last_hash, _) = match last_account_storage.last() {
1445                Some(last_hash) => last_hash,
1446                None => {
1447                    error!("No last hash found, this shouldn't happen");
1448                    break 'outcome retry_outcome();
1449                }
1450            };
1451            let next_hash_u256 = U256::from_big_endian(&last_hash.0).saturating_add(1.into());
1452            let next_hash = H256::from_uint(&next_hash_u256);
1453            (start + account_storages.len() - 1, end, next_hash)
1454        } else {
1455            (start + account_storages.len(), end, H256::zero())
1456        };
1457        let slot_count: usize = account_storages.iter().map(|s| s.len()).sum();
1458        #[cfg(feature = "metrics")]
1459        ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("success");
1460        tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "success", slots = slot_count, "Storage range response received");
1461        (
1462            account_storages,
1463            remaining_start,
1464            remaining_end,
1465            (remaining_start_hash, task.end_hash),
1466        )
1467    };
1468
1469    let task_result = StorageTaskResult {
1470        start_index: start,
1471        account_storages,
1472        peer_id,
1473        remaining_start,
1474        remaining_end,
1475        remaining_hash_range,
1476        task_start_hash: start_hash,
1477    };
1478    tx.send(task_result).await.ok();
1479    Ok::<(), SnapError>(())
1480}