1use crate::rlpx::message::Message as RLPxMessage;
6use crate::{
7 metrics::{CurrentStepValue, METRICS},
8 peer_handler::PeerHandler,
9 peer_table::{PeerTableServerProtocol as _, RequestPermit},
10 rlpx::{
11 connection::server::PeerConnection,
12 error::PeerConnectionError,
13 p2p::SUPPORTED_SNAP_CAPABILITIES,
14 snap::{
15 AccountRange, AccountRangeUnit, ByteCodes, GetAccountRange, GetByteCodes,
16 GetStorageRanges, GetTrieNodes, StorageRanges, TrieNodes,
17 },
18 },
19 snap::{async_fs, constants::*, encodable_to_proof, error::SnapError},
20 sync::{AccountStorageRoots, SnapBlockSyncState, block_is_stale, update_pivot},
21 utils::{
22 AccountsWithStorage, dump_accounts_to_file, dump_storages_to_file,
23 get_account_state_snapshot_file, get_account_storages_snapshot_file,
24 },
25};
26use bytes::Bytes;
27use ethrex_common::{
28 BigEndianHash, H256, U256,
29 types::{AccountState, BlockHeader},
30};
31use ethrex_crypto::NativeCrypto;
32use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode};
33use ethrex_storage::Store;
34use ethrex_trie::Nibbles;
35use ethrex_trie::{Node, verify_range};
36use std::{
37 collections::{BTreeMap, HashMap, VecDeque},
38 path::Path,
39 sync::atomic::Ordering,
40 time::{Duration, SystemTime},
41};
42use tracing::{debug, error, trace, warn};
43
44pub use super::error::DumpError;
46
47#[derive(Debug, Clone)]
49pub struct RequestMetadata {
50 pub hash: H256,
51 pub path: Nibbles,
52 pub parent_path: Nibbles,
54}
55
56#[derive(Debug, thiserror::Error)]
58#[error("Storage trie node request {request_id} failed: {source}")]
59pub struct RequestStorageTrieNodesError {
60 pub request_id: u64,
61 #[source]
62 pub source: SnapError,
63}
64
65struct StorageTaskResult {
66 start_index: usize,
67 account_storages: Vec<Vec<(H256, U256)>>,
68 peer_id: H256,
69 remaining_start: usize,
70 remaining_end: usize,
71 remaining_hash_range: (H256, Option<H256>),
72 task_start_hash: H256,
76}
77
78#[derive(Debug)]
79struct StorageTask {
80 start_index: usize,
81 end_index: usize,
82 start_hash: H256,
83 end_hash: Option<H256>,
85}
86
87fn clear_completed_interval(
102 account_storage_roots: &mut AccountStorageRoots,
103 accounts_by_root_hash: &[(H256, Vec<H256>)],
104 accounts_done: &mut HashMap<H256, Vec<(H256, H256)>>,
105 start_index: usize,
106 interval: (H256, H256),
107) -> Result<bool, SnapError> {
108 let accounts = &accounts_by_root_hash[start_index].1;
109 let acc_hash = accounts.iter().copied().find(|account| {
110 account_storage_roots
111 .accounts_with_storage_root
112 .get(account)
113 .is_some_and(|(_, ivs)| !ivs.is_empty())
114 });
115 let Some(acc_hash) = acc_hash else {
116 return Ok(false);
117 };
118 let (_, old_intervals) = account_storage_roots
119 .accounts_with_storage_root
120 .get_mut(&acc_hash)
121 .ok_or_else(|| {
122 SnapError::InternalError(
123 "Tried to get the old download intervals for an account but did not find them"
124 .to_owned(),
125 )
126 })?;
127 let pos = old_intervals
128 .iter()
129 .position(|iv| *iv == interval)
130 .ok_or_else(|| {
131 SnapError::InternalError(
132 "Could not find an old interval that we were tracking".to_owned(),
133 )
134 })?;
135 old_intervals.remove(pos);
136 if old_intervals.is_empty() {
137 for account in accounts {
138 accounts_done.insert(*account, vec![]);
139 account_storage_roots.healed_accounts.insert(*account);
140 }
141 }
142 Ok(true)
143}
144
145pub async fn request_account_range(
156 peers: &mut PeerHandler,
157 start: H256,
158 limit: H256,
159 account_state_snapshots_dir: &Path,
160 pivot_header: &mut BlockHeader,
161 block_sync_state: &mut SnapBlockSyncState,
162 diagnostics: &std::sync::Arc<tokio::sync::RwLock<crate::sync::SyncDiagnostics>>,
163) -> Result<(), SnapError> {
164 METRICS
165 .current_step
166 .set(CurrentStepValue::RequestingAccountRanges);
167 let start_u256 = U256::from_big_endian(&start.0);
169 let limit_u256 = U256::from_big_endian(&limit.0);
170
171 let range = limit_u256 - start_u256;
172 let chunk_count =
174 usize::try_from(U256::from(ACCOUNT_RANGE_CHUNK_COUNT).min(range.max(U256::one())))
175 .expect("chunk_count bounded by ACCOUNT_RANGE_CHUNK_COUNT");
176 let chunk_size = range / chunk_count;
177
178 let mut tasks_queue_not_started = VecDeque::<(H256, H256)>::new();
180 for i in 0..(chunk_count as u64) {
181 let chunk_start_u256 = chunk_size * i + start_u256;
182 let chunk_end_u256 = chunk_start_u256 + chunk_size - 1u64;
184 let chunk_start = H256::from_uint(&(chunk_start_u256));
185 let chunk_end = H256::from_uint(&(chunk_end_u256));
186 tasks_queue_not_started.push_back((chunk_start, chunk_end));
187 }
188 let last_task = tasks_queue_not_started
190 .back_mut()
191 .ok_or(SnapError::NoTasks)?;
192 last_task.1 = limit;
193
194 let mut downloaded_count = 0_u64;
197 let mut all_account_hashes = Vec::new();
198 let mut all_accounts_state = Vec::new();
199
200 let (task_sender, mut task_receiver) =
202 tokio::sync::mpsc::channel::<(Vec<AccountRangeUnit>, H256, Option<(H256, H256)>)>(1000);
203
204 debug!("Starting to download account ranges from peers");
205
206 *METRICS.account_tries_download_start_time.lock().await = Some(SystemTime::now());
207
208 let mut completed_tasks = 0;
209 let mut chunk_file = 0;
210 let mut last_update: SystemTime = SystemTime::now();
211 let mut write_set = tokio::task::JoinSet::new();
212
213 let mut logged_no_free_peers_count = 0;
214
215 loop {
216 if all_accounts_state.len() * size_of::<AccountState>() >= RANGE_FILE_CHUNK_SIZE {
217 let current_account_hashes = std::mem::take(&mut all_account_hashes);
218 let current_account_states = std::mem::take(&mut all_accounts_state);
219
220 let account_state_chunk = current_account_hashes
221 .into_iter()
222 .zip(current_account_states)
223 .collect::<Vec<(H256, AccountState)>>();
224
225 async_fs::ensure_dir_exists(account_state_snapshots_dir).await?;
226
227 let account_state_snapshots_dir_cloned = account_state_snapshots_dir.to_path_buf();
228 write_set.spawn(async move {
229 let path = get_account_state_snapshot_file(
230 &account_state_snapshots_dir_cloned,
231 chunk_file,
232 );
233 dump_accounts_to_file(&path, account_state_chunk)
235 });
236
237 chunk_file += 1;
238 }
239
240 if last_update
241 .elapsed()
242 .expect("Time shouldn't be in the past")
243 >= Duration::from_secs(1)
244 {
245 METRICS
246 .downloaded_account_tries
247 .store(downloaded_count, Ordering::Relaxed);
248 last_update = SystemTime::now();
249 }
250
251 if let Ok((accounts, peer_id, chunk_start_end)) = task_receiver.try_recv() {
252 if let Some((chunk_start, chunk_end)) = chunk_start_end {
253 if chunk_start <= chunk_end {
254 tasks_queue_not_started.push_back((chunk_start, chunk_end));
255 } else {
256 completed_tasks += 1;
257 }
258 }
259 if chunk_start_end.is_none() {
260 completed_tasks += 1;
261 }
262 if accounts.is_empty() {
263 peers.peer_table.record_failure(peer_id)?;
264 continue;
265 }
266 peers.peer_table.record_success(peer_id)?;
267
268 downloaded_count += accounts.len() as u64;
269
270 debug!(
271 "Downloaded {} accounts from peer {} (current count: {downloaded_count})",
272 accounts.len(),
273 peer_id
274 );
275 all_account_hashes.extend(accounts.iter().map(|unit| unit.hash));
276 all_accounts_state.extend(accounts.iter().map(|unit| unit.account));
277 }
278
279 let Some((peer_id, connection, permit)) = peers
280 .peer_table
281 .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec())
282 .await
283 .inspect_err(|err| warn!(%err, "Error requesting a peer for account range"))
284 .unwrap_or(None)
285 else {
286 if logged_no_free_peers_count == 0 {
288 trace!("We are missing peers in request_account_range");
289 logged_no_free_peers_count = 1000;
290 }
291 logged_no_free_peers_count -= 1;
292 tokio::time::sleep(Duration::from_millis(10)).await;
294 continue;
295 };
296
297 let Some((chunk_start, chunk_end)) = tasks_queue_not_started.pop_front() else {
298 if completed_tasks >= chunk_count {
299 debug!("All account ranges downloaded successfully");
300 break;
301 }
302 continue;
303 };
304
305 let tx = task_sender.clone();
306
307 if block_is_stale(pivot_header) {
308 debug!("Pivot became stale during account range download, updating pivot");
309 *pivot_header = update_pivot(
310 pivot_header.number,
311 pivot_header.timestamp,
312 peers,
313 block_sync_state,
314 diagnostics,
315 )
316 .await
317 .expect("Should be able to update pivot")
318 }
319
320 tokio::spawn(request_account_range_worker(
321 peer_id,
322 connection,
323 chunk_start,
324 chunk_end,
325 pivot_header.state_root,
326 tx,
327 permit,
328 ));
329 }
330
331 write_set
332 .join_all()
333 .await
334 .into_iter()
335 .collect::<Result<Vec<()>, DumpError>>()
336 .map_err(SnapError::from)?;
337
338 {
340 let current_account_hashes = std::mem::take(&mut all_account_hashes);
341 let current_account_states = std::mem::take(&mut all_accounts_state);
342
343 let account_state_chunk = current_account_hashes
344 .into_iter()
345 .zip(current_account_states)
346 .collect::<Vec<(H256, AccountState)>>();
347
348 async_fs::ensure_dir_exists(account_state_snapshots_dir).await?;
349
350 let path = get_account_state_snapshot_file(account_state_snapshots_dir, chunk_file);
351 dump_accounts_to_file(&path, account_state_chunk)
352 .inspect_err(|err| error!("Failed to dump remaining accounts to disk: {}", err.error))
353 .map_err(|_| {
354 SnapError::SnapshotDir(format!(
355 "Failed to write state snapshot chunk {}",
356 chunk_file
357 ))
358 })?;
359 }
360
361 METRICS
362 .downloaded_account_tries
363 .store(downloaded_count, Ordering::Relaxed);
364 *METRICS.account_tries_download_end_time.lock().await = Some(SystemTime::now());
365
366 Ok(())
367}
368
369pub async fn request_bytecodes(
374 peers: &mut PeerHandler,
375 all_bytecode_hashes: &[H256],
376) -> Result<Option<Vec<Bytes>>, SnapError> {
377 METRICS
378 .current_step
379 .set(CurrentStepValue::RequestingBytecodes);
380 if all_bytecode_hashes.is_empty() {
381 return Ok(Some(Vec::new()));
382 }
383 const MAX_BYTECODES_REQUEST_SIZE: usize = 100;
384 let chunk_count = 800;
386 let chunk_count = chunk_count.min(all_bytecode_hashes.len());
387 let chunk_size = all_bytecode_hashes.len() / chunk_count;
388
389 let mut tasks_queue_not_started = VecDeque::<(usize, usize)>::new();
393 for i in 0..chunk_count {
394 let chunk_start = chunk_size * i;
395 let chunk_end = chunk_start + chunk_size;
396 tasks_queue_not_started.push_back((chunk_start, chunk_end));
397 }
398 let last_task = tasks_queue_not_started
400 .back_mut()
401 .ok_or(SnapError::NoTasks)?;
402 last_task.1 = all_bytecode_hashes.len();
403
404 let mut downloaded_count = 0_u64;
406 let mut all_bytecodes = vec![Bytes::new(); all_bytecode_hashes.len()];
407
408 struct TaskResult {
410 start_index: usize,
411 bytecodes: Vec<Bytes>,
412 peer_id: H256,
413 remaining_start: usize,
414 remaining_end: usize,
415 }
416 let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::<TaskResult>(1000);
417
418 debug!("Starting to download bytecodes from peers");
419
420 METRICS
421 .bytecodes_to_download
422 .fetch_add(all_bytecode_hashes.len() as u64, Ordering::Relaxed);
423
424 let mut completed_tasks = 0;
425
426 let mut logged_no_free_peers_count = 0;
427
428 loop {
429 if let Ok(result) = task_receiver.try_recv() {
430 let TaskResult {
431 start_index,
432 bytecodes,
433 peer_id,
434 remaining_start,
435 remaining_end,
436 } = result;
437
438 debug!(
439 "Downloaded {} bytecodes from peer {peer_id} (current count: {downloaded_count})",
440 bytecodes.len(),
441 );
442
443 if remaining_start < remaining_end {
444 tasks_queue_not_started.push_back((remaining_start, remaining_end));
445 } else {
446 completed_tasks += 1;
447 }
448 if bytecodes.is_empty() {
449 peers.peer_table.record_failure(peer_id)?;
450 continue;
451 }
452
453 downloaded_count += bytecodes.len() as u64;
454
455 peers.peer_table.record_success(peer_id)?;
456 for (i, bytecode) in bytecodes.into_iter().enumerate() {
457 all_bytecodes[start_index + i] = bytecode;
458 }
459 }
460
461 let Some((peer_id, mut connection, permit)) = peers
462 .peer_table
463 .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec())
464 .await
465 .inspect_err(|err| warn!(%err, "Error requesting a peer for bytecodes"))
466 .unwrap_or(None)
467 else {
468 if logged_no_free_peers_count == 0 {
470 trace!("We are missing peers in request_bytecodes");
471 logged_no_free_peers_count = 1000;
472 }
473 logged_no_free_peers_count -= 1;
474 tokio::time::sleep(Duration::from_millis(10)).await;
476 continue;
477 };
478
479 let Some((chunk_start, chunk_end)) = tasks_queue_not_started.pop_front() else {
480 if completed_tasks >= chunk_count {
481 debug!("All bytecodes downloaded successfully");
482 break;
483 }
484 continue;
485 };
486
487 let tx = task_sender.clone();
488
489 let hashes_to_request: Vec<_> = all_bytecode_hashes
490 .iter()
491 .skip(chunk_start)
492 .take((chunk_end - chunk_start).min(MAX_BYTECODES_REQUEST_SIZE))
493 .copied()
494 .collect();
495
496 tokio::spawn(async move {
497 debug!(
498 "Requesting bytecode from peer {peer_id}, chunk: {chunk_start:?} - {chunk_end:?}"
499 );
500 let request_id = rand::random();
501 let request = RLPxMessage::GetByteCodes(GetByteCodes {
502 id: request_id,
503 hashes: hashes_to_request.clone(),
504 bytes: MAX_RESPONSE_BYTES,
505 });
506
507 let response = connection
508 .outgoing_request(request, PEER_REPLY_TIMEOUT)
509 .await;
510 drop(permit);
511
512 let (bytecodes, remaining_start) = match response {
513 Ok(RLPxMessage::ByteCodes(ByteCodes { id: _, codes })) if !codes.is_empty() => {
514 let validated_codes: Vec<Bytes> = codes
515 .into_iter()
516 .zip(hashes_to_request)
517 .take_while(|(b, hash)| ethrex_common::utils::keccak(b) == *hash)
518 .map(|(b, _hash)| b)
519 .collect();
520 let new_remaining_start = chunk_start + validated_codes.len();
521 (validated_codes, new_remaining_start)
522 }
523 Ok(RLPxMessage::ByteCodes(_)) => {
524 (Vec::new(), chunk_start)
526 }
527 _ => {
528 tracing::debug!("Failed to get bytecode");
529 (Vec::new(), chunk_start)
530 }
531 };
532
533 let result = TaskResult {
534 start_index: chunk_start,
535 bytecodes,
536 peer_id,
537 remaining_start,
538 remaining_end: chunk_end,
539 };
540 tx.send(result).await.ok();
541 });
542 }
543
544 METRICS
545 .downloaded_bytecodes
546 .fetch_add(downloaded_count, Ordering::Relaxed);
547 debug!(
548 "Finished downloading bytecodes, total bytecodes: {}",
549 all_bytecode_hashes.len()
550 );
551
552 Ok(Some(all_bytecodes))
553}
554
555pub async fn request_storage_ranges(
563 peers: &mut PeerHandler,
564 account_storage_roots: &mut AccountStorageRoots,
565 account_storages_snapshots_dir: &Path,
566 mut chunk_index: u64,
567 pivot_header: &mut BlockHeader,
568 store: Store,
569) -> Result<u64, SnapError> {
570 METRICS
571 .current_step
572 .set(CurrentStepValue::RequestingStorageRanges);
573 debug!("Starting request_storage_ranges function");
574 let mut accounts_by_root_hash: BTreeMap<_, Vec<_>> = BTreeMap::new();
576 for (account, (maybe_root_hash, _)) in &account_storage_roots.accounts_with_storage_root {
577 match maybe_root_hash {
578 Some(root) => {
579 accounts_by_root_hash
580 .entry(*root)
581 .or_default()
582 .push(*account);
583 }
584 None => {
585 let root = store
586 .get_account_state_by_acc_hash(pivot_header.hash(), *account)?
587 .ok_or_else(|| {
588 SnapError::InternalError(
589 "Could not find account that should have been downloaded or healed"
590 .to_string(),
591 )
592 })?
593 .storage_root;
594 accounts_by_root_hash
595 .entry(root)
596 .or_default()
597 .push(*account);
598 }
599 }
600 }
601 let mut accounts_by_root_hash = Vec::from_iter(accounts_by_root_hash);
609 accounts_by_root_hash.sort_unstable_by_key(|(_, accounts)| !accounts.len());
611 let chunk_size = STORAGE_BATCH_SIZE;
612
613 let mut tasks_queue_not_started = VecDeque::<StorageTask>::new();
622 let mut bulk_chunk_start: Option<usize> = None;
623 for (i, (_, accounts)) in accounts_by_root_hash.iter().enumerate() {
624 let intervals = accounts.iter().find_map(|acc| {
625 account_storage_roots
626 .accounts_with_storage_root
627 .get(acc)
628 .and_then(|(_, ivs)| (!ivs.is_empty()).then_some(ivs))
629 });
630 if let Some(intervals) = intervals {
631 if let Some(start) = bulk_chunk_start.take() {
632 tasks_queue_not_started.push_back(StorageTask {
633 start_index: start,
634 end_index: i,
635 start_hash: H256::zero(),
636 end_hash: None,
637 });
638 }
639 for &(start_hash, end_hash) in intervals.iter() {
640 tasks_queue_not_started.push_back(StorageTask {
641 start_index: i,
642 end_index: i + 1,
643 start_hash,
644 end_hash: Some(end_hash),
645 });
646 }
647 } else {
648 let chunk_start = *bulk_chunk_start.get_or_insert(i);
649 if i + 1 - chunk_start >= chunk_size {
650 tasks_queue_not_started.push_back(StorageTask {
651 start_index: chunk_start,
652 end_index: i + 1,
653 start_hash: H256::zero(),
654 end_hash: None,
655 });
656 bulk_chunk_start = None;
657 }
658 }
659 }
660 if let Some(start) = bulk_chunk_start {
661 tasks_queue_not_started.push_back(StorageTask {
662 start_index: start,
663 end_index: accounts_by_root_hash.len(),
664 start_hash: H256::zero(),
665 end_hash: None,
666 });
667 }
668
669 let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::<StorageTaskResult>(1000);
671
672 let mut disk_joinset: tokio::task::JoinSet<Result<(), DumpError>> = tokio::task::JoinSet::new();
674
675 let mut task_count = tasks_queue_not_started.len();
676 let mut completed_tasks = 0;
677
678 let mut accounts_done: HashMap<H256, Vec<(H256, H256)>> = HashMap::new();
680 let mut current_account_storages: BTreeMap<H256, AccountsWithStorage> = BTreeMap::new();
683
684 let mut logged_no_free_peers_count = 0;
685
686 debug!("Starting request_storage_ranges loop");
687 loop {
688 if current_account_storages
689 .values()
690 .map(|accounts| 32 * accounts.accounts.len() + 64 * accounts.storages.len())
691 .sum::<usize>()
692 > RANGE_FILE_CHUNK_SIZE
693 {
694 let current_account_storages = std::mem::take(&mut current_account_storages);
695 let snapshot = current_account_storages.into_values().collect::<Vec<_>>();
696
697 async_fs::ensure_dir_exists(account_storages_snapshots_dir).await?;
698
699 let account_storages_snapshots_dir_cloned =
700 account_storages_snapshots_dir.to_path_buf();
701 if !disk_joinset.is_empty() {
702 debug!("Writing to disk");
703 disk_joinset
704 .join_next()
705 .await
706 .expect("Shouldn't be empty")
707 .expect("Shouldn't have a join error")
708 .inspect_err(|err| error!("Failed to dump storage snapshot to file: {err:?}"))
709 .map_err(SnapError::from)?;
710 }
711 disk_joinset.spawn(async move {
712 let path = get_account_storages_snapshot_file(
713 &account_storages_snapshots_dir_cloned,
714 chunk_index,
715 );
716 dump_storages_to_file(&path, snapshot)
717 });
718
719 chunk_index += 1;
720 }
721
722 if let Ok(result) = task_receiver.try_recv() {
723 let StorageTaskResult {
724 start_index,
725 mut account_storages,
726 peer_id,
727 remaining_start,
728 remaining_end,
729 remaining_hash_range: (hash_start, hash_end),
730 task_start_hash,
731 } = result;
732 completed_tasks += 1;
733
734 for (_, accounts) in accounts_by_root_hash[start_index..remaining_start].iter() {
735 for account in accounts {
736 if !accounts_done.contains_key(account) {
737 let (_, old_intervals) = account_storage_roots
738 .accounts_with_storage_root
739 .get_mut(account)
740 .ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
741
742 if old_intervals.is_empty() {
743 accounts_done.insert(*account, vec![]);
744 }
745 }
746 }
747 }
748
749 if remaining_start < remaining_end {
750 debug!("Failed to download entire chunk from peer {peer_id}");
751 if hash_start.is_zero() {
752 let task = StorageTask {
754 start_index: remaining_start,
755 end_index: remaining_end,
756 start_hash: H256::zero(),
757 end_hash: None,
758 };
759 tasks_queue_not_started.push_back(task);
760 task_count += 1;
761 } else if let Some(hash_end) = hash_end {
762 if hash_start <= hash_end {
764 let task = StorageTask {
765 start_index: remaining_start,
766 end_index: remaining_end,
767 start_hash: hash_start,
768 end_hash: Some(hash_end),
769 };
770 tasks_queue_not_started.push_back(task);
771 task_count += 1;
772
773 let acc_hash = *accounts_by_root_hash[remaining_start]
774 .1
775 .first()
776 .ok_or(SnapError::InternalError("Empty accounts vector".to_owned()))?;
777 let (_, old_intervals) = account_storage_roots
778 .accounts_with_storage_root
779 .get_mut(&acc_hash).ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
780 for (old_start, end) in old_intervals {
781 if *old_start == task_start_hash && *end == hash_end {
782 *old_start = hash_start;
783 break;
784 }
785 }
786 account_storage_roots
787 .healed_accounts
788 .extend(accounts_by_root_hash[start_index].1.iter().copied());
789 } else {
790 let found = clear_completed_interval(
796 account_storage_roots,
797 &accounts_by_root_hash,
798 &mut accounts_done,
799 remaining_start,
800 (task_start_hash, hash_end),
801 )?;
802 if !found {
803 panic!("Should have found the account hash");
804 }
805 }
806 } else {
807 if remaining_start + 1 < remaining_end {
808 let task = StorageTask {
809 start_index: remaining_start + 1,
810 end_index: remaining_end,
811 start_hash: H256::zero(),
812 end_hash: None,
813 };
814 tasks_queue_not_started.push_back(task);
815 task_count += 1;
816 }
817 let start_hash_u256 = U256::from_big_endian(&hash_start.0);
819 let missing_storage_range = U256::MAX - start_hash_u256;
820
821 for account in accounts_by_root_hash[remaining_start].1.iter() {
823 account_storage_roots.healed_accounts.insert(*account);
824 }
825
826 let slot_count = account_storages
827 .last()
828 .map(|v| v.len())
829 .ok_or(SnapError::NoAccountStorages)?
830 .max(1);
831 let storage_density = start_hash_u256 / slot_count;
832
833 let slots_per_chunk = U256::from(10000);
834 let chunk_size = storage_density
835 .checked_mul(slots_per_chunk)
836 .unwrap_or(U256::MAX);
837
838 let chunk_count = usize::try_from(missing_storage_range / chunk_size)
839 .unwrap_or(ACCOUNT_RANGE_CHUNK_COUNT)
840 .max(1);
841
842 let first_acc_hash = *accounts_by_root_hash[remaining_start]
843 .1
844 .first()
845 .ok_or(SnapError::InternalError("Empty accounts vector".to_owned()))?;
846
847 let maybe_old_intervals = account_storage_roots
848 .accounts_with_storage_root
849 .get(&first_acc_hash);
850
851 if let Some((_, old_intervals)) = maybe_old_intervals {
852 if !old_intervals.is_empty() {
853 for (start_hash, end_hash) in old_intervals {
854 let task = StorageTask {
855 start_index: remaining_start,
856 end_index: remaining_start + 1,
857 start_hash: *start_hash,
858 end_hash: Some(*end_hash),
859 };
860
861 tasks_queue_not_started.push_back(task);
862 task_count += 1;
863 }
864 } else {
865 account_storage_roots
867 .accounts_with_storage_root
868 .insert(first_acc_hash, (None, vec![]));
869 let (_, intervals) = account_storage_roots
870 .accounts_with_storage_root
871 .get_mut(&first_acc_hash)
872 .ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
873
874 for i in 0..chunk_count {
875 let start_hash_u256 = start_hash_u256 + chunk_size * i;
876 let start_hash = H256::from_uint(&start_hash_u256);
877 let end_hash = if i == chunk_count - 1 {
878 HASH_MAX
879 } else {
880 let end_hash_u256 = start_hash_u256
881 .checked_add(chunk_size)
882 .unwrap_or(U256::MAX);
883 H256::from_uint(&end_hash_u256)
884 };
885
886 let task = StorageTask {
887 start_index: remaining_start,
888 end_index: remaining_start + 1,
889 start_hash,
890 end_hash: Some(end_hash),
891 };
892
893 intervals.push((start_hash, end_hash));
894
895 tasks_queue_not_started.push_back(task);
896 task_count += 1;
897 }
898 debug!("Split big storage account into {chunk_count} chunks.");
899 }
900 } else {
901 account_storage_roots
902 .accounts_with_storage_root
903 .insert(first_acc_hash, (None, vec![]));
904 let (_, intervals) = account_storage_roots
905 .accounts_with_storage_root
906 .get_mut(&first_acc_hash)
907 .ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
908
909 for i in 0..chunk_count {
910 let start_hash_u256 = start_hash_u256 + chunk_size * i;
911 let start_hash = H256::from_uint(&start_hash_u256);
912 let end_hash = if i == chunk_count - 1 {
913 HASH_MAX
914 } else {
915 let end_hash_u256 =
916 start_hash_u256.checked_add(chunk_size).unwrap_or(U256::MAX);
917 H256::from_uint(&end_hash_u256)
918 };
919
920 let task = StorageTask {
921 start_index: remaining_start,
922 end_index: remaining_start + 1,
923 start_hash,
924 end_hash: Some(end_hash),
925 };
926
927 intervals.push((start_hash, end_hash));
928
929 tasks_queue_not_started.push_back(task);
930 task_count += 1;
931 }
932 debug!("Split big storage account into {chunk_count} chunks.");
933 }
934 }
935 } else if let Some(hash_end) = hash_end {
936 clear_completed_interval(
952 account_storage_roots,
953 &accounts_by_root_hash,
954 &mut accounts_done,
955 start_index,
956 (task_start_hash, hash_end),
957 )?;
958 }
959
960 if account_storages.is_empty() {
961 peers.peer_table.record_failure(peer_id)?;
962 continue;
963 }
964 if let Some(hash_end) = hash_end {
965 if account_storages[0].len() == 1 && account_storages[0][0].0 > hash_end {
967 continue;
968 }
969 }
970
971 peers.peer_table.record_success(peer_id)?;
972
973 let n_storages = account_storages.len();
974 let n_slots = account_storages
975 .iter()
976 .map(|storage| storage.len())
977 .sum::<usize>();
978
979 let effective_slots: usize = account_storages
981 .iter()
982 .enumerate()
983 .map(|(i, storages)| {
984 accounts_by_root_hash[start_index + i].1.len() * storages.len()
985 })
986 .sum();
987
988 METRICS
989 .storage_leaves_downloaded
990 .inc_by(effective_slots as u64);
991
992 debug!("Downloaded {n_storages} storages ({n_slots} slots) from peer {peer_id}");
993 debug!(
994 "Total tasks: {task_count}, completed tasks: {completed_tasks}, queued tasks: {}",
995 tasks_queue_not_started.len()
996 );
997 if account_storages.len() == 1 {
1000 let (root_hash, accounts) = &accounts_by_root_hash[start_index];
1001 current_account_storages
1003 .entry(*root_hash)
1004 .or_insert_with(|| AccountsWithStorage {
1005 accounts: accounts.clone(),
1006 storages: Vec::new(),
1007 })
1008 .storages
1009 .extend(account_storages.remove(0));
1010 } else {
1011 for (i, storages) in account_storages.into_iter().enumerate() {
1012 let (root_hash, accounts) = &accounts_by_root_hash[start_index + i];
1013 current_account_storages.insert(
1014 *root_hash,
1015 AccountsWithStorage {
1016 accounts: accounts.clone(),
1017 storages,
1018 },
1019 );
1020 }
1021 }
1022 }
1023
1024 if block_is_stale(pivot_header) {
1025 debug!("Pivot became stale during storage range download, stopping this round");
1026 break;
1027 }
1028
1029 let Some((peer_id, connection, permit)) = peers
1030 .peer_table
1031 .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec())
1032 .await
1033 .inspect_err(|err| warn!(%err, "Error requesting a peer for storage ranges"))
1034 .unwrap_or(None)
1035 else {
1036 if logged_no_free_peers_count == 0 {
1038 trace!("We are missing peers in request_storage_ranges");
1039 logged_no_free_peers_count = 1000;
1040 }
1041 logged_no_free_peers_count -= 1;
1042 tokio::time::sleep(Duration::from_millis(10)).await;
1044 continue;
1045 };
1046
1047 let Some(task) = tasks_queue_not_started.pop_front() else {
1048 if completed_tasks >= task_count {
1049 break;
1050 }
1051 continue;
1052 };
1053
1054 let tx = task_sender.clone();
1055
1056 let (chunk_account_hashes, chunk_storage_roots): (Vec<_>, Vec<_>) = accounts_by_root_hash
1058 [task.start_index..task.end_index]
1059 .iter()
1060 .map(|(root, storages)| (*storages.first().unwrap_or(&H256::zero()), *root))
1061 .unzip();
1062
1063 if task_count - completed_tasks < 30 {
1064 debug!(
1065 "Assigning task: {task:?}, account_hash: {}, storage_root: {}",
1066 chunk_account_hashes.first().unwrap_or(&H256::zero()),
1067 chunk_storage_roots.first().unwrap_or(&H256::zero()),
1068 );
1069 }
1070 tokio::spawn(request_storage_ranges_worker(
1071 task,
1072 peer_id,
1073 connection,
1074 pivot_header.state_root,
1075 chunk_account_hashes,
1076 chunk_storage_roots,
1077 tx,
1078 permit,
1079 ));
1080 }
1081
1082 {
1083 let snapshot = current_account_storages.into_values().collect::<Vec<_>>();
1084
1085 async_fs::ensure_dir_exists(account_storages_snapshots_dir).await?;
1086
1087 let path = get_account_storages_snapshot_file(account_storages_snapshots_dir, chunk_index);
1088 dump_storages_to_file(&path, snapshot).map_err(|_| {
1089 SnapError::SnapshotDir(format!(
1090 "Failed to write storage snapshot chunk {}",
1091 chunk_index
1092 ))
1093 })?;
1094 }
1095 disk_joinset
1096 .join_all()
1097 .await
1098 .into_iter()
1099 .map(|result| {
1100 result.inspect_err(|err| error!("Failed to dump storage snapshot to file: {err:?}"))
1101 })
1102 .collect::<Result<Vec<()>, DumpError>>()
1103 .map_err(SnapError::from)?;
1104
1105 for (account_done, intervals) in accounts_done {
1106 if intervals.is_empty() {
1107 account_storage_roots
1108 .accounts_with_storage_root
1109 .remove(&account_done);
1110 }
1111 }
1112
1113 drop(task_sender);
1115
1116 Ok(chunk_index + 1)
1117}
1118
1119pub async fn request_state_trienodes(
1126 mut connection: PeerConnection,
1127 permit: RequestPermit,
1128 state_root: H256,
1129 paths: Vec<RequestMetadata>,
1130) -> Result<Vec<Node>, SnapError> {
1131 let expected_nodes = paths.len();
1132
1133 let request_id = rand::random();
1134 let request = RLPxMessage::GetTrieNodes(GetTrieNodes {
1135 id: request_id,
1136 root_hash: state_root,
1137 paths: paths
1139 .iter()
1140 .map(|vec| vec![Bytes::from(vec.path.encode_compact())])
1141 .collect(),
1142 bytes: MAX_RESPONSE_BYTES,
1143 });
1144 let response = connection
1145 .outgoing_request(request, PEER_REPLY_TIMEOUT)
1146 .await;
1147 drop(permit);
1148 let nodes = match response {
1149 Ok(RLPxMessage::TrieNodes(trie_nodes)) => trie_nodes
1150 .nodes
1151 .iter()
1152 .map(|node| Node::decode(node))
1153 .collect::<Result<Vec<_>, _>>()
1154 .map_err(SnapError::from),
1155 Ok(other_msg) => Err(SnapError::Protocol(
1156 PeerConnectionError::UnexpectedResponse("TrieNodes".to_string(), other_msg.to_string()),
1157 )),
1158 Err(other_err) => Err(SnapError::Protocol(other_err)),
1159 }?;
1160
1161 if nodes.is_empty() || nodes.len() > expected_nodes {
1162 return Err(SnapError::InvalidData);
1163 }
1164
1165 for (index, node) in nodes.iter().enumerate() {
1166 if node.compute_hash(&NativeCrypto).finalize(&NativeCrypto) != paths[index].hash {
1167 debug!(
1168 "A peer is sending wrong data for the state trie node {:?}",
1169 paths[index].path
1170 );
1171 return Err(SnapError::InvalidHash);
1172 }
1173 }
1174
1175 Ok(nodes)
1176}
1177
1178pub async fn request_storage_trienodes(
1186 mut connection: PeerConnection,
1187 _permit: RequestPermit,
1188 get_trie_nodes: GetTrieNodes,
1189) -> Result<TrieNodes, RequestStorageTrieNodesError> {
1190 let request_id = get_trie_nodes.id;
1191 let request = RLPxMessage::GetTrieNodes(get_trie_nodes);
1192 match connection
1193 .outgoing_request(request, PEER_REPLY_TIMEOUT)
1194 .await
1195 {
1196 Ok(RLPxMessage::TrieNodes(trie_nodes)) => Ok(trie_nodes),
1197 Ok(other_msg) => Err(RequestStorageTrieNodesError {
1198 request_id,
1199 source: SnapError::Protocol(PeerConnectionError::UnexpectedResponse(
1200 "TrieNodes".to_string(),
1201 other_msg.to_string(),
1202 )),
1203 }),
1204 Err(e) => Err(RequestStorageTrieNodesError {
1205 request_id,
1206 source: SnapError::Protocol(e),
1207 }),
1208 }
1209}
1210
1211#[allow(clippy::type_complexity)]
1212async fn request_account_range_worker(
1213 peer_id: H256,
1214 mut connection: PeerConnection,
1215 chunk_start: H256,
1216 chunk_end: H256,
1217 state_root: H256,
1218 tx: tokio::sync::mpsc::Sender<(Vec<AccountRangeUnit>, H256, Option<(H256, H256)>)>,
1219 permit: RequestPermit,
1220) -> Result<(), SnapError> {
1221 debug!("Requesting account range from peer {peer_id}, chunk: {chunk_start:?} - {chunk_end:?}");
1222 let request_id = rand::random();
1223 let request = RLPxMessage::GetAccountRange(GetAccountRange {
1224 id: request_id,
1225 root_hash: state_root,
1226 starting_hash: chunk_start,
1227 limit_hash: chunk_end,
1228 response_bytes: MAX_RESPONSE_BYTES,
1229 });
1230
1231 let response = connection
1234 .outgoing_request(request, PEER_REPLY_TIMEOUT)
1235 .await;
1236 drop(permit);
1237
1238 let retry = || {
1239 (
1240 Vec::<AccountRangeUnit>::new(),
1241 Some((chunk_start, chunk_end)),
1242 )
1243 };
1244 let (accounts_out, chunk_left) = if let Ok(RLPxMessage::AccountRange(AccountRange {
1245 id: _,
1246 accounts,
1247 proof,
1248 })) = response
1249 {
1250 if accounts.is_empty() {
1251 retry()
1252 } else {
1253 let proof = encodable_to_proof(&proof);
1256 let account_hashes: Vec<H256> = accounts.iter().map(|u| u.hash).collect();
1257 let encoded_accounts: Vec<_> =
1258 accounts.iter().map(|u| u.account.encode_to_vec()).collect();
1259
1260 match verify_range(
1261 state_root,
1262 &chunk_start,
1263 &account_hashes,
1264 &encoded_accounts,
1265 &proof,
1266 ) {
1267 Ok(should_continue) => {
1268 let chunk_left = if should_continue {
1269 match account_hashes.last() {
1270 Some(last_hash) => {
1271 let new_start_u256 = U256::from_big_endian(&last_hash.0) + 1;
1272 let new_start = H256::from_uint(&new_start_u256);
1273 Some((new_start, chunk_end))
1274 }
1275 None => {
1276 error!("Account hashes last failed, this shouldn't happen");
1278 Some((chunk_start, chunk_end))
1279 }
1280 }
1281 } else {
1282 None
1283 };
1284 let filtered = accounts
1285 .into_iter()
1286 .filter(|unit| unit.hash <= chunk_end)
1287 .collect::<Vec<_>>();
1288 (filtered, chunk_left)
1289 }
1290 Err(_) => {
1291 tracing::debug!("Received invalid account range");
1292 retry()
1293 }
1294 }
1295 }
1296 } else {
1297 tracing::debug!("Failed to get account range");
1298 retry()
1299 };
1300
1301 tx.send((accounts_out, peer_id, chunk_left)).await.ok();
1302 Ok::<(), SnapError>(())
1303}
1304
1305#[allow(clippy::too_many_arguments)]
1306async fn request_storage_ranges_worker(
1307 task: StorageTask,
1308 peer_id: H256,
1309 mut connection: PeerConnection,
1310 state_root: H256,
1311 chunk_account_hashes: Vec<H256>,
1312 chunk_storage_roots: Vec<H256>,
1313 tx: tokio::sync::mpsc::Sender<StorageTaskResult>,
1314 permit: RequestPermit,
1315) -> Result<(), SnapError> {
1316 let start = task.start_index;
1317 let end = task.end_index;
1318 let start_hash = task.start_hash;
1319
1320 let retry_outcome = || {
1323 (
1324 Vec::<Vec<(H256, U256)>>::new(),
1325 task.start_index,
1326 task.end_index,
1327 (start_hash, task.end_hash),
1328 )
1329 };
1330
1331 let request_id = rand::random();
1332 let request = RLPxMessage::GetStorageRanges(GetStorageRanges {
1333 id: request_id,
1334 root_hash: state_root,
1335 account_hashes: chunk_account_hashes,
1336 starting_hash: start_hash,
1337 limit_hash: task.end_hash.unwrap_or(HASH_MAX),
1338 response_bytes: MAX_RESPONSE_BYTES,
1339 });
1340 tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", "Sending storage range request");
1341
1342 let response = connection
1345 .outgoing_request(request, PEER_REPLY_TIMEOUT)
1346 .await;
1347 drop(permit);
1348
1349 let (account_storages, remaining_start, remaining_end, remaining_hash_range) = 'outcome: {
1350 let Ok(RLPxMessage::StorageRanges(StorageRanges {
1351 id: _,
1352 slots,
1353 proof,
1354 })) = response
1355 else {
1356 #[cfg(feature = "metrics")]
1357 ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("timeout");
1358 tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", outcome = "timeout", "Storage range request failed");
1359 tracing::debug!("Failed to get storage range");
1360 break 'outcome retry_outcome();
1361 };
1362 if slots.is_empty() && proof.is_empty() {
1363 #[cfg(feature = "metrics")]
1364 ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("empty");
1365 tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "empty", "Storage range response empty");
1366 tracing::debug!("Received empty storage range");
1367 break 'outcome retry_outcome();
1368 }
1369 if slots.len() > chunk_storage_roots.len() || slots.is_empty() {
1370 break 'outcome retry_outcome();
1371 }
1372 let proof = encodable_to_proof(&proof);
1373 let mut account_storages: Vec<Vec<(H256, U256)>> = vec![];
1374 let mut should_continue = false;
1375 let mut validation_failed = false;
1376 let mut storage_roots = chunk_storage_roots.into_iter();
1377 let last_slot_index = slots.len() - 1;
1378 for (i, next_account_slots) in slots.into_iter().enumerate() {
1379 if next_account_slots.is_empty() {
1380 debug!("Received empty storage range, skipping");
1381 validation_failed = true;
1382 break;
1383 }
1384 let encoded_values = next_account_slots
1385 .iter()
1386 .map(|slot| slot.data.encode_to_vec())
1387 .collect::<Vec<_>>();
1388 let hashed_keys: Vec<_> = next_account_slots.iter().map(|slot| slot.hash).collect();
1389
1390 let storage_root = match storage_roots.next() {
1391 Some(root) => root,
1392 None => {
1393 debug!("No storage root for account {i}");
1394 break 'outcome retry_outcome();
1395 }
1396 };
1397
1398 if i == last_slot_index && !proof.is_empty() {
1400 let Ok(sc) = verify_range(
1401 storage_root,
1402 &start_hash,
1403 &hashed_keys,
1404 &encoded_values,
1405 &proof,
1406 ) else {
1407 validation_failed = true;
1408 break;
1409 };
1410 should_continue = sc;
1411 } else if verify_range(
1412 storage_root,
1413 &start_hash,
1414 &hashed_keys,
1415 &encoded_values,
1416 &[],
1417 )
1418 .is_err()
1419 {
1420 validation_failed = true;
1421 break;
1422 }
1423
1424 account_storages.push(
1425 next_account_slots
1426 .iter()
1427 .map(|slot| (slot.hash, slot.data))
1428 .collect(),
1429 );
1430 }
1431
1432 if validation_failed {
1433 break 'outcome retry_outcome();
1434 }
1435
1436 let (remaining_start, remaining_end, remaining_start_hash) = if should_continue {
1437 let last_account_storage = match account_storages.last() {
1438 Some(storage) => storage,
1439 None => {
1440 error!("No account storage found, this shouldn't happen");
1441 break 'outcome retry_outcome();
1442 }
1443 };
1444 let (last_hash, _) = match last_account_storage.last() {
1445 Some(last_hash) => last_hash,
1446 None => {
1447 error!("No last hash found, this shouldn't happen");
1448 break 'outcome retry_outcome();
1449 }
1450 };
1451 let next_hash_u256 = U256::from_big_endian(&last_hash.0).saturating_add(1.into());
1452 let next_hash = H256::from_uint(&next_hash_u256);
1453 (start + account_storages.len() - 1, end, next_hash)
1454 } else {
1455 (start + account_storages.len(), end, H256::zero())
1456 };
1457 let slot_count: usize = account_storages.iter().map(|s| s.len()).sum();
1458 #[cfg(feature = "metrics")]
1459 ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("success");
1460 tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "success", slots = slot_count, "Storage range response received");
1461 (
1462 account_storages,
1463 remaining_start,
1464 remaining_end,
1465 (remaining_start_hash, task.end_hash),
1466 )
1467 };
1468
1469 let task_result = StorageTaskResult {
1470 start_index: start,
1471 account_storages,
1472 peer_id,
1473 remaining_start,
1474 remaining_end,
1475 remaining_hash_range,
1476 task_start_hash: start_hash,
1477 };
1478 tx.send(task_result).await.ok();
1479 Ok::<(), SnapError>(())
1480}