Skip to main content

signet_hot/db/
inconsistent.rs

1use crate::{
2    db::{HistoryError, HistoryRead},
3    model::HotKvWrite,
4    tables,
5};
6use ahash::AHashMap;
7use alloy::primitives::{Address, B256, BlockNumber, U256};
8use itertools::Itertools;
9use signet_storage_types::{Account, BlockNumberList, SealedHeader, ShardedKey};
10use std::ops::RangeInclusive;
11use trevm::revm::{
12    bytecode::Bytecode,
13    database::{
14        BundleState, OriginalValuesKnown,
15        states::{PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset},
16    },
17    state::AccountInfo,
18};
19
20/// Bundle state initialization type.
21/// Maps address -> (old_account, new_account, storage_changes)
22/// where storage_changes maps slot (B256) -> (old_value, new_value)
23pub type BundleInit =
24    AHashMap<Address, (Option<Account>, Option<Account>, AHashMap<B256, (U256, U256)>)>;
25
26/// Trait for database write operations on standard hot tables.
27///
28/// This trait is low-level, and usage may leave the database in an
29/// inconsistent state if not used carefully. Users should prefer
30/// [`HotHistoryWrite`] or higher-level abstractions when possible.
31///
32/// [`HotHistoryWrite`]: crate::db::HistoryWrite
33pub trait UnsafeDbWrite: HotKvWrite + super::sealed::Sealed {
34    /// Write a block header. This will leave the DB in an inconsistent state
35    /// until the corresponding header number is also written. Users should
36    /// prefer [`Self::put_header`] instead.
37    fn put_header_inconsistent(&self, header: &SealedHeader) -> Result<(), Self::Error> {
38        self.queue_put::<tables::Headers>(&header.number, header)
39    }
40
41    /// Append a block header. Block number must be > all existing block numbers.
42    ///
43    /// This will leave the DB in an inconsistent state until the corresponding
44    /// header number is also written. Users should prefer [`Self::put_header`]
45    /// instead.
46    fn append_header(&self, header: &SealedHeader) -> Result<(), Self::Error> {
47        self.queue_append::<tables::Headers>(&header.number, header)
48    }
49
50    /// Write a block number by its hash. This will leave the DB in an
51    /// inconsistent state until the corresponding header is also written.
52    /// Users should prefer [`Self::put_header`] instead.
53    fn put_header_number_inconsistent(&self, hash: &B256, number: u64) -> Result<(), Self::Error> {
54        self.queue_put::<tables::HeaderNumbers>(hash, &number)
55    }
56
57    /// Write contract Bytecode by its hash.
58    fn put_bytecode(&self, code_hash: &B256, bytecode: &Bytecode) -> Result<(), Self::Error> {
59        self.queue_put::<tables::Bytecodes>(code_hash, bytecode)
60    }
61
62    /// Write an account by its address.
63    fn put_account(&self, address: &Address, account: &Account) -> Result<(), Self::Error> {
64        self.queue_put::<tables::PlainAccountState>(address, account)
65    }
66
67    /// Append an account by its address. This should generally only be used
68    /// when initializing the database (e.g., from genesis).
69    fn append_account(&self, address: &Address, account: &Account) -> Result<(), Self::Error> {
70        self.queue_append::<tables::PlainAccountState>(address, account)
71    }
72
73    /// Write a storage entry by its address and key.
74    fn put_storage(&self, address: &Address, key: &U256, entry: &U256) -> Result<(), Self::Error> {
75        self.queue_put_dual::<tables::PlainStorageState>(address, key, entry)
76    }
77
78    /// Append a storage entry by its address and key. This should generally
79    /// only be used when initializing the database (e.g., from genesis).
80    fn append_storage(
81        &self,
82        address: &Address,
83        key: &U256,
84        entry: &U256,
85    ) -> Result<(), Self::Error> {
86        self.queue_append_dual::<tables::PlainStorageState>(address, key, entry)
87    }
88
89    /// Write a sealed block header (header + number).
90    fn put_header(&self, header: &SealedHeader) -> Result<(), Self::Error> {
91        self.put_header_inconsistent(header)
92            .and_then(|_| self.put_header_number_inconsistent(&header.hash(), header.number))
93    }
94
95    /// Delete a header by block number.
96    fn delete_header(&self, number: u64) -> Result<(), Self::Error> {
97        self.queue_delete::<tables::Headers>(&number)
98    }
99
100    /// Delete a header number mapping by hash.
101    fn delete_header_number(&self, hash: &B256) -> Result<(), Self::Error> {
102        self.queue_delete::<tables::HeaderNumbers>(hash)
103    }
104
105    /// Commit the write transaction.
106    fn commit(self) -> Result<(), Self::Error>
107    where
108        Self: Sized,
109    {
110        HotKvWrite::raw_commit(self)
111    }
112}
113
114impl<T> UnsafeDbWrite for T where T: HotKvWrite {}
115
116/// Trait for history write operations.
117///
118/// These tables maintain historical information about accounts and storage
119/// changes, and their contents can be used to reconstruct past states or
120/// roll back changes.
121pub trait UnsafeHistoryWrite: UnsafeDbWrite + HistoryRead {
122    /// Maintain a list of block numbers where an account was touched.
123    ///
124    /// Accounts are keyed
125    fn write_account_history(
126        &self,
127        address: &Address,
128        latest_height: u64,
129        touched: &BlockNumberList,
130    ) -> Result<(), Self::Error> {
131        self.queue_put_dual::<tables::AccountsHistory>(address, &latest_height, touched)
132    }
133
134    /// Write an account change (pre-state) for an account at a specific block.
135    fn write_account_prestate(
136        &self,
137        block_number: u64,
138        address: Address,
139        pre_state: &Account,
140    ) -> Result<(), Self::Error> {
141        self.queue_put_dual::<tables::AccountChangeSets>(&block_number, &address, pre_state)
142    }
143
144    /// Append an account prestate entry.
145    ///
146    /// Entries must be appended in sorted order by (block_number, address).
147    /// Within a single block, addresses must be sorted.
148    fn append_account_prestate(
149        &self,
150        block_number: u64,
151        address: Address,
152        pre_state: &Account,
153    ) -> Result<(), Self::Error> {
154        self.queue_append_dual::<tables::AccountChangeSets>(&block_number, &address, pre_state)
155    }
156
157    /// Write storage history, by highest block number and touched block
158    /// numbers.
159    fn write_storage_history(
160        &self,
161        address: &Address,
162        slot: U256,
163        highest_block_number: u64,
164        touched: &BlockNumberList,
165    ) -> Result<(), Self::Error> {
166        let sharded_key = ShardedKey::new(slot, highest_block_number);
167        self.queue_put_dual::<tables::StorageHistory>(address, &sharded_key, touched)
168    }
169
170    /// Write a storage change (before state) for an account at a specific block.
171    fn write_storage_prestate(
172        &self,
173        block_number: u64,
174        address: Address,
175        slot: &U256,
176        prestate: &U256,
177    ) -> Result<(), Self::Error> {
178        self.queue_put_dual::<tables::StorageChangeSets>(&(block_number, address), slot, prestate)
179    }
180
181    /// Append a storage prestate entry.
182    ///
183    /// Entries must be appended in sorted order by ((block_number, address), slot).
184    /// Within a single (block, address), slots must be sorted.
185    fn append_storage_prestate(
186        &self,
187        block_number: u64,
188        address: Address,
189        slot: &U256,
190        prestate: &U256,
191    ) -> Result<(), Self::Error> {
192        self.queue_append_dual::<tables::StorageChangeSets>(
193            &(block_number, address),
194            slot,
195            prestate,
196        )
197    }
198
199    /// Write a pre-state for every storage key that exists for an account at a
200    /// specific block.
201    ///
202    /// Note: This uses `write_storage_prestate` (regular put) instead of
203    /// `append_storage_prestate` because the slots may interleave with other
204    /// writes to the same K1 from different code paths.
205    fn write_wipe(&self, block_number: u64, address: &Address) -> Result<(), Self::Error> {
206        let mut cursor = self.traverse_dual::<tables::PlainStorageState>()?;
207
208        for entry in cursor.iter_k2(address)? {
209            let (slot, value) = entry?;
210            self.write_storage_prestate(block_number, *address, &slot, &value)?;
211        }
212        Ok(())
213    }
214
215    /// Write pre-sorted revert data for a single block.
216    ///
217    /// # Panics (debug builds only)
218    ///
219    /// Panics if `accounts` is not sorted by address or `storage` is not sorted
220    /// by address.
221    fn write_plain_revert_sorted(
222        &self,
223        block_number: u64,
224        accounts: &[&(Address, Option<AccountInfo>)],
225        storage: &[&PlainStorageRevert],
226    ) -> Result<(), Self::Error> {
227        #[cfg(debug_assertions)]
228        {
229            debug_assert!(
230                accounts.windows(2).all(|w| w[0].0 <= w[1].0),
231                "accounts must be sorted by address"
232            );
233            debug_assert!(
234                storage.windows(2).all(|w| w[0].address <= w[1].address),
235                "storage must be sorted by address"
236            );
237        }
238
239        for (address, info) in accounts {
240            let account = info.as_ref().map(Account::from).unwrap_or_default();
241
242            // bytecode_hash is None when code_hash == KECCAK256_EMPTY,
243            // which doesn't need to be stored.
244            if let Some((bytecode, code_hash)) =
245                info.as_ref().and_then(|info| info.code.clone()).zip(account.bytecode_hash)
246            {
247                self.put_bytecode(&code_hash, &bytecode)?;
248            }
249
250            self.append_account_prestate(block_number, *address, &account)?;
251        }
252
253        for entry in storage {
254            if entry.wiped {
255                self.write_wipe(block_number, &entry.address)?;
256                continue;
257            }
258            // Use write (put) instead of append because storage_revert slots
259            // are not guaranteed to be sorted.
260            for (key, old_value) in entry.storage_revert.iter() {
261                self.write_storage_prestate(
262                    block_number,
263                    entry.address,
264                    key,
265                    &old_value.to_previous_value(),
266                )?;
267            }
268        }
269
270        Ok(())
271    }
272
273    /// Write multiple blocks' plain state revert information.
274    ///
275    /// Sorts accounts and storage in parallel before writing to enable
276    /// efficient append operations.
277    fn write_plain_reverts(
278        &self,
279        first_block_number: u64,
280        PlainStateReverts { accounts, storage }: &PlainStateReverts,
281    ) -> Result<(), Self::Error> {
282        use rayon::prelude::*;
283
284        // Sort accounts and storage in parallel using rayon::join
285        let (sorted_accounts, sorted_storage) = rayon::join(
286            || {
287                accounts
288                    .par_iter()
289                    .map(|block_accounts| {
290                        let mut sorted: Vec<_> = block_accounts.iter().collect();
291                        sorted.sort_by_key(|(addr, _)| *addr);
292                        sorted
293                    })
294                    .collect::<Vec<_>>()
295            },
296            || {
297                storage
298                    .par_iter()
299                    .map(|block_storage| {
300                        let mut sorted: Vec<_> = block_storage.iter().collect();
301                        sorted.sort_by_key(|entry| entry.address);
302                        sorted
303                    })
304                    .collect::<Vec<_>>()
305            },
306        );
307
308        // Write sequentially (DB writes must be ordered)
309        sorted_accounts.iter().zip(sorted_storage.iter()).enumerate().try_for_each(
310            |(idx, (acc, sto))| {
311                self.write_plain_revert_sorted(first_block_number + idx as u64, acc, sto)
312            },
313        )
314    }
315
316    /// Write changed accounts from a [`StateChangeset`].
317    fn write_changed_account(
318        &self,
319        address: &Address,
320        account: &Option<AccountInfo>,
321    ) -> Result<(), Self::Error> {
322        let Some(info) = account.as_ref() else {
323            // Account removal
324            return self.queue_delete::<tables::PlainAccountState>(address);
325        };
326
327        let account = Account::from(info.clone());
328        // bytecode_hash is None when code_hash == KECCAK256_EMPTY,
329        // which doesn't need to be stored.
330        if let Some((bytecode, code_hash)) = info.code.clone().zip(account.bytecode_hash) {
331            self.put_bytecode(&code_hash, &bytecode)?;
332        }
333        self.put_account(address, &account)
334    }
335
336    /// Write changed storage from a [`StateChangeset`].
337    fn write_changed_storage(
338        &self,
339        PlainStorageChangeset { address, wipe_storage, storage }: &PlainStorageChangeset,
340    ) -> Result<(), Self::Error> {
341        if *wipe_storage {
342            return self.clear_k1_for::<tables::PlainStorageState>(address);
343        }
344
345        storage.iter().try_for_each(|(key, value)| self.put_storage(address, key, value))
346    }
347
348    /// Write changed contract bytecode from a [`StateChangeset`].
349    fn write_changed_contracts(
350        &self,
351        code_hash: &B256,
352        bytecode: &Bytecode,
353    ) -> Result<(), Self::Error> {
354        self.put_bytecode(code_hash, bytecode)
355    }
356
357    /// Write a state changeset for a specific block.
358    fn write_state_changes(
359        &self,
360        StateChangeset { accounts, storage, contracts }: &StateChangeset,
361    ) -> Result<(), Self::Error> {
362        contracts.iter().try_for_each(|(code_hash, bytecode)| {
363            self.write_changed_contracts(code_hash, bytecode)
364        })?;
365        accounts
366            .iter()
367            .try_for_each(|(address, account)| self.write_changed_account(address, account))?;
368        storage
369            .iter()
370            .try_for_each(|storage_changeset| self.write_changed_storage(storage_changeset))?;
371        Ok(())
372    }
373
374    /// Get all changed accounts with the list of block numbers in the given
375    /// range.
376    ///
377    /// Iterates over entries starting from the first block in the range,
378    /// collecting changes while the block number remains in range.
379    // TODO: estimate capacity from block range size for better allocation
380    fn changed_accounts_with_range(
381        &self,
382        range: RangeInclusive<BlockNumber>,
383    ) -> Result<AHashMap<Address, Vec<u64>>, Self::Error> {
384        self.traverse_dual::<tables::AccountChangeSets>()?
385            .iter_from(range.start(), &Address::ZERO)?
386            .process_results(|iter| {
387                iter.take_while(|(num, _, _)| range.contains(num))
388                    .map(|(num, addr, _)| (addr, num))
389                    .into_group_map_by(|(addr, _)| *addr)
390                    .into_iter()
391                    .map(|(addr, pairs)| (addr, pairs.into_iter().map(|(_, num)| num).collect()))
392                    .collect()
393            })
394    }
395
396    /// Append account history indices for multiple accounts.
397    fn append_account_history_index(
398        &self,
399        index_updates: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
400    ) -> Result<(), HistoryError<Self::Error>> {
401        for (acct, indices) in index_updates {
402            let existing = self.last_account_history(acct)?;
403            append_to_sharded_history(
404                existing,
405                indices,
406                |key| self.queue_delete_dual::<tables::AccountsHistory>(&acct, &key),
407                |height, list| self.write_account_history(&acct, height, list),
408            )?;
409        }
410        Ok(())
411    }
412
413    /// Get all changed storages with the list of block numbers in the given
414    /// range.
415    ///
416    /// Iterates over entries starting from the first block in the range,
417    /// collecting changes while the block number remains in range.
418    // TODO: estimate capacity from block range size for better allocation
419    #[allow(clippy::type_complexity)]
420    fn changed_storages_with_range(
421        &self,
422        range: RangeInclusive<BlockNumber>,
423    ) -> Result<AHashMap<(Address, U256), Vec<u64>>, Self::Error> {
424        self.traverse_dual::<tables::StorageChangeSets>()?
425            .iter_from(&(*range.start(), Address::ZERO), &U256::ZERO)?
426            .process_results(|iter| {
427                iter.take_while(|(num_addr, _, _)| range.contains(&num_addr.0))
428                    .map(|(num_addr, slot, _)| ((num_addr.1, slot), num_addr.0))
429                    .into_group_map_by(|(key, _)| *key)
430                    .into_iter()
431                    .map(|(key, pairs)| (key, pairs.into_iter().map(|(_, num)| num).collect()))
432                    .collect()
433            })
434    }
435
436    /// Append storage history indices for multiple (address, slot) pairs.
437    fn append_storage_history_index(
438        &self,
439        index_updates: impl IntoIterator<Item = ((Address, U256), impl IntoIterator<Item = u64>)>,
440    ) -> Result<(), HistoryError<Self::Error>> {
441        for ((addr, slot), indices) in index_updates {
442            let existing = self.last_storage_history(&addr, &slot)?;
443            append_to_sharded_history(
444                existing,
445                indices,
446                |key| self.queue_delete_dual::<tables::StorageHistory>(&addr, &key),
447                |height, list| self.write_storage_history(&addr, slot, height, list),
448            )?;
449        }
450        Ok(())
451    }
452
453    /// Update the history indices for accounts and storage in the given block
454    /// range.
455    fn update_history_indices_inconsistent(
456        &self,
457        range: RangeInclusive<BlockNumber>,
458    ) -> Result<(), HistoryError<Self::Error>> {
459        // account history stage
460        {
461            let indices = self.changed_accounts_with_range(range.clone())?;
462            self.append_account_history_index(indices)?;
463        }
464
465        // storage history stage
466        {
467            let indices = self.changed_storages_with_range(range)?;
468            self.append_storage_history_index(indices)?;
469        }
470
471        Ok(())
472    }
473
474    /// Append a block's header and state changes in an inconsistent manner.
475    ///
476    /// This may leave the database in an inconsistent state. Users should
477    /// prefer higher-level abstractions when possible.
478    ///
479    /// 1. It MUST be checked that the header is the child of the current chain
480    ///    tip before calling this method.
481    /// 2. After calling this method, the caller MUST call
482    ///    `update_history_indices`.
483    fn append_block_inconsistent(
484        &self,
485        header: &SealedHeader,
486        state_changes: &BundleState,
487    ) -> Result<(), Self::Error> {
488        self.append_header(header)?;
489        self.put_header_number_inconsistent(&header.hash(), header.number)?;
490
491        let (state_changes, reverts) =
492            state_changes.to_plain_state_and_reverts(OriginalValuesKnown::No);
493
494        self.write_state_changes(&state_changes)?;
495        self.write_plain_reverts(header.number, &reverts)
496    }
497
498    /// Append multiple blocks' headers and state changes in an inconsistent
499    /// manner.
500    ///
501    /// This may leave the database in an inconsistent state. Users should
502    /// prefer higher-level abstractions when possible.
503    /// 1. It MUST be checked that the first header is the child of the current
504    ///    chain tip before calling this method.
505    /// 2. After calling this method, the caller MUST call
506    ///    `update_history_indices`.
507    fn append_blocks_inconsistent<'a>(
508        &self,
509        blocks: impl IntoIterator<Item = (&'a SealedHeader, &'a BundleState)>,
510    ) -> Result<(), Self::Error> {
511        blocks
512            .into_iter()
513            .try_for_each(|(header, state)| self.append_block_inconsistent(header, state))
514    }
515}
516
517impl<T> UnsafeHistoryWrite for T where T: UnsafeDbWrite + HotKvWrite {}
518
519/// Append indices to a sharded history entry, handling shard splitting.
520///
521/// This helper handles the common pattern of:
522/// 1. Appending new block numbers to an existing shard
523/// 2. Deleting the old shard if it exists
524/// 3. Splitting into multiple shards if the result exceeds the shard size
525///
526/// # Arguments
527/// - `existing`: The current last shard (key, list) if any
528/// - `indices`: New block numbers to append
529/// - `delete_old`: Called to delete the old shard key before writing new ones
530/// - `write_shard`: Called for each resulting shard (highest_block, list)
531fn append_to_sharded_history<K, E, D, W>(
532    existing: Option<(K, BlockNumberList)>,
533    indices: impl IntoIterator<Item = u64>,
534    mut delete_old: D,
535    mut write_shard: W,
536) -> Result<(), HistoryError<E>>
537where
538    E: std::error::Error,
539    D: FnMut(K) -> Result<(), E>,
540    W: FnMut(u64, &BlockNumberList) -> Result<(), E>,
541{
542    let (old_key, last_shard) =
543        existing.map_or_else(|| (None, BlockNumberList::default()), |(k, list)| (Some(k), list));
544    let mut last_shard = last_shard;
545
546    last_shard.append(indices).map_err(HistoryError::IntList)?;
547
548    // Delete the existing shard before writing new ones to avoid duplicates
549    if let Some(key) = old_key {
550        delete_old(key).map_err(HistoryError::Db)?;
551    }
552
553    // Fast path: all indices fit in one shard
554    if last_shard.len() <= ShardedKey::SHARD_COUNT as u64 {
555        return write_shard(u64::MAX, &last_shard).map_err(HistoryError::Db);
556    }
557
558    // Slow path: rechunk into multiple shards
559    // Reuse a single buffer to avoid allocating a new Vec per chunk
560    let mut chunk_buf = Vec::with_capacity(ShardedKey::SHARD_COUNT);
561    let mut iter = last_shard.iter().peekable();
562
563    while iter.peek().is_some() {
564        chunk_buf.clear();
565        chunk_buf.extend(iter.by_ref().take(ShardedKey::SHARD_COUNT));
566
567        let highest = if iter.peek().is_some() {
568            *chunk_buf.last().expect("chunk_buf is non-empty")
569        } else {
570            // Insert last list with `u64::MAX`
571            u64::MAX
572        };
573
574        let shard = BlockNumberList::new_pre_sorted(chunk_buf.iter().copied());
575        write_shard(highest, &shard).map_err(HistoryError::Db)?;
576    }
577    Ok(())
578}