Skip to main content

signet_hot/db/
consistent.rs

1use crate::{
2    db::{HistoryError, UnsafeDbWrite, UnsafeHistoryWrite},
3    tables,
4};
5use ahash::AHashSet;
6use alloy::{
7    consensus::Sealable,
8    genesis::{Genesis, GenesisAccount},
9    primitives::{Address, B256, BlockNumber, U256, address},
10};
11use signet_storage_types::{Account, BlockNumberList, EthereumHardfork, SealedHeader};
12use trevm::revm::{database::BundleState, state::Bytecode};
13
14/// Maximum address value (all bits set to 1).
15const ADDRESS_MAX: Address = address!("0xffffffffffffffffffffffffffffffffffffffff");
16
17/// Trait for database write operations on hot history tables. This trait
18/// maintains a consistent state of the database.
19pub trait HistoryWrite: UnsafeDbWrite + UnsafeHistoryWrite {
20    /// Validate that a range of headers forms a valid chain extension.
21    ///
22    /// Headers must be in order and each must extend the previous.
23    /// The first header must extend the current database tip (or be the first
24    /// block if the database is empty).
25    ///
26    /// Returns `Ok(())` if valid, or an error describing the inconsistency.
27    fn validate_chain_extension<'a, I>(&self, headers: I) -> Result<(), HistoryError<Self::Error>>
28    where
29        I: IntoIterator<Item = &'a SealedHeader>,
30    {
31        let mut iter = headers.into_iter();
32        let first = iter.next().ok_or(HistoryError::EmptyRange)?;
33
34        // Validate first header against current DB tip
35        match self.get_chain_tip().map_err(HistoryError::Db)? {
36            None => {
37                // Empty DB - first block is valid as genesis
38            }
39            Some((tip_number, tip_hash)) => {
40                let expected_number = tip_number + 1;
41                if first.number != expected_number {
42                    return Err(HistoryError::NonContiguousBlock {
43                        expected: expected_number,
44                        got: first.number,
45                    });
46                }
47                if first.parent_hash != tip_hash {
48                    return Err(HistoryError::ParentHashMismatch {
49                        expected: tip_hash,
50                        got: first.parent_hash,
51                    });
52                }
53            }
54        }
55
56        // Validate each subsequent header extends the previous using fold
57        iter.try_fold(first, |prev, curr| {
58            let expected_number = prev.number + 1;
59            if curr.number != expected_number {
60                return Err(HistoryError::NonContiguousBlock {
61                    expected: expected_number,
62                    got: curr.number,
63                });
64            }
65
66            let expected_hash = prev.hash();
67            if curr.parent_hash != expected_hash {
68                return Err(HistoryError::ParentHashMismatch {
69                    expected: expected_hash,
70                    got: curr.parent_hash,
71                });
72            }
73
74            Ok(curr)
75        })?;
76
77        Ok(())
78    }
79
80    /// Append a range of blocks and their associated state to the database.
81    fn append_blocks<'a>(
82        &self,
83        blocks: impl IntoIterator<Item = (&'a SealedHeader, &'a BundleState)>,
84    ) -> Result<(), HistoryError<Self::Error>> {
85        let mut iter = blocks.into_iter();
86
87        let Some((first_header, first_bundle)) = iter.next() else {
88            return Err(HistoryError::EmptyRange);
89        };
90
91        // Validate first header against DB tip
92        match self.get_chain_tip().map_err(HistoryError::Db)? {
93            None => { /* Empty DB - first block is valid as genesis */ }
94            Some((tip_number, tip_hash)) => {
95                let expected_number = tip_number + 1;
96                if first_header.number != expected_number {
97                    return Err(HistoryError::NonContiguousBlock {
98                        expected: expected_number,
99                        got: first_header.number,
100                    });
101                }
102                if first_header.parent_hash != tip_hash {
103                    return Err(HistoryError::ParentHashMismatch {
104                        expected: tip_hash,
105                        got: first_header.parent_hash,
106                    });
107                }
108            }
109        }
110
111        // Write first block and track range
112        self.append_block_inconsistent(first_header, first_bundle)?;
113        let first_num = first_header.number;
114        let mut last_num = first_num;
115        let mut prev = first_header;
116
117        // Process remaining: validate chain continuity and write in one pass
118        for (header, bundle) in iter {
119            let expected_number = prev.number + 1;
120            if header.number != expected_number {
121                return Err(HistoryError::NonContiguousBlock {
122                    expected: expected_number,
123                    got: header.number,
124                });
125            }
126            let expected_hash = prev.hash();
127            if header.parent_hash != expected_hash {
128                return Err(HistoryError::ParentHashMismatch {
129                    expected: expected_hash,
130                    got: header.parent_hash,
131                });
132            }
133
134            self.append_block_inconsistent(header, bundle)?;
135            last_num = header.number;
136            prev = header;
137        }
138
139        self.update_history_indices_inconsistent(first_num..=last_num)
140    }
141
142    /// Unwind all data above the given block number.
143    ///
144    /// This completely reverts the database state to what it was at block
145    /// `block`, including:
146    /// - Plain account state
147    /// - Plain storage state
148    /// - Headers and header number mappings
149    /// - Account and storage change sets
150    /// - Account and storage history indices
151    fn unwind_above(&self, block: BlockNumber) -> Result<(), HistoryError<Self::Error>> {
152        let first_block = block + 1;
153        let Some(last_block) = self.last_block_number()? else {
154            return Ok(());
155        };
156
157        if first_block > last_block {
158            return Ok(());
159        }
160
161        // ═══════════════════════════════════════════════════════════════════
162        // 1. STREAM AccountChangeSets → restore + filter history in one pass
163        // ═══════════════════════════════════════════════════════════════════
164        // TODO: estimate capacity from block range size for better allocation
165        let mut seen_accounts: AHashSet<Address> = AHashSet::new();
166        let mut account_cursor = self.traverse_dual::<tables::AccountChangeSets>()?;
167
168        // Position at first entry
169        let mut current = account_cursor.next_dual_above(&first_block, &Address::ZERO)?;
170
171        while let Some((block_num, address, old_account)) = current {
172            if block_num > last_block {
173                break;
174            }
175
176            // First occurrence = process both plain state and history
177            if seen_accounts.insert(address) {
178                // Restore plain state
179                if old_account.is_empty() {
180                    self.queue_delete::<tables::PlainAccountState>(&address)?;
181                } else {
182                    self.put_account(&address, &old_account)?;
183                }
184
185                // Filter history index
186                if let Some((shard_key, list)) = self.last_account_history(address)? {
187                    self.queue_delete_dual::<tables::AccountsHistory>(&address, &shard_key)?;
188                    let mut filtered = list.iter().take_while(|&bn| bn <= block).peekable();
189                    if filtered.peek().is_some() {
190                        self.write_account_history(
191                            &address,
192                            u64::MAX,
193                            &BlockNumberList::new_pre_sorted(filtered),
194                        )?;
195                    }
196                }
197            }
198
199            current = account_cursor.read_next()?;
200        }
201
202        // ═══════════════════════════════════════════════════════════════════
203        // 2. STREAM StorageChangeSets → restore + filter history in one pass
204        // ═══════════════════════════════════════════════════════════════════
205        // TODO: estimate capacity from block range size for better allocation
206        let mut seen_storage: AHashSet<(Address, U256)> = AHashSet::new();
207        let mut storage_cursor = self.traverse_dual::<tables::StorageChangeSets>()?;
208
209        // Position at first entry
210        let mut current_storage =
211            storage_cursor.next_dual_above(&(first_block, Address::ZERO), &U256::ZERO)?;
212
213        while let Some(((block_num, address), slot, old_value)) = current_storage {
214            if block_num > last_block {
215                break;
216            }
217
218            if seen_storage.insert((address, slot)) {
219                // Restore plain state
220                if old_value.is_zero() {
221                    self.queue_delete_dual::<tables::PlainStorageState>(&address, &slot)?;
222                } else {
223                    self.put_storage(&address, &slot, &old_value)?;
224                }
225
226                // Filter history index
227                if let Some((shard_key, list)) = self.last_storage_history(&address, &slot)? {
228                    self.queue_delete_dual::<tables::StorageHistory>(&address, &shard_key)?;
229                    let mut filtered = list.iter().take_while(|&bn| bn <= block).peekable();
230                    if filtered.peek().is_some() {
231                        self.write_storage_history(
232                            &address,
233                            slot,
234                            u64::MAX,
235                            &BlockNumberList::new_pre_sorted(filtered),
236                        )?;
237                    }
238                }
239            }
240
241            current_storage = storage_cursor.read_next()?;
242        }
243
244        // ═══════════════════════════════════════════════════════════════════
245        // 3. DELETE changeset ranges
246        // ═══════════════════════════════════════════════════════════════════
247        self.traverse_dual_mut::<tables::AccountChangeSets>()?
248            .delete_range((first_block, Address::ZERO)..=(last_block, ADDRESS_MAX))?;
249        self.traverse_dual_mut::<tables::StorageChangeSets>()?.delete_range(
250            ((first_block, Address::ZERO), U256::ZERO)..=((last_block, ADDRESS_MAX), U256::MAX),
251        )?;
252
253        // ═══════════════════════════════════════════════════════════════════
254        // 4. STREAM Headers → delete HeaderNumbers, then clear Headers
255        // ═══════════════════════════════════════════════════════════════════
256        let mut header_cursor = self.traverse::<tables::Headers>()?;
257
258        // Position at first entry and process it
259        let first_entry = header_cursor.lower_bound(&first_block)?;
260        if let Some((block_num, header)) = first_entry
261            && block_num <= last_block
262        {
263            self.delete_header_number(&header.hash())?;
264
265            // Continue with remaining entries
266            while let Some((block_num, header)) = header_cursor.read_next()? {
267                if block_num > last_block {
268                    break;
269                }
270                self.delete_header_number(&header.hash())?;
271            }
272        }
273        self.traverse_mut::<tables::Headers>()?.delete_range_inclusive(first_block..=last_block)?;
274
275        Ok(())
276    }
277
278    /// Load genesis data into the database.
279    ///
280    /// This operation is only valid on an empty database.
281    fn load_genesis(
282        &self,
283        genesis: &Genesis,
284        genesis_hardforks: &EthereumHardfork,
285    ) -> Result<(), HistoryError<Self::Error>> {
286        // Check that the database is empty
287        if self.get_chain_tip().map_err(HistoryError::Db)?.is_some() {
288            return Err(HistoryError::DbNotEmpty);
289        }
290
291        // Seal the genesis header, record its number, and create a blocknumber
292        // list.
293        let header = signet_storage_types::genesis_header(genesis, genesis_hardforks).seal_slow();
294        let genesis_number = header.number;
295        let genesis_history = BlockNumberList::new_pre_sorted([genesis_number]);
296
297        // Append the header, with empty state
298        self.append_blocks([(&header, &BundleState::default())])?;
299
300        // Keep track of written bytecode hashes to avoid duplicates.
301        let mut written_bytecode_hashes: AHashSet<B256> = AHashSet::new();
302
303        // For each account in the genesis allocation, append account.
304        // The accounts are pre-sorted by the BTreeMap in Genesis.
305        genesis.alloc.iter().try_for_each(|(address, account)| {
306            let GenesisAccount { nonce, balance, code, storage, .. } = account;
307
308            // Insert bytecode if present. Check against the set to avoid
309            // duplicate writes. We still have to compute the hash though.
310            let bytecode_hash = code
311                .as_ref()
312                .map(|code_bytes| -> Result<_, HistoryError<Self::Error>> {
313                    let hash = alloy::primitives::keccak256(code_bytes);
314                    // Short-circuit if already written
315                    if !written_bytecode_hashes.insert(hash) {
316                        return Ok(hash);
317                    }
318                    self.put_bytecode(&hash, &Bytecode::new_raw(code_bytes.clone()))?;
319                    Ok(hash)
320                })
321                .transpose()?;
322
323            // Append the account.
324            self.append_account(
325                address,
326                &Account { nonce: nonce.unwrap_or_default(), balance: *balance, bytecode_hash },
327            )?;
328
329            // Record account history at genesis
330            self.write_account_history(address, u64::MAX, &genesis_history)?;
331
332            // Insert storage entries and history
333            storage.iter().flatten().try_for_each(|(slot, value)| {
334                let slot = U256::from_be_bytes(**slot);
335                // We can append directly since the slots are sorted and the
336                // db is empty.
337                self.append_storage(address, &slot, &U256::from_be_bytes(**value))?;
338                // Record storage history at genesis
339                self.write_storage_history(address, slot, u64::MAX, &genesis_history)?;
340                Ok::<(), HistoryError<Self::Error>>(())
341            })?;
342            Ok(())
343        })
344    }
345}
346
347impl<T> HistoryWrite for T where T: UnsafeDbWrite + UnsafeHistoryWrite {}