1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Instant;
4
5use anyhow::{Context, Result};
6use arc_swap::{ArcSwap, ArcSwapOption};
7use tycho_block_util::block::BlockStuff;
8use tycho_block_util::state::ShardStateStuff;
9use tycho_storage::kv::InstanceId;
10use tycho_types::cell::Lazy;
11use tycho_types::models::*;
12use tycho_types::prelude::*;
13use tycho_util::metrics::HistogramGuard;
14use tycho_util::sync::CancellationFlag;
15use tycho_util::{FastHashMap, FastHashSet};
16use weedb::rocksdb;
17
18use super::db::RpcDb;
19use super::tables::{self, Transactions};
20
21#[derive(Default, Clone)]
22pub struct BlacklistedAccounts {
23 inner: Arc<BlacklistedAccountsInner>,
24}
25
26impl BlacklistedAccounts {
27 pub fn update<I: IntoIterator<Item = StdAddr>>(&self, items: I) {
28 let items = items
29 .into_iter()
30 .map(|item| {
31 let mut key = [0; 33];
32 key[0] = item.workchain as u8;
33 key[1..33].copy_from_slice(item.address.as_array());
34 key
35 })
36 .collect::<FastHashSet<_>>();
37
38 self.inner.accounts.store(Arc::new(items));
39 }
40
41 pub fn load(&self) -> Arc<FastHashSet<AddressKey>> {
42 self.inner.accounts.load_full()
43 }
44}
45
46#[derive(Default)]
47struct BlacklistedAccountsInner {
48 accounts: ArcSwap<FastHashSet<AddressKey>>,
49}
50
51pub struct RpcStorage {
52 db: RpcDb,
53 min_tx_lt: AtomicU64,
54 min_tx_lt_guard: tokio::sync::Mutex<()>,
55 snapshot: ArcSwapOption<weedb::OwnedSnapshot>,
56}
57
58impl RpcStorage {
59 pub fn new(db: RpcDb) -> Self {
60 let this = Self {
61 db,
62 min_tx_lt: AtomicU64::new(u64::MAX),
63 min_tx_lt_guard: Default::default(),
64 snapshot: Default::default(),
65 };
66
67 let state = &this.db.state;
68 if state.get(INSTANCE_ID).unwrap().is_none() {
69 state
70 .insert(INSTANCE_ID, rand::random::<InstanceId>())
71 .unwrap();
72 }
73
74 let min_lt = match state.get(TX_MIN_LT).unwrap() {
75 Some(value) if value.is_empty() => None,
76 Some(value) => Some(u64::from_le_bytes(value.as_ref().try_into().unwrap())),
77 None => None,
78 };
79
80 this.min_tx_lt
81 .store(min_lt.unwrap_or(u64::MAX), Ordering::Release);
82
83 tracing::debug!(?min_lt, "rpc storage initialized");
84
85 this
86 }
87
88 pub fn db(&self) -> &RpcDb {
89 &self.db
90 }
91
92 pub fn min_tx_lt(&self) -> u64 {
93 self.min_tx_lt.load(Ordering::Acquire)
94 }
95
96 pub fn update_snapshot(&self) {
97 let snapshot = Arc::new(self.db.owned_snapshot());
98 self.snapshot.store(Some(snapshot));
99 }
100
101 pub fn load_snapshot(&self) -> Option<RpcSnapshot> {
102 self.snapshot.load_full().map(RpcSnapshot)
103 }
104
105 pub fn store_instance_id(&self, id: InstanceId) {
106 let rpc_states = &self.db.state;
107 rpc_states.insert(INSTANCE_ID, id).unwrap();
108 }
109
110 pub fn load_instance_id(&self) -> InstanceId {
111 let id = self.db.state.get(INSTANCE_ID).unwrap().unwrap();
112 InstanceId::from_slice(id.as_ref())
113 }
114
115 pub fn get_known_mc_blocks_range(
116 &self,
117 snapshot: Option<&RpcSnapshot>,
118 ) -> Result<Option<(u32, u32)>> {
119 let mut snapshot = snapshot.cloned();
120 if snapshot.is_none() {
121 snapshot = self.snapshot.load_full().map(RpcSnapshot);
122 }
123
124 let table = &self.db.known_blocks;
125
126 let mut range_from = [0x00; tables::KnownBlocks::KEY_LEN];
127 range_from[0] = -1i8 as u8;
128 range_from[1..9].copy_from_slice(&ShardIdent::PREFIX_FULL.to_be_bytes());
129 let mut range_to = [0xff; tables::KnownBlocks::KEY_LEN];
130 range_to[0..9].clone_from_slice(&range_from[0..9]);
131
132 let mut readopts = table.new_read_config();
133 if let Some(snapshot) = &snapshot {
134 readopts.set_snapshot(snapshot);
135 }
136 readopts.set_iterate_lower_bound(range_from.as_slice());
137 readopts.set_iterate_upper_bound(range_to.as_slice());
138 let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&table.cf(), readopts);
139
140 iter.seek(range_from.as_slice());
141 Ok(if let Some(key) = iter.key() {
142 let from_seqno = u32::from_be_bytes(key[9..13].try_into().unwrap());
143 let mut to_seqno = from_seqno;
144
145 iter.seek_for_prev(range_to.as_slice());
146 if let Some(key) = iter.key() {
147 let seqno = u32::from_be_bytes(key[9..13].try_into().unwrap());
148 to_seqno = std::cmp::max(to_seqno, seqno);
149 }
150
151 Some((from_seqno, to_seqno))
152 } else {
153 iter.status()?;
154 None
155 })
156 }
157
158 pub fn get_blocks_by_mc_seqno(
159 &self,
160 mc_seqno: u32,
161 mut snapshot: Option<RpcSnapshot>,
162 ) -> Result<Option<BlocksByMcSeqnoIter>> {
163 let mut key = [0; tables::KnownBlocks::KEY_LEN];
164 key[0] = -1i8 as u8;
165 key[1..9].copy_from_slice(&ShardIdent::PREFIX_FULL.to_be_bytes());
166 key[9..13].copy_from_slice(&mc_seqno.to_be_bytes());
167
168 if snapshot.is_none() {
169 snapshot = self.snapshot.load_full().map(RpcSnapshot);
170 }
171 let Some(snapshot) = snapshot else {
172 anyhow::bail!("No snapshot available");
174 };
175
176 let table = &self.db.known_blocks;
177 if table.get_ext(key, Some(&snapshot))?.is_none() {
178 return Ok(None);
179 };
180
181 let mut range_from = [0x00; tables::BlocksByMcSeqno::KEY_LEN];
182 range_from[0..4].clone_from_slice(&mc_seqno.to_be_bytes());
183 let mut range_to = [0xff; tables::BlocksByMcSeqno::KEY_LEN];
184 range_to[0..4].clone_from_slice(&mc_seqno.to_be_bytes());
185
186 let table = &self.db.blocks_by_mc_seqno;
187 let mut readopts = table.new_read_config();
188 readopts.set_snapshot(&snapshot);
189 readopts.set_iterate_lower_bound(range_from.as_slice());
190 readopts.set_iterate_upper_bound(range_to.as_slice());
191
192 let rocksdb = self.db.rocksdb();
193 let mut iter = rocksdb.raw_iterator_cf_opt(&table.cf(), readopts);
194 iter.seek(range_from.as_slice());
195
196 Ok(Some(BlocksByMcSeqnoIter {
197 mc_seqno,
198 inner: unsafe { weedb::OwnedRawIterator::new(rocksdb.clone(), iter) },
200 snapshot,
201 }))
202 }
203
204 pub fn get_brief_block_info(
205 &self,
206 block_id: &BlockIdShort,
207 snapshot: Option<&RpcSnapshot>,
208 ) -> Result<Option<(BlockId, u32, BriefBlockInfo)>> {
209 let Ok(workchain) = i8::try_from(block_id.shard.workchain()) else {
210 return Ok(None);
211 };
212 let mut key = [0; tables::KnownBlocks::KEY_LEN];
213 key[0] = workchain as u8;
214 key[1..9].copy_from_slice(&block_id.shard.prefix().to_be_bytes());
215 key[9..13].copy_from_slice(&block_id.seqno.to_be_bytes());
216
217 let table = &self.db.known_blocks;
218 let Some(value) = table.get_ext(key, snapshot)? else {
219 return Ok(None);
220 };
221 let value = value.as_ref();
222
223 let brief_info = BriefBlockInfo::load_from_bytes(workchain as i32, &value[68..])
224 .context("invalid brief info")?;
225
226 let block_id = BlockId {
227 shard: block_id.shard,
228 seqno: block_id.seqno,
229 root_hash: HashBytes::from_slice(&value[0..32]),
230 file_hash: HashBytes::from_slice(&value[32..64]),
231 };
232 let mc_seqno = u32::from_le_bytes(value[64..68].try_into().unwrap());
233
234 Ok(Some((block_id, mc_seqno, brief_info)))
235 }
236
237 pub fn get_brief_shards_descr(
238 &self,
239 mc_seqno: u32,
240 snapshot: Option<&RpcSnapshot>,
241 ) -> Result<Option<Vec<BriefShardDescr>>> {
242 let mut key = [0x00; tables::BlocksByMcSeqno::KEY_LEN];
243 key[0..4].copy_from_slice(&mc_seqno.to_be_bytes());
244 key[4] = -1i8 as u8;
245 key[5..13].copy_from_slice(&ShardIdent::PREFIX_FULL.to_be_bytes());
246 key[13..17].copy_from_slice(&mc_seqno.to_be_bytes());
247
248 let table = &self.db.blocks_by_mc_seqno;
249 let Some(value) = table.get_ext(key, snapshot)? else {
250 return Ok(None);
251 };
252 let value = value.as_ref();
253
254 let shard_count = u32::from_le_bytes(
255 value[tables::BlocksByMcSeqno::VALUE_LEN..tables::BlocksByMcSeqno::VALUE_LEN + 4]
256 .try_into()
257 .unwrap(),
258 ) as usize;
259
260 let mut result = Vec::with_capacity(shard_count);
261 for i in 0..shard_count {
262 let offset =
263 tables::BlocksByMcSeqno::DESCR_OFFSET + i * tables::BlocksByMcSeqno::DESCR_LEN;
264 let descr = &value[offset..offset + tables::BlocksByMcSeqno::DESCR_LEN];
265
266 result.push(BriefShardDescr {
267 shard_ident: ShardIdent::new(
268 descr[0] as i8 as i32,
269 u64::from_le_bytes(descr[1..9].try_into().unwrap()),
270 )
271 .context("invalid top shard ident")?,
272 seqno: u32::from_le_bytes(descr[9..13].try_into().unwrap()),
273 root_hash: HashBytes::from_slice(&descr[13..45]),
274 file_hash: HashBytes::from_slice(&descr[45..77]),
275 start_lt: u64::from_le_bytes(descr[77..85].try_into().unwrap()),
276 end_lt: u64::from_le_bytes(descr[85..93].try_into().unwrap()),
277 });
278 }
279
280 Ok(Some(result))
281 }
282
283 pub fn get_accounts_by_code_hash(
284 &self,
285 code_hash: &HashBytes,
286 continuation: Option<&StdAddr>,
287 mut snapshot: Option<RpcSnapshot>,
288 ) -> Result<CodeHashesIter<'_>> {
289 let mut key = [0u8; tables::CodeHashes::KEY_LEN];
290 key[0..32].copy_from_slice(code_hash.as_ref());
291 if let Some(continuation) = continuation {
292 key[32] = continuation.workchain as u8;
293 key[33..65].copy_from_slice(continuation.address.as_ref());
294 }
295
296 let mut upper_bound = Vec::with_capacity(tables::CodeHashes::KEY_LEN);
297 upper_bound.extend_from_slice(&key[..32]);
298 upper_bound.extend_from_slice(&[0xff; 33]);
299
300 let mut readopts = self.db.code_hashes.new_read_config();
301 readopts.set_iterate_upper_bound(upper_bound);
304
305 if snapshot.is_none() {
306 snapshot = self.snapshot.load_full().map(RpcSnapshot);
307 }
308 let snapshot = snapshot.unwrap_or_else(|| RpcSnapshot(Arc::new(self.db.owned_snapshot())));
309 readopts.set_snapshot(&snapshot);
310
311 let rocksdb = self.db.rocksdb();
312 let code_hashes_cf = self.db.code_hashes.cf();
313 let mut iter = rocksdb.raw_iterator_cf_opt(&code_hashes_cf, readopts);
314
315 iter.seek(key);
316 if continuation.is_some() {
317 iter.next();
318 }
319
320 Ok(CodeHashesIter {
321 inner: iter,
322 snapshot,
323 })
324 }
325
326 pub fn get_block_transactions(
327 &self,
328 block_id: &BlockIdShort,
329 reverse: bool,
330 cursor: Option<&BlockTransactionsCursor>,
331 snapshot: Option<RpcSnapshot>,
332 ) -> Result<Option<BlockTransactionsIterBuilder>> {
333 let Some(ids) = self.get_block_transaction_ids(block_id, reverse, cursor, snapshot)? else {
334 return Ok(None);
335 };
336
337 Ok(Some(BlockTransactionsIterBuilder {
338 ids,
339 transactions_cf: self.db.transactions.get_unbounded_cf(),
340 }))
341 }
342
343 pub fn get_block_transaction_ids(
344 &self,
345 block_id: &BlockIdShort,
346 reverse: bool,
347 cursor: Option<&BlockTransactionsCursor>,
348 mut snapshot: Option<RpcSnapshot>,
349 ) -> Result<Option<BlockTransactionIdsIter>> {
350 let Ok(workchain) = i8::try_from(block_id.shard.workchain()) else {
351 return Ok(None);
352 };
353
354 if snapshot.is_none() {
355 snapshot = self.snapshot.load_full().map(RpcSnapshot);
356 }
357 let Some(snapshot) = snapshot else {
358 anyhow::bail!("No snapshot available");
360 };
361
362 let mut range_from = [0x00; tables::BlockTransactions::KEY_LEN];
363 range_from[0] = workchain as u8;
364 range_from[1..9].copy_from_slice(&block_id.shard.prefix().to_be_bytes());
365 range_from[9..13].copy_from_slice(&block_id.seqno.to_be_bytes());
366
367 if let Some(cursor) = cursor {
368 range_from[13..45].copy_from_slice(cursor.hash.as_slice());
369 range_from[45..53].copy_from_slice(&cursor.lt.to_be_bytes());
370 }
371
372 let table = &self.db.known_blocks;
373 let ref_by_mc_seqno;
374 let block_id = match table.get_ext(&range_from[0..13], Some(&snapshot))? {
375 Some(value) => {
376 let value = value.as_ref();
377 ref_by_mc_seqno = u32::from_le_bytes(value[64..68].try_into().unwrap());
378 BlockId {
379 shard: block_id.shard,
380 seqno: block_id.seqno,
381 root_hash: HashBytes::from_slice(&value[0..32]),
382 file_hash: HashBytes::from_slice(&value[32..64]),
383 }
384 }
385 None => return Ok(None),
386 };
387
388 let mut range_to = [0xff; tables::BlockTransactions::KEY_LEN];
389 range_to[0..13].copy_from_slice(&range_from[0..13]);
390
391 let mut readopts = self.db.block_transactions.new_read_config();
392 readopts.set_iterate_lower_bound(range_from.as_slice());
393 readopts.set_iterate_upper_bound(range_to.as_slice());
394 readopts.set_snapshot(&snapshot);
395
396 let rocksdb = self.db.rocksdb();
397 let block_transactions_cf = self.db.block_transactions.cf();
398 let mut iter = rocksdb.raw_iterator_cf_opt(&block_transactions_cf, readopts);
399
400 if reverse {
401 iter.seek_for_prev(range_to);
402 } else {
403 iter.seek(range_from);
404 }
405
406 if cursor.is_some()
407 && let Some(key) = iter.key()
408 && key == range_from.as_slice()
409 {
410 if reverse {
411 iter.prev();
412 } else {
413 iter.next();
414 }
415 }
416
417 Ok(Some(BlockTransactionIdsIter {
418 block_id,
419 ref_by_mc_seqno,
420 is_reversed: reverse,
421 inner: unsafe { weedb::OwnedRawIterator::new(rocksdb.clone(), iter) },
423 snapshot,
424 }))
425 }
426
427 pub fn get_transactions(
428 &self,
429 account: &StdAddr,
430 start_lt: Option<u64>,
431 end_lt: Option<u64>,
432 reverse: bool,
433 mut snapshot: Option<RpcSnapshot>,
434 ) -> Result<TransactionsIterBuilder> {
435 let mut start_lt = start_lt.unwrap_or_default();
436 let mut end_lt = end_lt.unwrap_or(u64::MAX);
437 if end_lt < start_lt {
438 start_lt = u64::MAX - 1;
440 end_lt = u64::MAX;
441 }
442
443 if snapshot.is_none() {
444 snapshot = self.snapshot.load_full().map(RpcSnapshot);
445 }
446 let snapshot = snapshot.unwrap_or_else(|| RpcSnapshot(Arc::new(self.db.owned_snapshot())));
447
448 let mut range_from = [0u8; tables::Transactions::KEY_LEN];
449 range_from[0] = account.workchain as u8;
450 range_from[1..33].copy_from_slice(account.address.as_ref());
451 range_from[33..41].copy_from_slice(&start_lt.to_be_bytes());
452 let mut range_to = range_from;
453 range_to[33..41].copy_from_slice(&end_lt.saturating_add(1).to_be_bytes());
456
457 let mut readopts = self.db.transactions.new_read_config();
458 readopts.set_snapshot(&snapshot);
459 readopts.set_iterate_lower_bound(range_from.as_slice());
460 readopts.set_iterate_upper_bound(range_to.as_slice());
461
462 let rocksdb = self.db.rocksdb();
463 let transactions_cf = self.db.transactions.cf();
464 let mut iter = rocksdb.raw_iterator_cf_opt(&transactions_cf, readopts);
465 if reverse {
466 iter.seek_for_prev(range_to.as_slice());
467 } else {
468 iter.seek(range_from.as_slice());
469 }
470 iter.status()?;
471
472 Ok(TransactionsIterBuilder {
473 is_reversed: reverse,
474 inner: unsafe { weedb::OwnedRawIterator::new(rocksdb.clone(), iter) },
476 snapshot,
477 })
478 }
479
480 pub fn get_transaction(
481 &self,
482 hash: &HashBytes,
483 snapshot: Option<&RpcSnapshot>,
484 ) -> Result<Option<TransactionData<'_>>> {
485 let table = &self.db.transactions_by_hash;
486 let Some(tx_info) = table.get_ext(hash, snapshot)? else {
487 return Ok(None);
488 };
489 let key = &tx_info.as_ref()[..Transactions::KEY_LEN];
490
491 let table = &self.db.transactions;
492 let tx = table.get_ext(key, snapshot)?;
493 Ok(tx.map(TransactionData::new))
494 }
495
496 pub fn get_transaction_ext<'db>(
497 &'db self,
498 hash: &HashBytes,
499 snapshot: Option<&RpcSnapshot>,
500 ) -> Result<Option<TransactionDataExt<'db>>> {
501 let table = &self.db.transactions_by_hash;
502 let Some(tx_info) = table.get_ext(hash, snapshot)? else {
503 return Ok(None);
504 };
505 let tx_info = tx_info.as_ref();
506 let Some(info) = TransactionInfo::from_bytes(tx_info) else {
507 return Ok(None);
508 };
509
510 let table = &self.db.transactions;
511 let tx = table.get_ext(&tx_info[..Transactions::KEY_LEN], snapshot)?;
512
513 Ok(tx.map(move |data| TransactionDataExt {
514 info,
515 data: TransactionData::new(data),
516 }))
517 }
518
519 pub fn get_transaction_info(
520 &self,
521 hash: &HashBytes,
522 snapshot: Option<&RpcSnapshot>,
523 ) -> Result<Option<TransactionInfo>> {
524 let table = &self.db.transactions_by_hash;
525 let Some(tx_info) = table.get_ext(hash, snapshot)? else {
526 return Ok(None);
527 };
528 Ok(TransactionInfo::from_bytes(&tx_info))
529 }
530
531 pub fn get_src_transaction<'db>(
532 &'db self,
533 account: &StdAddr,
534 message_lt: u64,
535 snapshot: Option<&RpcSnapshot>,
536 ) -> Result<Option<TransactionData<'db>>> {
537 let table = &self.db.transactions;
538
539 let owned_snapshot;
540 let snapshot = match snapshot {
541 Some(snapshot) => snapshot,
542 None => {
543 owned_snapshot = self
544 .load_snapshot()
545 .unwrap_or_else(|| RpcSnapshot(Arc::new(self.db.owned_snapshot())));
546 &owned_snapshot
547 }
548 };
549
550 let mut key = [0u8; tables::Transactions::KEY_LEN];
551 key[0] = account.workchain as u8;
552 key[1..33].copy_from_slice(account.address.as_slice());
553
554 let lower_bound = key;
555 key[33..41].copy_from_slice(&message_lt.to_be_bytes());
556
557 let mut readopts = table.new_read_config();
558 readopts.set_iterate_lower_bound(lower_bound);
559 readopts.set_iterate_upper_bound(key);
560 readopts.set_snapshot(snapshot);
561 let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&table.cf(), readopts);
562 iter.seek_for_prev(key.as_slice());
563
564 let Some(tx_key) = iter.key() else {
566 iter.status()?;
567 return Ok(None);
568 };
569 if tx_key[0..33] != key[0..33] {
570 return Ok(None);
571 }
572
573 let tx = table.get_ext(tx_key, Some(snapshot))?;
574
575 Ok(tx.map(TransactionData::new))
576 }
577
578 pub fn get_dst_transaction<'db>(
579 &'db self,
580 in_msg_hash: &HashBytes,
581 snapshot: Option<&RpcSnapshot>,
582 ) -> Result<Option<TransactionData<'db>>> {
583 let table = &self.db.transactions_by_in_msg;
584 let Some(key) = table.get_ext(in_msg_hash, snapshot)? else {
585 return Ok(None);
586 };
587
588 let table = &self.db.transactions;
589 let tx = table.get_ext(key, snapshot)?;
590 Ok(tx.map(TransactionData::new))
591 }
592
593 #[tracing::instrument(
594 level = "info",
595 name = "reset_accounts",
596 skip_all,
597 fields(shard = %shard_state.block_id().shard)
598 )]
599 pub async fn reset_accounts(
600 &self,
601 shard_state: ShardStateStuff,
602 split_depth: u8,
603 ) -> Result<()> {
604 let shard_ident = shard_state.block_id().shard;
605 let Ok(workchain) = i8::try_from(shard_ident.workchain()) else {
606 return Ok(());
607 };
608
609 tracing::info!("clearing old code hash indices");
610 let started_at = Instant::now();
611 self.remove_code_hashes(&shard_ident).await?;
612 tracing::info!(
613 elapsed = %humantime::format_duration(started_at.elapsed()),
614 "cleared old code hash indices"
615 );
616
617 let split = {
619 let guard = shard_state.ref_mc_state_handle().clone();
620
621 let mut virtual_shards = FastHashMap::default();
622 split_shard(
623 &shard_ident,
624 shard_state.state().load_accounts()?.dict(),
625 split_depth,
626 &mut virtual_shards,
627 )
628 .context("failed to split shard state into virtual shards")?;
629
630 drop(shard_state);
632 (guard, virtual_shards)
633 };
634
635 let cancelled = CancellationFlag::new();
636 scopeguard::defer! {
637 cancelled.cancel();
638 }
639
640 let db = self.db.clone();
642 let mut cancelled = cancelled.debounce(10000);
643 let span = tracing::Span::current();
644
645 tokio::task::spawn_blocking(move || {
647 let _span = span.enter();
648
649 let (_state_guard, virtual_shards) = split;
651
652 let guard = scopeguard::guard((), |_| {
653 tracing::warn!("cancelled");
654 });
655
656 tracing::info!(split_depth, "started building new code hash indices");
657 let started_at = Instant::now();
658
659 let raw = db.rocksdb().as_ref();
660 let code_hashes_cf = &db.code_hashes.cf();
661 let code_hashes_by_address_cf = &db.code_hashes_by_address.cf();
662
663 let mut non_empty_batch = false;
664 let mut write_batch = rocksdb::WriteBatch::default();
665
666 let mut code_hashes_key = [0u8; { tables::CodeHashes::KEY_LEN }];
668 code_hashes_key[32] = workchain as u8;
669
670 let mut code_hashes_by_address_key = [0u8; { tables::CodeHashesByAddress::KEY_LEN }];
671 code_hashes_by_address_key[0] = workchain as u8;
672
673 for (virtual_shard, accounts) in virtual_shards {
675 tracing::info!(%virtual_shard, "started collecting code hashes");
676 let started_at = Instant::now();
677
678 for entry in accounts.iter() {
679 if cancelled.check() {
680 anyhow::bail!("accounts reset cancelled");
681 }
682
683 let (id, (_, account)) = entry?;
684
685 let code_hash = match extract_code_hash(&account)? {
686 ExtractedCodeHash::Exact(Some(code_hash)) => code_hash,
687 ExtractedCodeHash::Exact(None) => continue,
688 ExtractedCodeHash::Skip => anyhow::bail!("code in account state is pruned"),
689 };
690
691 non_empty_batch |= true;
692
693 code_hashes_key[..32].copy_from_slice(code_hash.as_slice());
695 code_hashes_key[33..65].copy_from_slice(id.as_slice());
696
697 code_hashes_by_address_key[1..33].copy_from_slice(id.as_slice());
698
699 write_batch.put_cf(code_hashes_cf, code_hashes_key.as_slice(), []);
701 write_batch.put_cf(
702 code_hashes_by_address_cf,
703 code_hashes_by_address_key.as_slice(),
704 code_hash.as_slice(),
705 );
706 }
707
708 tracing::info!(
709 %virtual_shard,
710 elapsed = %humantime::format_duration(started_at.elapsed()),
711 "finished collecting code hashes",
712 );
713 }
714
715 if non_empty_batch {
716 raw.write_opt(write_batch, db.code_hashes.write_config())?;
717 }
718
719 tracing::info!(
720 elapsed = %humantime::format_duration(started_at.elapsed()),
721 "finished building new code hash indices"
722 );
723
724 tracing::info!("started flushing code hash indices");
726 let started_at = Instant::now();
727
728 let bound = Option::<[u8; 0]>::None;
729 raw.compact_range_cf(code_hashes_cf, bound, bound);
730 raw.compact_range_cf(code_hashes_by_address_cf, bound, bound);
731
732 scopeguard::ScopeGuard::into_inner(guard);
734 tracing::info!(
735 elapsed = %humantime::format_duration(started_at.elapsed()),
736 "finished flushing code hash indices"
737 );
738 Ok(())
739 })
740 .await?
741 }
742
743 #[tracing::instrument(level = "info", name = "remove_old_transactions", skip(self))]
744 pub async fn remove_old_transactions(
745 &self,
746 mc_seqno: u32,
747 min_lt: u64,
748 keep_tx_per_account: usize,
749 ) -> Result<()> {
750 const ITEMS_PER_BATCH: usize = 100000;
751
752 type TxKey = [u8; tables::Transactions::KEY_LEN];
753
754 enum PendingDelete {
755 Single,
756 Range,
757 }
758
759 struct GcState<'a> {
760 raw: &'a rocksdb::DB,
761 writeopt: &'a rocksdb::WriteOptions,
762 tx_cf: weedb::BoundedCfHandle<'a>,
763 tx_by_hash: weedb::BoundedCfHandle<'a>,
764 tx_by_in_msg: weedb::BoundedCfHandle<'a>,
765 key_range_begin: TxKey,
766 key_range_end: TxKey,
767 pending_delete: Option<PendingDelete>,
768 batch: rocksdb::WriteBatch,
769 total_tx: usize,
770 total_tx_by_hash: usize,
771 total_tx_by_in_msg: usize,
772 }
773
774 impl<'a> GcState<'a> {
775 fn new(db: &'a RpcDb) -> Self {
776 Self {
777 raw: db.rocksdb(),
778 writeopt: db.transactions.write_config(),
779 tx_cf: db.transactions.cf(),
780 tx_by_hash: db.transactions_by_hash.cf(),
781 tx_by_in_msg: db.transactions_by_in_msg.cf(),
782 key_range_begin: [0u8; tables::Transactions::KEY_LEN],
783 key_range_end: [0u8; tables::Transactions::KEY_LEN],
784 pending_delete: None,
785 batch: Default::default(),
786 total_tx: 0,
787 total_tx_by_hash: 0,
788 total_tx_by_in_msg: 0,
789 }
790 }
791
792 fn delete_tx(&mut self, key: &TxKey, value: &[u8]) {
793 self.pending_delete = Some(if self.pending_delete.is_none() {
795 self.key_range_end.copy_from_slice(key);
796 PendingDelete::Single
797 } else {
798 self.key_range_begin.copy_from_slice(key);
799 PendingDelete::Range
800 });
801 self.total_tx += 1;
802
803 assert!(value.len() >= 33);
805
806 let mask = TransactionMask::from_bits_retain(value[0]);
807
808 let tx_hash = &value[1..33];
810 self.batch.delete_cf(&self.tx_by_hash, tx_hash);
811 self.total_tx_by_hash += 1;
812
813 if mask.has_msg_hash() {
815 assert!(value.len() >= 65);
816
817 let in_msg_hash = &value[33..65];
818 self.batch.delete_cf(&self.tx_by_in_msg, in_msg_hash);
819 self.total_tx_by_in_msg += 1;
820 }
821 }
822
823 fn end_account(&mut self) {
824 if let Some(pending) = self.pending_delete.take() {
826 match pending {
827 PendingDelete::Single => self
828 .batch
829 .delete_cf(&self.tx_cf, self.key_range_end.as_slice()),
830 PendingDelete::Range => {
831 self.batch.delete_range_cf(
833 &self.tx_cf,
834 self.key_range_begin.as_slice(),
835 self.key_range_end.as_slice(),
836 );
837 self.batch
839 .delete_cf(&self.tx_cf, self.key_range_end.as_slice());
840 }
841 }
842 }
843 }
844
845 fn flush(&mut self) -> Result<()> {
846 self.raw
847 .write_opt(std::mem::take(&mut self.batch), self.writeopt)?;
848 Ok(())
849 }
850 }
851
852 if let Some(known_min_lt) = self.db.state.get(TX_MIN_LT)? {
853 let known_min_lt = u64::from_le_bytes(known_min_lt.as_ref().try_into().unwrap());
854 let was_running = matches!(
855 self.db.state.get(TX_GC_RUNNING)?,
856 Some(status) if !status.is_empty()
857 );
858
859 if !was_running && min_lt <= known_min_lt {
860 tracing::info!(known_min_lt, "skipping removal of old transactions");
861 return Ok(());
862 }
863 }
864
865 let cancelled = CancellationFlag::new();
866 scopeguard::defer! {
867 cancelled.cancel();
868 }
869
870 self.min_tx_lt.store(min_lt, Ordering::Release);
872
873 let db = self.db.clone();
874 let mut cancelled = cancelled.debounce(10000);
875 let span = tracing::Span::current();
876
877 tokio::task::spawn_blocking(move || {
879 let _span = span.enter();
880
881 let guard = scopeguard::guard((), |_| {
882 tracing::warn!("cancelled");
883 });
884
885 let raw = db.rocksdb().as_ref();
886
887 tracing::info!("started removing old transactions");
888 let started_at = Instant::now();
889
890 let snapshot = raw.snapshot();
892
893 'block: {
895 let mc_seqno = match mc_seqno.checked_sub(1) {
896 None | Some(0) => break 'block,
897 Some(seqno) => seqno,
898 };
899
900 let known_blocks = &db.known_blocks;
901 let blocks_by_mc_seqno = &db.blocks_by_mc_seqno;
902 let block_transactions = &db.block_transactions;
903
904 let mut key = [0u8; tables::BlocksByMcSeqno::KEY_LEN];
906 key[0..4].copy_from_slice(&mc_seqno.to_be_bytes());
907 key[4] = -1i8 as u8;
908 key[5..13].copy_from_slice(&ShardIdent::PREFIX_FULL.to_be_bytes());
909 key[13..17].copy_from_slice(&mc_seqno.to_be_bytes());
910
911 let Some(value) = snapshot.get_pinned_cf_opt(
912 &blocks_by_mc_seqno.cf(),
913 key,
914 blocks_by_mc_seqno.new_read_config(),
915 )?
916 else {
917 break 'block;
918 };
919 let value = value.as_ref();
920 debug_assert!(value.len() >= tables::BlocksByMcSeqno::VALUE_LEN + 4);
921
922 let shard_count = u32::from_le_bytes(
924 value[tables::BlocksByMcSeqno::VALUE_LEN
925 ..tables::BlocksByMcSeqno::VALUE_LEN + 4]
926 .try_into()
927 .unwrap(),
928 ) as usize;
929 let mut top_block_ids = Vec::with_capacity(1 + shard_count);
930 top_block_ids.push(BlockIdShort {
931 shard: ShardIdent::MASTERCHAIN,
932 seqno: mc_seqno,
933 });
934 for i in 0..shard_count {
935 let offset = tables::BlocksByMcSeqno::DESCR_OFFSET
936 + i * tables::BlocksByMcSeqno::DESCR_LEN;
937 let descr = &value[offset..offset + tables::BlocksByMcSeqno::DESCR_LEN];
938 top_block_ids.push(BlockIdShort {
939 shard: ShardIdent::new(
940 descr[0] as i8 as i32,
941 u64::from_le_bytes(descr[1..9].try_into().unwrap()),
942 )
943 .context("invalid top shard ident")?,
944 seqno: u32::from_le_bytes(descr[9..13].try_into().unwrap()),
945 });
946 }
947
948 let mut batch = rocksdb::WriteBatch::new();
950
951 let range_from = [0x00; tables::BlocksByMcSeqno::KEY_LEN];
953 let mut range_to = [0xff; tables::BlocksByMcSeqno::KEY_LEN];
954 range_to[0..4].copy_from_slice(&mc_seqno.to_be_bytes());
955 batch.delete_range_cf(&blocks_by_mc_seqno.cf(), range_from, range_to);
956 batch.delete_cf(&blocks_by_mc_seqno.cf(), range_to);
957
958 let mut range_from = [0x00; tables::BlockTransactions::KEY_LEN];
961 let mut range_to = [0xff; tables::BlockTransactions::KEY_LEN];
962 for block_id in top_block_ids {
963 range_from[0] = block_id.shard.workchain() as i8 as u8;
964 range_from[1..9].copy_from_slice(&block_id.shard.prefix().to_be_bytes());
965 range_from[9..13].copy_from_slice(&block_id.seqno.to_be_bytes());
966 range_to[0..13].copy_from_slice(&range_from[0..13]);
967 batch.delete_range_cf(&block_transactions.cf(), range_from, range_to);
968 batch.delete_cf(&block_transactions.cf(), range_to);
969
970 let range_from = &range_from[0..tables::KnownBlocks::KEY_LEN];
971 let range_to = &range_to[0..tables::KnownBlocks::KEY_LEN];
972 batch.delete_range_cf(&known_blocks.cf(), range_from, range_to);
973 batch.delete_cf(&known_blocks.cf(), range_to);
974 }
975
976 db.rocksdb()
978 .write(batch)
979 .context("failed to remove block transactions")?;
980 }
981
982 let mut readopts = db.transactions.new_read_config();
984 readopts.set_snapshot(&snapshot);
985 let mut iter = raw.raw_iterator_cf_opt(&db.transactions.cf(), readopts);
986 iter.seek_to_last();
987
988 let mut gc = GcState::new(&db);
990
991 let mut last_account: TxKey = [0u8; tables::Transactions::KEY_LEN];
997 last_account[33..41].copy_from_slice(&min_lt.to_be_bytes());
998
999 let mut items = 0usize;
1000 let mut total_invalid = 0usize;
1001 let mut iteration = 0usize;
1002 let mut tx_count = 0usize;
1003 loop {
1004 let Some((key, value)) = iter.item() else {
1005 break iter.status()?;
1006 };
1007 iteration += 1;
1008
1009 if cancelled.check() {
1010 anyhow::bail!("transactions GC cancelled");
1011 }
1012
1013 let Ok::<&TxKey, _>(key) = key.try_into() else {
1014 items += 1;
1016 total_invalid += 1;
1017 gc.batch.delete_cf(&gc.tx_cf, key);
1018 iter.prev();
1019 continue;
1020 };
1021
1022 let item_account = &key[..33];
1024 let is_prev_account = item_account != &last_account[..33];
1025 if is_prev_account {
1026 last_account[..33].copy_from_slice(item_account);
1028
1029 gc.end_account();
1031
1032 tx_count = 0;
1033 }
1034
1035 let lt = u64::from_be_bytes(key[33..41].try_into().unwrap());
1037
1038 if tx_count < keep_tx_per_account {
1039 tx_count += 1;
1041 iter.prev();
1042 } else if lt < min_lt {
1043 items += 1;
1045 gc.delete_tx(key, value);
1046 iter.prev();
1047 } else if lt > 0 {
1048 iter.seek_for_prev(last_account.as_slice());
1051 } else {
1052 iter.prev();
1054 }
1055
1056 if items >= ITEMS_PER_BATCH {
1058 tracing::info!(iteration, "flushing batch");
1059 gc.flush()?;
1060 items = 0;
1061 }
1062 }
1063
1064 gc.end_account();
1066
1067 if items != 0 {
1069 gc.flush()?;
1070 }
1071
1072 raw.put(TX_GC_RUNNING, [])?;
1074
1075 scopeguard::ScopeGuard::into_inner(guard);
1077 tracing::info!(
1078 elapsed = %humantime::format_duration(started_at.elapsed()),
1079 total_invalid,
1080 total_tx = gc.total_tx,
1081 total_tx_by_hash = gc.total_tx_by_hash,
1082 total_tx_by_in_msg = gc.total_tx_by_in_msg,
1083 "finished removing old transactions"
1084 );
1085 Ok(())
1086 })
1087 .await?
1088 }
1089
1090 #[tracing::instrument(level = "info", name = "update", skip_all, fields(block_id = %block.id()))]
1091 pub async fn update(
1092 &self,
1093 mc_block_id: &BlockId,
1094 block: BlockStuff,
1095 rpc_blacklist: Option<&BlacklistedAccounts>,
1096 subscriptions: &super::subscriptions::RpcSubscriptions,
1097 ) -> Result<()> {
1098 let Ok(workchain) = i8::try_from(block.id().shard.workchain()) else {
1099 return Ok(());
1100 };
1101
1102 let is_masterchain = block.id().is_masterchain();
1103 let mc_seqno = mc_block_id.seqno;
1104
1105 let shard_hashes = is_masterchain
1106 .then(|| {
1107 let custom = block.load_custom()?;
1108 Ok::<_, anyhow::Error>(custom.shards.clone())
1109 })
1110 .transpose()?;
1111
1112 let span = tracing::Span::current();
1113 let db = self.db.clone();
1114
1115 let rpc_blacklist = rpc_blacklist.map(|x| x.load());
1116
1117 let (start_lt, updates) = tokio::task::spawn_blocking(move || {
1119 let prepare_batch_histogram =
1120 HistogramGuard::begin("tycho_storage_rpc_prepare_batch_time");
1121
1122 let _span = span.enter();
1123
1124 let info = block.load_info()?;
1125 let extra = block.load_extra()?;
1126
1127 let mut updates = Some(FastHashMap::default());
1128
1129 let account_blocks = extra.account_blocks.load()?;
1130
1131 let accounts = if account_blocks.is_empty() {
1132 Dict::new()
1133 } else {
1134 let merkle_update = block.as_ref().state_update.load()?;
1135
1136 let get_accounts = |cell: Cell| {
1138 let mut cs = cell.as_slice()?;
1139 cs.skip_first(0, 1)?;
1140 cs.load_reference_cloned().map(Cell::virtualize)
1141 };
1142
1143 let old_accounts = get_accounts(merkle_update.old)?;
1144 let new_accounts = get_accounts(merkle_update.new)?;
1145
1146 if old_accounts.repr_hash() == new_accounts.repr_hash() {
1147 Dict::new()
1148 } else {
1149 let accounts = Lazy::<ShardAccounts>::from_raw(new_accounts)?.load()?;
1150 let (accounts, _) = accounts.into_parts();
1151 accounts
1152 }
1153 };
1154
1155 let mut write_batch = rocksdb::WriteBatch::default();
1156 let tx_cf = &db.transactions.cf();
1157 let tx_by_hash_cf = &db.transactions_by_hash.cf();
1158 let tx_by_in_msg_cf = &db.transactions_by_in_msg.cf();
1159 let block_txs_cf = &db.block_transactions.cf();
1160
1161 let mut tx_info = [0u8; tables::TransactionsByHash::VALUE_FULL_LEN];
1163 tx_info[0] = workchain as u8;
1164
1165 let block_id = block.id();
1166 tx_info[41] = block_id.shard.prefix_len() as u8;
1167 tx_info[42..46].copy_from_slice(&block_id.seqno.to_le_bytes());
1168 tx_info[46..78].copy_from_slice(block_id.root_hash.as_slice());
1169 tx_info[78..110].copy_from_slice(block_id.file_hash.as_slice());
1170 tx_info[110..114].copy_from_slice(&mc_seqno.to_le_bytes());
1171
1172 let mut block_tx = [0u8; tables::BlockTransactions::KEY_LEN];
1173 block_tx[0] = workchain as u8;
1174 block_tx[1..9].copy_from_slice(&block_id.shard.prefix().to_be_bytes());
1175 block_tx[9..13].copy_from_slice(&block_id.seqno.to_be_bytes());
1176
1177 let mut buffer = Vec::with_capacity(64 + BriefBlockInfo::MIN_BYTE_LEN);
1179
1180 {
1182 let mut key = [0u8; tables::BlocksByMcSeqno::KEY_LEN];
1183 key[0..4].copy_from_slice(&mc_seqno.to_be_bytes());
1184 key[4] = workchain as u8;
1185 key[5..13].copy_from_slice(&block_id.shard.prefix().to_be_bytes());
1186 key[13..17].copy_from_slice(&block_id.seqno.to_be_bytes());
1187
1188 buffer.clear();
1189 buffer.extend_from_slice(block_id.root_hash.as_slice()); buffer.extend_from_slice(block_id.file_hash.as_slice()); buffer.extend_from_slice(&info.start_lt.to_le_bytes()); buffer.extend_from_slice(&info.end_lt.to_le_bytes()); if let Some(shard_hashes) = shard_hashes {
1194 let shards = shard_hashes
1195 .iter()
1196 .filter_map(|item| {
1197 let (shard_ident, descr) = match item {
1198 Ok(item) => item,
1199 Err(e) => return Some(Err(e)),
1200 };
1201 if i8::try_from(shard_ident.workchain()).is_err() {
1202 return None;
1203 }
1204
1205 Some(Ok(BriefShardDescr {
1206 shard_ident,
1207 seqno: descr.seqno,
1208 root_hash: descr.root_hash,
1209 file_hash: descr.file_hash,
1210 start_lt: descr.start_lt,
1211 end_lt: descr.end_lt,
1212 }))
1213 })
1214 .collect::<Result<Vec<_>, _>>()?;
1215
1216 buffer.reserve(4 + tables::BlocksByMcSeqno::DESCR_LEN * shards.len());
1217 buffer.extend_from_slice(&(shards.len() as u32).to_le_bytes());
1218 for shard in shards {
1219 buffer.push(shard.shard_ident.workchain() as i8 as u8);
1220 buffer.extend_from_slice(&shard.shard_ident.prefix().to_le_bytes());
1221 buffer.extend_from_slice(&shard.seqno.to_le_bytes());
1222 buffer.extend_from_slice(shard.root_hash.as_slice());
1223 buffer.extend_from_slice(shard.file_hash.as_slice());
1224 buffer.extend_from_slice(&shard.start_lt.to_le_bytes());
1225 buffer.extend_from_slice(&shard.end_lt.to_le_bytes());
1226 }
1227 }
1228
1229 write_batch.put_cf(&db.blocks_by_mc_seqno.cf(), key, buffer.as_slice());
1230 }
1231
1232 let rpc_blacklist = rpc_blacklist.as_deref();
1233
1234 let mut block_tx_count = 0usize;
1236 for item in account_blocks.iter() {
1237 let (account, _, account_block) = item?;
1238
1239 tx_info[1..33].copy_from_slice(account.as_slice());
1241 block_tx[13..45].copy_from_slice(account.as_slice());
1242
1243 let mut has_special_actions = false;
1245 let mut was_active = false;
1246 let mut is_active = false;
1247
1248 let mut first_tx = true;
1250 for item in account_block.transactions.values() {
1251 let (_, tx_cell) = item?;
1252
1253 block_tx_count += 1;
1255
1256 let tx = tx_cell.load()?;
1257
1258 tx_info[33..41].copy_from_slice(&tx.lt.to_be_bytes());
1259 block_tx[45..53].copy_from_slice(&tx.lt.to_be_bytes());
1260
1261 if first_tx {
1263 was_active = tx.orig_status == AccountStatus::Active;
1265 first_tx = false;
1266 }
1267 if was_active && tx.orig_status != AccountStatus::Active {
1268 has_special_actions = true;
1272 }
1273 is_active = tx.end_status == AccountStatus::Active;
1274
1275 if !has_special_actions {
1276 let info = tx.load_info()?;
1278 let action_phase = match &info {
1279 TxInfo::Ordinary(info) => info.action_phase.as_ref(),
1280 TxInfo::TickTock(info) => info.action_phase.as_ref(),
1281 };
1282 if let Some(action_phase) = action_phase {
1283 has_special_actions |= action_phase.special_actions > 0;
1284 }
1285 }
1286
1287 if let Some(blacklist) = &rpc_blacklist
1289 && blacklist.contains(&tx_info[..33])
1290 {
1291 continue;
1292 }
1293
1294 if let Some(ref mut map) = updates {
1295 let entry = map.entry(account).or_insert(tx.lt);
1296 if tx.lt > *entry {
1297 *entry = tx.lt;
1298 }
1299 }
1300
1301 let tx_hash = tx_cell.inner().repr_hash();
1302 let (tx_mask, msg_hash) = match &tx.in_msg {
1303 Some(in_msg) => {
1304 let hash = Some(in_msg.repr_hash());
1305 let mask = TransactionMask::HAS_MSG_HASH;
1306 (mask, hash)
1307 }
1308 None => (TransactionMask::empty(), None),
1309 };
1310
1311 buffer.clear();
1313 buffer.push(tx_mask.bits());
1314 buffer.extend_from_slice(tx_hash.as_slice());
1315 if let Some(msg_hash) = msg_hash {
1316 buffer.extend_from_slice(msg_hash.as_slice());
1317 }
1318 tycho_types::boc::ser::BocHeader::<ahash::RandomState>::with_root(
1319 tx_cell.inner().as_ref(),
1320 )
1321 .encode(&mut buffer);
1322
1323 write_batch.put_cf(tx_by_hash_cf, tx_hash.as_slice(), tx_info.as_slice());
1325 write_batch.put_cf(block_txs_cf, block_tx.as_slice(), tx_hash.as_slice());
1326
1327 if let Some(msg_hash) = msg_hash {
1328 write_batch.put_cf(
1329 tx_by_in_msg_cf,
1330 msg_hash,
1331 &tx_info[..tables::Transactions::KEY_LEN],
1332 );
1333 }
1334
1335 write_batch.put_cf(tx_cf, &tx_info[..tables::Transactions::KEY_LEN], &buffer);
1336 }
1337
1338 let update = if is_active && (!was_active || has_special_actions) {
1340 Some(false)
1344 } else if was_active && !is_active {
1345 Some(true)
1348 } else {
1349 None
1351 };
1352
1353 if let Some(remove) = update {
1355 Self::update_code_hash(
1356 &db,
1357 workchain,
1358 &account,
1359 &accounts,
1360 remove,
1361 &mut write_batch,
1362 )?;
1363 }
1364 }
1365
1366 let brief_block_info =
1368 BriefBlockInfo::new(block.as_ref(), info, extra, block_tx_count)?;
1369 buffer.clear();
1370 buffer.extend_from_slice(&tx_info[46..114]); brief_block_info.write_to_bytes(&mut buffer); write_batch.put_cf(
1374 &db.known_blocks.cf(),
1375 &block_tx[0..tables::KnownBlocks::KEY_LEN],
1376 buffer.as_slice(),
1377 );
1378
1379 drop(prepare_batch_histogram);
1380
1381 let _execute_batch_histogram =
1382 HistogramGuard::begin("tycho_storage_rpc_execute_batch_time");
1383
1384 db.rocksdb()
1385 .write_opt(write_batch, db.transactions.write_config())?;
1386
1387 let updates = updates
1388 .map(|map| {
1389 map.into_iter()
1390 .map(|(address, max_lt)| super::subscriptions::AccountUpdate {
1391 address: StdAddr::new(workchain, address),
1392 max_lt,
1393 gen_utime: info.gen_utime,
1394 })
1395 .collect::<Vec<_>>()
1396 })
1397 .unwrap_or_default();
1398
1399 Ok::<_, anyhow::Error>((info.start_lt, updates))
1400 })
1401 .await??;
1402
1403 'min_lt: {
1405 if start_lt < self.min_tx_lt.fetch_min(start_lt, Ordering::Release) {
1408 let _guard = self.min_tx_lt_guard.lock().await;
1410
1411 if start_lt > self.min_tx_lt.load(Ordering::Acquire) {
1414 break 'min_lt;
1415 }
1416
1417 self.db.state.insert(TX_MIN_LT, start_lt.to_le_bytes())?;
1419 }
1420 }
1421
1422 if !updates.is_empty() {
1423 subscriptions.fanout_updates(updates).await;
1424 }
1425
1426 Ok(())
1427 }
1428
1429 fn update_code_hash(
1430 db: &RpcDb,
1431 workchain: i8,
1432 account: &HashBytes,
1433 accounts: &ShardAccountsDict,
1434 remove: bool,
1435 write_batch: &mut rocksdb::WriteBatch,
1436 ) -> Result<()> {
1437 let new_code_hash = 'code_hash: {
1439 if !remove && let Some((_, account)) = accounts.get(account)? {
1440 match extract_code_hash(&account)? {
1441 ExtractedCodeHash::Exact(hash) => break 'code_hash hash,
1442 ExtractedCodeHash::Skip => return Ok(()),
1443 }
1444 }
1445 None
1446 };
1447
1448 let code_hashes_cf = &db.code_hashes.cf();
1450 let code_hashes_by_address_cf = &db.code_hashes_by_address.cf();
1451
1452 let mut code_hashes_by_address_id = [0u8; tables::CodeHashesByAddress::KEY_LEN];
1454 code_hashes_by_address_id[0] = workchain as u8;
1455 code_hashes_by_address_id[1..33].copy_from_slice(account.as_slice());
1456
1457 let old_code_hash = db
1459 .code_hashes_by_address
1460 .get(code_hashes_by_address_id.as_slice())?;
1461
1462 if remove && old_code_hash.is_none()
1463 || matches!(
1464 (&old_code_hash, &new_code_hash),
1465 (Some(old), Some(new)) if old.as_ref() == new.as_slice()
1466 )
1467 {
1468 return Ok(());
1470 }
1471
1472 let mut code_hashes_id = [0u8; tables::CodeHashes::KEY_LEN];
1473 code_hashes_id[32] = workchain as u8;
1474 code_hashes_id[33..65].copy_from_slice(account.as_slice());
1475
1476 if let Some(old_code_hash) = old_code_hash {
1478 code_hashes_id[..32].copy_from_slice(&old_code_hash);
1479 write_batch.delete_cf(code_hashes_cf, code_hashes_id.as_slice());
1480 }
1481
1482 match new_code_hash {
1483 Some(new_code_hash) => {
1484 code_hashes_id[..32].copy_from_slice(new_code_hash.as_slice());
1486 write_batch.put_cf(
1487 code_hashes_cf,
1488 code_hashes_id.as_slice(),
1489 new_code_hash.as_slice(),
1490 );
1491
1492 write_batch.put_cf(
1494 code_hashes_by_address_cf,
1495 code_hashes_by_address_id.as_slice(),
1496 new_code_hash.as_slice(),
1497 );
1498 }
1499 None => {
1500 write_batch.delete_cf(
1502 code_hashes_by_address_cf,
1503 code_hashes_by_address_id.as_slice(),
1504 );
1505 }
1506 }
1507
1508 Ok(())
1509 }
1510
1511 async fn remove_code_hashes(&self, shard: &ShardIdent) -> Result<()> {
1512 let workchain = shard.workchain() as u8;
1513
1514 {
1516 let mut from = [0u8; { tables::CodeHashesByAddress::KEY_LEN }];
1517 from[0] = workchain;
1518
1519 {
1520 let [_, from @ ..] = &mut from;
1521 extend_account_prefix(shard, false, from);
1522 }
1523
1524 let mut to = from;
1525 {
1526 let [_, to @ ..] = &mut to;
1527 extend_account_prefix(shard, true, to);
1528 }
1529
1530 let raw = self.db.rocksdb();
1531 let cf = &self.db.code_hashes_by_address.cf();
1532 let writeopts = self.db.code_hashes_by_address.write_config();
1533
1534 raw.delete_range_cf_opt(cf, &from, &to, writeopts)?;
1536 raw.delete_cf_opt(cf, to, writeopts)?;
1538 }
1539
1540 let cancelled = CancellationFlag::new();
1541 scopeguard::defer! {
1542 cancelled.cancel();
1543 }
1544
1545 let db = self.db.clone();
1547 let mut cancelled = cancelled.debounce(1000);
1548 let shard = *shard;
1549 let span = tracing::Span::current();
1550
1551 tokio::task::spawn_blocking(move || {
1553 let _span = span.enter();
1554
1555 let guard = scopeguard::guard((), |_| {
1556 tracing::warn!("cancelled");
1557 });
1558
1559 let cf = &db.code_hashes.cf();
1560
1561 let raw = db.rocksdb().as_ref();
1562 let snapshot = raw.snapshot();
1563 let mut readopts = db.code_hashes.new_read_config();
1564 readopts.set_snapshot(&snapshot);
1565
1566 let writeopts = db.code_hashes.write_config();
1567
1568 let mut iter = raw.raw_iterator_cf_opt(cf, readopts);
1569 iter.seek_to_first();
1570
1571 let mut prefix = shard.prefix();
1572 let tag = extract_tag(&shard);
1573 prefix -= tag; let prefix_mask = !(tag | (tag - 1));
1577
1578 loop {
1579 let key = match iter.key() {
1580 Some(key) => key,
1581 None => break iter.status()?,
1582 };
1583
1584 if cancelled.check() {
1585 anyhow::bail!("remove_code_hashes cancelled");
1586 }
1587
1588 if key.len() != tables::CodeHashes::KEY_LEN
1589 || key[32] == workchain
1590 && (shard.is_full() || {
1591 let key = u64::from_be_bytes(key[33..41].try_into().unwrap());
1593 (key ^ prefix) & prefix_mask == 0
1594 })
1595 {
1596 raw.delete_cf_opt(cf, key, writeopts)?;
1597 }
1598
1599 iter.next();
1600 }
1601
1602 scopeguard::ScopeGuard::into_inner(guard);
1603 Ok(())
1604 })
1605 .await?
1606 }
1607}
1608
1609trait TableExt {
1610 fn get_ext<'db, K: AsRef<[u8]>>(
1611 &'db self,
1612 key: K,
1613 snapshot: Option<&RpcSnapshot>,
1614 ) -> Result<Option<rocksdb::DBPinnableSlice<'db>>>;
1615}
1616
1617impl<T: weedb::ColumnFamily> TableExt for weedb::Table<T> {
1618 fn get_ext<'db, K: AsRef<[u8]>>(
1619 &'db self,
1620 key: K,
1621 snapshot: Option<&RpcSnapshot>,
1622 ) -> Result<Option<rocksdb::DBPinnableSlice<'db>>> {
1623 match snapshot {
1624 None => self.get(key),
1625 Some(snapshot) => {
1626 anyhow::ensure!(
1627 Arc::ptr_eq(snapshot.db(), self.db()),
1628 "snapshot must be made for the same DB instance"
1629 );
1630
1631 let mut readopts = self.new_read_config();
1632 readopts.set_snapshot(snapshot);
1633 self.db().get_pinned_cf_opt(&self.cf(), key, &readopts)
1634 }
1635 }
1636 .map_err(Into::into)
1637 }
1638}
1639
1640#[derive(Debug)]
1641pub struct BriefShardDescr {
1642 pub shard_ident: ShardIdent,
1643 pub seqno: u32,
1644 pub root_hash: HashBytes,
1645 pub file_hash: HashBytes,
1646 pub start_lt: u64,
1647 pub end_lt: u64,
1648}
1649
1650#[derive(Debug, Clone)]
1651pub struct BriefBlockInfo {
1652 pub global_id: i32,
1653 pub version: u32,
1654 pub flags: u8,
1655 pub after_merge: bool,
1656 pub after_split: bool,
1657 pub before_split: bool,
1658 pub want_merge: bool,
1659 pub want_split: bool,
1660 pub validator_list_hash_short: u32,
1661 pub catchain_seqno: u32,
1662 pub min_ref_mc_seqno: u32,
1663 pub is_key_block: bool,
1664 pub prev_key_block_seqno: u32,
1665 pub start_lt: u64,
1666 pub end_lt: u64,
1667 pub gen_utime: u32,
1668 pub vert_seqno: u32,
1669 pub rand_seed: HashBytes,
1670 pub tx_count: u32,
1671 pub master_ref: Option<BlockId>,
1672 pub prev_blocks: Vec<BlockId>,
1673}
1674
1675impl BriefBlockInfo {
1676 const VERSION: u8 = 0;
1677 const MIN_BYTE_LEN: usize = 256;
1678
1679 fn new(
1680 block: &Block,
1681 info: &BlockInfo,
1682 extra: &BlockExtra,
1683 tx_count: usize,
1684 ) -> Result<Self, tycho_types::error::Error> {
1685 let shard_ident = info.shard;
1686 let prev_blocks = match info.load_prev_ref()? {
1687 PrevBlockRef::Single(block_ref) => vec![block_ref.as_block_id(shard_ident)],
1688 PrevBlockRef::AfterMerge { left, right } => vec![
1689 left.as_block_id(shard_ident),
1690 right.as_block_id(shard_ident),
1691 ],
1692 };
1693
1694 Ok(Self {
1695 global_id: block.global_id,
1696 version: info.version,
1697 flags: info.flags,
1698 after_merge: info.after_merge,
1699 after_split: info.after_split,
1700 before_split: info.before_split,
1701 want_merge: info.want_merge,
1702 want_split: info.want_split,
1703 validator_list_hash_short: info.gen_validator_list_hash_short,
1704 catchain_seqno: info.gen_catchain_seqno,
1705 min_ref_mc_seqno: info.min_ref_mc_seqno,
1706 is_key_block: info.key_block,
1707 prev_key_block_seqno: info.prev_key_block_seqno,
1708 start_lt: info.start_lt,
1709 end_lt: info.end_lt,
1710 gen_utime: info.gen_utime,
1711 vert_seqno: info.vert_seqno,
1712 rand_seed: extra.rand_seed,
1713 tx_count: tx_count.try_into().unwrap_or(u32::MAX),
1714 master_ref: info
1715 .load_master_ref()?
1716 .map(|r| r.as_block_id(ShardIdent::MASTERCHAIN)),
1717 prev_blocks,
1718 })
1719 }
1720
1721 fn write_to_bytes(&self, target: &mut Vec<u8>) {
1722 let packed_flags = ((self.master_ref.is_some() as u8) << 7)
1724 | ((self.after_merge as u8) << 6)
1725 | ((self.before_split as u8) << 5)
1726 | ((self.after_split as u8) << 4)
1727 | ((self.want_split as u8) << 3)
1728 | ((self.want_merge as u8) << 2)
1729 | ((self.is_key_block as u8) << 1);
1730
1731 target.reserve(Self::MIN_BYTE_LEN);
1732 target.push(Self::VERSION);
1733 target.extend_from_slice(&self.global_id.to_le_bytes());
1734 target.extend_from_slice(&self.version.to_le_bytes());
1735 target.push(self.flags);
1736 target.push(packed_flags);
1737 target.extend_from_slice(&self.validator_list_hash_short.to_le_bytes());
1738 target.extend_from_slice(&self.catchain_seqno.to_le_bytes());
1739 target.extend_from_slice(&self.min_ref_mc_seqno.to_le_bytes());
1740 target.extend_from_slice(&self.prev_key_block_seqno.to_le_bytes());
1741 target.extend_from_slice(&self.start_lt.to_le_bytes());
1742 target.extend_from_slice(&self.end_lt.to_le_bytes());
1743 target.extend_from_slice(&self.gen_utime.to_le_bytes());
1744 target.extend_from_slice(&self.vert_seqno.to_le_bytes());
1745 target.extend_from_slice(self.rand_seed.as_slice());
1746 target.extend_from_slice(&self.tx_count.to_le_bytes());
1747 if let Some(block_id) = &self.master_ref {
1748 target.extend_from_slice(&block_id.seqno.to_le_bytes());
1749 target.extend_from_slice(block_id.root_hash.as_slice());
1750 target.extend_from_slice(block_id.file_hash.as_slice());
1751 }
1752 target.push(self.prev_blocks.len() as u8);
1753 for block_id in &self.prev_blocks {
1754 target.extend_from_slice(&block_id.shard.prefix().to_le_bytes());
1755 target.extend_from_slice(&block_id.seqno.to_le_bytes());
1756 target.extend_from_slice(block_id.root_hash.as_slice());
1757 target.extend_from_slice(block_id.file_hash.as_slice());
1758 }
1759 }
1760
1761 fn load_from_bytes(workchain: i32, mut bytes: &[u8]) -> Option<Self> {
1762 use bytes::Buf;
1763
1764 if bytes.get_u8() != Self::VERSION {
1765 return None;
1766 }
1767
1768 let global_id = bytes.get_i32_le();
1769 let version = bytes.get_u32_le();
1770 let [flags, packed_flags] = bytes.get_u16().to_be_bytes();
1771 let validator_list_hash_short = bytes.get_u32_le();
1772 let catchain_seqno = bytes.get_u32_le();
1773 let min_ref_mc_seqno = bytes.get_u32_le();
1774 let prev_key_block_seqno = bytes.get_u32_le();
1775 let start_lt = bytes.get_u64_le();
1776 let end_lt = bytes.get_u64_le();
1777 let gen_utime = bytes.get_u32_le();
1778 let vert_seqno = bytes.get_u32_le();
1779 let rand_seed = HashBytes::from_slice(&bytes[..32]);
1780 bytes = &bytes[32..];
1781 let tx_count = bytes.get_u32_le();
1782
1783 let master_ref = if packed_flags & 0b10000000 != 0 {
1784 let seqno = bytes.get_u32_le();
1785 let root_hash = HashBytes::from_slice(&bytes[0..32]);
1786 let file_hash = HashBytes::from_slice(&bytes[32..64]);
1787 bytes = &bytes[64..];
1788 Some(BlockId {
1789 shard: ShardIdent::MASTERCHAIN,
1790 seqno,
1791 root_hash,
1792 file_hash,
1793 })
1794 } else {
1795 None
1796 };
1797
1798 let prev_block_count = bytes.get_u8();
1799 let mut prev_blocks = Vec::with_capacity(prev_block_count as _);
1800 for _ in 0..prev_block_count {
1801 prev_blocks.push(BlockId {
1802 shard: ShardIdent::new(
1803 workchain,
1804 u64::from_le_bytes(bytes[0..8].try_into().unwrap()),
1805 )
1806 .unwrap(),
1807 seqno: u32::from_le_bytes(bytes[8..12].try_into().unwrap()),
1808 root_hash: HashBytes::from_slice(&bytes[12..44]),
1809 file_hash: HashBytes::from_slice(&bytes[44..76]),
1810 });
1811 bytes = &bytes[76..];
1812 }
1813
1814 Some(Self {
1815 global_id,
1816 version,
1817 flags,
1818 after_merge: packed_flags & 0b01000000 != 0,
1819 after_split: packed_flags & 0b00010000 != 0,
1820 before_split: packed_flags & 0b00100000 != 0,
1821 want_merge: packed_flags & 0b00000100 != 0,
1822 want_split: packed_flags & 0b00001000 != 0,
1823 validator_list_hash_short,
1824 catchain_seqno,
1825 min_ref_mc_seqno,
1826 is_key_block: packed_flags & 0b00000010 != 0,
1827 prev_key_block_seqno,
1828 start_lt,
1829 end_lt,
1830 gen_utime,
1831 vert_seqno,
1832 rand_seed,
1833 tx_count,
1834 master_ref,
1835 prev_blocks,
1836 })
1837 }
1838}
1839
1840#[derive(Clone)]
1841#[repr(transparent)]
1842pub struct RpcSnapshot(Arc<weedb::OwnedSnapshot>);
1843
1844impl std::ops::Deref for RpcSnapshot {
1845 type Target = weedb::OwnedSnapshot;
1846
1847 #[inline]
1848 fn deref(&self) -> &Self::Target {
1849 self.0.as_ref()
1850 }
1851}
1852
1853pub struct BlocksByMcSeqnoIter {
1854 mc_seqno: u32,
1855 inner: weedb::OwnedRawIterator,
1856 snapshot: RpcSnapshot,
1857}
1858
1859impl BlocksByMcSeqnoIter {
1860 pub fn mc_seqno(&self) -> u32 {
1861 self.mc_seqno
1862 }
1863
1864 pub fn snapshot(&self) -> &RpcSnapshot {
1865 &self.snapshot
1866 }
1867}
1868
1869impl Iterator for BlocksByMcSeqnoIter {
1870 type Item = BlockId;
1872
1873 fn next(&mut self) -> Option<Self::Item> {
1874 let (key, value) = self.inner.item()?;
1875 let shard = ShardIdent::new(
1876 key[4] as i8 as i32,
1877 u64::from_be_bytes(key[5..13].try_into().unwrap()),
1878 )
1879 .expect("stored shard must have a valid prefix");
1880 let seqno = u32::from_be_bytes(key[13..17].try_into().unwrap());
1881
1882 let block_id = BlockId {
1883 shard,
1884 seqno,
1885 root_hash: HashBytes::from_slice(&value[0..32]),
1886 file_hash: HashBytes::from_slice(&value[32..64]),
1887 };
1888 self.inner.next();
1889
1890 Some(block_id)
1891 }
1892}
1893
1894pub struct CodeHashesIter<'a> {
1895 inner: rocksdb::DBRawIterator<'a>,
1896 snapshot: RpcSnapshot,
1897}
1898
1899impl<'a> CodeHashesIter<'a> {
1900 pub fn snapshot(&self) -> &RpcSnapshot {
1901 &self.snapshot
1902 }
1903
1904 pub fn into_raw(self) -> RawCodeHashesIter<'a> {
1905 RawCodeHashesIter {
1906 inner: self.inner,
1907 snapshot: self.snapshot,
1908 }
1909 }
1910}
1911
1912impl Iterator for CodeHashesIter<'_> {
1913 type Item = StdAddr;
1914
1915 fn next(&mut self) -> Option<Self::Item> {
1916 let value = self.inner.key()?;
1917 debug_assert!(value.len() == tables::CodeHashes::KEY_LEN);
1918
1919 let result = Some(StdAddr {
1920 anycast: None,
1921 workchain: value[32] as i8,
1922 address: HashBytes(value[33..65].try_into().unwrap()),
1923 });
1924 self.inner.next();
1925 result
1926 }
1927}
1928
1929pub struct RawCodeHashesIter<'a> {
1930 inner: rocksdb::DBRawIterator<'a>,
1931 snapshot: RpcSnapshot,
1932}
1933
1934impl RawCodeHashesIter<'_> {
1935 pub fn snapshot(&self) -> &RpcSnapshot {
1936 &self.snapshot
1937 }
1938}
1939
1940impl Iterator for RawCodeHashesIter<'_> {
1941 type Item = [u8; 33];
1942
1943 fn next(&mut self) -> Option<Self::Item> {
1944 let value = self.inner.key()?;
1945 debug_assert!(value.len() == tables::CodeHashes::KEY_LEN);
1946
1947 let result = Some(value[32..65].try_into().unwrap());
1948 self.inner.next();
1949 result
1950 }
1951}
1952
1953#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
1954pub struct BlockTransactionsCursor {
1955 pub hash: HashBytes,
1956 pub lt: u64,
1957}
1958
1959pub struct BlockTransactionIdsIter {
1960 block_id: BlockId,
1961 ref_by_mc_seqno: u32,
1962 is_reversed: bool,
1963 inner: weedb::OwnedRawIterator,
1964 snapshot: RpcSnapshot,
1965}
1966
1967impl BlockTransactionIdsIter {
1968 pub fn is_reversed(&self) -> bool {
1969 self.is_reversed
1970 }
1971
1972 pub fn block_id(&self) -> &BlockId {
1973 &self.block_id
1974 }
1975
1976 pub fn ref_by_mc_seqno(&self) -> u32 {
1977 self.ref_by_mc_seqno
1978 }
1979
1980 pub fn snapshot(&self) -> &RpcSnapshot {
1981 &self.snapshot
1982 }
1983}
1984
1985impl Iterator for BlockTransactionIdsIter {
1986 type Item = FullTransactionId;
1987
1988 fn next(&mut self) -> Option<Self::Item> {
1989 let (key, value) = self.inner.item()?;
1990 let res = Some(FullTransactionId {
1991 account: StdAddr::new(key[0] as i8, HashBytes::from_slice(&key[13..45])),
1992 lt: u64::from_be_bytes(key[45..53].try_into().unwrap()),
1993 hash: HashBytes::from_slice(&value[0..32]),
1994 });
1995 if self.is_reversed {
1996 self.inner.prev();
1997 } else {
1998 self.inner.next();
1999 }
2000 res
2001 }
2002}
2003
2004pub struct BlockTransactionsIterBuilder {
2005 ids: BlockTransactionIdsIter,
2006 transactions_cf: weedb::UnboundedCfHandle,
2007}
2008
2009impl BlockTransactionsIterBuilder {
2010 #[inline]
2011 pub fn is_reversed(&self) -> bool {
2012 self.ids.is_reversed()
2013 }
2014
2015 #[inline]
2016 pub fn block_id(&self) -> &BlockId {
2017 self.ids.block_id()
2018 }
2019
2020 #[inline]
2021 pub fn ref_by_mc_seqno(&self) -> u32 {
2022 self.ids.ref_by_mc_seqno()
2023 }
2024
2025 #[inline]
2026 pub fn snapshot(&self) -> &RpcSnapshot {
2027 self.ids.snapshot()
2028 }
2029
2030 #[inline]
2031 pub fn into_ids(self) -> BlockTransactionIdsIter {
2032 self.ids
2033 }
2034
2035 pub fn map<F, R>(self, map: F) -> BlockTransactionsIter<F>
2036 where
2037 for<'a> F: FnMut(&'a StdAddr, u64, &'a [u8]) -> R,
2038 {
2039 BlockTransactionsIter {
2040 ids: self.ids,
2041 transactions_cf: self.transactions_cf,
2042 map,
2043 }
2044 }
2045}
2046
2047pub struct BlockTransactionsIter<F> {
2048 ids: BlockTransactionIdsIter,
2049 transactions_cf: weedb::UnboundedCfHandle,
2050 map: F,
2051}
2052
2053impl<F> BlockTransactionsIter<F> {
2054 #[inline]
2055 pub fn is_reversed(&self) -> bool {
2056 self.ids.is_reversed()
2057 }
2058
2059 #[inline]
2060 pub fn block_id(&self) -> &BlockId {
2061 self.ids.block_id()
2062 }
2063
2064 #[inline]
2065 pub fn snapshot(&self) -> &RpcSnapshot {
2066 self.ids.snapshot()
2067 }
2068
2069 #[inline]
2070 pub fn into_ids(self) -> BlockTransactionIdsIter {
2071 self.ids
2072 }
2073}
2074
2075impl<F, R> Iterator for BlockTransactionsIter<F>
2076where
2077 for<'a> F: FnMut(&'a StdAddr, u64, &'a [u8]) -> Option<R>,
2078{
2079 type Item = R;
2080
2081 fn next(&mut self) -> Option<Self::Item> {
2082 loop {
2083 let id = self.ids.next()?;
2084
2085 let mut key = [0; tables::Transactions::KEY_LEN];
2086 key[0] = id.account.workchain as u8;
2087 key[1..33].copy_from_slice(id.account.address.as_slice());
2088 key[33..41].copy_from_slice(&id.lt.to_be_bytes());
2089
2090 let cf = self.transactions_cf.bound();
2091 let value = match self.ids.snapshot.get_pinned_cf(&cf, key) {
2092 Ok(Some(value)) => value,
2093 Ok(None) => continue,
2095 Err(_) => return None,
2097 };
2098 break (self.map)(
2099 &id.account,
2100 id.lt,
2101 TransactionData::read_transaction(&value),
2102 );
2103 }
2104 }
2105}
2106
2107#[derive(Debug, Clone)]
2108pub struct FullTransactionId {
2109 pub account: StdAddr,
2110 pub lt: u64,
2111 pub hash: HashBytes,
2112}
2113
2114pub struct TransactionsIterBuilder {
2115 is_reversed: bool,
2116 inner: weedb::OwnedRawIterator,
2117 snapshot: RpcSnapshot,
2119}
2120
2121impl TransactionsIterBuilder {
2122 #[inline]
2123 pub fn is_reversed(&self) -> bool {
2124 self.is_reversed
2125 }
2126
2127 #[inline]
2128 pub fn snapshot(&self) -> &RpcSnapshot {
2129 &self.snapshot
2130 }
2131
2132 pub fn map<F, R>(self, map: F) -> TransactionsIter<F, false>
2133 where
2134 for<'a> F: FnMut(&'a [u8]) -> R,
2135 {
2136 TransactionsIter {
2137 is_reversed: self.is_reversed,
2138 inner: self.inner,
2139 map,
2140 snapshot: self.snapshot,
2141 }
2142 }
2143
2144 pub fn map_ext<F, R>(self, map: F) -> TransactionsIter<F, true>
2145 where
2146 for<'a> F: FnMut(u64, &'a HashBytes, &'a [u8]) -> R,
2147 {
2148 TransactionsIter {
2149 is_reversed: self.is_reversed,
2150 inner: self.inner,
2151 map,
2152 snapshot: self.snapshot,
2153 }
2154 }
2155}
2156
2157pub struct TransactionsIter<F, const EXT: bool> {
2158 is_reversed: bool,
2159 inner: weedb::OwnedRawIterator,
2160 map: F,
2161 snapshot: RpcSnapshot,
2162}
2163
2164pub type TransactionsExtIter<F> = TransactionsIter<F, true>;
2165
2166impl<F, const EXT: bool> TransactionsIter<F, EXT> {
2167 #[inline]
2168 pub fn is_reversed(&self) -> bool {
2169 self.is_reversed
2170 }
2171
2172 #[inline]
2173 pub fn snapshot(&self) -> &RpcSnapshot {
2174 &self.snapshot
2175 }
2176}
2177
2178impl<F, R> Iterator for TransactionsIter<F, false>
2179where
2180 for<'a> F: FnMut(&'a [u8]) -> Option<R>,
2181{
2182 type Item = R;
2183
2184 fn next(&mut self) -> Option<Self::Item> {
2185 let value = self.inner.value()?;
2186 let result = (self.map)(TransactionData::read_transaction(value))?;
2187 if self.is_reversed {
2188 self.inner.prev();
2189 } else {
2190 self.inner.next();
2191 }
2192 Some(result)
2193 }
2194}
2195
2196impl<F, R> Iterator for TransactionsIter<F, true>
2197where
2198 for<'a> F: FnMut(u64, &'a HashBytes, &'a [u8]) -> Option<R>,
2199{
2200 type Item = R;
2201
2202 fn next(&mut self) -> Option<Self::Item> {
2203 let (key, value) = self.inner.item()?;
2204 let result = (self.map)(
2205 u64::from_be_bytes(key[33..41].try_into().unwrap()),
2206 &TransactionData::read_tx_hash(value),
2207 TransactionData::read_transaction(value),
2208 )?;
2209 if self.is_reversed {
2210 self.inner.prev();
2211 } else {
2212 self.inner.next();
2213 }
2214 Some(result)
2215 }
2216}
2217
2218#[derive(Debug, Clone)]
2219pub struct TransactionInfo {
2220 pub account: StdAddr,
2221 pub lt: u64,
2222 pub block_id: BlockId,
2223 pub mc_seqno: u32,
2224}
2225
2226impl TransactionInfo {
2227 fn from_bytes(bytes: &[u8]) -> Option<Self> {
2228 if bytes.len() < tables::TransactionsByHash::VALUE_FULL_LEN {
2229 return None;
2230 }
2231
2232 let account = StdAddr::new(bytes[0] as i8, HashBytes::from_slice(&bytes[1..33]));
2233 let lt = u64::from_be_bytes(bytes[33..41].try_into().unwrap());
2234 let prefix_len = bytes[41];
2235 debug_assert!(prefix_len < 64);
2236
2237 let tail_mask = 1u64 << (63 - prefix_len);
2238
2239 let Some(shard) = ShardIdent::new(
2241 bytes[0] as i8 as i32,
2242 (account.prefix() | tail_mask) & !(tail_mask - 1),
2243 ) else {
2244 return None;
2246 };
2247
2248 let block_id = BlockId {
2249 shard,
2250 seqno: u32::from_le_bytes(bytes[42..46].try_into().unwrap()),
2251 root_hash: HashBytes::from_slice(&bytes[46..78]),
2252 file_hash: HashBytes::from_slice(&bytes[78..110]),
2253 };
2254 let mc_seqno = u32::from_le_bytes(bytes[110..114].try_into().unwrap());
2255
2256 Some(Self {
2257 account,
2258 lt,
2259 block_id,
2260 mc_seqno,
2261 })
2262 }
2263}
2264
2265pub struct TransactionDataExt<'a> {
2266 pub info: TransactionInfo,
2267 pub data: TransactionData<'a>,
2268}
2269
2270pub struct TransactionData<'a> {
2271 data: rocksdb::DBPinnableSlice<'a>,
2272}
2273
2274impl<'a> TransactionData<'a> {
2275 pub fn new(data: rocksdb::DBPinnableSlice<'a>) -> Self {
2276 Self { data }
2277 }
2278
2279 pub fn tx_hash(&self) -> HashBytes {
2280 let value = self.data.as_ref();
2281 assert!(!value.is_empty());
2282 HashBytes::from_slice(&value[1..33])
2283 }
2284
2285 pub fn in_msg_hash(&self) -> Option<HashBytes> {
2286 let value = self.data.as_ref();
2287 assert!(!value.is_empty());
2288
2289 let mask = TransactionMask::from_bits_retain(value[0]);
2290 mask.has_msg_hash()
2291 .then(|| HashBytes::from_slice(&value[33..65]))
2292 }
2293
2294 fn read_tx_hash(value: &[u8]) -> HashBytes {
2295 HashBytes::from_slice(&value[1..33])
2296 }
2297
2298 fn read_transaction<T: AsRef<[u8]> + ?Sized>(value: &T) -> &[u8] {
2299 let value = value.as_ref();
2300 assert!(!value.is_empty());
2301
2302 let mask = TransactionMask::from_bits_retain(value[0]);
2303 let boc_start = if mask.has_msg_hash() { 65 } else { 33 }; assert!(boc_start < value.len());
2306
2307 value[boc_start..].as_ref()
2308 }
2309}
2310
2311impl AsRef<[u8]> for TransactionData<'_> {
2312 fn as_ref(&self) -> &[u8] {
2313 Self::read_transaction(self.data.as_ref())
2314 }
2315}
2316
2317enum ExtractedCodeHash {
2318 Exact(Option<HashBytes>),
2319 Skip,
2320}
2321
2322fn extract_code_hash(account: &ShardAccount) -> Result<ExtractedCodeHash> {
2323 if account.account.inner().descriptor().is_pruned_branch() {
2324 return Ok(ExtractedCodeHash::Skip);
2325 }
2326
2327 if let Some(account) = account.load_account()?
2328 && let AccountState::Active(state_init) = &account.state
2329 && let Some(code) = &state_init.code
2330 {
2331 return Ok(ExtractedCodeHash::Exact(Some(*code.repr_hash())));
2332 }
2333
2334 Ok(ExtractedCodeHash::Exact(None))
2335}
2336
2337fn split_shard(
2338 shard: &ShardIdent,
2339 accounts: &ShardAccountsDict,
2340 depth: u8,
2341 shards: &mut FastHashMap<ShardIdent, ShardAccountsDict>,
2342) -> Result<()> {
2343 fn split_shard_impl(
2344 shard: &ShardIdent,
2345 accounts: &ShardAccountsDict,
2346 depth: u8,
2347 shards: &mut FastHashMap<ShardIdent, ShardAccountsDict>,
2348 builder: &mut CellBuilder,
2349 ) -> Result<()> {
2350 let (left_shard_ident, right_shard_ident) = 'split: {
2351 if depth > 0
2352 && let Some((left, right)) = shard.split()
2353 {
2354 break 'split (left, right);
2355 }
2356 shards.insert(*shard, accounts.clone());
2357 return Ok(());
2358 };
2359
2360 let (left_accounts, right_accounts) = {
2361 builder.clear_bits();
2362 let prefix_len = shard.prefix_len();
2363 if prefix_len > 0 {
2364 builder.store_uint(shard.prefix() >> (64 - prefix_len), prefix_len)?;
2365 }
2366 accounts.split_by_prefix(&builder.as_data_slice())?
2367 };
2368
2369 split_shard_impl(
2370 &left_shard_ident,
2371 &left_accounts,
2372 depth - 1,
2373 shards,
2374 builder,
2375 )?;
2376 split_shard_impl(
2377 &right_shard_ident,
2378 &right_accounts,
2379 depth - 1,
2380 shards,
2381 builder,
2382 )
2383 }
2384
2385 split_shard_impl(shard, accounts, depth, shards, &mut CellBuilder::new())
2386}
2387
2388type ShardAccountsDict = Dict<HashBytes, (DepthBalanceInfo, ShardAccount)>;
2389
2390fn extend_account_prefix(shard: &ShardIdent, max: bool, target: &mut [u8; 32]) {
2391 let mut prefix = shard.prefix();
2392 if max {
2393 prefix |= prefix - 1;
2397 } else {
2398 prefix -= extract_tag(shard);
2403 };
2404 target[..8].copy_from_slice(&prefix.to_be_bytes());
2405 target[8..].fill(0xff * max as u8);
2406}
2407
2408const fn extract_tag(shard: &ShardIdent) -> u64 {
2409 let prefix = shard.prefix();
2410 prefix & (!prefix).wrapping_add(1)
2411}
2412
2413bitflags::bitflags! {
2414 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2415 pub struct TransactionMask: u8 {
2416 const HAS_MSG_HASH = 1 << 0;
2417 }
2418}
2419
2420impl TransactionMask {
2421 pub fn has_msg_hash(&self) -> bool {
2422 self.contains(TransactionMask::HAS_MSG_HASH)
2423 }
2424}
2425
2426type AddressKey = [u8; 33];
2427
2428const TX_MIN_LT: &[u8] = b"tx_min_lt";
2429const TX_GC_RUNNING: &[u8] = b"tx_gc_running";
2430const INSTANCE_ID: &[u8] = b"instance_id";
2431
2432#[cfg(test)]
2433mod tests {
2434 use super::*;
2435
2436 #[test]
2437 fn shard_prefix() {
2438 let prefix_len = 10;
2439
2440 let account_prefix = 0xabccdeadaaaaaaaa;
2441 let tail_mask = 1u64 << (63 - prefix_len);
2442
2443 let shard = ShardIdent::new(0, (account_prefix | tail_mask) & !(tail_mask - 1)).unwrap();
2444 assert_eq!(shard, unsafe {
2445 ShardIdent::new_unchecked(0, 0xabe0000000000000)
2446 });
2447 }
2448}