1use crate::{
2 db::{HistoryError, HistoryRead},
3 model::HotKvWrite,
4 tables,
5};
6use ahash::AHashMap;
7use alloy::primitives::{Address, B256, BlockNumber, U256};
8use itertools::Itertools;
9use signet_storage_types::{Account, BlockNumberList, SealedHeader, ShardedKey};
10use std::ops::RangeInclusive;
11use trevm::revm::{
12 bytecode::Bytecode,
13 database::{
14 BundleState, OriginalValuesKnown,
15 states::{PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset},
16 },
17 state::AccountInfo,
18};
19
20pub type BundleInit =
24 AHashMap<Address, (Option<Account>, Option<Account>, AHashMap<B256, (U256, U256)>)>;
25
26pub trait UnsafeDbWrite: HotKvWrite + super::sealed::Sealed {
34 fn put_header_inconsistent(&self, header: &SealedHeader) -> Result<(), Self::Error> {
38 self.queue_put::<tables::Headers>(&header.number, header)
39 }
40
41 fn append_header(&self, header: &SealedHeader) -> Result<(), Self::Error> {
47 self.queue_append::<tables::Headers>(&header.number, header)
48 }
49
50 fn put_header_number_inconsistent(&self, hash: &B256, number: u64) -> Result<(), Self::Error> {
54 self.queue_put::<tables::HeaderNumbers>(hash, &number)
55 }
56
57 fn put_bytecode(&self, code_hash: &B256, bytecode: &Bytecode) -> Result<(), Self::Error> {
59 self.queue_put::<tables::Bytecodes>(code_hash, bytecode)
60 }
61
62 fn put_account(&self, address: &Address, account: &Account) -> Result<(), Self::Error> {
64 self.queue_put::<tables::PlainAccountState>(address, account)
65 }
66
67 fn append_account(&self, address: &Address, account: &Account) -> Result<(), Self::Error> {
70 self.queue_append::<tables::PlainAccountState>(address, account)
71 }
72
73 fn put_storage(&self, address: &Address, key: &U256, entry: &U256) -> Result<(), Self::Error> {
75 self.queue_put_dual::<tables::PlainStorageState>(address, key, entry)
76 }
77
78 fn append_storage(
81 &self,
82 address: &Address,
83 key: &U256,
84 entry: &U256,
85 ) -> Result<(), Self::Error> {
86 self.queue_append_dual::<tables::PlainStorageState>(address, key, entry)
87 }
88
89 fn put_header(&self, header: &SealedHeader) -> Result<(), Self::Error> {
91 self.put_header_inconsistent(header)
92 .and_then(|_| self.put_header_number_inconsistent(&header.hash(), header.number))
93 }
94
95 fn delete_header(&self, number: u64) -> Result<(), Self::Error> {
97 self.queue_delete::<tables::Headers>(&number)
98 }
99
100 fn delete_header_number(&self, hash: &B256) -> Result<(), Self::Error> {
102 self.queue_delete::<tables::HeaderNumbers>(hash)
103 }
104
105 fn commit(self) -> Result<(), Self::Error>
107 where
108 Self: Sized,
109 {
110 HotKvWrite::raw_commit(self)
111 }
112}
113
114impl<T> UnsafeDbWrite for T where T: HotKvWrite {}
115
116pub trait UnsafeHistoryWrite: UnsafeDbWrite + HistoryRead {
122 fn write_account_history(
126 &self,
127 address: &Address,
128 latest_height: u64,
129 touched: &BlockNumberList,
130 ) -> Result<(), Self::Error> {
131 self.queue_put_dual::<tables::AccountsHistory>(address, &latest_height, touched)
132 }
133
134 fn write_account_prestate(
136 &self,
137 block_number: u64,
138 address: Address,
139 pre_state: &Account,
140 ) -> Result<(), Self::Error> {
141 self.queue_put_dual::<tables::AccountChangeSets>(&block_number, &address, pre_state)
142 }
143
144 fn append_account_prestate(
149 &self,
150 block_number: u64,
151 address: Address,
152 pre_state: &Account,
153 ) -> Result<(), Self::Error> {
154 self.queue_append_dual::<tables::AccountChangeSets>(&block_number, &address, pre_state)
155 }
156
157 fn write_storage_history(
160 &self,
161 address: &Address,
162 slot: U256,
163 highest_block_number: u64,
164 touched: &BlockNumberList,
165 ) -> Result<(), Self::Error> {
166 let sharded_key = ShardedKey::new(slot, highest_block_number);
167 self.queue_put_dual::<tables::StorageHistory>(address, &sharded_key, touched)
168 }
169
170 fn write_storage_prestate(
172 &self,
173 block_number: u64,
174 address: Address,
175 slot: &U256,
176 prestate: &U256,
177 ) -> Result<(), Self::Error> {
178 self.queue_put_dual::<tables::StorageChangeSets>(&(block_number, address), slot, prestate)
179 }
180
181 fn append_storage_prestate(
186 &self,
187 block_number: u64,
188 address: Address,
189 slot: &U256,
190 prestate: &U256,
191 ) -> Result<(), Self::Error> {
192 self.queue_append_dual::<tables::StorageChangeSets>(
193 &(block_number, address),
194 slot,
195 prestate,
196 )
197 }
198
199 fn write_wipe(&self, block_number: u64, address: &Address) -> Result<(), Self::Error> {
206 let mut cursor = self.traverse_dual::<tables::PlainStorageState>()?;
207
208 for entry in cursor.iter_k2(address)? {
209 let (slot, value) = entry?;
210 self.write_storage_prestate(block_number, *address, &slot, &value)?;
211 }
212 Ok(())
213 }
214
215 fn write_plain_revert_sorted(
222 &self,
223 block_number: u64,
224 accounts: &[&(Address, Option<AccountInfo>)],
225 storage: &[&PlainStorageRevert],
226 ) -> Result<(), Self::Error> {
227 #[cfg(debug_assertions)]
228 {
229 debug_assert!(
230 accounts.windows(2).all(|w| w[0].0 <= w[1].0),
231 "accounts must be sorted by address"
232 );
233 debug_assert!(
234 storage.windows(2).all(|w| w[0].address <= w[1].address),
235 "storage must be sorted by address"
236 );
237 }
238
239 for (address, info) in accounts {
240 let account = info.as_ref().map(Account::from).unwrap_or_default();
241
242 if let Some((bytecode, code_hash)) =
245 info.as_ref().and_then(|info| info.code.clone()).zip(account.bytecode_hash)
246 {
247 self.put_bytecode(&code_hash, &bytecode)?;
248 }
249
250 self.append_account_prestate(block_number, *address, &account)?;
251 }
252
253 for entry in storage {
254 if entry.wiped {
255 self.write_wipe(block_number, &entry.address)?;
256 continue;
257 }
258 for (key, old_value) in entry.storage_revert.iter() {
261 self.write_storage_prestate(
262 block_number,
263 entry.address,
264 key,
265 &old_value.to_previous_value(),
266 )?;
267 }
268 }
269
270 Ok(())
271 }
272
273 fn write_plain_reverts(
278 &self,
279 first_block_number: u64,
280 PlainStateReverts { accounts, storage }: &PlainStateReverts,
281 ) -> Result<(), Self::Error> {
282 use rayon::prelude::*;
283
284 let (sorted_accounts, sorted_storage) = rayon::join(
286 || {
287 accounts
288 .par_iter()
289 .map(|block_accounts| {
290 let mut sorted: Vec<_> = block_accounts.iter().collect();
291 sorted.sort_by_key(|(addr, _)| *addr);
292 sorted
293 })
294 .collect::<Vec<_>>()
295 },
296 || {
297 storage
298 .par_iter()
299 .map(|block_storage| {
300 let mut sorted: Vec<_> = block_storage.iter().collect();
301 sorted.sort_by_key(|entry| entry.address);
302 sorted
303 })
304 .collect::<Vec<_>>()
305 },
306 );
307
308 sorted_accounts.iter().zip(sorted_storage.iter()).enumerate().try_for_each(
310 |(idx, (acc, sto))| {
311 self.write_plain_revert_sorted(first_block_number + idx as u64, acc, sto)
312 },
313 )
314 }
315
316 fn write_changed_account(
318 &self,
319 address: &Address,
320 account: &Option<AccountInfo>,
321 ) -> Result<(), Self::Error> {
322 let Some(info) = account.as_ref() else {
323 return self.queue_delete::<tables::PlainAccountState>(address);
325 };
326
327 let account = Account::from(info.clone());
328 if let Some((bytecode, code_hash)) = info.code.clone().zip(account.bytecode_hash) {
331 self.put_bytecode(&code_hash, &bytecode)?;
332 }
333 self.put_account(address, &account)
334 }
335
336 fn write_changed_storage(
338 &self,
339 PlainStorageChangeset { address, wipe_storage, storage }: &PlainStorageChangeset,
340 ) -> Result<(), Self::Error> {
341 if *wipe_storage {
342 return self.clear_k1_for::<tables::PlainStorageState>(address);
343 }
344
345 storage.iter().try_for_each(|(key, value)| self.put_storage(address, key, value))
346 }
347
348 fn write_changed_contracts(
350 &self,
351 code_hash: &B256,
352 bytecode: &Bytecode,
353 ) -> Result<(), Self::Error> {
354 self.put_bytecode(code_hash, bytecode)
355 }
356
357 fn write_state_changes(
359 &self,
360 StateChangeset { accounts, storage, contracts }: &StateChangeset,
361 ) -> Result<(), Self::Error> {
362 contracts.iter().try_for_each(|(code_hash, bytecode)| {
363 self.write_changed_contracts(code_hash, bytecode)
364 })?;
365 accounts
366 .iter()
367 .try_for_each(|(address, account)| self.write_changed_account(address, account))?;
368 storage
369 .iter()
370 .try_for_each(|storage_changeset| self.write_changed_storage(storage_changeset))?;
371 Ok(())
372 }
373
374 fn changed_accounts_with_range(
381 &self,
382 range: RangeInclusive<BlockNumber>,
383 ) -> Result<AHashMap<Address, Vec<u64>>, Self::Error> {
384 self.traverse_dual::<tables::AccountChangeSets>()?
385 .iter_from(range.start(), &Address::ZERO)?
386 .process_results(|iter| {
387 iter.take_while(|(num, _, _)| range.contains(num))
388 .map(|(num, addr, _)| (addr, num))
389 .into_group_map_by(|(addr, _)| *addr)
390 .into_iter()
391 .map(|(addr, pairs)| (addr, pairs.into_iter().map(|(_, num)| num).collect()))
392 .collect()
393 })
394 }
395
396 fn append_account_history_index(
398 &self,
399 index_updates: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
400 ) -> Result<(), HistoryError<Self::Error>> {
401 for (acct, indices) in index_updates {
402 let existing = self.last_account_history(acct)?;
403 append_to_sharded_history(
404 existing,
405 indices,
406 |key| self.queue_delete_dual::<tables::AccountsHistory>(&acct, &key),
407 |height, list| self.write_account_history(&acct, height, list),
408 )?;
409 }
410 Ok(())
411 }
412
413 #[allow(clippy::type_complexity)]
420 fn changed_storages_with_range(
421 &self,
422 range: RangeInclusive<BlockNumber>,
423 ) -> Result<AHashMap<(Address, U256), Vec<u64>>, Self::Error> {
424 self.traverse_dual::<tables::StorageChangeSets>()?
425 .iter_from(&(*range.start(), Address::ZERO), &U256::ZERO)?
426 .process_results(|iter| {
427 iter.take_while(|(num_addr, _, _)| range.contains(&num_addr.0))
428 .map(|(num_addr, slot, _)| ((num_addr.1, slot), num_addr.0))
429 .into_group_map_by(|(key, _)| *key)
430 .into_iter()
431 .map(|(key, pairs)| (key, pairs.into_iter().map(|(_, num)| num).collect()))
432 .collect()
433 })
434 }
435
436 fn append_storage_history_index(
438 &self,
439 index_updates: impl IntoIterator<Item = ((Address, U256), impl IntoIterator<Item = u64>)>,
440 ) -> Result<(), HistoryError<Self::Error>> {
441 for ((addr, slot), indices) in index_updates {
442 let existing = self.last_storage_history(&addr, &slot)?;
443 append_to_sharded_history(
444 existing,
445 indices,
446 |key| self.queue_delete_dual::<tables::StorageHistory>(&addr, &key),
447 |height, list| self.write_storage_history(&addr, slot, height, list),
448 )?;
449 }
450 Ok(())
451 }
452
453 fn update_history_indices_inconsistent(
456 &self,
457 range: RangeInclusive<BlockNumber>,
458 ) -> Result<(), HistoryError<Self::Error>> {
459 {
461 let indices = self.changed_accounts_with_range(range.clone())?;
462 self.append_account_history_index(indices)?;
463 }
464
465 {
467 let indices = self.changed_storages_with_range(range)?;
468 self.append_storage_history_index(indices)?;
469 }
470
471 Ok(())
472 }
473
474 fn append_block_inconsistent(
484 &self,
485 header: &SealedHeader,
486 state_changes: &BundleState,
487 ) -> Result<(), Self::Error> {
488 self.append_header(header)?;
489 self.put_header_number_inconsistent(&header.hash(), header.number)?;
490
491 let (state_changes, reverts) =
492 state_changes.to_plain_state_and_reverts(OriginalValuesKnown::No);
493
494 self.write_state_changes(&state_changes)?;
495 self.write_plain_reverts(header.number, &reverts)
496 }
497
498 fn append_blocks_inconsistent<'a>(
508 &self,
509 blocks: impl IntoIterator<Item = (&'a SealedHeader, &'a BundleState)>,
510 ) -> Result<(), Self::Error> {
511 blocks
512 .into_iter()
513 .try_for_each(|(header, state)| self.append_block_inconsistent(header, state))
514 }
515}
516
517impl<T> UnsafeHistoryWrite for T where T: UnsafeDbWrite + HotKvWrite {}
518
519fn append_to_sharded_history<K, E, D, W>(
532 existing: Option<(K, BlockNumberList)>,
533 indices: impl IntoIterator<Item = u64>,
534 mut delete_old: D,
535 mut write_shard: W,
536) -> Result<(), HistoryError<E>>
537where
538 E: std::error::Error,
539 D: FnMut(K) -> Result<(), E>,
540 W: FnMut(u64, &BlockNumberList) -> Result<(), E>,
541{
542 let (old_key, last_shard) =
543 existing.map_or_else(|| (None, BlockNumberList::default()), |(k, list)| (Some(k), list));
544 let mut last_shard = last_shard;
545
546 last_shard.append(indices).map_err(HistoryError::IntList)?;
547
548 if let Some(key) = old_key {
550 delete_old(key).map_err(HistoryError::Db)?;
551 }
552
553 if last_shard.len() <= ShardedKey::SHARD_COUNT as u64 {
555 return write_shard(u64::MAX, &last_shard).map_err(HistoryError::Db);
556 }
557
558 let mut chunk_buf = Vec::with_capacity(ShardedKey::SHARD_COUNT);
561 let mut iter = last_shard.iter().peekable();
562
563 while iter.peek().is_some() {
564 chunk_buf.clear();
565 chunk_buf.extend(iter.by_ref().take(ShardedKey::SHARD_COUNT));
566
567 let highest = if iter.peek().is_some() {
568 *chunk_buf.last().expect("chunk_buf is non-empty")
569 } else {
570 u64::MAX
572 };
573
574 let shard = BlockNumberList::new_pre_sorted(chunk_buf.iter().copied());
575 write_shard(highest, &shard).map_err(HistoryError::Db)?;
576 }
577 Ok(())
578}