1#[cfg(feature = "rocksdb")]
2use crate::backend::rocksdb::RocksDBBackend;
3use crate::{
4 STORE_METADATA_FILENAME, STORE_SCHEMA_VERSION,
5 api::{
6 StorageBackend, StorageReadView,
7 tables::{
8 ACCOUNT_CODE_METADATA, ACCOUNT_CODES, ACCOUNT_FLATKEYVALUE, ACCOUNT_TRIE_NODES,
9 BLOCK_ACCESS_LISTS, BLOCK_NUMBERS, BODIES, CANONICAL_BLOCK_HASHES, CHAIN_DATA,
10 EXECUTION_WITNESSES, FULLSYNC_HEADERS, HEADERS, INVALID_CHAINS, MISC_VALUES,
11 PENDING_BLOCKS, RECEIPTS_V2, SNAP_STATE, STORAGE_FLATKEYVALUE, STORAGE_TRIE_NODES,
12 TRANSACTION_LOCATIONS,
13 },
14 },
15 apply_prefix,
16 backend::in_memory::InMemoryBackend,
17 error::StoreError,
18 layering::{TrieLayerCache, TrieWrapper},
19 rlp::{BlockBodyRLP, BlockHeaderRLP, BlockRLP},
20 trie::{BackendTrieDB, BackendTrieDBLocked},
21 utils::{ChainDataIndex, SnapStateIndex},
22};
23
24use ethrex_common::{
25 Address, H256, U256,
26 types::{
27 AccountInfo, AccountState, AccountUpdate, Block, BlockBody, BlockHash, BlockHeader,
28 BlockNumber, ChainConfig, Code, CodeMetadata, ForkId, Genesis, GenesisAccount, Index,
29 Receipt, Transaction,
30 block_access_list::BlockAccessList,
31 block_execution_witness::{ExecutionWitness, RpcExecutionWitness},
32 },
33 utils::keccak,
34};
35use ethrex_crypto::{NativeCrypto, keccak::keccak_hash};
36use ethrex_rlp::{
37 decode::{RLPDecode, decode_bytes},
38 encode::RLPEncode,
39};
40use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Trie, TrieLogger, TrieNode, TrieWitness};
41use ethrex_trie::{Node, NodeRLP};
42use lru::LruCache;
43use rustc_hash::FxBuildHasher;
44use serde::{Deserialize, Serialize};
45use std::{
46 collections::{BTreeMap, HashMap, hash_map::Entry},
47 fmt::Debug,
48 io::Write,
49 path::{Path, PathBuf},
50 sync::{
51 Arc, Mutex, RwLock,
52 mpsc::{SyncSender, TryRecvError, sync_channel},
53 },
54 thread::JoinHandle,
55};
56#[cfg(feature = "rocksdb")]
57use tracing::warn;
58use tracing::{debug, error, info};
59
60pub const MAX_WITNESSES: u64 = 128;
62
63#[allow(unused)]
67const DB_COMMIT_THRESHOLD: usize = 128;
68const IN_MEMORY_COMMIT_THRESHOLD: usize = 10000;
69
70const BATCH_COMMIT_THRESHOLD: usize = 4;
73
74pub const DEFAULT_ROCKSDB_BLOCK_CACHE_SIZE_BYTES: usize = 12 * 1024 * 1024 * 1024;
84
85#[derive(Debug, Clone, Copy)]
90pub struct StoreConfig {
91 pub rocksdb_block_cache_size: usize,
100}
101
102impl Default for StoreConfig {
103 fn default() -> Self {
104 Self {
105 rocksdb_block_cache_size: DEFAULT_ROCKSDB_BLOCK_CACHE_SIZE_BYTES,
106 }
107 }
108}
109
110#[derive(Debug, PartialEq)]
112enum FKVGeneratorControlMessage {
113 Stop,
114 Continue,
115}
116
117const CODE_CACHE_MAX_SIZE: u64 = 64 * 1024 * 1024;
119
120#[derive(Debug)]
121struct CodeCache {
122 inner_cache: LruCache<H256, Code, FxBuildHasher>,
123 cache_size: u64,
124}
125
126impl Default for CodeCache {
127 fn default() -> Self {
128 Self {
129 inner_cache: LruCache::unbounded_with_hasher(FxBuildHasher),
130 cache_size: 0,
131 }
132 }
133}
134
135impl CodeCache {
136 fn get(&mut self, code_hash: &H256) -> Result<Option<Code>, StoreError> {
137 Ok(self.inner_cache.get(code_hash).cloned())
138 }
139
140 fn insert(&mut self, code: &Code) -> Result<(), StoreError> {
141 let code_size = code.size();
142 let cache_len = self.inner_cache.len() + 1;
143 self.cache_size += code_size as u64;
144 let current_size = self.cache_size;
145 debug!(
146 "[ACCOUNT CODE CACHE] cache elements (): {cache_len}, total size: {current_size} bytes"
147 );
148
149 while self.cache_size > CODE_CACHE_MAX_SIZE {
150 if let Some((_, code)) = self.inner_cache.pop_lru() {
151 self.cache_size -= code.size() as u64;
152 } else {
153 break;
154 }
155 }
156
157 self.inner_cache.get_or_insert(code.hash, || code.clone());
158 Ok(())
159 }
160}
161
162#[derive(Debug, Clone)]
195pub struct Store {
196 db_path: PathBuf,
198 backend: Arc<dyn StorageBackend>,
200 chain_config: ChainConfig,
202 trie_cache: Arc<RwLock<Arc<TrieLayerCache>>>,
204 flatkeyvalue_control_tx: std::sync::mpsc::SyncSender<FKVGeneratorControlMessage>,
206 trie_update_worker_tx: std::sync::mpsc::SyncSender<TrieMessage>,
208 latest_block_header: LatestBlockHeaderCache,
216 last_computed_flatkeyvalue: Arc<RwLock<Vec<u8>>>,
218
219 account_code_cache: Arc<Mutex<CodeCache>>,
224
225 code_metadata_cache: Arc<Mutex<rustc_hash::FxHashMap<H256, CodeMetadata>>>,
228
229 fcu_lock: Arc<tokio::sync::Mutex<()>>,
232
233 background_threads: Arc<ThreadList>,
234}
235
236#[derive(Debug, Default)]
237struct ThreadList {
238 list: Vec<JoinHandle<()>>,
239}
240
241impl Drop for ThreadList {
242 fn drop(&mut self) {
243 for handle in self.list.drain(..) {
244 let _ = handle.join();
245 }
246 }
247}
248
249pub type StorageTrieNodes = Vec<(H256, Vec<(Nibbles, Vec<u8>)>)>;
254type StorageTries = HashMap<Address, (TrieWitness, Trie)>;
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
260pub enum EngineType {
261 InMemory,
263 #[cfg(feature = "rocksdb")]
265 RocksDB,
266}
267
268pub struct UpdateBatch {
273 pub account_updates: Vec<TrieNode>,
275 pub storage_updates: Vec<(H256, Vec<TrieNode>)>,
277 pub blocks: Vec<Block>,
279 pub receipts: Vec<(H256, Vec<Receipt>)>,
281 pub code_updates: Vec<(H256, Code)>,
283 pub batch_mode: bool,
287}
288
289pub type StorageUpdates = Vec<(H256, Vec<(Nibbles, Vec<u8>)>)>;
291
292pub struct AccountUpdatesList {
297 pub state_trie_hash: H256,
299 pub state_updates: Vec<(Nibbles, Vec<u8>)>,
301 pub storage_updates: StorageUpdates,
303 pub code_updates: Vec<(H256, Code)>,
305}
306
307pub(crate) fn encode_tx_location_operand(
318 block_number: BlockNumber,
319 block_hash: BlockHash,
320 index: Index,
321) -> Vec<u8> {
322 vec![(block_number, block_hash, index)].encode_to_vec()
323}
324
325pub fn tx_locations_merge(
359 existing: Option<&[u8]>,
360 operands: impl IntoIterator<Item = impl AsRef<[u8]>>,
361) -> Option<Vec<u8>> {
362 fn fold_chunk(
366 list: &mut Vec<(BlockNumber, BlockHash, Index)>,
367 bytes: &[u8],
368 what: &str,
369 ) -> bool {
370 match <Vec<(BlockNumber, BlockHash, Index)>>::decode(bytes) {
371 Ok(entries) => {
372 for (bn, bh, idx) in entries {
373 list.retain(|(_, existing_bh, _)| *existing_bh != bh);
374 list.push((bn, bh, idx));
375 }
376 true
377 }
378 Err(e) => {
379 error!(
380 "tx_locations_merge: failed to decode {what} ({} bytes): {e}; \
381 aborting merge to avoid silent data loss",
382 bytes.len()
383 );
384 false
385 }
386 }
387 }
388
389 let mut list: Vec<(BlockNumber, BlockHash, Index)> = Vec::new();
390
391 if let Some(bytes) = existing
393 && !fold_chunk(&mut list, bytes, "existing value")
394 {
395 return None;
396 }
397 for op in operands {
398 if !fold_chunk(&mut list, op.as_ref(), "operand") {
399 return None;
400 }
401 }
402 Some(list.encode_to_vec())
403}
404
405impl Store {
406 pub async fn wait_for_persistence_idle(&self) -> Result<(), StoreError> {
422 let tx = self.trie_update_worker_tx.clone();
423 tokio::task::spawn_blocking(move || tx.send(TrieMessage::Ping))
424 .await
425 .map_err(|e| StoreError::Custom(format!("wait_for_persistence_idle join: {e}")))?
426 .map_err(|e| StoreError::Custom(format!("wait_for_persistence_idle send: {e}")))
427 }
428
429 pub async fn add_block(&self, block: Block) -> Result<(), StoreError> {
432 self.add_blocks(vec![block]).await
433 }
434
435 pub async fn add_blocks(&self, blocks: Vec<Block>) -> Result<(), StoreError> {
438 let db = self.backend.clone();
439 tokio::task::spawn_blocking(move || {
440 let mut tx = db.begin_write()?;
441
442 for block in blocks {
444 let block_number = block.header.number;
445 let block_hash = block.hash();
446 let hash_key = block_hash.encode_to_vec();
447
448 let header_value_rlp = BlockHeaderRLP::from(block.header.clone());
449 tx.put(HEADERS, &hash_key, header_value_rlp.bytes())?;
450
451 let body_value = BlockBodyRLP::from_bytes(block.body.encode_to_vec());
452 tx.put(BODIES, &hash_key, body_value.bytes())?;
453
454 tx.put(BLOCK_NUMBERS, &hash_key, &block_number.to_le_bytes())?;
455
456 for (index, transaction) in block.body.transactions.iter().enumerate() {
457 tx.merge(
458 TRANSACTION_LOCATIONS,
459 transaction.hash().as_bytes(),
460 &encode_tx_location_operand(block_number, block_hash, index as u64),
461 )?;
462 }
463 }
464
465 tx.commit()
466 })
467 .await
468 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
469 }
470
471 pub async fn add_block_header(
473 &self,
474 block_hash: BlockHash,
475 block_header: BlockHeader,
476 ) -> Result<(), StoreError> {
477 let hash_key = block_hash.encode_to_vec();
478 let header_value = BlockHeaderRLP::from(block_header).into_vec();
479 self.write_async(HEADERS, hash_key, header_value).await
480 }
481
482 pub async fn add_block_headers(
484 &self,
485 block_headers: Vec<BlockHeader>,
486 ) -> Result<(), StoreError> {
487 let mut txn = self.backend.begin_write()?;
488
489 for header in block_headers {
490 let block_hash = header.hash();
491 let block_number = header.number;
492 let hash_key = block_hash.encode_to_vec();
493 let header_value = BlockHeaderRLP::from(header).into_vec();
494
495 txn.put(HEADERS, &hash_key, &header_value)?;
496
497 let number_key = block_number.to_le_bytes().to_vec();
498 txn.put(BLOCK_NUMBERS, &hash_key, &number_key)?;
499 }
500 txn.commit()?;
501 Ok(())
502 }
503
504 pub fn get_block_header(
506 &self,
507 block_number: BlockNumber,
508 ) -> Result<Option<BlockHeader>, StoreError> {
509 let latest = self.latest_block_header.get();
510 if block_number == latest.number {
511 return Ok(Some((*latest).clone()));
512 }
513 self.load_block_header(block_number)
514 }
515
516 pub async fn add_block_body(
518 &self,
519 block_hash: BlockHash,
520 block_body: BlockBody,
521 ) -> Result<(), StoreError> {
522 let hash_key = block_hash.encode_to_vec();
523 let body_value = BlockBodyRLP::from(block_body).into_vec();
524 self.write_async(BODIES, hash_key, body_value).await
525 }
526
527 pub async fn get_block_body(
529 &self,
530 block_number: BlockNumber,
531 ) -> Result<Option<BlockBody>, StoreError> {
532 let Some(block_hash) = self.get_canonical_block_hash_sync(block_number)? else {
533 return Ok(None);
534 };
535
536 self.get_block_body_by_hash(block_hash).await
537 }
538
539 pub async fn remove_block(&self, block_number: BlockNumber) -> Result<(), StoreError> {
541 let Some(hash) = self.get_canonical_block_hash_sync(block_number)? else {
542 return Ok(());
543 };
544
545 let backend = self.backend.clone();
546 tokio::task::spawn_blocking(move || {
547 let hash_key = hash.encode_to_vec();
548
549 let mut txn = backend.begin_write()?;
550 txn.delete(
551 CANONICAL_BLOCK_HASHES,
552 block_number.to_le_bytes().as_slice(),
553 )?;
554 txn.delete(BODIES, &hash_key)?;
555 txn.delete(HEADERS, &hash_key)?;
556 txn.delete(BLOCK_NUMBERS, &hash_key)?;
557 txn.commit()
558 })
559 .await
560 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
561 }
562
563 pub async fn get_block_bodies(
565 &self,
566 from: BlockNumber,
567 to: BlockNumber,
568 ) -> Result<Vec<Option<BlockBody>>, StoreError> {
569 let backend = self.backend.clone();
571 tokio::task::spawn_blocking(move || {
572 let numbers: Vec<BlockNumber> = (from..=to).collect();
573 let mut block_bodies = Vec::new();
574
575 let txn = backend.begin_read()?;
576 for number in numbers {
577 let Some(hash) = txn
578 .get(CANONICAL_BLOCK_HASHES, number.to_le_bytes().as_slice())?
579 .map(|bytes| H256::decode(bytes.as_slice()))
580 .transpose()?
581 else {
582 block_bodies.push(None);
583 continue;
584 };
585 let hash_key = hash.encode_to_vec();
586 let block_body_opt = txn
587 .get(BODIES, &hash_key)?
588 .map(|bytes| BlockBodyRLP::from_bytes(bytes).to())
589 .transpose()
590 .map_err(StoreError::from)?;
591
592 block_bodies.push(block_body_opt);
593 }
594
595 Ok(block_bodies)
596 })
597 .await
598 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
599 }
600
601 pub async fn get_block_bodies_by_hash(
603 &self,
604 hashes: Vec<BlockHash>,
605 ) -> Result<Vec<BlockBody>, StoreError> {
606 let backend = self.backend.clone();
607 tokio::task::spawn_blocking(move || {
609 let txn = backend.begin_read()?;
610 let mut block_bodies = Vec::new();
611 for hash in hashes {
612 let hash_key = hash.encode_to_vec();
613
614 let Some(block_body) = txn
615 .get(BODIES, &hash_key)?
616 .map(|bytes| BlockBodyRLP::from_bytes(bytes).to())
617 .transpose()
618 .map_err(StoreError::from)?
619 else {
620 return Err(StoreError::Custom(format!(
621 "Block body not found for hash: {hash}"
622 )));
623 };
624 block_bodies.push(block_body);
625 }
626 Ok(block_bodies)
627 })
628 .await
629 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
630 }
631
632 pub async fn get_block_body_by_hash(
634 &self,
635 block_hash: BlockHash,
636 ) -> Result<Option<BlockBody>, StoreError> {
637 self.read_async(BODIES, block_hash.encode_to_vec())
638 .await?
639 .map(|bytes| BlockBodyRLP::from_bytes(bytes).to())
640 .transpose()
641 .map_err(StoreError::from)
642 }
643
644 pub fn get_block_header_by_hash(
645 &self,
646 block_hash: BlockHash,
647 ) -> Result<Option<BlockHeader>, StoreError> {
648 let latest = self.latest_block_header.get();
649 if block_hash == latest.hash() {
650 return Ok(Some((*latest).clone()));
651 }
652 self.load_block_header_by_hash(block_hash)
653 }
654
655 pub fn add_pending_block(&self, block: Block) -> Result<(), StoreError> {
656 let block_hash = block.hash();
657 let block_value = BlockRLP::from(block).into_vec();
658 self.write(PENDING_BLOCKS, block_hash.as_bytes().to_vec(), block_value)
659 }
660
661 pub async fn get_pending_block(
662 &self,
663 block_hash: BlockHash,
664 ) -> Result<Option<Block>, StoreError> {
665 self.read_async(PENDING_BLOCKS, block_hash.as_bytes().to_vec())
666 .await?
667 .map(|bytes| BlockRLP::from_bytes(bytes).to())
668 .transpose()
669 .map_err(StoreError::from)
670 }
671
672 pub async fn add_block_number(
674 &self,
675 block_hash: BlockHash,
676 block_number: BlockNumber,
677 ) -> Result<(), StoreError> {
678 let number_value = block_number.to_le_bytes().to_vec();
679 self.write_async(BLOCK_NUMBERS, block_hash.encode_to_vec(), number_value)
680 .await
681 }
682
683 pub async fn get_block_number(
685 &self,
686 block_hash: BlockHash,
687 ) -> Result<Option<BlockNumber>, StoreError> {
688 self.read_async(BLOCK_NUMBERS, block_hash.encode_to_vec())
689 .await?
690 .map(|bytes| -> Result<BlockNumber, StoreError> {
691 let array: [u8; 8] = bytes
692 .try_into()
693 .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
694 Ok(BlockNumber::from_le_bytes(array))
695 })
696 .transpose()
697 }
698
699 pub async fn add_transaction_location(
701 &self,
702 transaction_hash: H256,
703 block_number: BlockNumber,
704 block_hash: BlockHash,
705 index: Index,
706 ) -> Result<(), StoreError> {
707 self.add_transaction_locations(vec![(transaction_hash, block_number, block_hash, index)])
708 .await
709 }
710
711 pub async fn add_transaction_locations(
713 &self,
714 locations: Vec<(H256, BlockNumber, BlockHash, Index)>,
715 ) -> Result<(), StoreError> {
716 let db = self.backend.clone();
717 tokio::task::spawn_blocking(move || {
718 let mut tx = db.begin_write()?;
719 for (tx_hash, block_number, block_hash, index) in locations {
720 tx.merge(
721 TRANSACTION_LOCATIONS,
722 tx_hash.as_bytes(),
723 &encode_tx_location_operand(block_number, block_hash, index),
724 )?;
725 }
726 tx.commit()
727 })
728 .await
729 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
730 }
731
732 pub async fn get_transaction_location(
734 &self,
735 transaction_hash: H256,
736 ) -> Result<Option<(BlockNumber, BlockHash, Index)>, StoreError> {
737 let db = self.backend.clone();
738 tokio::task::spawn_blocking(move || {
739 let tx = db.begin_read()?;
740 let Some(bytes) = tx.get(TRANSACTION_LOCATIONS, transaction_hash.as_bytes())? else {
741 return Ok(None);
742 };
743 let locations = <Vec<(BlockNumber, BlockHash, Index)>>::decode(&bytes)?;
744
745 for (block_number, block_hash, index) in locations {
748 let canonical_hash = tx
749 .get(
750 CANONICAL_BLOCK_HASHES,
751 block_number.to_le_bytes().as_slice(),
752 )?
753 .map(|bytes| H256::decode(bytes.as_slice()))
754 .transpose()?;
755
756 if canonical_hash == Some(block_hash) {
757 return Ok(Some((block_number, block_hash, index)));
758 }
759 }
760
761 Ok(None)
762 })
763 .await
764 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
765 }
766
767 pub async fn add_receipt(
769 &self,
770 block_hash: BlockHash,
771 index: Index,
772 receipt: Receipt,
773 ) -> Result<(), StoreError> {
774 let key = receipt_key(&block_hash, index);
775 let value = receipt.encode_to_vec();
776 self.write_async(RECEIPTS_V2, key, value).await
777 }
778
779 pub async fn add_receipts(
781 &self,
782 block_hash: BlockHash,
783 receipts: Vec<Receipt>,
784 ) -> Result<(), StoreError> {
785 let batch_items: Vec<_> = receipts
786 .into_iter()
787 .enumerate()
788 .map(|(index, receipt)| {
789 let key = receipt_key(&block_hash, index as u64);
790 let value = receipt.encode_to_vec();
791 (key, value)
792 })
793 .collect();
794 self.write_batch_async(RECEIPTS_V2, batch_items).await
795 }
796
797 pub async fn get_receipt(
799 &self,
800 block_number: BlockNumber,
801 index: Index,
802 ) -> Result<Option<Receipt>, StoreError> {
803 let Some(block_hash) = self.get_canonical_block_hash(block_number).await? else {
805 return Ok(None);
806 };
807 self.get_receipt_by_block_hash(block_hash, index).await
808 }
809
810 async fn get_receipt_by_block_hash(
812 &self,
813 block_hash: BlockHash,
814 index: Index,
815 ) -> Result<Option<Receipt>, StoreError> {
816 let key = receipt_key(&block_hash, index);
817 self.read_async(RECEIPTS_V2, key)
818 .await?
819 .map(|bytes| Receipt::decode(bytes.as_slice()))
820 .transpose()
821 .map_err(StoreError::from)
822 }
823
824 pub fn get_account_code(&self, code_hash: H256) -> Result<Option<Code>, StoreError> {
829 if let Some(code) = self
831 .account_code_cache
832 .lock()
833 .map_err(|_| StoreError::LockError)?
834 .get(&code_hash)?
835 {
836 return Ok(Some(code));
837 }
838
839 let Some(bytes) = self
840 .backend
841 .begin_read()?
842 .get(ACCOUNT_CODES, code_hash.as_bytes())?
843 else {
844 return Ok(None);
845 };
846 let (bytecode_slice, targets) = decode_bytes(&bytes)?;
847 let code = Code::from_parts_unchecked(
848 code_hash,
849 bytecode_slice,
850 <Vec<u32>>::decode(targets)?.into(),
851 );
852
853 self.account_code_cache
855 .lock()
856 .map_err(|_| StoreError::LockError)?
857 .insert(&code)?;
858
859 Ok(Some(code))
860 }
861
862 pub fn code_exists(&self, code_hash: H256) -> Result<bool, StoreError> {
867 if self
869 .account_code_cache
870 .lock()
871 .map_err(|_| StoreError::LockError)?
872 .get(&code_hash)?
873 .is_some()
874 {
875 return Ok(true);
876 }
877 Ok(self
879 .backend
880 .begin_read()?
881 .get(ACCOUNT_CODES, code_hash.as_bytes())?
882 .is_some())
883 }
884
885 pub fn get_code_metadata(&self, code_hash: H256) -> Result<Option<CodeMetadata>, StoreError> {
890 use ethrex_common::constants::EMPTY_KECCAK_HASH;
891
892 if code_hash == *EMPTY_KECCAK_HASH {
894 return Ok(Some(CodeMetadata { length: 0 }));
895 }
896
897 if let Some(metadata) = self
899 .code_metadata_cache
900 .lock()
901 .map_err(|_| StoreError::LockError)?
902 .get(&code_hash)
903 .copied()
904 {
905 return Ok(Some(metadata));
906 }
907
908 let metadata = if let Some(bytes) = self
910 .backend
911 .begin_read()?
912 .get(ACCOUNT_CODE_METADATA, code_hash.as_bytes())?
913 {
914 let length =
915 u64::from_be_bytes(bytes.try_into().map_err(|_| {
916 StoreError::Custom("Invalid metadata length encoding".to_string())
917 })?);
918 CodeMetadata { length }
919 } else {
920 let Some(code) = self.get_account_code(code_hash)? else {
922 return Ok(None);
923 };
924 let metadata = CodeMetadata {
925 length: code.len() as u64,
926 };
927
928 let metadata_buf = metadata.length.to_be_bytes().to_vec();
930 let hash_key = code_hash.0.to_vec();
931 let backend = self.backend.clone();
932 tokio::task::spawn(async move {
933 if let Err(e) = async {
934 let mut tx = backend.begin_write()?;
935 tx.put(ACCOUNT_CODE_METADATA, &hash_key, &metadata_buf)?;
936 tx.commit()
937 }
938 .await
939 {
940 tracing::warn!("Failed to write code metadata during auto-migration: {}", e);
941 }
942 });
943
944 metadata
945 };
946
947 self.code_metadata_cache
949 .lock()
950 .map_err(|_| StoreError::LockError)?
951 .insert(code_hash, metadata);
952
953 Ok(Some(metadata))
954 }
955
956 pub async fn add_account_code(&self, code: Code) -> Result<(), StoreError> {
958 let hash_key = code.hash.0.to_vec();
959 let buf = encode_code(&code);
960 let metadata_buf = (code.len() as u64).to_be_bytes();
961
962 let backend = self.backend.clone();
964 tokio::task::spawn_blocking(move || {
965 let mut tx = backend.begin_write()?;
966 tx.put(ACCOUNT_CODES, &hash_key, &buf)?;
967 tx.put(ACCOUNT_CODE_METADATA, &hash_key, &metadata_buf)?;
968 tx.commit()
969 })
970 .await
971 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
972 }
973
974 pub async fn clear_snap_state(&self) -> Result<(), StoreError> {
976 let db = self.backend.clone();
977 tokio::task::spawn_blocking(move || db.clear_table(SNAP_STATE))
978 .await
979 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
980 }
981
982 pub async fn get_transaction_by_hash(
983 &self,
984 transaction_hash: H256,
985 ) -> Result<Option<Transaction>, StoreError> {
986 let (_block_number, block_hash, index) =
987 match self.get_transaction_location(transaction_hash).await? {
988 Some(location) => location,
989 None => return Ok(None),
990 };
991 self.get_transaction_by_location(block_hash, index).await
992 }
993
994 pub async fn get_transaction_by_location(
995 &self,
996 block_hash: H256,
997 index: u64,
998 ) -> Result<Option<Transaction>, StoreError> {
999 let block_body = match self.get_block_body_by_hash(block_hash).await? {
1000 Some(body) => body,
1001 None => return Ok(None),
1002 };
1003 let index: usize = index.try_into()?;
1004 Ok(block_body.transactions.get(index).cloned())
1005 }
1006
1007 pub async fn get_block_by_hash(
1008 &self,
1009 block_hash: BlockHash,
1010 ) -> Result<Option<Block>, StoreError> {
1011 let header = match self.get_block_header_by_hash(block_hash)? {
1012 Some(header) => header,
1013 None => return Ok(None),
1014 };
1015 let body = match self.get_block_body_by_hash(block_hash).await? {
1016 Some(body) => body,
1017 None => return Ok(None),
1018 };
1019 Ok(Some(Block::new(header, body)))
1020 }
1021
1022 pub async fn get_block_by_number(
1023 &self,
1024 block_number: BlockNumber,
1025 ) -> Result<Option<Block>, StoreError> {
1026 let Some(block_hash) = self.get_canonical_block_hash(block_number).await? else {
1027 return Ok(None);
1028 };
1029 self.get_block_by_hash(block_hash).await
1030 }
1031
1032 pub async fn get_canonical_block_hash(
1034 &self,
1035 block_number: BlockNumber,
1036 ) -> Result<Option<BlockHash>, StoreError> {
1037 let last = self.latest_block_header.get();
1038 if last.number == block_number {
1039 return Ok(Some(last.hash()));
1040 }
1041 let backend = self.backend.clone();
1042 tokio::task::spawn_blocking(move || {
1043 backend
1044 .begin_read()?
1045 .get(
1046 CANONICAL_BLOCK_HASHES,
1047 block_number.to_le_bytes().as_slice(),
1048 )?
1049 .map(|bytes| H256::decode(bytes.as_slice()))
1050 .transpose()
1051 .map_err(StoreError::from)
1052 })
1053 .await
1054 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
1055 }
1056
1057 pub async fn set_chain_config(&mut self, chain_config: &ChainConfig) -> Result<(), StoreError> {
1060 self.chain_config = *chain_config;
1061 let key = chain_data_key(ChainDataIndex::ChainConfig);
1062 let value = serde_json::to_string(chain_config)
1063 .map_err(|_| StoreError::Custom("Failed to serialize chain config".to_string()))?
1064 .into_bytes();
1065 self.write_async(CHAIN_DATA, key, value).await
1066 }
1067
1068 pub async fn update_earliest_block_number(
1070 &self,
1071 block_number: BlockNumber,
1072 ) -> Result<(), StoreError> {
1073 let key = chain_data_key(ChainDataIndex::EarliestBlockNumber);
1074 let value = block_number.to_le_bytes().to_vec();
1075 self.write_async(CHAIN_DATA, key, value).await
1076 }
1077
1078 pub async fn get_earliest_block_number(&self) -> Result<BlockNumber, StoreError> {
1080 let key = chain_data_key(ChainDataIndex::EarliestBlockNumber);
1081 self.read_async(CHAIN_DATA, key)
1082 .await?
1083 .map(|bytes| -> Result<BlockNumber, StoreError> {
1084 let array: [u8; 8] = bytes
1085 .try_into()
1086 .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
1087 Ok(BlockNumber::from_le_bytes(array))
1088 })
1089 .ok_or(StoreError::MissingEarliestBlockNumber)?
1090 }
1091
1092 pub async fn get_finalized_block_number(&self) -> Result<Option<BlockNumber>, StoreError> {
1094 let key = chain_data_key(ChainDataIndex::FinalizedBlockNumber);
1095 self.read_async(CHAIN_DATA, key)
1096 .await?
1097 .map(|bytes| -> Result<BlockNumber, StoreError> {
1098 let array: [u8; 8] = bytes
1099 .try_into()
1100 .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
1101 Ok(BlockNumber::from_le_bytes(array))
1102 })
1103 .transpose()
1104 }
1105
1106 pub async fn get_safe_block_number(&self) -> Result<Option<BlockNumber>, StoreError> {
1108 let key = chain_data_key(ChainDataIndex::SafeBlockNumber);
1109 self.read_async(CHAIN_DATA, key)
1110 .await?
1111 .map(|bytes| -> Result<BlockNumber, StoreError> {
1112 let array: [u8; 8] = bytes
1113 .try_into()
1114 .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
1115 Ok(BlockNumber::from_le_bytes(array))
1116 })
1117 .transpose()
1118 }
1119
1120 pub async fn get_latest_block_number(&self) -> Result<BlockNumber, StoreError> {
1122 Ok(self.latest_block_header.get().number)
1123 }
1124
1125 pub async fn update_pending_block_number(
1127 &self,
1128 block_number: BlockNumber,
1129 ) -> Result<(), StoreError> {
1130 let key = chain_data_key(ChainDataIndex::PendingBlockNumber);
1131 let value = block_number.to_le_bytes().to_vec();
1132 self.write_async(CHAIN_DATA, key, value).await
1133 }
1134
1135 pub async fn get_pending_block_number(&self) -> Result<Option<BlockNumber>, StoreError> {
1137 let key = chain_data_key(ChainDataIndex::PendingBlockNumber);
1138 self.read_async(CHAIN_DATA, key)
1139 .await?
1140 .map(|bytes| -> Result<BlockNumber, StoreError> {
1141 let array: [u8; 8] = bytes
1142 .try_into()
1143 .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
1144 Ok(BlockNumber::from_le_bytes(array))
1145 })
1146 .transpose()
1147 }
1148
1149 async fn forkchoice_update_inner(
1155 &self,
1156 new_canonical_blocks: Vec<(BlockNumber, BlockHash)>,
1157 head_number: BlockNumber,
1158 head_hash: BlockHash,
1159 safe: Option<BlockNumber>,
1160 finalized: Option<BlockNumber>,
1161 ) -> Result<(), StoreError> {
1162 let latest = self.load_latest_block_number().await?.unwrap_or(0);
1163 let db = self.backend.clone();
1164 tokio::task::spawn_blocking(move || {
1165 let mut txn = db.begin_write()?;
1166
1167 for (block_number, block_hash) in new_canonical_blocks {
1168 let head_key = block_number.to_le_bytes();
1169 let head_value = block_hash.encode_to_vec();
1170 txn.put(CANONICAL_BLOCK_HASHES, &head_key, &head_value)?;
1171 }
1172
1173 for number in (head_number + 1)..=(latest) {
1180 txn.delete(CANONICAL_BLOCK_HASHES, number.to_le_bytes().as_slice())?;
1181 }
1182
1183 let head_key = head_number.to_le_bytes();
1185 let head_value = head_hash.encode_to_vec();
1186 txn.put(CANONICAL_BLOCK_HASHES, &head_key, &head_value)?;
1187
1188 let latest_key = chain_data_key(ChainDataIndex::LatestBlockNumber);
1190 txn.put(CHAIN_DATA, &latest_key, &head_number.to_le_bytes())?;
1191
1192 if let Some(safe) = safe {
1193 let safe_key = chain_data_key(ChainDataIndex::SafeBlockNumber);
1194 txn.put(CHAIN_DATA, &safe_key, &safe.to_le_bytes())?;
1195 }
1196
1197 if let Some(finalized) = finalized {
1198 let finalized_key = chain_data_key(ChainDataIndex::FinalizedBlockNumber);
1199 txn.put(CHAIN_DATA, &finalized_key, &finalized.to_le_bytes())?;
1200 }
1201
1202 txn.commit()
1203 })
1204 .await
1205 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
1206 }
1207
1208 pub async fn get_receipts_for_block(
1209 &self,
1210 block_hash: &BlockHash,
1211 ) -> Result<Vec<Receipt>, StoreError> {
1212 self.get_receipts_for_block_from_index(block_hash, 0, None)
1213 .await
1214 }
1215
1216 pub async fn get_receipts_for_block_from_index(
1225 &self,
1226 block_hash: &BlockHash,
1227 start_index: u64,
1228 max_count: Option<usize>,
1229 ) -> Result<Vec<Receipt>, StoreError> {
1230 let backend = self.backend.clone();
1231 let block_hash = *block_hash;
1232
1233 tokio::task::spawn_blocking(move || {
1234 let txn = backend.begin_read()?;
1235 let prefix = block_hash.as_bytes().to_vec();
1236 let mut seek_key = prefix.clone();
1239 seek_key.extend_from_slice(&start_index.to_be_bytes());
1240 let iter = txn.prefix_iterator(RECEIPTS_V2, &seek_key)?;
1241 let mut receipts = Vec::new();
1242 for result in iter {
1243 let (k, v) = result?;
1244 if !k.starts_with(&prefix) {
1245 break;
1246 }
1247 if k.len() != 40 {
1248 continue;
1249 }
1250 receipts.push(Receipt::decode(v.as_ref())?);
1251 if let Some(max) = max_count
1252 && receipts.len() >= max
1253 {
1254 break;
1255 }
1256 }
1257 Ok(receipts)
1258 })
1259 .await
1260 .map_err(|e| StoreError::Custom(format!("Task panicked: {e}")))?
1261 }
1262
1263 pub async fn set_header_download_checkpoint(
1267 &self,
1268 block_hash: BlockHash,
1269 ) -> Result<(), StoreError> {
1270 let key = snap_state_key(SnapStateIndex::HeaderDownloadCheckpoint);
1271 let value = block_hash.encode_to_vec();
1272 self.write_async(SNAP_STATE, key, value).await
1273 }
1274
1275 pub async fn get_header_download_checkpoint(&self) -> Result<Option<BlockHash>, StoreError> {
1277 let key = snap_state_key(SnapStateIndex::HeaderDownloadCheckpoint);
1278 self.backend
1279 .begin_read()?
1280 .get(SNAP_STATE, &key)?
1281 .map(|bytes| H256::decode(bytes.as_slice()))
1282 .transpose()
1283 .map_err(StoreError::from)
1284 }
1285
1286 pub async fn set_latest_valid_ancestor(
1292 &self,
1293 bad_block: BlockHash,
1294 latest_valid: BlockHash,
1295 ) -> Result<(), StoreError> {
1296 let value = latest_valid.encode_to_vec();
1297 self.write_async(INVALID_CHAINS, bad_block.as_bytes().to_vec(), value)
1298 .await
1299 }
1300
1301 pub async fn get_latest_valid_ancestor(
1304 &self,
1305 block: BlockHash,
1306 ) -> Result<Option<BlockHash>, StoreError> {
1307 self.read_async(INVALID_CHAINS, block.as_bytes().to_vec())
1308 .await?
1309 .map(|bytes| H256::decode(bytes.as_slice()))
1310 .transpose()
1311 .map_err(StoreError::from)
1312 }
1313
1314 pub fn get_block_number_sync(
1316 &self,
1317 block_hash: BlockHash,
1318 ) -> Result<Option<BlockNumber>, StoreError> {
1319 let txn = self.backend.begin_read()?;
1320 txn.get(BLOCK_NUMBERS, &block_hash.encode_to_vec())?
1321 .map(|bytes| -> Result<BlockNumber, StoreError> {
1322 let array: [u8; 8] = bytes
1323 .try_into()
1324 .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
1325 Ok(BlockNumber::from_le_bytes(array))
1326 })
1327 .transpose()
1328 }
1329
1330 pub fn get_canonical_block_hash_sync(
1332 &self,
1333 block_number: BlockNumber,
1334 ) -> Result<Option<BlockHash>, StoreError> {
1335 let last = self.latest_block_header.get();
1336 if last.number == block_number {
1337 return Ok(Some(last.hash()));
1338 }
1339 let txn = self.backend.begin_read()?;
1340 txn.get(
1341 CANONICAL_BLOCK_HASHES,
1342 block_number.to_le_bytes().as_slice(),
1343 )?
1344 .map(|bytes| H256::decode(bytes.as_slice()))
1345 .transpose()
1346 .map_err(StoreError::from)
1347 }
1348
1349 pub async fn write_storage_trie_nodes_batch(
1352 &self,
1353 storage_trie_nodes: StorageUpdates,
1354 ) -> Result<(), StoreError> {
1355 let mut txn = self.backend.begin_write()?;
1356 tokio::task::spawn_blocking(move || {
1357 for (address_hash, nodes) in storage_trie_nodes {
1358 for (node_path, node_data) in nodes {
1359 let key = apply_prefix(Some(address_hash), node_path);
1360 if node_data.is_empty() {
1361 txn.delete(STORAGE_TRIE_NODES, key.as_ref())?;
1362 } else {
1363 txn.put(STORAGE_TRIE_NODES, key.as_ref(), &node_data)?;
1364 }
1365 }
1366 }
1367 txn.commit()
1368 })
1369 .await
1370 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
1371 }
1372
1373 pub async fn write_account_code_batch(
1376 &self,
1377 account_codes: Vec<(H256, Code)>,
1378 ) -> Result<(), StoreError> {
1379 let mut code_batch_items = Vec::new();
1380 let mut metadata_batch_items = Vec::new();
1381
1382 for (code_hash, code) in account_codes {
1383 let buf = encode_code(&code);
1384 let metadata_buf = (code.len() as u64).to_be_bytes().to_vec();
1385 code_batch_items.push((code_hash.as_bytes().to_vec(), buf));
1386 metadata_batch_items.push((code_hash.as_bytes().to_vec(), metadata_buf));
1387 }
1388
1389 self.write_batch_async(ACCOUNT_CODES, code_batch_items)
1391 .await?;
1392 self.write_batch_async(ACCOUNT_CODE_METADATA, metadata_batch_items)
1393 .await
1394 }
1395
1396 pub fn write(
1402 &self,
1403 table: &'static str,
1404 key: Vec<u8>,
1405 value: Vec<u8>,
1406 ) -> Result<(), StoreError> {
1407 let backend = self.backend.clone();
1408 let mut txn = backend.begin_write()?;
1409 txn.put(table, &key, &value)?;
1410 txn.commit()
1411 }
1412
1413 async fn write_async(
1416 &self,
1417 table: &'static str,
1418 key: Vec<u8>,
1419 value: Vec<u8>,
1420 ) -> Result<(), StoreError> {
1421 let backend = self.backend.clone();
1422
1423 tokio::task::spawn_blocking(move || {
1424 let mut txn = backend.begin_write()?;
1425 txn.put(table, &key, &value)?;
1426 txn.commit()
1427 })
1428 .await
1429 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
1430 }
1431
1432 pub async fn read_async(
1435 &self,
1436 table: &'static str,
1437 key: Vec<u8>,
1438 ) -> Result<Option<Vec<u8>>, StoreError> {
1439 let backend = self.backend.clone();
1440
1441 tokio::task::spawn_blocking(move || {
1442 let txn = backend.begin_read()?;
1443 txn.get(table, &key)
1444 })
1445 .await
1446 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
1447 }
1448
1449 pub fn read(&self, table: &'static str, key: Vec<u8>) -> Result<Option<Vec<u8>>, StoreError> {
1452 let backend = self.backend.clone();
1453 let txn = backend.begin_read()?;
1454 txn.get(table, &key)
1455 }
1456
1457 pub async fn write_batch_async(
1461 &self,
1462 table: &'static str,
1463 batch_ops: Vec<(Vec<u8>, Vec<u8>)>,
1464 ) -> Result<(), StoreError> {
1465 let backend = self.backend.clone();
1466
1467 tokio::task::spawn_blocking(move || {
1468 let mut txn = backend.begin_write()?;
1469 txn.put_batch(table, batch_ops)?;
1470 txn.commit()
1471 })
1472 .await
1473 .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
1474 }
1475
1476 pub fn write_batch(
1478 &self,
1479 table: &'static str,
1480 batch_ops: Vec<(Vec<u8>, Vec<u8>)>,
1481 ) -> Result<(), StoreError> {
1482 let backend = self.backend.clone();
1483 let mut txn = backend.begin_write()?;
1484 txn.put_batch(table, batch_ops)?;
1485 txn.commit()
1486 }
1487
1488 pub async fn add_fullsync_batch(&self, headers: Vec<BlockHeader>) -> Result<(), StoreError> {
1489 self.write_batch_async(
1490 FULLSYNC_HEADERS,
1491 headers
1492 .into_iter()
1493 .map(|header| (header.number.to_le_bytes().to_vec(), header.encode_to_vec()))
1494 .collect(),
1495 )
1496 .await
1497 }
1498
1499 pub async fn read_fullsync_batch(
1500 &self,
1501 start: BlockNumber,
1502 limit: u64,
1503 ) -> Result<Vec<Option<BlockHeader>>, StoreError> {
1504 let mut res = vec![];
1505 let read_tx = self.backend.begin_read()?;
1506 for key in start..start + limit {
1508 let header_opt = read_tx
1509 .get(FULLSYNC_HEADERS, &key.to_le_bytes())?
1510 .map(|header| BlockHeader::decode(&header))
1511 .transpose()?;
1512 res.push(header_opt);
1513 }
1514 Ok(res)
1515 }
1516
1517 pub async fn clear_fullsync_headers(&self) -> Result<(), StoreError> {
1518 self.backend.clear_table(FULLSYNC_HEADERS)
1519 }
1520
1521 pub fn delete(&self, table: &'static str, key: Vec<u8>) -> Result<(), StoreError> {
1523 let mut txn = self.backend.begin_write()?;
1524 txn.delete(table, &key)?;
1525 txn.commit()
1526 }
1527
1528 pub fn store_block_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> {
1529 self.apply_updates(update_batch)
1530 }
1531
1532 fn apply_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> {
1533 let db = self.backend.clone();
1534 let parent_state_root = self
1535 .get_block_header_by_hash(
1536 update_batch
1537 .blocks
1538 .first()
1539 .ok_or(StoreError::UpdateBatchNoBlocks)?
1540 .header
1541 .parent_hash,
1542 )?
1543 .map(|header| header.state_root)
1544 .unwrap_or_default();
1545 let last_state_root = update_batch
1546 .blocks
1547 .last()
1548 .ok_or(StoreError::UpdateBatchNoBlocks)?
1549 .header
1550 .state_root;
1551 let trie_upd_worker_tx = self.trie_update_worker_tx.clone();
1552
1553 let is_batch = update_batch.batch_mode;
1554
1555 let UpdateBatch {
1556 account_updates,
1557 storage_updates,
1558 ..
1559 } = update_batch;
1560
1561 let (notify_tx, notify_rx) = sync_channel(1);
1563 let wait_for_new_layer = notify_rx;
1564 let trie_update = TrieUpdate {
1565 parent_state_root,
1566 account_updates,
1567 storage_updates,
1568 result_sender: notify_tx,
1569 child_state_root: last_state_root,
1570 is_batch,
1571 };
1572 trie_upd_worker_tx
1573 .send(TrieMessage::Update(trie_update))
1574 .map_err(|e| {
1575 StoreError::Custom(format!("failed to read new trie layer notification: {e}"))
1576 })?;
1577 let mut tx = db.begin_write()?;
1578
1579 for block in update_batch.blocks {
1580 let block_number = block.header.number;
1581 let block_hash = block.hash();
1582 let hash_key = block_hash.encode_to_vec();
1583
1584 let header_value_rlp = BlockHeaderRLP::from(block.header.clone());
1585 tx.put(HEADERS, &hash_key, header_value_rlp.bytes())?;
1586
1587 let body_value = BlockBodyRLP::from_bytes(block.body.encode_to_vec());
1588 tx.put(BODIES, &hash_key, body_value.bytes())?;
1589
1590 tx.put(BLOCK_NUMBERS, &hash_key, &block_number.to_le_bytes())?;
1591
1592 for (index, transaction) in block.body.transactions.iter().enumerate() {
1593 tx.merge(
1594 TRANSACTION_LOCATIONS,
1595 transaction.hash().as_bytes(),
1596 &encode_tx_location_operand(block_number, block_hash, index as u64),
1597 )?;
1598 }
1599 }
1600
1601 for (block_hash, receipts) in update_batch.receipts {
1602 for (index, receipt) in receipts.into_iter().enumerate() {
1603 let key = receipt_key(&block_hash, index as u64);
1604 let value = receipt.encode_to_vec();
1605 tx.put(RECEIPTS_V2, &key, &value)?;
1606 }
1607 }
1608
1609 for (code_hash, code) in update_batch.code_updates {
1610 let buf = encode_code(&code);
1611 let metadata_buf = (code.len() as u64).to_be_bytes();
1612 tx.put(ACCOUNT_CODES, code_hash.as_ref(), &buf)?;
1613 tx.put(ACCOUNT_CODE_METADATA, code_hash.as_ref(), &metadata_buf)?;
1614 }
1615
1616 wait_for_new_layer
1619 .recv()
1620 .map_err(|e| StoreError::Custom(format!("recv failed: {e}")))??;
1621 tx.commit()?;
1623
1624 Ok(())
1625 }
1626
1627 pub fn new(path: impl AsRef<Path>, engine_type: EngineType) -> Result<Self, StoreError> {
1632 Self::new_with_config(path, engine_type, StoreConfig::default())
1633 }
1634
1635 pub fn new_with_config(
1637 path: impl AsRef<Path>,
1638 engine_type: EngineType,
1639 #[cfg_attr(not(feature = "rocksdb"), allow(unused_variables))] config: StoreConfig,
1641 ) -> Result<Self, StoreError> {
1642 let db_path = path.as_ref().to_path_buf();
1643
1644 if engine_type != EngineType::InMemory {
1645 let version = read_store_schema_version(&db_path)?;
1646
1647 match version {
1648 None if db_path.exists() && dir_contains_legacy_db(&db_path)? => {
1649 return Err(StoreError::NotFoundDBVersion);
1651 }
1652 None => {
1653 init_metadata_file(&db_path)?;
1659 }
1660 Some(v) if v < 1 => {
1661 return Err(StoreError::MigrationFailed {
1662 from: v,
1663 to: STORE_SCHEMA_VERSION,
1664 reason: format!("DB version v{v} is invalid (predates migrations)"),
1665 });
1666 }
1667 Some(v) if v > STORE_SCHEMA_VERSION => {
1668 return Err(StoreError::MigrationFailed {
1669 from: v,
1670 to: STORE_SCHEMA_VERSION,
1671 reason: format!(
1672 "DB version v{v} is more recent than the client expects (v{STORE_SCHEMA_VERSION}). Rolling back is not supported"
1673 ),
1674 });
1675 }
1676 #[cfg(feature = "rocksdb")]
1677 Some(v) if v < STORE_SCHEMA_VERSION => {
1678 let rocksdb = Arc::new(RocksDBBackend::open(
1682 &path,
1683 config.rocksdb_block_cache_size,
1684 )?);
1685 crate::migrations::run_pending_migrations(rocksdb.as_ref(), &db_path, v)?;
1686 rocksdb.drop_obsolete_cfs(&path);
1687 let backend: Arc<dyn crate::api::StorageBackend> = rocksdb;
1688 return Self::from_backend(backend, db_path, DB_COMMIT_THRESHOLD);
1689 }
1690 Some(_) => {
1691 }
1696 }
1697 }
1698
1699 match engine_type {
1700 #[cfg(feature = "rocksdb")]
1701 EngineType::RocksDB => {
1702 let rocksdb = RocksDBBackend::open(&path, config.rocksdb_block_cache_size)?;
1703 rocksdb.drop_obsolete_cfs(&path);
1704 let backend: Arc<dyn StorageBackend> = Arc::new(rocksdb);
1705 Self::from_backend(backend, db_path, DB_COMMIT_THRESHOLD)
1706 }
1707 EngineType::InMemory => {
1708 let backend = Arc::new(InMemoryBackend::open()?);
1709 Self::from_backend(backend, db_path, IN_MEMORY_COMMIT_THRESHOLD)
1710 }
1711 }
1712 }
1713
1714 fn from_backend(
1715 backend: Arc<dyn StorageBackend>,
1716 db_path: PathBuf,
1717 commit_threshold: usize,
1718 ) -> Result<Self, StoreError> {
1719 debug!("Initializing Store with {commit_threshold} in-memory diff-layers");
1720 let (fkv_tx, fkv_rx) = std::sync::mpsc::sync_channel(0);
1721 let (trie_upd_tx, trie_upd_rx) = std::sync::mpsc::sync_channel(0);
1722
1723 let last_written = {
1724 let tx = backend.begin_read()?;
1725 let last_written = tx
1726 .get(MISC_VALUES, "last_written".as_bytes())?
1727 .unwrap_or_else(|| vec![0u8; 64]);
1728 if last_written == [0xff] {
1729 vec![0xff; 64]
1730 } else {
1731 last_written
1732 }
1733 };
1734 let mut background_threads = Vec::new();
1735 let mut store = Self {
1736 db_path,
1737 backend,
1738 chain_config: Default::default(),
1739 latest_block_header: Default::default(),
1740 trie_cache: Arc::new(RwLock::new(Arc::new(TrieLayerCache::new(commit_threshold)))),
1741 flatkeyvalue_control_tx: fkv_tx,
1742 trie_update_worker_tx: trie_upd_tx,
1743 last_computed_flatkeyvalue: Arc::new(RwLock::new(last_written)),
1744 account_code_cache: Arc::new(Mutex::new(CodeCache::default())),
1745 code_metadata_cache: Arc::new(Mutex::new(rustc_hash::FxHashMap::default())),
1746 fcu_lock: Arc::new(tokio::sync::Mutex::new(())),
1747 background_threads: Default::default(),
1748 };
1749 let backend_clone = store.backend.clone();
1750 let last_computed_fkv = store.last_computed_flatkeyvalue.clone();
1751 background_threads.push(std::thread::spawn(move || {
1752 let rx = fkv_rx;
1753 loop {
1755 match rx.recv() {
1756 Ok(FKVGeneratorControlMessage::Continue) => break,
1757 Ok(FKVGeneratorControlMessage::Stop) => {}
1758 Err(std::sync::mpsc::RecvError) => {
1759 debug!("Closing FlatKeyValue generator.");
1760 return;
1761 }
1762 }
1763 }
1764
1765 let _ = flatkeyvalue_generator(&backend_clone, &last_computed_fkv, &rx)
1766 .inspect_err(|err| error!("Error while generating FlatKeyValue: {err}"));
1767 }));
1768 let backend = store.backend.clone();
1769 let flatkeyvalue_control_tx = store.flatkeyvalue_control_tx.clone();
1770 let trie_cache = store.trie_cache.clone();
1771 background_threads.push(std::thread::spawn(move || {
1792 let rx = trie_upd_rx;
1793 loop {
1794 match rx.recv() {
1795 Ok(TrieMessage::Update(trie_update)) => {
1796 let _ = apply_trie_updates(
1798 backend.as_ref(),
1799 &flatkeyvalue_control_tx,
1800 &trie_cache,
1801 trie_update,
1802 )
1803 .inspect_err(|err| error!("apply_trie_updates failed: {err}"));
1804 }
1805 Ok(TrieMessage::Ping) => {
1806 }
1809 Err(err) => {
1810 debug!("Trie update sender disconnected: {err}");
1811 return;
1812 }
1813 }
1814 }
1815 }));
1816 store.background_threads = Arc::new(ThreadList {
1817 list: background_threads,
1818 });
1819 Ok(store)
1820 }
1821
1822 pub async fn new_from_genesis(
1825 store_path: &Path,
1826 engine_type: EngineType,
1827 genesis_path: &str,
1828 ) -> Result<Self, StoreError> {
1829 Self::new_from_genesis_with_config(
1830 store_path,
1831 engine_type,
1832 genesis_path,
1833 StoreConfig::default(),
1834 )
1835 .await
1836 }
1837
1838 pub async fn new_from_genesis_with_config(
1841 store_path: &Path,
1842 engine_type: EngineType,
1843 genesis_path: &str,
1844 config: StoreConfig,
1845 ) -> Result<Self, StoreError> {
1846 let file = std::fs::File::open(genesis_path)
1847 .map_err(|error| StoreError::Custom(format!("Failed to open genesis file: {error}")))?;
1848 let reader = std::io::BufReader::new(file);
1849 let genesis: Genesis = serde_json::from_reader(reader)
1850 .map_err(|e| StoreError::Custom(format!("Failed to deserialize genesis file: {e}")))?;
1851 let mut store = Self::new_with_config(store_path, engine_type, config)?;
1852 store.add_initial_state(genesis).await?;
1853 Ok(store)
1854 }
1855
1856 pub async fn get_account_info(
1857 &self,
1858 block_number: BlockNumber,
1859 address: Address,
1860 ) -> Result<Option<AccountInfo>, StoreError> {
1861 match self.get_canonical_block_hash(block_number).await? {
1862 Some(block_hash) => self.get_account_info_by_hash(block_hash, address),
1863 None => Ok(None),
1864 }
1865 }
1866
1867 pub fn get_account_info_by_hash(
1868 &self,
1869 block_hash: BlockHash,
1870 address: Address,
1871 ) -> Result<Option<AccountInfo>, StoreError> {
1872 let Some(state_trie) = self.state_trie(block_hash)? else {
1873 return Ok(None);
1874 };
1875 let hashed_address = hash_address_fixed(&address);
1876
1877 let Some(encoded_state) = state_trie.get(hashed_address.as_bytes())? else {
1878 return Ok(None);
1879 };
1880
1881 let account_state = AccountState::decode(&encoded_state)?;
1882 Ok(Some(AccountInfo {
1883 code_hash: account_state.code_hash,
1884 balance: account_state.balance,
1885 nonce: account_state.nonce,
1886 }))
1887 }
1888
1889 pub fn get_account_state_by_acc_hash(
1890 &self,
1891 block_hash: BlockHash,
1892 account_hash: H256,
1893 ) -> Result<Option<AccountState>, StoreError> {
1894 let Some(state_trie) = self.state_trie(block_hash)? else {
1895 return Ok(None);
1896 };
1897 let Some(encoded_state) = state_trie.get(account_hash.as_bytes())? else {
1898 return Ok(None);
1899 };
1900 let account_state = AccountState::decode(&encoded_state)?;
1901 Ok(Some(account_state))
1902 }
1903
1904 pub async fn get_fork_id(&self) -> Result<ForkId, StoreError> {
1905 let chain_config = self.get_chain_config();
1906 let genesis_header = self
1907 .load_block_header(0)?
1908 .ok_or(StoreError::MissingEarliestBlockNumber)?;
1909 let block_header = self.latest_block_header.get();
1910
1911 Ok(ForkId::new(
1912 chain_config,
1913 genesis_header,
1914 block_header.timestamp,
1915 block_header.number,
1916 ))
1917 }
1918
1919 pub async fn get_code_by_account_address(
1920 &self,
1921 block_number: BlockNumber,
1922 address: Address,
1923 ) -> Result<Option<Code>, StoreError> {
1924 let Some(block_hash) = self.get_canonical_block_hash(block_number).await? else {
1925 return Ok(None);
1926 };
1927 let Some(state_trie) = self.state_trie(block_hash)? else {
1928 return Ok(None);
1929 };
1930 let hashed_address = hash_address_fixed(&address);
1931 let Some(encoded_state) = state_trie.get(hashed_address.as_bytes())? else {
1932 return Ok(None);
1933 };
1934 let account_state = AccountState::decode(&encoded_state)?;
1935 self.get_account_code(account_state.code_hash)
1936 }
1937
1938 pub async fn get_nonce_by_account_address(
1939 &self,
1940 block_number: BlockNumber,
1941 address: Address,
1942 ) -> Result<Option<u64>, StoreError> {
1943 let Some(block_hash) = self.get_canonical_block_hash(block_number).await? else {
1944 return Ok(None);
1945 };
1946 let Some(state_trie) = self.state_trie(block_hash)? else {
1947 return Ok(None);
1948 };
1949 let hashed_address = hash_address_fixed(&address);
1950 let Some(encoded_state) = state_trie.get(hashed_address.as_bytes())? else {
1951 return Ok(None);
1952 };
1953 let account_state = AccountState::decode(&encoded_state)?;
1954 Ok(Some(account_state.nonce))
1955 }
1956
1957 pub fn apply_account_updates_batch(
1960 &self,
1961 block_hash: BlockHash,
1962 account_updates: &[AccountUpdate],
1963 ) -> Result<Option<AccountUpdatesList>, StoreError> {
1964 let Some(mut state_trie) = self.state_trie(block_hash)? else {
1965 return Ok(None);
1966 };
1967
1968 Ok(Some(self.apply_account_updates_from_trie_batch(
1969 &mut state_trie,
1970 account_updates,
1971 )?))
1972 }
1973
1974 pub fn apply_account_updates_from_trie_batch<'a>(
1975 &self,
1976 state_trie: &mut Trie,
1977 account_updates: impl IntoIterator<Item = &'a AccountUpdate>,
1978 ) -> Result<AccountUpdatesList, StoreError> {
1979 let mut ret_storage_updates = Vec::new();
1980 let mut code_updates = Vec::new();
1981 let state_root = state_trie.hash_no_commit(&NativeCrypto);
1982 for update in account_updates {
1983 let hashed_address = hash_address_fixed(&update.address);
1984 if update.removed {
1985 state_trie.remove(hashed_address.as_bytes())?;
1987 continue;
1988 }
1989 let mut account_state = match state_trie.get(hashed_address.as_bytes())? {
1992 Some(encoded_state) => AccountState::decode(&encoded_state)?,
1993 None => AccountState::default(),
1994 };
1995 if update.removed_storage {
1996 account_state.storage_root = *EMPTY_TRIE_HASH;
1997 }
1998 if let Some(info) = &update.info {
1999 account_state.nonce = info.nonce;
2000 account_state.balance = info.balance;
2001 account_state.code_hash = info.code_hash;
2002 if let Some(code) = &update.code {
2004 code_updates.push((info.code_hash, code.clone()));
2005 }
2006 }
2007 if !update.added_storage.is_empty() {
2009 let mut storage_trie =
2010 self.open_storage_trie(hashed_address, state_root, account_state.storage_root)?;
2011 for (storage_key, storage_value) in &update.added_storage {
2012 let hashed_key = hash_key(storage_key);
2013 if storage_value.is_zero() {
2014 storage_trie.remove(&hashed_key)?;
2015 } else {
2016 storage_trie.insert(hashed_key, storage_value.encode_to_vec())?;
2017 }
2018 }
2019 let (storage_hash, storage_updates) =
2020 storage_trie.collect_changes_since_last_hash(&NativeCrypto);
2021 account_state.storage_root = storage_hash;
2022 ret_storage_updates.push((hashed_address, storage_updates));
2023 }
2024 state_trie.insert(
2025 hashed_address.as_bytes().to_vec(),
2026 account_state.encode_to_vec(),
2027 )?;
2028 }
2029 let (state_trie_hash, state_updates) =
2030 state_trie.collect_changes_since_last_hash(&NativeCrypto);
2031
2032 Ok(AccountUpdatesList {
2033 state_trie_hash,
2034 state_updates,
2035 storage_updates: ret_storage_updates,
2036 code_updates,
2037 })
2038 }
2039
2040 pub fn apply_account_updates_from_trie_with_witness(
2043 &self,
2044 mut state_trie: Trie,
2045 account_updates: &[AccountUpdate],
2046 mut storage_tries: StorageTries,
2047 ) -> Result<(StorageTries, AccountUpdatesList), StoreError> {
2048 let mut ret_storage_updates = Vec::new();
2049
2050 let mut code_updates = Vec::new();
2051
2052 let state_root = state_trie.hash_no_commit(&NativeCrypto);
2053
2054 for update in account_updates.iter() {
2055 let hashed_address = hash_address(&update.address);
2056
2057 if update.removed {
2058 state_trie.remove(&hashed_address)?;
2060
2061 continue;
2062 }
2063
2064 let mut account_state = match state_trie.get(&hashed_address)? {
2067 Some(encoded_state) => AccountState::decode(&encoded_state)?,
2068 None => AccountState::default(),
2069 };
2070
2071 if update.removed_storage {
2072 account_state.storage_root = *EMPTY_TRIE_HASH;
2073 }
2074
2075 if let Some(info) = &update.info {
2076 account_state.nonce = info.nonce;
2077
2078 account_state.balance = info.balance;
2079
2080 account_state.code_hash = info.code_hash;
2081
2082 if let Some(code) = &update.code {
2084 code_updates.push((info.code_hash, code.clone()));
2085 }
2086 }
2087
2088 if !update.added_storage.is_empty() {
2090 let (_witness, storage_trie) = match storage_tries.entry(update.address) {
2091 Entry::Occupied(value) => value.into_mut(),
2092 Entry::Vacant(vacant) => {
2093 let trie = self.open_storage_trie(
2094 H256::from_slice(&hashed_address),
2095 state_root,
2096 account_state.storage_root,
2097 )?;
2098 vacant.insert(TrieLogger::open_trie(trie))
2099 }
2100 };
2101
2102 for (storage_key, storage_value) in &update.added_storage {
2103 let hashed_key = hash_key(storage_key);
2104
2105 if storage_value.is_zero() {
2106 storage_trie.remove(&hashed_key)?;
2107 } else {
2108 storage_trie.insert(hashed_key, storage_value.encode_to_vec())?;
2109 }
2110 }
2111
2112 let (storage_hash, storage_updates) =
2113 storage_trie.collect_changes_since_last_hash(&NativeCrypto);
2114
2115 account_state.storage_root = storage_hash;
2116
2117 ret_storage_updates.push((H256::from_slice(&hashed_address), storage_updates));
2118 }
2119
2120 state_trie.insert(hashed_address, account_state.encode_to_vec())?;
2121 }
2122
2123 let (state_trie_hash, state_updates) =
2124 state_trie.collect_changes_since_last_hash(&NativeCrypto);
2125
2126 let account_updates_list = AccountUpdatesList {
2127 state_trie_hash,
2128 state_updates,
2129 storage_updates: ret_storage_updates,
2130 code_updates,
2131 };
2132
2133 Ok((storage_tries, account_updates_list))
2134 }
2135
2136 pub async fn setup_genesis_state_trie(
2138 &self,
2139 genesis_accounts: BTreeMap<Address, GenesisAccount>,
2140 ) -> Result<H256, StoreError> {
2141 let mut storage_trie_nodes = vec![];
2142 let mut genesis_state_trie = self.open_direct_state_trie(*EMPTY_TRIE_HASH)?;
2143 for (address, account) in genesis_accounts {
2144 let hashed_address = hash_address(&address);
2145 let h256_hashed_address = H256::from_slice(&hashed_address);
2146
2147 let code = Code::from_bytecode(account.code, &NativeCrypto);
2149 let code_hash = code.hash;
2150 self.add_account_code(code).await?;
2151
2152 let mut storage_trie =
2154 self.open_direct_storage_trie(h256_hashed_address, *EMPTY_TRIE_HASH)?;
2155 for (storage_key, storage_value) in account.storage {
2156 if !storage_value.is_zero() {
2157 let hashed_key = hash_key(&H256(storage_key.to_big_endian()));
2158 storage_trie.insert(hashed_key, storage_value.encode_to_vec())?;
2159 }
2160 }
2161
2162 let (storage_root, storage_nodes) =
2163 storage_trie.collect_changes_since_last_hash(&NativeCrypto);
2164
2165 storage_trie_nodes.extend(
2166 storage_nodes
2167 .into_iter()
2168 .map(|(path, n)| (apply_prefix(Some(h256_hashed_address), path).into_vec(), n)),
2169 );
2170
2171 let account_state = AccountState {
2173 nonce: account.nonce,
2174 balance: account.balance,
2175 storage_root,
2176 code_hash,
2177 };
2178 genesis_state_trie.insert(hashed_address, account_state.encode_to_vec())?;
2179 }
2180
2181 let (state_root, account_trie_nodes) =
2182 genesis_state_trie.collect_changes_since_last_hash(&NativeCrypto);
2183 let account_trie_nodes = account_trie_nodes
2184 .into_iter()
2185 .map(|(path, n)| (apply_prefix(None, path).into_vec(), n))
2186 .collect::<Vec<_>>();
2187
2188 let mut tx = self.backend.begin_write()?;
2189 tx.put_batch(ACCOUNT_TRIE_NODES, account_trie_nodes)?;
2190 tx.put_batch(STORAGE_TRIE_NODES, storage_trie_nodes)?;
2191 tx.commit()?;
2192
2193 Ok(state_root)
2194 }
2195
2196 fn make_witness_key(block_number: u64, block_hash: &BlockHash) -> Vec<u8> {
2198 let mut composite_key = Vec::with_capacity(8 + 32);
2199 composite_key.extend_from_slice(&block_number.to_be_bytes());
2200 composite_key.extend_from_slice(block_hash.as_bytes());
2201 composite_key
2202 }
2203
2204 pub fn store_witness(
2210 &self,
2211 block_hash: BlockHash,
2212 block_number: u64,
2213 witness: ExecutionWitness,
2214 ) -> Result<(), StoreError> {
2215 let rpc_witness = RpcExecutionWitness::try_from(witness)?;
2217 let key = Self::make_witness_key(block_number, &block_hash);
2218 let value = serde_json::to_vec(&rpc_witness)?;
2219 self.write(EXECUTION_WITNESSES, key, value)?;
2220 self.cleanup_old_witnesses(block_number)
2222 }
2223
2224 fn cleanup_old_witnesses(&self, latest_block_number: u64) -> Result<(), StoreError> {
2225 if latest_block_number <= MAX_WITNESSES {
2227 return Ok(());
2228 }
2229
2230 let threshold = latest_block_number - MAX_WITNESSES;
2231
2232 if let Some(oldest_block_number) = self.get_oldest_witness_number()? {
2233 let prefix = oldest_block_number.to_be_bytes();
2234 let mut to_delete = Vec::new();
2235
2236 {
2237 let read_txn = self.backend.begin_read()?;
2238 let iter = read_txn.prefix_iterator(EXECUTION_WITNESSES, &prefix)?;
2239
2240 for item in iter {
2242 let (key, _value) = item?;
2243 let mut block_number_bytes = [0u8; 8];
2244 block_number_bytes.copy_from_slice(&key[0..8]);
2245 let block_number = u64::from_be_bytes(block_number_bytes);
2246 if block_number > threshold {
2247 break;
2248 }
2249 to_delete.push(key.to_vec());
2250 }
2251 }
2252
2253 for key in to_delete {
2254 self.delete(EXECUTION_WITNESSES, key)?;
2255 }
2256 };
2257
2258 self.update_oldest_witness_number(threshold + 1)?;
2259
2260 Ok(())
2261 }
2262
2263 fn update_oldest_witness_number(&self, oldest_block_number: u64) -> Result<(), StoreError> {
2264 self.write(
2265 MISC_VALUES,
2266 b"oldest_witness_block_number".to_vec(),
2267 oldest_block_number.to_le_bytes().to_vec(),
2268 )?;
2269 Ok(())
2270 }
2271
2272 fn get_oldest_witness_number(&self) -> Result<Option<u64>, StoreError> {
2273 let Some(value) = self.read(MISC_VALUES, b"oldest_witness_block_number".to_vec())? else {
2274 return Ok(None);
2275 };
2276
2277 let array: [u8; 8] = value.as_slice().try_into().map_err(|_| {
2278 StoreError::Custom("Invalid oldest witness block number bytes".to_string())
2279 })?;
2280 Ok(Some(u64::from_le_bytes(array)))
2281 }
2282
2283 pub fn get_witness_json_bytes(
2289 &self,
2290 block_number: u64,
2291 block_hash: BlockHash,
2292 ) -> Result<Option<Vec<u8>>, StoreError> {
2293 let key = Self::make_witness_key(block_number, &block_hash);
2294 self.read(EXECUTION_WITNESSES, key)
2295 }
2296
2297 pub fn get_witness_by_number_and_hash(
2302 &self,
2303 block_number: u64,
2304 block_hash: BlockHash,
2305 ) -> Result<Option<RpcExecutionWitness>, StoreError> {
2306 let key = Self::make_witness_key(block_number, &block_hash);
2307 match self.read(EXECUTION_WITNESSES, key)? {
2308 Some(value) => {
2309 let witness: RpcExecutionWitness = serde_json::from_slice(&value)?;
2310 Ok(Some(witness))
2311 }
2312 None => Ok(None),
2313 }
2314 }
2315
2316 pub fn store_block_access_list(
2318 &self,
2319 block_hash: BlockHash,
2320 bal: &BlockAccessList,
2321 ) -> Result<(), StoreError> {
2322 let key = block_hash.as_bytes().to_vec();
2323 let mut value = vec![];
2324 bal.encode(&mut value);
2325 self.write(BLOCK_ACCESS_LISTS, key, value)
2326 }
2327
2328 pub fn get_block_access_list(
2330 &self,
2331 block_hash: BlockHash,
2332 ) -> Result<Option<BlockAccessList>, StoreError> {
2333 let key = block_hash.as_bytes().to_vec();
2334 match self.read(BLOCK_ACCESS_LISTS, key)? {
2335 Some(value) => {
2336 let bal = BlockAccessList::decode(&value)
2337 .map_err(|e| StoreError::Custom(format!("Failed to decode BAL: {e}")))?;
2338 Ok(Some(bal))
2339 }
2340 None => Ok(None),
2341 }
2342 }
2343
2344 pub async fn add_initial_state(&mut self, genesis: Genesis) -> Result<(), StoreError> {
2345 self.add_initial_state_inner(genesis, false).await
2346 }
2347
2348 pub async fn add_initial_state_skip_validation(
2359 &mut self,
2360 genesis: Genesis,
2361 ) -> Result<(), StoreError> {
2362 self.add_initial_state_inner(genesis, true).await
2363 }
2364
2365 async fn add_initial_state_inner(
2366 &mut self,
2367 genesis: Genesis,
2368 skip_genesis_validation: bool,
2369 ) -> Result<(), StoreError> {
2370 debug!("Storing initial state from genesis");
2371
2372 let genesis_block = genesis.get_block();
2374 let genesis_block_number = genesis_block.header.number;
2375
2376 let genesis_hash = genesis_block.hash();
2377
2378 let stored_genesis_header = self.load_block_header(genesis_block_number)?;
2379
2380 self.set_chain_config(&genesis.config).await?;
2387
2388 if let Some(number) = self.load_latest_block_number().await? {
2390 let latest_block_header = self
2391 .load_block_header(number)?
2392 .ok_or_else(|| StoreError::MissingLatestBlockNumber)?;
2393 self.latest_block_header.update(latest_block_header);
2394 }
2395
2396 match stored_genesis_header {
2397 Some(header) if skip_genesis_validation => {
2398 info!(
2399 stored_genesis = %header.hash(),
2400 "Skipping genesis state validation; trusting the genesis header and state already stored in the datadir"
2401 );
2402 return Ok(());
2403 }
2404 Some(header) if header.hash() == genesis_hash => {
2405 info!("Received genesis file matching a previously stored one, nothing to do");
2406 return Ok(());
2407 }
2408 Some(_) => {
2409 error!(
2410 "The chain configuration stored in the database is incompatible with the provided configuration. If you intended to switch networks, choose another datadir or clear the database (e.g., run `ethrex removedb`) and try again."
2411 );
2412 return Err(StoreError::IncompatibleChainConfig);
2413 }
2414 None => {
2415 self.add_block_header(genesis_hash, genesis_block.header.clone())
2416 .await?
2417 }
2418 }
2419 let genesis_state_root = self.setup_genesis_state_trie(genesis.alloc).await?;
2422 debug_assert_eq!(genesis_state_root, genesis_block.header.state_root);
2423
2424 info!(hash = %genesis_hash, "Storing genesis block");
2426
2427 self.add_block(genesis_block).await?;
2428 self.update_earliest_block_number(genesis_block_number)
2429 .await?;
2430 self.forkchoice_update(vec![], genesis_block_number, genesis_hash, None, None)
2431 .await?;
2432 Ok(())
2433 }
2434
2435 pub async fn load_initial_state(&self) -> Result<(), StoreError> {
2436 info!("Loading initial state from DB");
2437 let Some(number) = self.load_latest_block_number().await? else {
2438 return Err(StoreError::MissingLatestBlockNumber);
2439 };
2440 let latest_block_header = self
2441 .load_block_header(number)?
2442 .ok_or_else(|| StoreError::Custom("latest block header is missing".to_string()))?;
2443 self.latest_block_header.update(latest_block_header);
2444 Ok(())
2445 }
2446
2447 pub fn get_storage_at(
2448 &self,
2449 block_number: BlockNumber,
2450 address: Address,
2451 storage_key: H256,
2452 ) -> Result<Option<U256>, StoreError> {
2453 match self.get_block_header(block_number)? {
2454 Some(header) => self.get_storage_at_root(header.state_root, address, storage_key),
2455 None => Ok(None),
2456 }
2457 }
2458
2459 pub fn get_storage_at_root(
2460 &self,
2461 state_root: H256,
2462 address: Address,
2463 storage_key: H256,
2464 ) -> Result<Option<U256>, StoreError> {
2465 let account_hash = hash_address_fixed(&address);
2466
2467 let read_view = self.backend.begin_read()?;
2469 let cache = self
2470 .trie_cache
2471 .read()
2472 .map_err(|_| StoreError::LockError)?
2473 .clone();
2474 let last_written = self.last_written()?;
2475 let use_fkv = Self::flatkeyvalue_computed_with_last_written(account_hash, &last_written);
2476
2477 let storage_root = if use_fkv {
2478 *EMPTY_TRIE_HASH
2480 } else {
2481 let state_trie = self.open_state_trie_shared(
2482 state_root,
2483 read_view.clone(),
2484 cache.clone(),
2485 last_written.clone(),
2486 )?;
2487 let Some(encoded_account) = state_trie.get(account_hash.as_bytes())? else {
2488 return Ok(None);
2489 };
2490 let account = AccountState::decode(&encoded_account)?;
2491 account.storage_root
2492 };
2493 let storage_trie = self.open_storage_trie_shared(
2494 account_hash,
2495 state_root,
2496 storage_root,
2497 read_view,
2498 cache,
2499 last_written,
2500 )?;
2501
2502 let hashed_key = hash_key_fixed(&storage_key);
2503 storage_trie
2504 .get(&hashed_key)?
2505 .map(|rlp| U256::decode(&rlp).map_err(StoreError::RLPDecode))
2506 .transpose()
2507 }
2508
2509 pub fn get_storage_at_root_with_known_storage_root(
2514 &self,
2515 state_root: H256,
2516 account_hash: H256,
2517 storage_root: H256,
2518 storage_key: H256,
2519 ) -> Result<Option<U256>, StoreError> {
2520 let read_view = self.backend.begin_read()?;
2521 let cache = self
2522 .trie_cache
2523 .read()
2524 .map_err(|_| StoreError::LockError)?
2525 .clone();
2526 let last_written = self.last_written()?;
2527 let storage_root =
2531 if Self::flatkeyvalue_computed_with_last_written(account_hash, &last_written) {
2532 *EMPTY_TRIE_HASH
2533 } else {
2534 storage_root
2535 };
2536 let storage_trie = self.open_storage_trie_shared(
2537 account_hash,
2538 state_root,
2539 storage_root,
2540 read_view,
2541 cache,
2542 last_written,
2543 )?;
2544
2545 let hashed_key = hash_key_fixed(&storage_key);
2546 storage_trie
2547 .get(&hashed_key)?
2548 .map(|rlp| U256::decode(&rlp).map_err(StoreError::RLPDecode))
2549 .transpose()
2550 }
2551
2552 pub fn get_chain_config(&self) -> ChainConfig {
2553 self.chain_config
2554 }
2555
2556 pub async fn get_latest_canonical_block_hash(&self) -> Result<Option<BlockHash>, StoreError> {
2557 Ok(Some(self.latest_block_header.get().hash()))
2558 }
2559
2560 pub async fn forkchoice_update(
2565 &self,
2566 new_canonical_blocks: Vec<(BlockNumber, BlockHash)>,
2567 head_number: BlockNumber,
2568 head_hash: BlockHash,
2569 safe: Option<BlockNumber>,
2570 finalized: Option<BlockNumber>,
2571 ) -> Result<(), StoreError> {
2572 let _guard = self.fcu_lock.lock().await;
2578
2579 let previous_head = self.latest_block_header.get();
2584 let new_head = self
2585 .load_block_header_by_hash(head_hash)?
2586 .ok_or_else(|| StoreError::MissingLatestBlockNumber)?;
2587 self.latest_block_header.update(new_head);
2588 if let Err(err) = self
2589 .forkchoice_update_inner(
2590 new_canonical_blocks,
2591 head_number,
2592 head_hash,
2593 safe,
2594 finalized,
2595 )
2596 .await
2597 {
2598 self.latest_block_header.update((*previous_head).clone());
2599 return Err(err);
2600 }
2601
2602 Ok(())
2603 }
2604
2605 pub fn state_trie(&self, block_hash: BlockHash) -> Result<Option<Trie>, StoreError> {
2607 let Some(header) = self.get_block_header_by_hash(block_hash)? else {
2608 return Ok(None);
2609 };
2610 Ok(Some(self.open_state_trie(header.state_root)?))
2611 }
2612
2613 pub fn storage_trie(
2615 &self,
2616 block_hash: BlockHash,
2617 address: Address,
2618 ) -> Result<Option<Trie>, StoreError> {
2619 let Some(header) = self.get_block_header_by_hash(block_hash)? else {
2620 return Ok(None);
2621 };
2622 let Some(state_trie) = self.state_trie(block_hash)? else {
2624 return Ok(None);
2625 };
2626 let hashed_address = hash_address_fixed(&address);
2627 let Some(encoded_account) = state_trie.get(hashed_address.as_bytes())? else {
2628 return Ok(None);
2629 };
2630 let account = AccountState::decode(&encoded_account)?;
2631 let storage_root = account.storage_root;
2633 Ok(Some(self.open_storage_trie(
2634 hashed_address,
2635 header.state_root,
2636 storage_root,
2637 )?))
2638 }
2639
2640 pub async fn get_account_state(
2641 &self,
2642 block_number: BlockNumber,
2643 address: Address,
2644 ) -> Result<Option<AccountState>, StoreError> {
2645 let Some(block_hash) = self.get_canonical_block_hash(block_number).await? else {
2646 return Ok(None);
2647 };
2648 let Some(state_trie) = self.state_trie(block_hash)? else {
2649 return Ok(None);
2650 };
2651 self.get_account_state_from_trie(&state_trie, address)
2652 }
2653
2654 pub fn get_account_state_by_root(
2655 &self,
2656 state_root: H256,
2657 address: Address,
2658 ) -> Result<Option<AccountState>, StoreError> {
2659 let state_trie = self.open_state_trie(state_root)?;
2660 self.get_account_state_from_trie(&state_trie, address)
2661 }
2662
2663 pub fn get_account_state_from_trie(
2664 &self,
2665 state_trie: &Trie,
2666 address: Address,
2667 ) -> Result<Option<AccountState>, StoreError> {
2668 let hashed_address = hash_address_fixed(&address);
2669 let Some(encoded_state) = state_trie.get(hashed_address.as_bytes())? else {
2670 return Ok(None);
2671 };
2672 Ok(Some(AccountState::decode(&encoded_state)?))
2673 }
2674
2675 pub async fn get_account_proof(
2680 &self,
2681 state_root: H256,
2682 address: Address,
2683 storage_keys: &[H256],
2684 ) -> Result<Option<AccountProof>, StoreError> {
2685 let state_trie = self.open_state_trie(state_root)?;
2690 let address_path = hash_address_fixed(&address);
2691 let proof = state_trie.get_proof(address_path.as_bytes())?;
2692 let account_opt = state_trie
2693 .get(address_path.as_bytes())?
2694 .map(|encoded_state| AccountState::decode(&encoded_state))
2695 .transpose()?;
2696
2697 let mut storage_proof = Vec::with_capacity(storage_keys.len());
2698
2699 if let Some(account) = &account_opt {
2700 let storage_trie =
2701 self.open_storage_trie(address_path, state_root, account.storage_root)?;
2702
2703 for key in storage_keys {
2704 let hashed_key = hash_key(key);
2705 let proof = storage_trie.get_proof(&hashed_key)?;
2706 let value = storage_trie
2707 .get(&hashed_key)?
2708 .map(|rlp| U256::decode(&rlp).map_err(StoreError::RLPDecode))
2709 .transpose()?
2710 .unwrap_or_default();
2711
2712 let slot_proof = StorageSlotProof {
2713 proof,
2714 key: *key,
2715 value,
2716 };
2717 storage_proof.push(slot_proof);
2718 }
2719 } else {
2720 storage_proof.extend(storage_keys.iter().map(|key| StorageSlotProof {
2721 proof: Vec::new(),
2722 key: *key,
2723 value: U256::zero(),
2724 }));
2725 }
2726 let account = account_opt.unwrap_or_default();
2727 let account_proof = AccountProof {
2728 proof,
2729 account,
2730 storage_proof,
2731 };
2732 Ok(Some(account_proof))
2733 }
2734
2735 pub fn iter_accounts_from(
2738 &self,
2739 state_root: H256,
2740 starting_address: H256,
2741 ) -> Result<impl Iterator<Item = (H256, AccountState)>, StoreError> {
2742 let mut iter = self.open_locked_state_trie(state_root)?.into_iter();
2743 iter.advance(starting_address.0.to_vec())?;
2744 Ok(iter.content().map_while(|(path, value)| {
2745 Some((H256::from_slice(&path), AccountState::decode(&value).ok()?))
2746 }))
2747 }
2748
2749 pub fn iter_accounts(
2752 &self,
2753 state_root: H256,
2754 ) -> Result<impl Iterator<Item = (H256, AccountState)>, StoreError> {
2755 self.iter_accounts_from(state_root, H256::zero())
2756 }
2757
2758 pub fn iter_storage_from(
2761 &self,
2762 state_root: H256,
2763 hashed_address: H256,
2764 starting_slot: H256,
2765 ) -> Result<Option<impl Iterator<Item = (H256, U256)>>, StoreError> {
2766 let state_trie = self.open_locked_state_trie(state_root)?;
2767 let Some(account_rlp) = state_trie.get(hashed_address.as_bytes())? else {
2768 return Ok(None);
2769 };
2770 let storage_root = AccountState::decode(&account_rlp)?.storage_root;
2771 let mut iter = self
2772 .open_locked_storage_trie(hashed_address, state_root, storage_root)?
2773 .into_iter();
2774 iter.advance(starting_slot.0.to_vec())?;
2775 Ok(Some(iter.content().map_while(|(path, value)| {
2776 Some((H256::from_slice(&path), U256::decode(&value).ok()?))
2777 })))
2778 }
2779
2780 pub fn iter_storage(
2783 &self,
2784 state_root: H256,
2785 hashed_address: H256,
2786 ) -> Result<Option<impl Iterator<Item = (H256, U256)>>, StoreError> {
2787 self.iter_storage_from(state_root, hashed_address, H256::zero())
2788 }
2789
2790 pub fn get_account_range_proof(
2791 &self,
2792 state_root: H256,
2793 starting_hash: H256,
2794 last_hash: Option<H256>,
2795 ) -> Result<Vec<Vec<u8>>, StoreError> {
2796 let state_trie = self.open_state_trie(state_root)?;
2797 let mut proof = state_trie.get_proof(starting_hash.as_bytes())?;
2798 if let Some(last_hash) = last_hash {
2799 proof.extend_from_slice(&state_trie.get_proof(last_hash.as_bytes())?);
2800 }
2801 Ok(proof)
2802 }
2803
2804 pub fn get_storage_range_proof(
2805 &self,
2806 state_root: H256,
2807 hashed_address: H256,
2808 starting_hash: H256,
2809 last_hash: Option<H256>,
2810 ) -> Result<Option<Vec<Vec<u8>>>, StoreError> {
2811 let state_trie = self.open_state_trie(state_root)?;
2812 let Some(account_rlp) = state_trie.get(hashed_address.as_bytes())? else {
2813 return Ok(None);
2814 };
2815 let storage_root = AccountState::decode(&account_rlp)?.storage_root;
2816 let storage_trie = self.open_storage_trie(hashed_address, state_root, storage_root)?;
2817 let mut proof = storage_trie.get_proof(starting_hash.as_bytes())?;
2818 if let Some(last_hash) = last_hash {
2819 proof.extend_from_slice(&storage_trie.get_proof(last_hash.as_bytes())?);
2820 }
2821 Ok(Some(proof))
2822 }
2823
2824 pub fn get_trie_nodes(
2831 &self,
2832 state_root: H256,
2833 paths: Vec<Vec<u8>>,
2834 byte_limit: u64,
2835 ) -> Result<Vec<Vec<u8>>, StoreError> {
2836 let Some(account_path) = paths.first() else {
2837 return Ok(vec![]);
2838 };
2839 let state_trie = self.open_state_trie(state_root)?;
2840 if paths.len() == 1 {
2842 let node = state_trie.get_node(account_path)?;
2844 return Ok(vec![node]);
2845 }
2846 let Some(account_state) = state_trie
2848 .get(account_path)?
2849 .map(|ref rlp| AccountState::decode(rlp))
2850 .transpose()?
2851 else {
2852 return Ok(vec![]);
2853 };
2854 let Ok(hashed_address) = account_path.clone().try_into().map(H256) else {
2856 return Ok(vec![]);
2857 };
2858 let storage_trie =
2859 self.open_storage_trie(hashed_address, state_root, account_state.storage_root)?;
2860 let mut nodes = vec![];
2862 let mut bytes_used = 0;
2863 for path in paths.iter().skip(1) {
2864 if bytes_used >= byte_limit {
2865 break;
2866 }
2867 let node = storage_trie.get_node(path)?;
2868 bytes_used += node.len() as u64;
2869 nodes.push(node);
2870 }
2871 Ok(nodes)
2872 }
2873
2874 pub fn new_state_trie_for_test(&self) -> Result<Trie, StoreError> {
2876 self.open_state_trie(*EMPTY_TRIE_HASH)
2877 }
2878
2879 pub fn open_state_trie(&self, state_root: H256) -> Result<Trie, StoreError> {
2885 let trie_db = TrieWrapper::new(
2886 state_root,
2887 self.trie_cache
2888 .read()
2889 .map_err(|_| StoreError::LockError)?
2890 .clone(),
2891 Box::new(BackendTrieDB::new_for_accounts(
2892 self.backend.clone(),
2893 self.last_written()?,
2894 )?),
2895 None,
2896 );
2897 Ok(Trie::open(Box::new(trie_db), state_root))
2898 }
2899
2900 pub fn open_direct_state_trie(&self, state_root: H256) -> Result<Trie, StoreError> {
2904 Ok(Trie::open(
2905 Box::new(BackendTrieDB::new_for_accounts(
2906 self.backend.clone(),
2907 self.last_written()?,
2908 )?),
2909 state_root,
2910 ))
2911 }
2912
2913 pub fn open_locked_state_trie(&self, state_root: H256) -> Result<Trie, StoreError> {
2917 let trie_db = TrieWrapper::new(
2918 state_root,
2919 self.trie_cache
2920 .read()
2921 .map_err(|_| StoreError::LockError)?
2922 .clone(),
2923 Box::new(state_trie_locked_backend(
2924 self.backend.as_ref(),
2925 self.last_written()?,
2926 )?),
2927 None,
2928 );
2929 Ok(Trie::open(Box::new(trie_db), state_root))
2930 }
2931
2932 pub fn open_storage_trie(
2935 &self,
2936 account_hash: H256,
2937 state_root: H256,
2938 storage_root: H256,
2939 ) -> Result<Trie, StoreError> {
2940 let trie_db = TrieWrapper::new(
2941 state_root,
2942 self.trie_cache
2943 .read()
2944 .map_err(|_| StoreError::LockError)?
2945 .clone(),
2946 Box::new(BackendTrieDB::new_for_storages(
2947 self.backend.clone(),
2948 self.last_written()?,
2949 )?),
2950 Some(account_hash),
2951 );
2952 Ok(Trie::open(Box::new(trie_db), storage_root))
2953 }
2954
2955 fn open_state_trie_shared(
2959 &self,
2960 state_root: H256,
2961 read_view: Arc<dyn StorageReadView>,
2962 cache: Arc<TrieLayerCache>,
2963 last_written: Vec<u8>,
2964 ) -> Result<Trie, StoreError> {
2965 let trie_db = TrieWrapper::new(
2966 state_root,
2967 cache,
2968 Box::new(BackendTrieDB::new_for_accounts_with_view(
2969 self.backend.clone(),
2970 read_view,
2971 last_written,
2972 )?),
2973 None,
2974 );
2975 Ok(Trie::open(Box::new(trie_db), state_root))
2976 }
2977
2978 fn open_storage_trie_shared(
2980 &self,
2981 account_hash: H256,
2982 state_root: H256,
2983 storage_root: H256,
2984 read_view: Arc<dyn StorageReadView>,
2985 cache: Arc<TrieLayerCache>,
2986 last_written: Vec<u8>,
2987 ) -> Result<Trie, StoreError> {
2988 let trie_db = TrieWrapper::new(
2989 state_root,
2990 cache,
2991 Box::new(BackendTrieDB::new_for_storages_with_view(
2992 self.backend.clone(),
2993 read_view,
2994 last_written,
2995 )?),
2996 Some(account_hash),
2997 );
2998 Ok(Trie::open(Box::new(trie_db), storage_root))
2999 }
3000
3001 pub fn open_direct_storage_trie(
3004 &self,
3005 account_hash: H256,
3006 storage_root: H256,
3007 ) -> Result<Trie, StoreError> {
3008 Ok(Trie::open(
3009 Box::new(BackendTrieDB::new_for_account_storage(
3010 self.backend.clone(),
3011 account_hash,
3012 self.last_written()?,
3013 )?),
3014 storage_root,
3015 ))
3016 }
3017
3018 pub fn open_locked_storage_trie(
3021 &self,
3022 account_hash: H256,
3023 state_root: H256,
3024 storage_root: H256,
3025 ) -> Result<Trie, StoreError> {
3026 let trie_db = TrieWrapper::new(
3027 state_root,
3028 self.trie_cache
3029 .read()
3030 .map_err(|_| StoreError::LockError)?
3031 .clone(),
3032 Box::new(state_trie_locked_backend(
3033 self.backend.as_ref(),
3034 self.last_written()?,
3035 )?),
3036 Some(account_hash),
3037 );
3038 Ok(Trie::open(Box::new(trie_db), storage_root))
3039 }
3040
3041 pub fn has_state_root(&self, state_root: H256) -> Result<bool, StoreError> {
3042 if state_root == *EMPTY_TRIE_HASH {
3044 return Ok(true);
3045 }
3046 let trie = self.open_state_trie(state_root)?;
3047 let Some(root) = trie.db().get(Nibbles::default())? else {
3049 return Ok(false);
3050 };
3051 let root_hash = ethrex_trie::Node::decode(&root)?
3052 .compute_hash(&NativeCrypto)
3053 .finalize(&NativeCrypto);
3054 Ok(state_root == root_hash)
3055 }
3056
3057 pub fn ancestors(&self, block_hash: BlockHash) -> AncestorIterator {
3060 AncestorIterator {
3061 store: self.clone(),
3062 next_hash: block_hash,
3063 }
3064 }
3065
3066 pub fn is_canonical_sync(&self, block_hash: BlockHash) -> Result<bool, StoreError> {
3068 let Some(block_number) = self.get_block_number_sync(block_hash)? else {
3069 return Ok(false);
3070 };
3071 Ok(self
3072 .get_canonical_block_hash_sync(block_number)?
3073 .is_some_and(|h| h == block_hash))
3074 }
3075
3076 pub fn generate_flatkeyvalue(&self) -> Result<(), StoreError> {
3077 self.flatkeyvalue_control_tx
3078 .send(FKVGeneratorControlMessage::Continue)
3079 .map_err(|_| StoreError::Custom("FlatKeyValue thread disconnected.".to_string()))
3080 }
3081
3082 pub fn create_checkpoint(&self, path: impl AsRef<Path>) -> Result<(), StoreError> {
3083 self.backend.create_checkpoint(path.as_ref())?;
3084 init_metadata_file(path.as_ref())?;
3085 Ok(())
3086 }
3087
3088 pub fn get_store_directory(&self) -> Result<PathBuf, StoreError> {
3089 Ok(self.db_path.clone())
3090 }
3091
3092 async fn load_latest_block_number(&self) -> Result<Option<BlockNumber>, StoreError> {
3094 let key = chain_data_key(ChainDataIndex::LatestBlockNumber);
3095 self.read_async(CHAIN_DATA, key)
3096 .await?
3097 .map(|bytes| -> Result<BlockNumber, StoreError> {
3098 let array: [u8; 8] = bytes
3099 .try_into()
3100 .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
3101 Ok(BlockNumber::from_le_bytes(array))
3102 })
3103 .transpose()
3104 }
3105
3106 fn load_canonical_block_hash(
3107 &self,
3108 block_number: BlockNumber,
3109 ) -> Result<Option<BlockHash>, StoreError> {
3110 let txn = self.backend.begin_read()?;
3111 txn.get(
3112 CANONICAL_BLOCK_HASHES,
3113 block_number.to_le_bytes().as_slice(),
3114 )?
3115 .map(|bytes| H256::decode(bytes.as_slice()))
3116 .transpose()
3117 .map_err(StoreError::from)
3118 }
3119
3120 fn load_block_header(
3121 &self,
3122 block_number: BlockNumber,
3123 ) -> Result<Option<BlockHeader>, StoreError> {
3124 let Some(block_hash) = self.load_canonical_block_hash(block_number)? else {
3125 return Ok(None);
3126 };
3127 self.load_block_header_by_hash(block_hash)
3128 }
3129
3130 fn load_block_header_by_hash(
3132 &self,
3133 block_hash: BlockHash,
3134 ) -> Result<Option<BlockHeader>, StoreError> {
3135 let txn = self.backend.begin_read()?;
3136 let hash_key = block_hash.encode_to_vec();
3137 let header_value = txn.get(HEADERS, hash_key.as_slice())?;
3138 let mut header = header_value
3139 .map(|bytes| BlockHeaderRLP::from_bytes(bytes).to())
3140 .transpose()
3141 .map_err(StoreError::from)?;
3142 header.as_mut().inspect(|h| {
3143 let _ = h.hash.set(block_hash);
3145 });
3146 Ok(header)
3147 }
3148
3149 pub fn last_written(&self) -> Result<Vec<u8>, StoreError> {
3150 let last_computed_flatkeyvalue = self
3151 .last_computed_flatkeyvalue
3152 .read()
3153 .map_err(|_| StoreError::LockError)?;
3154 Ok(last_computed_flatkeyvalue.clone())
3155 }
3156
3157 fn flatkeyvalue_computed_with_last_written(account: H256, last_written: &[u8]) -> bool {
3158 let account_nibbles = Nibbles::from_bytes(account.as_bytes());
3159 &last_written[0..64] > account_nibbles.as_ref()
3160 }
3161}
3162
3163type TrieNodesUpdate = Vec<(Nibbles, Vec<u8>)>;
3164
3165struct TrieUpdate {
3166 result_sender: std::sync::mpsc::SyncSender<Result<(), StoreError>>,
3167 parent_state_root: H256,
3168 child_state_root: H256,
3169 account_updates: TrieNodesUpdate,
3170 storage_updates: Vec<(H256, TrieNodesUpdate)>,
3171 is_batch: bool,
3172}
3173
3174enum TrieMessage {
3182 Update(TrieUpdate),
3183 Ping,
3184}
3185
3186fn apply_trie_updates(
3189 backend: &dyn StorageBackend,
3190 fkv_ctl: &SyncSender<FKVGeneratorControlMessage>,
3191 trie_cache: &Arc<RwLock<Arc<TrieLayerCache>>>,
3192 trie_update: TrieUpdate,
3193) -> Result<(), StoreError> {
3194 let TrieUpdate {
3195 result_sender,
3196 parent_state_root,
3197 child_state_root,
3198 account_updates,
3199 storage_updates,
3200 is_batch,
3201 } = trie_update;
3202
3203 let new_layer = storage_updates
3205 .into_iter()
3206 .flat_map(|(account_hash, nodes)| {
3207 nodes
3208 .into_iter()
3209 .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node))
3210 })
3211 .chain(account_updates)
3212 .collect();
3213 let trie = trie_cache
3215 .read()
3216 .map_err(|_| StoreError::LockError)?
3217 .clone();
3218 let mut trie_mut = (*trie).clone();
3219 trie_mut.put_batch(parent_state_root, child_state_root, new_layer);
3220 let trie = Arc::new(trie_mut);
3221 *trie_cache.write().map_err(|_| StoreError::LockError)? = trie.clone();
3222 result_sender
3224 .send(Ok(()))
3225 .map_err(|_| StoreError::LockError)?;
3226
3227 let commitable = if is_batch {
3229 trie.get_commitable_with_threshold(parent_state_root, BATCH_COMMIT_THRESHOLD)
3230 } else {
3231 trie.get_commitable(parent_state_root)
3232 };
3233 let Some(root) = commitable else {
3234 return Ok(());
3236 };
3237 let _ = fkv_ctl.send(FKVGeneratorControlMessage::Stop);
3240
3241 let mut trie_mut = (*trie).clone();
3243
3244 let last_written = backend
3245 .begin_read()?
3246 .get(MISC_VALUES, "last_written".as_bytes())?
3247 .unwrap_or_default();
3248
3249 let mut write_tx = backend.begin_write()?;
3250
3251 let nodes = trie_mut.commit(root).unwrap_or_default();
3256 let mut result = Ok(());
3257 for (key, value) in nodes {
3258 let is_leaf = key.len() == 65 || key.len() == 131;
3259 let is_account = key.len() <= 65;
3260
3261 if is_leaf && key > last_written {
3262 continue;
3263 }
3264 let table = if is_leaf {
3265 if is_account {
3266 &ACCOUNT_FLATKEYVALUE
3267 } else {
3268 &STORAGE_FLATKEYVALUE
3269 }
3270 } else if is_account {
3271 &ACCOUNT_TRIE_NODES
3272 } else {
3273 &STORAGE_TRIE_NODES
3274 };
3275 if value.is_empty() {
3276 result = write_tx.delete(table, &key);
3277 } else {
3278 result = write_tx.put(table, &key, &value);
3279 }
3280 if result.is_err() {
3281 break;
3282 }
3283 }
3284 if result.is_ok() {
3285 result = write_tx.commit();
3286 }
3287 let _ = fkv_ctl.send(FKVGeneratorControlMessage::Continue);
3289 result?;
3290 *trie_cache.write().map_err(|_| StoreError::LockError)? = Arc::new(trie_mut);
3292 Ok(())
3293}
3294
3295fn flatkeyvalue_generator(
3298 backend: &Arc<dyn StorageBackend>,
3299 last_computed_fkv: &RwLock<Vec<u8>>,
3300 control_rx: &std::sync::mpsc::Receiver<FKVGeneratorControlMessage>,
3301) -> Result<(), StoreError> {
3302 info!("Generation of FlatKeyValue started.");
3303 let initial_last_written = backend
3304 .begin_read()?
3305 .get(MISC_VALUES, "last_written".as_bytes())?
3306 .unwrap_or_default();
3307
3308 if initial_last_written.is_empty() {
3309 backend.clear_table(ACCOUNT_FLATKEYVALUE)?;
3311 backend.clear_table(STORAGE_FLATKEYVALUE)?;
3312 } else if initial_last_written == [0xff] {
3313 info!("FlatKeyValue already generated. Skipping.");
3315 return Ok(());
3316 }
3317
3318 loop {
3319 let read_tx = backend.begin_read()?;
3322 let root = read_tx
3323 .get(ACCOUNT_TRIE_NODES, &[])?
3324 .ok_or(StoreError::MissingLatestBlockNumber)?;
3325 let root: Node = ethrex_trie::Node::decode(&root)?;
3326 let state_root = root.compute_hash(&NativeCrypto).finalize(&NativeCrypto);
3327
3328 let last_written = read_tx
3329 .get(MISC_VALUES, "last_written".as_bytes())?
3330 .unwrap_or_default();
3331 let last_written_account = last_written
3332 .get(0..64)
3333 .map(|v| Nibbles::from_hex(v.to_vec()))
3334 .unwrap_or_default();
3335 let mut last_written_storage = last_written
3336 .get(66..130)
3337 .map(|v| Nibbles::from_hex(v.to_vec()))
3338 .unwrap_or_default();
3339
3340 debug!("Starting FlatKeyValue loop pivot={last_written:?} SR={state_root:x}");
3341
3342 let mut ctr = 0;
3343 let mut write_txn = backend.begin_write()?;
3344 let mut iter = Trie::open(
3345 Box::new(BackendTrieDB::new_for_accounts_with_view(
3346 backend.clone(),
3347 read_tx.clone(),
3348 last_written.clone(),
3349 )?),
3350 state_root,
3351 )
3352 .into_iter();
3353 if last_written_account > Nibbles::default() {
3354 iter.advance(last_written_account.to_bytes())?;
3355 }
3356 let res = iter.try_for_each(|(path, node)| -> Result<(), StoreError> {
3357 let Node::Leaf(node) = node else {
3358 return Ok(());
3359 };
3360 let account_state = AccountState::decode(&node.value)?;
3361 let account_hash = H256::from_slice(&path.to_bytes());
3362 write_txn.put(MISC_VALUES, "last_written".as_bytes(), path.as_ref())?;
3363 write_txn.put(ACCOUNT_FLATKEYVALUE, path.as_ref(), &node.value)?;
3364 ctr += 1;
3365 if ctr > 10_000 {
3366 write_txn.commit()?;
3367 write_txn = backend.begin_write()?;
3368 *last_computed_fkv
3369 .write()
3370 .map_err(|_| StoreError::LockError)? = path.as_ref().to_vec();
3371 ctr = 0;
3372 }
3373
3374 let mut iter_inner = Trie::open(
3375 Box::new(BackendTrieDB::new_for_account_storage_with_view(
3376 backend.clone(),
3377 read_tx.clone(),
3378 account_hash,
3379 path.as_ref().to_vec(),
3380 )?),
3381 account_state.storage_root,
3382 )
3383 .into_iter();
3384 if last_written_storage > Nibbles::default() {
3385 iter_inner.advance(last_written_storage.to_bytes())?;
3386 last_written_storage = Nibbles::default();
3387 }
3388 iter_inner.try_for_each(|(path, node)| -> Result<(), StoreError> {
3389 let Node::Leaf(node) = node else {
3390 return Ok(());
3391 };
3392 let key = apply_prefix(Some(account_hash), path);
3393 write_txn.put(MISC_VALUES, "last_written".as_bytes(), key.as_ref())?;
3394 write_txn.put(STORAGE_FLATKEYVALUE, key.as_ref(), &node.value)?;
3395 ctr += 1;
3396 if ctr > 10_000 {
3397 write_txn.commit()?;
3398 write_txn = backend.begin_write()?;
3399 *last_computed_fkv
3400 .write()
3401 .map_err(|_| StoreError::LockError)? = key.into_vec();
3402 ctr = 0;
3403 }
3404 fkv_check_for_stop_msg(control_rx)?;
3405 Ok(())
3406 })?;
3407 fkv_check_for_stop_msg(control_rx)?;
3408 Ok(())
3409 });
3410 match res {
3411 Err(StoreError::PivotChanged) => {
3412 match control_rx.recv() {
3413 Ok(FKVGeneratorControlMessage::Continue) => {}
3414 Ok(FKVGeneratorControlMessage::Stop) => {
3415 return Err(StoreError::Custom("Unexpected Stop message".to_string()));
3416 }
3417 Err(std::sync::mpsc::RecvError) => {
3419 info!("Store closed, stopping FlatKeyValue generation.");
3420 return Ok(());
3421 }
3422 }
3423 }
3424 Err(err) => return Err(err),
3425 Ok(()) => {
3426 write_txn.put(MISC_VALUES, "last_written".as_bytes(), &[0xff])?;
3427 write_txn.commit()?;
3428 *last_computed_fkv
3429 .write()
3430 .map_err(|_| StoreError::LockError)? = vec![0xff; 131];
3431 info!("FlatKeyValue generation finished.");
3432 return Ok(());
3433 }
3434 };
3435 }
3436}
3437
3438fn fkv_check_for_stop_msg(
3439 control_rx: &std::sync::mpsc::Receiver<FKVGeneratorControlMessage>,
3440) -> Result<(), StoreError> {
3441 match control_rx.try_recv() {
3442 Ok(FKVGeneratorControlMessage::Stop) | Err(TryRecvError::Disconnected) => {
3443 return Err(StoreError::PivotChanged);
3444 }
3445 Ok(FKVGeneratorControlMessage::Continue) => {
3446 return Err(StoreError::Custom(
3447 "Unexpected Continue message".to_string(),
3448 ));
3449 }
3450 Err(TryRecvError::Empty) => {}
3451 }
3452 Ok(())
3453}
3454
3455fn state_trie_locked_backend(
3456 backend: &dyn StorageBackend,
3457 last_written: Vec<u8>,
3458) -> Result<BackendTrieDBLocked, StoreError> {
3459 BackendTrieDBLocked::new(backend, last_written)
3461}
3462
3463pub struct AccountProof {
3464 pub proof: Vec<NodeRLP>,
3465 pub account: AccountState,
3466 pub storage_proof: Vec<StorageSlotProof>,
3467}
3468
3469pub struct StorageSlotProof {
3470 pub proof: Vec<NodeRLP>,
3471 pub key: H256,
3472 pub value: U256,
3473}
3474
3475pub struct AncestorIterator {
3476 store: Store,
3477 next_hash: BlockHash,
3478}
3479
3480impl Iterator for AncestorIterator {
3481 type Item = Result<(BlockHash, BlockHeader), StoreError>;
3482
3483 fn next(&mut self) -> Option<Self::Item> {
3484 let next_hash = self.next_hash;
3485 match self.store.load_block_header_by_hash(next_hash) {
3486 Ok(Some(header)) => {
3487 let ret_hash = self.next_hash;
3488 self.next_hash = header.parent_hash;
3489 Some(Ok((ret_hash, header)))
3490 }
3491 Ok(None) => None,
3492 Err(e) => Some(Err(e)),
3493 }
3494 }
3495}
3496
3497pub fn hash_address(address: &Address) -> Vec<u8> {
3498 keccak_hash(address.to_fixed_bytes()).to_vec()
3499}
3500
3501fn hash_address_fixed(address: &Address) -> H256 {
3502 keccak(address.to_fixed_bytes())
3503}
3504
3505pub fn hash_key(key: &H256) -> Vec<u8> {
3506 keccak_hash(key.to_fixed_bytes()).to_vec()
3507}
3508
3509pub fn hash_key_fixed(key: &H256) -> [u8; 32] {
3510 keccak_hash(key.to_fixed_bytes())
3511}
3512
3513fn chain_data_key(index: ChainDataIndex) -> Vec<u8> {
3514 (index as u8).encode_to_vec()
3515}
3516
3517fn snap_state_key(index: SnapStateIndex) -> Vec<u8> {
3518 (index as u8).encode_to_vec()
3519}
3520
3521pub fn receipt_key(block_hash: &BlockHash, index: u64) -> Vec<u8> {
3523 let mut key = Vec::with_capacity(40);
3524 key.extend_from_slice(block_hash.as_bytes());
3525 key.extend_from_slice(&index.to_be_bytes());
3526 key
3527}
3528
3529fn encode_code(code: &Code) -> Vec<u8> {
3530 let mut buf =
3531 Vec::with_capacity(6 + code.len() + std::mem::size_of_val::<[u32]>(&code.jump_targets));
3532 code.code().encode(&mut buf);
3533 code.jump_targets.to_vec().encode(&mut buf);
3536 buf
3537}
3538
3539#[derive(Debug, Default, Clone)]
3540struct LatestBlockHeaderCache {
3541 current: Arc<Mutex<Arc<BlockHeader>>>,
3542}
3543
3544impl LatestBlockHeaderCache {
3545 pub fn get(&self) -> Arc<BlockHeader> {
3546 self.current.lock().expect("poisoned mutex").clone()
3547 }
3548
3549 pub fn update(&self, header: BlockHeader) {
3550 let new = Arc::new(header);
3551 *self.current.lock().expect("poisoned mutex") = new;
3552 }
3553}
3554
3555#[derive(Debug, Serialize, Deserialize)]
3556pub struct StoreMetadata {
3557 pub schema_version: u64,
3558}
3559
3560impl StoreMetadata {
3561 pub fn new(schema_version: u64) -> Self {
3562 Self { schema_version }
3563 }
3564}
3565
3566fn read_store_schema_version(path: &Path) -> Result<Option<u64>, StoreError> {
3571 let metadata_path = path.join(STORE_METADATA_FILENAME);
3572 if !metadata_path.exists() {
3573 return Ok(None);
3574 }
3575 if !metadata_path.is_file() {
3576 return Err(StoreError::Custom(
3577 "store schema path exists but is not a file".to_string(),
3578 ));
3579 }
3580 let file_contents = std::fs::read_to_string(metadata_path)?;
3581 let metadata: StoreMetadata = serde_json::from_str(&file_contents)?;
3582 Ok(Some(metadata.schema_version))
3583}
3584
3585fn init_metadata_file(parent_path: &Path) -> Result<(), StoreError> {
3586 std::fs::create_dir_all(parent_path)?;
3587
3588 let metadata_path = parent_path.join(STORE_METADATA_FILENAME);
3589 let metadata = StoreMetadata::new(STORE_SCHEMA_VERSION);
3590 let serialized_metadata = serde_json::to_string_pretty(&metadata)?;
3591 let mut new_file = std::fs::File::create_new(metadata_path)?;
3592 new_file.write_all(serialized_metadata.as_bytes())?;
3593 Ok(())
3594}
3595
3596fn dir_contains_legacy_db(path: &Path) -> Result<bool, StoreError> {
3608 if path.join("CURRENT").is_file() {
3612 return Ok(true);
3613 }
3614 for entry in std::fs::read_dir(path)? {
3618 let entry = entry?;
3619 if !entry.file_type()?.is_file() {
3620 continue;
3621 }
3622 if entry.file_name().to_string_lossy().starts_with("MANIFEST-") {
3623 return Ok(true);
3624 }
3625 }
3626 Ok(false)
3627}
3628
3629pub fn has_valid_db(path: &Path) -> bool {
3633 let metadata_path = path.join(STORE_METADATA_FILENAME);
3634 if !metadata_path.is_file() {
3635 return false;
3636 }
3637 let Ok(contents) = std::fs::read_to_string(&metadata_path) else {
3638 return false;
3639 };
3640 let Ok(metadata) = serde_json::from_str::<StoreMetadata>(&contents) else {
3641 return false;
3642 };
3643 metadata.schema_version >= 1 && metadata.schema_version <= STORE_SCHEMA_VERSION
3644}
3645
3646pub fn read_chain_id_from_db(path: &Path) -> Option<u64> {
3655 if !has_valid_db(path) {
3656 return None;
3657 }
3658 #[cfg(feature = "rocksdb")]
3659 {
3660 let backend = match RocksDBBackend::open(path, DEFAULT_ROCKSDB_BLOCK_CACHE_SIZE_BYTES) {
3663 Ok(backend) => backend,
3664 Err(e) => {
3665 warn!("Failed to open RocksDB at {path:?} to read chain ID: {e}");
3666 return None;
3667 }
3668 };
3669 let read = match backend.begin_read() {
3670 Ok(read) => read,
3671 Err(e) => {
3672 warn!("Failed to begin read transaction at {path:?}: {e}");
3673 return None;
3674 }
3675 };
3676 let key = chain_data_key(ChainDataIndex::ChainConfig);
3677 let bytes = match read.get(CHAIN_DATA, &key) {
3678 Ok(Some(bytes)) => bytes,
3679 Ok(None) => {
3680 warn!("Chain config entry not found in database at {path:?}");
3681 return None;
3682 }
3683 Err(e) => {
3684 warn!("Failed to read chain config from database at {path:?}: {e}");
3685 return None;
3686 }
3687 };
3688 #[derive(serde::Deserialize)]
3693 #[serde(rename_all = "camelCase")]
3694 struct ChainIdOnly {
3695 chain_id: u64,
3696 }
3697 match serde_json::from_slice::<ChainIdOnly>(&bytes) {
3698 Ok(partial) => Some(partial.chain_id),
3699 Err(e) => {
3700 warn!("Failed to deserialize chain ID from database at {path:?}: {e}");
3701 None
3702 }
3703 }
3704 }
3705 #[cfg(not(feature = "rocksdb"))]
3706 {
3707 let _ = path;
3708 None
3709 }
3710}
3711
3712#[cfg(test)]
3713mod merge_tests {
3714 use super::*;
3715
3716 fn h256(b: u8) -> H256 {
3717 H256::from_low_u64_be(b as u64)
3718 }
3719
3720 fn op(bn: BlockNumber, bh: H256, idx: Index) -> Vec<u8> {
3721 encode_tx_location_operand(bn, bh, idx)
3722 }
3723
3724 fn decode(v: &[u8]) -> Vec<(BlockNumber, BlockHash, Index)> {
3725 <Vec<(BlockNumber, BlockHash, Index)>>::decode(v).unwrap()
3726 }
3727
3728 #[test]
3729 fn single_operand_on_empty_base() {
3730 let out = tx_locations_merge(None, vec![op(100, h256(0x10), 0)]).unwrap();
3731 assert_eq!(decode(&out), vec![(100, h256(0x10), 0)]);
3732 }
3733
3734 #[test]
3735 fn operand_appended_to_existing_base() {
3736 let base = vec![(100u64, h256(0x10), 0u64)].encode_to_vec();
3737 let out = tx_locations_merge(Some(&base), vec![op(101, h256(0x11), 5)]).unwrap();
3738 let mut got = decode(&out);
3739 got.sort();
3740 let mut want = vec![(100, h256(0x10), 0), (101, h256(0x11), 5)];
3741 want.sort();
3742 assert_eq!(got, want);
3743 }
3744
3745 #[test]
3746 fn multiple_operands_combined() {
3747 let out = tx_locations_merge(
3748 None,
3749 vec![
3750 op(100, h256(0x10), 0),
3751 op(100, h256(0x11), 1),
3752 op(101, h256(0x12), 2),
3753 ],
3754 )
3755 .unwrap();
3756 assert_eq!(decode(&out).len(), 3);
3757 }
3758
3759 #[test]
3760 fn same_block_hash_is_deduped() {
3761 let out =
3763 tx_locations_merge(None, vec![op(100, h256(0x10), 0), op(100, h256(0x10), 7)]).unwrap();
3764 assert_eq!(decode(&out), vec![(100, h256(0x10), 7)]);
3765 }
3766
3767 #[test]
3768 fn malformed_operand_aborts_merge() {
3769 let out = tx_locations_merge(None, vec![vec![0xff, 0xff], op(100, h256(0x10), 0)]);
3772 assert!(out.is_none(), "merge must abort on a malformed operand");
3773 }
3774
3775 #[test]
3776 fn malformed_base_value_aborts_merge() {
3777 let out = tx_locations_merge(Some(&[0xff, 0xff]), vec![op(100, h256(0x10), 0)]);
3778 assert!(out.is_none(), "merge must abort on a corrupt base value");
3779 }
3780
3781 #[test]
3788 fn partial_merge_result_is_a_valid_operand() {
3789 let partial =
3791 tx_locations_merge(None, vec![op(100, h256(0x10), 0), op(101, h256(0x11), 1)]).unwrap();
3792
3793 let base = vec![(99u64, h256(0x09), 9u64)].encode_to_vec();
3797 let out = tx_locations_merge(Some(&base), vec![partial]).unwrap();
3798
3799 let mut got = decode(&out);
3800 got.sort();
3801 let mut want = vec![
3802 (99, h256(0x09), 9),
3803 (100, h256(0x10), 0),
3804 (101, h256(0x11), 1),
3805 ];
3806 want.sort();
3807 assert_eq!(
3808 got, want,
3809 "no entries may be lost when re-merging a partial result"
3810 );
3811 }
3812
3813 #[test]
3816 fn operand_encoding_matches_value_encoding() {
3817 let operand = op(100, h256(0x10), 3);
3818 assert_eq!(decode(&operand), vec![(100, h256(0x10), 3)]);
3820 }
3821
3822 #[test]
3824 fn chained_partial_merges() {
3825 let p1 = tx_locations_merge(None, vec![op(1, h256(0x01), 0)]).unwrap();
3826 let p2 = tx_locations_merge(None, vec![p1, op(2, h256(0x02), 0)]).unwrap();
3827 let p3 = tx_locations_merge(None, vec![p2, op(3, h256(0x03), 0)]).unwrap();
3828 let out = tx_locations_merge(None, vec![p3]).unwrap();
3829 assert_eq!(decode(&out).len(), 3);
3830 }
3831}
3832
3833#[cfg(test)]
3834mod datadir_tests {
3835 use super::*;
3836 use std::fs;
3837
3838 #[test]
3839 fn empty_dir_has_no_existing_db() {
3840 let dir = tempfile::tempdir().unwrap();
3841 assert!(!dir_contains_legacy_db(dir.path()).unwrap());
3842 }
3843
3844 #[test]
3845 fn dir_with_only_unrelated_files_has_no_existing_db() {
3846 let dir = tempfile::tempdir().unwrap();
3849 fs::write(dir.path().join("jwt.hex"), "0xdeadbeef").unwrap();
3850 fs::write(dir.path().join("LOG"), "noise").unwrap();
3851 assert!(!dir_contains_legacy_db(dir.path()).unwrap());
3852 }
3853
3854 #[test]
3855 fn dir_with_rocksdb_markers_has_existing_db() {
3856 let dir = tempfile::tempdir().unwrap();
3858 fs::write(dir.path().join("CURRENT"), "MANIFEST-000001\n").unwrap();
3859 assert!(dir_contains_legacy_db(dir.path()).unwrap());
3860
3861 let dir2 = tempfile::tempdir().unwrap();
3862 fs::write(dir2.path().join("MANIFEST-000007"), "x").unwrap();
3863 assert!(dir_contains_legacy_db(dir2.path()).unwrap());
3864 }
3865
3866 #[test]
3867 fn dir_with_marker_named_subdirectories_has_no_existing_db() {
3868 let dir = tempfile::tempdir().unwrap();
3871 fs::create_dir(dir.path().join("CURRENT")).unwrap();
3872 fs::create_dir(dir.path().join("MANIFEST-000001")).unwrap();
3873 assert!(!dir_contains_legacy_db(dir.path()).unwrap());
3874 }
3875}