Skip to main content

tycho_rpc/state/
storage.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Instant;
4
5use anyhow::{Context, Result};
6use arc_swap::{ArcSwap, ArcSwapOption};
7use tycho_block_util::block::BlockStuff;
8use tycho_block_util::state::ShardStateStuff;
9use tycho_storage::kv::InstanceId;
10use tycho_types::cell::Lazy;
11use tycho_types::models::*;
12use tycho_types::prelude::*;
13use tycho_util::metrics::HistogramGuard;
14use tycho_util::sync::CancellationFlag;
15use tycho_util::{FastHashMap, FastHashSet};
16use weedb::rocksdb;
17
18use super::db::RpcDb;
19use super::tables::{self, Transactions};
20
21#[derive(Default, Clone)]
22pub struct BlacklistedAccounts {
23    inner: Arc<BlacklistedAccountsInner>,
24}
25
26impl BlacklistedAccounts {
27    pub fn update<I: IntoIterator<Item = StdAddr>>(&self, items: I) {
28        let items = items
29            .into_iter()
30            .map(|item| {
31                let mut key = [0; 33];
32                key[0] = item.workchain as u8;
33                key[1..33].copy_from_slice(item.address.as_array());
34                key
35            })
36            .collect::<FastHashSet<_>>();
37
38        self.inner.accounts.store(Arc::new(items));
39    }
40
41    pub fn load(&self) -> Arc<FastHashSet<AddressKey>> {
42        self.inner.accounts.load_full()
43    }
44}
45
46#[derive(Default)]
47struct BlacklistedAccountsInner {
48    accounts: ArcSwap<FastHashSet<AddressKey>>,
49}
50
51pub struct RpcStorage {
52    db: RpcDb,
53    min_tx_lt: AtomicU64,
54    min_tx_lt_guard: tokio::sync::Mutex<()>,
55    snapshot: ArcSwapOption<weedb::OwnedSnapshot>,
56}
57
58impl RpcStorage {
59    pub fn new(db: RpcDb) -> Self {
60        let this = Self {
61            db,
62            min_tx_lt: AtomicU64::new(u64::MAX),
63            min_tx_lt_guard: Default::default(),
64            snapshot: Default::default(),
65        };
66
67        let state = &this.db.state;
68        if state.get(INSTANCE_ID).unwrap().is_none() {
69            state
70                .insert(INSTANCE_ID, rand::random::<InstanceId>())
71                .unwrap();
72        }
73
74        let min_lt = match state.get(TX_MIN_LT).unwrap() {
75            Some(value) if value.is_empty() => None,
76            Some(value) => Some(u64::from_le_bytes(value.as_ref().try_into().unwrap())),
77            None => None,
78        };
79
80        this.min_tx_lt
81            .store(min_lt.unwrap_or(u64::MAX), Ordering::Release);
82
83        tracing::debug!(?min_lt, "rpc storage initialized");
84
85        this
86    }
87
88    pub fn db(&self) -> &RpcDb {
89        &self.db
90    }
91
92    pub fn min_tx_lt(&self) -> u64 {
93        self.min_tx_lt.load(Ordering::Acquire)
94    }
95
96    pub fn update_snapshot(&self) {
97        let snapshot = Arc::new(self.db.owned_snapshot());
98        self.snapshot.store(Some(snapshot));
99    }
100
101    pub fn load_snapshot(&self) -> Option<RpcSnapshot> {
102        self.snapshot.load_full().map(RpcSnapshot)
103    }
104
105    pub fn store_instance_id(&self, id: InstanceId) {
106        let rpc_states = &self.db.state;
107        rpc_states.insert(INSTANCE_ID, id).unwrap();
108    }
109
110    pub fn load_instance_id(&self) -> InstanceId {
111        let id = self.db.state.get(INSTANCE_ID).unwrap().unwrap();
112        InstanceId::from_slice(id.as_ref())
113    }
114
115    pub fn get_known_mc_blocks_range(
116        &self,
117        snapshot: Option<&RpcSnapshot>,
118    ) -> Result<Option<(u32, u32)>> {
119        let mut snapshot = snapshot.cloned();
120        if snapshot.is_none() {
121            snapshot = self.snapshot.load_full().map(RpcSnapshot);
122        }
123
124        let table = &self.db.known_blocks;
125
126        let mut range_from = [0x00; tables::KnownBlocks::KEY_LEN];
127        range_from[0] = -1i8 as u8;
128        range_from[1..9].copy_from_slice(&ShardIdent::PREFIX_FULL.to_be_bytes());
129        let mut range_to = [0xff; tables::KnownBlocks::KEY_LEN];
130        range_to[0..9].clone_from_slice(&range_from[0..9]);
131
132        let mut readopts = table.new_read_config();
133        if let Some(snapshot) = &snapshot {
134            readopts.set_snapshot(snapshot);
135        }
136        readopts.set_iterate_lower_bound(range_from.as_slice());
137        readopts.set_iterate_upper_bound(range_to.as_slice());
138        let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&table.cf(), readopts);
139
140        iter.seek(range_from.as_slice());
141        Ok(if let Some(key) = iter.key() {
142            let from_seqno = u32::from_be_bytes(key[9..13].try_into().unwrap());
143            let mut to_seqno = from_seqno;
144
145            iter.seek_for_prev(range_to.as_slice());
146            if let Some(key) = iter.key() {
147                let seqno = u32::from_be_bytes(key[9..13].try_into().unwrap());
148                to_seqno = std::cmp::max(to_seqno, seqno);
149            }
150
151            Some((from_seqno, to_seqno))
152        } else {
153            iter.status()?;
154            None
155        })
156    }
157
158    pub fn get_blocks_by_mc_seqno(
159        &self,
160        mc_seqno: u32,
161        mut snapshot: Option<RpcSnapshot>,
162    ) -> Result<Option<BlocksByMcSeqnoIter>> {
163        let mut key = [0; tables::KnownBlocks::KEY_LEN];
164        key[0] = -1i8 as u8;
165        key[1..9].copy_from_slice(&ShardIdent::PREFIX_FULL.to_be_bytes());
166        key[9..13].copy_from_slice(&mc_seqno.to_be_bytes());
167
168        if snapshot.is_none() {
169            snapshot = self.snapshot.load_full().map(RpcSnapshot);
170        }
171        let Some(snapshot) = snapshot else {
172            // TODO: Somehow always use snapshot.
173            anyhow::bail!("No snapshot available");
174        };
175
176        let table = &self.db.known_blocks;
177        if table.get_ext(key, Some(&snapshot))?.is_none() {
178            return Ok(None);
179        };
180
181        let mut range_from = [0x00; tables::BlocksByMcSeqno::KEY_LEN];
182        range_from[0..4].clone_from_slice(&mc_seqno.to_be_bytes());
183        let mut range_to = [0xff; tables::BlocksByMcSeqno::KEY_LEN];
184        range_to[0..4].clone_from_slice(&mc_seqno.to_be_bytes());
185
186        let table = &self.db.blocks_by_mc_seqno;
187        let mut readopts = table.new_read_config();
188        readopts.set_snapshot(&snapshot);
189        readopts.set_iterate_lower_bound(range_from.as_slice());
190        readopts.set_iterate_upper_bound(range_to.as_slice());
191
192        let rocksdb = self.db.rocksdb();
193        let mut iter = rocksdb.raw_iterator_cf_opt(&table.cf(), readopts);
194        iter.seek(range_from.as_slice());
195
196        Ok(Some(BlocksByMcSeqnoIter {
197            mc_seqno,
198            // SAFETY: Iterator was created from the same DB instance.
199            inner: unsafe { weedb::OwnedRawIterator::new(rocksdb.clone(), iter) },
200            snapshot,
201        }))
202    }
203
204    pub fn get_brief_block_info(
205        &self,
206        block_id: &BlockIdShort,
207        snapshot: Option<&RpcSnapshot>,
208    ) -> Result<Option<(BlockId, u32, BriefBlockInfo)>> {
209        let Ok(workchain) = i8::try_from(block_id.shard.workchain()) else {
210            return Ok(None);
211        };
212        let mut key = [0; tables::KnownBlocks::KEY_LEN];
213        key[0] = workchain as u8;
214        key[1..9].copy_from_slice(&block_id.shard.prefix().to_be_bytes());
215        key[9..13].copy_from_slice(&block_id.seqno.to_be_bytes());
216
217        let table = &self.db.known_blocks;
218        let Some(value) = table.get_ext(key, snapshot)? else {
219            return Ok(None);
220        };
221        let value = value.as_ref();
222
223        let brief_info = BriefBlockInfo::load_from_bytes(workchain as i32, &value[68..])
224            .context("invalid brief info")?;
225
226        let block_id = BlockId {
227            shard: block_id.shard,
228            seqno: block_id.seqno,
229            root_hash: HashBytes::from_slice(&value[0..32]),
230            file_hash: HashBytes::from_slice(&value[32..64]),
231        };
232        let mc_seqno = u32::from_le_bytes(value[64..68].try_into().unwrap());
233
234        Ok(Some((block_id, mc_seqno, brief_info)))
235    }
236
237    pub fn get_brief_shards_descr(
238        &self,
239        mc_seqno: u32,
240        snapshot: Option<&RpcSnapshot>,
241    ) -> Result<Option<Vec<BriefShardDescr>>> {
242        let mut key = [0x00; tables::BlocksByMcSeqno::KEY_LEN];
243        key[0..4].copy_from_slice(&mc_seqno.to_be_bytes());
244        key[4] = -1i8 as u8;
245        key[5..13].copy_from_slice(&ShardIdent::PREFIX_FULL.to_be_bytes());
246        key[13..17].copy_from_slice(&mc_seqno.to_be_bytes());
247
248        let table = &self.db.blocks_by_mc_seqno;
249        let Some(value) = table.get_ext(key, snapshot)? else {
250            return Ok(None);
251        };
252        let value = value.as_ref();
253
254        let shard_count = u32::from_le_bytes(
255            value[tables::BlocksByMcSeqno::VALUE_LEN..tables::BlocksByMcSeqno::VALUE_LEN + 4]
256                .try_into()
257                .unwrap(),
258        ) as usize;
259
260        let mut result = Vec::with_capacity(shard_count);
261        for i in 0..shard_count {
262            let offset =
263                tables::BlocksByMcSeqno::DESCR_OFFSET + i * tables::BlocksByMcSeqno::DESCR_LEN;
264            let descr = &value[offset..offset + tables::BlocksByMcSeqno::DESCR_LEN];
265
266            result.push(BriefShardDescr {
267                shard_ident: ShardIdent::new(
268                    descr[0] as i8 as i32,
269                    u64::from_le_bytes(descr[1..9].try_into().unwrap()),
270                )
271                .context("invalid top shard ident")?,
272                seqno: u32::from_le_bytes(descr[9..13].try_into().unwrap()),
273                root_hash: HashBytes::from_slice(&descr[13..45]),
274                file_hash: HashBytes::from_slice(&descr[45..77]),
275                start_lt: u64::from_le_bytes(descr[77..85].try_into().unwrap()),
276                end_lt: u64::from_le_bytes(descr[85..93].try_into().unwrap()),
277            });
278        }
279
280        Ok(Some(result))
281    }
282
283    pub fn get_accounts_by_code_hash(
284        &self,
285        code_hash: &HashBytes,
286        continuation: Option<&StdAddr>,
287        mut snapshot: Option<RpcSnapshot>,
288    ) -> Result<CodeHashesIter<'_>> {
289        let mut key = [0u8; tables::CodeHashes::KEY_LEN];
290        key[0..32].copy_from_slice(code_hash.as_ref());
291        if let Some(continuation) = continuation {
292            key[32] = continuation.workchain as u8;
293            key[33..65].copy_from_slice(continuation.address.as_ref());
294        }
295
296        let mut upper_bound = Vec::with_capacity(tables::CodeHashes::KEY_LEN);
297        upper_bound.extend_from_slice(&key[..32]);
298        upper_bound.extend_from_slice(&[0xff; 33]);
299
300        let mut readopts = self.db.code_hashes.new_read_config();
301        // TODO: somehow make the range inclusive since
302        // upper_bound is not included in the range
303        readopts.set_iterate_upper_bound(upper_bound);
304
305        if snapshot.is_none() {
306            snapshot = self.snapshot.load_full().map(RpcSnapshot);
307        }
308        let snapshot = snapshot.unwrap_or_else(|| RpcSnapshot(Arc::new(self.db.owned_snapshot())));
309        readopts.set_snapshot(&snapshot);
310
311        let rocksdb = self.db.rocksdb();
312        let code_hashes_cf = self.db.code_hashes.cf();
313        let mut iter = rocksdb.raw_iterator_cf_opt(&code_hashes_cf, readopts);
314
315        iter.seek(key);
316        if continuation.is_some() {
317            iter.next();
318        }
319
320        Ok(CodeHashesIter {
321            inner: iter,
322            snapshot,
323        })
324    }
325
326    pub fn get_block_transactions(
327        &self,
328        block_id: &BlockIdShort,
329        reverse: bool,
330        cursor: Option<&BlockTransactionsCursor>,
331        snapshot: Option<RpcSnapshot>,
332    ) -> Result<Option<BlockTransactionsIterBuilder>> {
333        let Some(ids) = self.get_block_transaction_ids(block_id, reverse, cursor, snapshot)? else {
334            return Ok(None);
335        };
336
337        Ok(Some(BlockTransactionsIterBuilder {
338            ids,
339            transactions_cf: self.db.transactions.get_unbounded_cf(),
340        }))
341    }
342
343    pub fn get_block_transaction_ids(
344        &self,
345        block_id: &BlockIdShort,
346        reverse: bool,
347        cursor: Option<&BlockTransactionsCursor>,
348        mut snapshot: Option<RpcSnapshot>,
349    ) -> Result<Option<BlockTransactionIdsIter>> {
350        let Ok(workchain) = i8::try_from(block_id.shard.workchain()) else {
351            return Ok(None);
352        };
353
354        if snapshot.is_none() {
355            snapshot = self.snapshot.load_full().map(RpcSnapshot);
356        }
357        let Some(snapshot) = snapshot else {
358            // TODO: Somehow always use snapshot.
359            anyhow::bail!("No snapshot available");
360        };
361
362        let mut range_from = [0x00; tables::BlockTransactions::KEY_LEN];
363        range_from[0] = workchain as u8;
364        range_from[1..9].copy_from_slice(&block_id.shard.prefix().to_be_bytes());
365        range_from[9..13].copy_from_slice(&block_id.seqno.to_be_bytes());
366
367        if let Some(cursor) = cursor {
368            range_from[13..45].copy_from_slice(cursor.hash.as_slice());
369            range_from[45..53].copy_from_slice(&cursor.lt.to_be_bytes());
370        }
371
372        let table = &self.db.known_blocks;
373        let ref_by_mc_seqno;
374        let block_id = match table.get_ext(&range_from[0..13], Some(&snapshot))? {
375            Some(value) => {
376                let value = value.as_ref();
377                ref_by_mc_seqno = u32::from_le_bytes(value[64..68].try_into().unwrap());
378                BlockId {
379                    shard: block_id.shard,
380                    seqno: block_id.seqno,
381                    root_hash: HashBytes::from_slice(&value[0..32]),
382                    file_hash: HashBytes::from_slice(&value[32..64]),
383                }
384            }
385            None => return Ok(None),
386        };
387
388        let mut range_to = [0xff; tables::BlockTransactions::KEY_LEN];
389        range_to[0..13].copy_from_slice(&range_from[0..13]);
390
391        let mut readopts = self.db.block_transactions.new_read_config();
392        readopts.set_iterate_lower_bound(range_from.as_slice());
393        readopts.set_iterate_upper_bound(range_to.as_slice());
394        readopts.set_snapshot(&snapshot);
395
396        let rocksdb = self.db.rocksdb();
397        let block_transactions_cf = self.db.block_transactions.cf();
398        let mut iter = rocksdb.raw_iterator_cf_opt(&block_transactions_cf, readopts);
399
400        if reverse {
401            iter.seek_for_prev(range_to);
402        } else {
403            iter.seek(range_from);
404        }
405
406        if cursor.is_some()
407            && let Some(key) = iter.key()
408            && key == range_from.as_slice()
409        {
410            if reverse {
411                iter.prev();
412            } else {
413                iter.next();
414            }
415        }
416
417        Ok(Some(BlockTransactionIdsIter {
418            block_id,
419            ref_by_mc_seqno,
420            is_reversed: reverse,
421            // SAFETY: Iterator was created from the same DB instance.
422            inner: unsafe { weedb::OwnedRawIterator::new(rocksdb.clone(), iter) },
423            snapshot,
424        }))
425    }
426
427    pub fn get_transactions(
428        &self,
429        account: &StdAddr,
430        start_lt: Option<u64>,
431        end_lt: Option<u64>,
432        reverse: bool,
433        mut snapshot: Option<RpcSnapshot>,
434    ) -> Result<TransactionsIterBuilder> {
435        let mut start_lt = start_lt.unwrap_or_default();
436        let mut end_lt = end_lt.unwrap_or(u64::MAX);
437        if end_lt < start_lt {
438            // Make empty iterator if `end_lt < start_lt`.
439            start_lt = u64::MAX - 1;
440            end_lt = u64::MAX;
441        }
442
443        if snapshot.is_none() {
444            snapshot = self.snapshot.load_full().map(RpcSnapshot);
445        }
446        let snapshot = snapshot.unwrap_or_else(|| RpcSnapshot(Arc::new(self.db.owned_snapshot())));
447
448        let mut range_from = [0u8; tables::Transactions::KEY_LEN];
449        range_from[0] = account.workchain as u8;
450        range_from[1..33].copy_from_slice(account.address.as_ref());
451        range_from[33..41].copy_from_slice(&start_lt.to_be_bytes());
452        let mut range_to = range_from;
453        // NOTE: Compute upper bound as `end_lt + 1` since it will
454        // not be included in the iteration result.
455        range_to[33..41].copy_from_slice(&end_lt.saturating_add(1).to_be_bytes());
456
457        let mut readopts = self.db.transactions.new_read_config();
458        readopts.set_snapshot(&snapshot);
459        readopts.set_iterate_lower_bound(range_from.as_slice());
460        readopts.set_iterate_upper_bound(range_to.as_slice());
461
462        let rocksdb = self.db.rocksdb();
463        let transactions_cf = self.db.transactions.cf();
464        let mut iter = rocksdb.raw_iterator_cf_opt(&transactions_cf, readopts);
465        if reverse {
466            iter.seek_for_prev(range_to.as_slice());
467        } else {
468            iter.seek(range_from.as_slice());
469        }
470        iter.status()?;
471
472        Ok(TransactionsIterBuilder {
473            is_reversed: reverse,
474            // SAFETY: Iterator was created from the same DB instance.
475            inner: unsafe { weedb::OwnedRawIterator::new(rocksdb.clone(), iter) },
476            snapshot,
477        })
478    }
479
480    pub fn get_transaction(
481        &self,
482        hash: &HashBytes,
483        snapshot: Option<&RpcSnapshot>,
484    ) -> Result<Option<TransactionData<'_>>> {
485        let table = &self.db.transactions_by_hash;
486        let Some(tx_info) = table.get_ext(hash, snapshot)? else {
487            return Ok(None);
488        };
489        let key = &tx_info.as_ref()[..Transactions::KEY_LEN];
490
491        let table = &self.db.transactions;
492        let tx = table.get_ext(key, snapshot)?;
493        Ok(tx.map(TransactionData::new))
494    }
495
496    pub fn get_transaction_ext<'db>(
497        &'db self,
498        hash: &HashBytes,
499        snapshot: Option<&RpcSnapshot>,
500    ) -> Result<Option<TransactionDataExt<'db>>> {
501        let table = &self.db.transactions_by_hash;
502        let Some(tx_info) = table.get_ext(hash, snapshot)? else {
503            return Ok(None);
504        };
505        let tx_info = tx_info.as_ref();
506        let Some(info) = TransactionInfo::from_bytes(tx_info) else {
507            return Ok(None);
508        };
509
510        let table = &self.db.transactions;
511        let tx = table.get_ext(&tx_info[..Transactions::KEY_LEN], snapshot)?;
512
513        Ok(tx.map(move |data| TransactionDataExt {
514            info,
515            data: TransactionData::new(data),
516        }))
517    }
518
519    pub fn get_transaction_info(
520        &self,
521        hash: &HashBytes,
522        snapshot: Option<&RpcSnapshot>,
523    ) -> Result<Option<TransactionInfo>> {
524        let table = &self.db.transactions_by_hash;
525        let Some(tx_info) = table.get_ext(hash, snapshot)? else {
526            return Ok(None);
527        };
528        Ok(TransactionInfo::from_bytes(&tx_info))
529    }
530
531    pub fn get_src_transaction<'db>(
532        &'db self,
533        account: &StdAddr,
534        message_lt: u64,
535        snapshot: Option<&RpcSnapshot>,
536    ) -> Result<Option<TransactionData<'db>>> {
537        let table = &self.db.transactions;
538
539        let owned_snapshot;
540        let snapshot = match snapshot {
541            Some(snapshot) => snapshot,
542            None => {
543                owned_snapshot = self
544                    .load_snapshot()
545                    .unwrap_or_else(|| RpcSnapshot(Arc::new(self.db.owned_snapshot())));
546                &owned_snapshot
547            }
548        };
549
550        let mut key = [0u8; tables::Transactions::KEY_LEN];
551        key[0] = account.workchain as u8;
552        key[1..33].copy_from_slice(account.address.as_slice());
553
554        let lower_bound = key;
555        key[33..41].copy_from_slice(&message_lt.to_be_bytes());
556
557        let mut readopts = table.new_read_config();
558        readopts.set_iterate_lower_bound(lower_bound);
559        readopts.set_iterate_upper_bound(key);
560        readopts.set_snapshot(snapshot);
561        let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&table.cf(), readopts);
562        iter.seek_for_prev(key.as_slice());
563
564        // TODO: Allow TransactionData to store iterator/data itself.
565        let Some(tx_key) = iter.key() else {
566            iter.status()?;
567            return Ok(None);
568        };
569        if tx_key[0..33] != key[0..33] {
570            return Ok(None);
571        }
572
573        let tx = table.get_ext(tx_key, Some(snapshot))?;
574
575        Ok(tx.map(TransactionData::new))
576    }
577
578    pub fn get_dst_transaction<'db>(
579        &'db self,
580        in_msg_hash: &HashBytes,
581        snapshot: Option<&RpcSnapshot>,
582    ) -> Result<Option<TransactionData<'db>>> {
583        let table = &self.db.transactions_by_in_msg;
584        let Some(key) = table.get_ext(in_msg_hash, snapshot)? else {
585            return Ok(None);
586        };
587
588        let table = &self.db.transactions;
589        let tx = table.get_ext(key, snapshot)?;
590        Ok(tx.map(TransactionData::new))
591    }
592
593    #[tracing::instrument(
594        level = "info",
595        name = "reset_accounts",
596        skip_all,
597        fields(shard = %shard_state.block_id().shard)
598    )]
599    pub async fn reset_accounts(
600        &self,
601        shard_state: ShardStateStuff,
602        split_depth: u8,
603    ) -> Result<()> {
604        let shard_ident = shard_state.block_id().shard;
605        let Ok(workchain) = i8::try_from(shard_ident.workchain()) else {
606            return Ok(());
607        };
608
609        tracing::info!("clearing old code hash indices");
610        let started_at = Instant::now();
611        self.remove_code_hashes(&shard_ident).await?;
612        tracing::info!(
613            elapsed = %humantime::format_duration(started_at.elapsed()),
614            "cleared old code hash indices"
615        );
616
617        // Split on virtual shards
618        let split = {
619            let guard = shard_state.ref_mc_state_handle().clone();
620
621            let mut virtual_shards = FastHashMap::default();
622            split_shard(
623                &shard_ident,
624                shard_state.state().load_accounts()?.dict(),
625                split_depth,
626                &mut virtual_shards,
627            )
628            .context("failed to split shard state into virtual shards")?;
629
630            // NOTE: Ensure that the root cell is dropped.
631            drop(shard_state);
632            (guard, virtual_shards)
633        };
634
635        let cancelled = CancellationFlag::new();
636        scopeguard::defer! {
637            cancelled.cancel();
638        }
639
640        // Rebuild code hashes
641        let db = self.db.clone();
642        let mut cancelled = cancelled.debounce(10000);
643        let span = tracing::Span::current();
644
645        // NOTE: `spawn_blocking` is used here instead of `rayon_run` as it is IO-bound task.
646        tokio::task::spawn_blocking(move || {
647            let _span = span.enter();
648
649            // NOTE: Ensure that guard is captured by the spawned thread.
650            let (_state_guard, virtual_shards) = split;
651
652            let guard = scopeguard::guard((), |_| {
653                tracing::warn!("cancelled");
654            });
655
656            tracing::info!(split_depth, "started building new code hash indices");
657            let started_at = Instant::now();
658
659            let raw = db.rocksdb().as_ref();
660            let code_hashes_cf = &db.code_hashes.cf();
661            let code_hashes_by_address_cf = &db.code_hashes_by_address.cf();
662
663            let mut non_empty_batch = false;
664            let mut write_batch = rocksdb::WriteBatch::default();
665
666            // Prepare buffer for code hashes ids
667            let mut code_hashes_key = [0u8; { tables::CodeHashes::KEY_LEN }];
668            code_hashes_key[32] = workchain as u8;
669
670            let mut code_hashes_by_address_key = [0u8; { tables::CodeHashesByAddress::KEY_LEN }];
671            code_hashes_by_address_key[0] = workchain as u8;
672
673            // Iterate all accounts
674            for (virtual_shard, accounts) in virtual_shards {
675                tracing::info!(%virtual_shard, "started collecting code hashes");
676                let started_at = Instant::now();
677
678                for entry in accounts.iter() {
679                    if cancelled.check() {
680                        anyhow::bail!("accounts reset cancelled");
681                    }
682
683                    let (id, (_, account)) = entry?;
684
685                    let code_hash = match extract_code_hash(&account)? {
686                        ExtractedCodeHash::Exact(Some(code_hash)) => code_hash,
687                        ExtractedCodeHash::Exact(None) => continue,
688                        ExtractedCodeHash::Skip => anyhow::bail!("code in account state is pruned"),
689                    };
690
691                    non_empty_batch |= true;
692
693                    // Fill account address in the key buffer
694                    code_hashes_key[..32].copy_from_slice(code_hash.as_slice());
695                    code_hashes_key[33..65].copy_from_slice(id.as_slice());
696
697                    code_hashes_by_address_key[1..33].copy_from_slice(id.as_slice());
698
699                    // Write tx data and indices
700                    write_batch.put_cf(code_hashes_cf, code_hashes_key.as_slice(), []);
701                    write_batch.put_cf(
702                        code_hashes_by_address_cf,
703                        code_hashes_by_address_key.as_slice(),
704                        code_hash.as_slice(),
705                    );
706                }
707
708                tracing::info!(
709                    %virtual_shard,
710                    elapsed = %humantime::format_duration(started_at.elapsed()),
711                    "finished collecting code hashes",
712                );
713            }
714
715            if non_empty_batch {
716                raw.write_opt(write_batch, db.code_hashes.write_config())?;
717            }
718
719            tracing::info!(
720                elapsed = %humantime::format_duration(started_at.elapsed()),
721                "finished building new code hash indices"
722            );
723
724            // Flush indices after delete/insert
725            tracing::info!("started flushing code hash indices");
726            let started_at = Instant::now();
727
728            let bound = Option::<[u8; 0]>::None;
729            raw.compact_range_cf(code_hashes_cf, bound, bound);
730            raw.compact_range_cf(code_hashes_by_address_cf, bound, bound);
731
732            // Done
733            scopeguard::ScopeGuard::into_inner(guard);
734            tracing::info!(
735                elapsed = %humantime::format_duration(started_at.elapsed()),
736                "finished flushing code hash indices"
737            );
738            Ok(())
739        })
740        .await?
741    }
742
743    #[tracing::instrument(level = "info", name = "remove_old_transactions", skip(self))]
744    pub async fn remove_old_transactions(
745        &self,
746        mc_seqno: u32,
747        min_lt: u64,
748        keep_tx_per_account: usize,
749    ) -> Result<()> {
750        const ITEMS_PER_BATCH: usize = 100000;
751
752        type TxKey = [u8; tables::Transactions::KEY_LEN];
753
754        enum PendingDelete {
755            Single,
756            Range,
757        }
758
759        struct GcState<'a> {
760            raw: &'a rocksdb::DB,
761            writeopt: &'a rocksdb::WriteOptions,
762            tx_cf: weedb::BoundedCfHandle<'a>,
763            tx_by_hash: weedb::BoundedCfHandle<'a>,
764            tx_by_in_msg: weedb::BoundedCfHandle<'a>,
765            key_range_begin: TxKey,
766            key_range_end: TxKey,
767            pending_delete: Option<PendingDelete>,
768            batch: rocksdb::WriteBatch,
769            total_tx: usize,
770            total_tx_by_hash: usize,
771            total_tx_by_in_msg: usize,
772        }
773
774        impl<'a> GcState<'a> {
775            fn new(db: &'a RpcDb) -> Self {
776                Self {
777                    raw: db.rocksdb(),
778                    writeopt: db.transactions.write_config(),
779                    tx_cf: db.transactions.cf(),
780                    tx_by_hash: db.transactions_by_hash.cf(),
781                    tx_by_in_msg: db.transactions_by_in_msg.cf(),
782                    key_range_begin: [0u8; tables::Transactions::KEY_LEN],
783                    key_range_end: [0u8; tables::Transactions::KEY_LEN],
784                    pending_delete: None,
785                    batch: Default::default(),
786                    total_tx: 0,
787                    total_tx_by_hash: 0,
788                    total_tx_by_in_msg: 0,
789                }
790            }
791
792            fn delete_tx(&mut self, key: &TxKey, value: &[u8]) {
793                // Batch multiple deletes for the primary table
794                self.pending_delete = Some(if self.pending_delete.is_none() {
795                    self.key_range_end.copy_from_slice(key);
796                    PendingDelete::Single
797                } else {
798                    self.key_range_begin.copy_from_slice(key);
799                    PendingDelete::Range
800                });
801                self.total_tx += 1;
802
803                // Must contain at least mask and tx hash
804                assert!(value.len() >= 33);
805
806                let mask = TransactionMask::from_bits_retain(value[0]);
807
808                // Delete transaction by hash index entry
809                let tx_hash = &value[1..33];
810                self.batch.delete_cf(&self.tx_by_hash, tx_hash);
811                self.total_tx_by_hash += 1;
812
813                // Delete transaction by incoming message hash index entry
814                if mask.has_msg_hash() {
815                    assert!(value.len() >= 65);
816
817                    let in_msg_hash = &value[33..65];
818                    self.batch.delete_cf(&self.tx_by_in_msg, in_msg_hash);
819                    self.total_tx_by_in_msg += 1;
820                }
821            }
822
823            fn end_account(&mut self) {
824                // Flush pending batch
825                if let Some(pending) = self.pending_delete.take() {
826                    match pending {
827                        PendingDelete::Single => self
828                            .batch
829                            .delete_cf(&self.tx_cf, self.key_range_end.as_slice()),
830                        PendingDelete::Range => {
831                            // Remove `[begin; end)`
832                            self.batch.delete_range_cf(
833                                &self.tx_cf,
834                                self.key_range_begin.as_slice(),
835                                self.key_range_end.as_slice(),
836                            );
837                            // Remove `end`
838                            self.batch
839                                .delete_cf(&self.tx_cf, self.key_range_end.as_slice());
840                        }
841                    }
842                }
843            }
844
845            fn flush(&mut self) -> Result<()> {
846                self.raw
847                    .write_opt(std::mem::take(&mut self.batch), self.writeopt)?;
848                Ok(())
849            }
850        }
851
852        if let Some(known_min_lt) = self.db.state.get(TX_MIN_LT)? {
853            let known_min_lt = u64::from_le_bytes(known_min_lt.as_ref().try_into().unwrap());
854            let was_running = matches!(
855                self.db.state.get(TX_GC_RUNNING)?,
856                Some(status) if !status.is_empty()
857            );
858
859            if !was_running && min_lt <= known_min_lt {
860                tracing::info!(known_min_lt, "skipping removal of old transactions");
861                return Ok(());
862            }
863        }
864
865        let cancelled = CancellationFlag::new();
866        scopeguard::defer! {
867            cancelled.cancel();
868        }
869
870        // Force update min lt and gc flag
871        self.min_tx_lt.store(min_lt, Ordering::Release);
872
873        let db = self.db.clone();
874        let mut cancelled = cancelled.debounce(10000);
875        let span = tracing::Span::current();
876
877        // NOTE: `spawn_blocking` is used here instead of `rayon_run` as it is IO-bound task.
878        tokio::task::spawn_blocking(move || {
879            let _span = span.enter();
880
881            let guard = scopeguard::guard((), |_| {
882                tracing::warn!("cancelled");
883            });
884
885            let raw = db.rocksdb().as_ref();
886
887            tracing::info!("started removing old transactions");
888            let started_at = Instant::now();
889
890            // Prepare snapshot and iterator
891            let snapshot = raw.snapshot();
892
893            // Delete block transactions.
894            'block: {
895                let mc_seqno = match mc_seqno.checked_sub(1) {
896                    None | Some(0) => break 'block,
897                    Some(seqno) => seqno,
898                };
899
900                let known_blocks = &db.known_blocks;
901                let blocks_by_mc_seqno = &db.blocks_by_mc_seqno;
902                let block_transactions = &db.block_transactions;
903
904                // Get masterchain block entry.
905                let mut key = [0u8; tables::BlocksByMcSeqno::KEY_LEN];
906                key[0..4].copy_from_slice(&mc_seqno.to_be_bytes());
907                key[4] = -1i8 as u8;
908                key[5..13].copy_from_slice(&ShardIdent::PREFIX_FULL.to_be_bytes());
909                key[13..17].copy_from_slice(&mc_seqno.to_be_bytes());
910
911                let Some(value) = snapshot.get_pinned_cf_opt(
912                    &blocks_by_mc_seqno.cf(),
913                    key,
914                    blocks_by_mc_seqno.new_read_config(),
915                )?
916                else {
917                    break 'block;
918                };
919                let value = value.as_ref();
920                debug_assert!(value.len() >= tables::BlocksByMcSeqno::VALUE_LEN + 4);
921
922                // Parse top shard block ids (short).
923                let shard_count = u32::from_le_bytes(
924                    value[tables::BlocksByMcSeqno::VALUE_LEN
925                        ..tables::BlocksByMcSeqno::VALUE_LEN + 4]
926                        .try_into()
927                        .unwrap(),
928                ) as usize;
929                let mut top_block_ids = Vec::with_capacity(1 + shard_count);
930                top_block_ids.push(BlockIdShort {
931                    shard: ShardIdent::MASTERCHAIN,
932                    seqno: mc_seqno,
933                });
934                for i in 0..shard_count {
935                    let offset = tables::BlocksByMcSeqno::DESCR_OFFSET
936                        + i * tables::BlocksByMcSeqno::DESCR_LEN;
937                    let descr = &value[offset..offset + tables::BlocksByMcSeqno::DESCR_LEN];
938                    top_block_ids.push(BlockIdShort {
939                        shard: ShardIdent::new(
940                            descr[0] as i8 as i32,
941                            u64::from_le_bytes(descr[1..9].try_into().unwrap()),
942                        )
943                        .context("invalid top shard ident")?,
944                        seqno: u32::from_le_bytes(descr[9..13].try_into().unwrap()),
945                    });
946                }
947
948                // Prepare batch.
949                let mut batch = rocksdb::WriteBatch::new();
950
951                // Delete `blocks_by_mc_seqno` range before the mc block.
952                let range_from = [0x00; tables::BlocksByMcSeqno::KEY_LEN];
953                let mut range_to = [0xff; tables::BlocksByMcSeqno::KEY_LEN];
954                range_to[0..4].copy_from_slice(&mc_seqno.to_be_bytes());
955                batch.delete_range_cf(&blocks_by_mc_seqno.cf(), range_from, range_to);
956                batch.delete_cf(&blocks_by_mc_seqno.cf(), range_to);
957
958                // Delete `known_blocks` and `block_transactions` ranges for each shard
959                // (including masterchain).
960                let mut range_from = [0x00; tables::BlockTransactions::KEY_LEN];
961                let mut range_to = [0xff; tables::BlockTransactions::KEY_LEN];
962                for block_id in top_block_ids {
963                    range_from[0] = block_id.shard.workchain() as i8 as u8;
964                    range_from[1..9].copy_from_slice(&block_id.shard.prefix().to_be_bytes());
965                    range_from[9..13].copy_from_slice(&block_id.seqno.to_be_bytes());
966                    range_to[0..13].copy_from_slice(&range_from[0..13]);
967                    batch.delete_range_cf(&block_transactions.cf(), range_from, range_to);
968                    batch.delete_cf(&block_transactions.cf(), range_to);
969
970                    let range_from = &range_from[0..tables::KnownBlocks::KEY_LEN];
971                    let range_to = &range_to[0..tables::KnownBlocks::KEY_LEN];
972                    batch.delete_range_cf(&known_blocks.cf(), range_from, range_to);
973                    batch.delete_cf(&known_blocks.cf(), range_to);
974                }
975
976                // Apply batch.
977                db.rocksdb()
978                    .write(batch)
979                    .context("failed to remove block transactions")?;
980            }
981
982            // Remove transactions
983            let mut readopts = db.transactions.new_read_config();
984            readopts.set_snapshot(&snapshot);
985            let mut iter = raw.raw_iterator_cf_opt(&db.transactions.cf(), readopts);
986            iter.seek_to_last();
987
988            // Prepare GC state
989            let mut gc = GcState::new(&db);
990
991            // `last_account` buffer is used to track the last processed account.
992            //
993            // The buffer is also used to seek to the beginning of the tx range.
994            // Its last 8 bytes are `min_lt`. It forces the `seek_prev` method
995            // to jump right to the last tx that is needed to be deleted.
996            let mut last_account: TxKey = [0u8; tables::Transactions::KEY_LEN];
997            last_account[33..41].copy_from_slice(&min_lt.to_be_bytes());
998
999            let mut items = 0usize;
1000            let mut total_invalid = 0usize;
1001            let mut iteration = 0usize;
1002            let mut tx_count = 0usize;
1003            loop {
1004                let Some((key, value)) = iter.item() else {
1005                    break iter.status()?;
1006                };
1007                iteration += 1;
1008
1009                if cancelled.check() {
1010                    anyhow::bail!("transactions GC cancelled");
1011                }
1012
1013                let Ok::<&TxKey, _>(key) = key.try_into() else {
1014                    // Remove invalid entires from the primary index only
1015                    items += 1;
1016                    total_invalid += 1;
1017                    gc.batch.delete_cf(&gc.tx_cf, key);
1018                    iter.prev();
1019                    continue;
1020                };
1021
1022                // Check whether the prev account is processed
1023                let item_account = &key[..33];
1024                let is_prev_account = item_account != &last_account[..33];
1025                if is_prev_account {
1026                    // Update last account address
1027                    last_account[..33].copy_from_slice(item_account);
1028
1029                    // Add pending delete into batch
1030                    gc.end_account();
1031
1032                    tx_count = 0;
1033                }
1034
1035                // Get lt from the key
1036                let lt = u64::from_be_bytes(key[33..41].try_into().unwrap());
1037
1038                if tx_count < keep_tx_per_account {
1039                    // Keep last `keep_tx_per_account` transactions for account
1040                    tx_count += 1;
1041                    iter.prev();
1042                } else if lt < min_lt {
1043                    // Add tx and its secondary indices into the batch
1044                    items += 1;
1045                    gc.delete_tx(key, value);
1046                    iter.prev();
1047                } else if lt > 0 {
1048                    // Seek to the end of the removed range
1049                    // (to start removing it backwards).
1050                    iter.seek_for_prev(last_account.as_slice());
1051                } else {
1052                    // Just seek to the previous account.
1053                    iter.prev();
1054                }
1055
1056                // Write batch
1057                if items >= ITEMS_PER_BATCH {
1058                    tracing::info!(iteration, "flushing batch");
1059                    gc.flush()?;
1060                    items = 0;
1061                }
1062            }
1063
1064            // Add final pending delete into batch
1065            gc.end_account();
1066
1067            // Write remaining batch
1068            if items != 0 {
1069                gc.flush()?;
1070            }
1071
1072            // Reset gc flag
1073            raw.put(TX_GC_RUNNING, [])?;
1074
1075            // Done
1076            scopeguard::ScopeGuard::into_inner(guard);
1077            tracing::info!(
1078                elapsed = %humantime::format_duration(started_at.elapsed()),
1079                total_invalid,
1080                total_tx = gc.total_tx,
1081                total_tx_by_hash = gc.total_tx_by_hash,
1082                total_tx_by_in_msg = gc.total_tx_by_in_msg,
1083                "finished removing old transactions"
1084            );
1085            Ok(())
1086        })
1087        .await?
1088    }
1089
1090    #[tracing::instrument(level = "info", name = "update", skip_all, fields(block_id = %block.id()))]
1091    pub async fn update(
1092        &self,
1093        mc_block_id: &BlockId,
1094        block: BlockStuff,
1095        rpc_blacklist: Option<&BlacklistedAccounts>,
1096        subscriptions: &super::subscriptions::RpcSubscriptions,
1097    ) -> Result<()> {
1098        let Ok(workchain) = i8::try_from(block.id().shard.workchain()) else {
1099            return Ok(());
1100        };
1101
1102        let is_masterchain = block.id().is_masterchain();
1103        let mc_seqno = mc_block_id.seqno;
1104
1105        let shard_hashes = is_masterchain
1106            .then(|| {
1107                let custom = block.load_custom()?;
1108                Ok::<_, anyhow::Error>(custom.shards.clone())
1109            })
1110            .transpose()?;
1111
1112        let span = tracing::Span::current();
1113        let db = self.db.clone();
1114
1115        let rpc_blacklist = rpc_blacklist.map(|x| x.load());
1116
1117        // NOTE: `spawn_blocking` is used here instead of `rayon_run` as it is IO-bound task.
1118        let (start_lt, updates) = tokio::task::spawn_blocking(move || {
1119            let prepare_batch_histogram =
1120                HistogramGuard::begin("tycho_storage_rpc_prepare_batch_time");
1121
1122            let _span = span.enter();
1123
1124            let info = block.load_info()?;
1125            let extra = block.load_extra()?;
1126
1127            let mut updates = Some(FastHashMap::default());
1128
1129            let account_blocks = extra.account_blocks.load()?;
1130
1131            let accounts = if account_blocks.is_empty() {
1132                Dict::new()
1133            } else {
1134                let merkle_update = block.as_ref().state_update.load()?;
1135
1136                // Accounts dict is stored in the second cell.
1137                let get_accounts = |cell: Cell| {
1138                    let mut cs = cell.as_slice()?;
1139                    cs.skip_first(0, 1)?;
1140                    cs.load_reference_cloned().map(Cell::virtualize)
1141                };
1142
1143                let old_accounts = get_accounts(merkle_update.old)?;
1144                let new_accounts = get_accounts(merkle_update.new)?;
1145
1146                if old_accounts.repr_hash() == new_accounts.repr_hash() {
1147                    Dict::new()
1148                } else {
1149                    let accounts = Lazy::<ShardAccounts>::from_raw(new_accounts)?.load()?;
1150                    let (accounts, _) = accounts.into_parts();
1151                    accounts
1152                }
1153            };
1154
1155            let mut write_batch = rocksdb::WriteBatch::default();
1156            let tx_cf = &db.transactions.cf();
1157            let tx_by_hash_cf = &db.transactions_by_hash.cf();
1158            let tx_by_in_msg_cf = &db.transactions_by_in_msg.cf();
1159            let block_txs_cf = &db.block_transactions.cf();
1160
1161            // Prepare buffer for full tx id
1162            let mut tx_info = [0u8; tables::TransactionsByHash::VALUE_FULL_LEN];
1163            tx_info[0] = workchain as u8;
1164
1165            let block_id = block.id();
1166            tx_info[41] = block_id.shard.prefix_len() as u8;
1167            tx_info[42..46].copy_from_slice(&block_id.seqno.to_le_bytes());
1168            tx_info[46..78].copy_from_slice(block_id.root_hash.as_slice());
1169            tx_info[78..110].copy_from_slice(block_id.file_hash.as_slice());
1170            tx_info[110..114].copy_from_slice(&mc_seqno.to_le_bytes());
1171
1172            let mut block_tx = [0u8; tables::BlockTransactions::KEY_LEN];
1173            block_tx[0] = workchain as u8;
1174            block_tx[1..9].copy_from_slice(&block_id.shard.prefix().to_be_bytes());
1175            block_tx[9..13].copy_from_slice(&block_id.seqno.to_be_bytes());
1176
1177            // Prepare buffer.
1178            let mut buffer = Vec::with_capacity(64 + BriefBlockInfo::MIN_BYTE_LEN);
1179
1180            // Write block info.
1181            {
1182                let mut key = [0u8; tables::BlocksByMcSeqno::KEY_LEN];
1183                key[0..4].copy_from_slice(&mc_seqno.to_be_bytes());
1184                key[4] = workchain as u8;
1185                key[5..13].copy_from_slice(&block_id.shard.prefix().to_be_bytes());
1186                key[13..17].copy_from_slice(&block_id.seqno.to_be_bytes());
1187
1188                buffer.clear();
1189                buffer.extend_from_slice(block_id.root_hash.as_slice()); // 0..32
1190                buffer.extend_from_slice(block_id.file_hash.as_slice()); // 32..64
1191                buffer.extend_from_slice(&info.start_lt.to_le_bytes()); // 64..72
1192                buffer.extend_from_slice(&info.end_lt.to_le_bytes()); // 72..80
1193                if let Some(shard_hashes) = shard_hashes {
1194                    let shards = shard_hashes
1195                        .iter()
1196                        .filter_map(|item| {
1197                            let (shard_ident, descr) = match item {
1198                                Ok(item) => item,
1199                                Err(e) => return Some(Err(e)),
1200                            };
1201                            if i8::try_from(shard_ident.workchain()).is_err() {
1202                                return None;
1203                            }
1204
1205                            Some(Ok(BriefShardDescr {
1206                                shard_ident,
1207                                seqno: descr.seqno,
1208                                root_hash: descr.root_hash,
1209                                file_hash: descr.file_hash,
1210                                start_lt: descr.start_lt,
1211                                end_lt: descr.end_lt,
1212                            }))
1213                        })
1214                        .collect::<Result<Vec<_>, _>>()?;
1215
1216                    buffer.reserve(4 + tables::BlocksByMcSeqno::DESCR_LEN * shards.len());
1217                    buffer.extend_from_slice(&(shards.len() as u32).to_le_bytes());
1218                    for shard in shards {
1219                        buffer.push(shard.shard_ident.workchain() as i8 as u8);
1220                        buffer.extend_from_slice(&shard.shard_ident.prefix().to_le_bytes());
1221                        buffer.extend_from_slice(&shard.seqno.to_le_bytes());
1222                        buffer.extend_from_slice(shard.root_hash.as_slice());
1223                        buffer.extend_from_slice(shard.file_hash.as_slice());
1224                        buffer.extend_from_slice(&shard.start_lt.to_le_bytes());
1225                        buffer.extend_from_slice(&shard.end_lt.to_le_bytes());
1226                    }
1227                }
1228
1229                write_batch.put_cf(&db.blocks_by_mc_seqno.cf(), key, buffer.as_slice());
1230            }
1231
1232            let rpc_blacklist = rpc_blacklist.as_deref();
1233
1234            // Iterate through all changed accounts in the block.
1235            let mut block_tx_count = 0usize;
1236            for item in account_blocks.iter() {
1237                let (account, _, account_block) = item?;
1238
1239                // Fill account address in the key buffer
1240                tx_info[1..33].copy_from_slice(account.as_slice());
1241                block_tx[13..45].copy_from_slice(account.as_slice());
1242
1243                // Flag to update code hash
1244                let mut has_special_actions = false;
1245                let mut was_active = false;
1246                let mut is_active = false;
1247
1248                // Process account transactions
1249                let mut first_tx = true;
1250                for item in account_block.transactions.values() {
1251                    let (_, tx_cell) = item?;
1252
1253                    // TODO: Should we increase this counter only for non-blacklisted accounts?
1254                    block_tx_count += 1;
1255
1256                    let tx = tx_cell.load()?;
1257
1258                    tx_info[33..41].copy_from_slice(&tx.lt.to_be_bytes());
1259                    block_tx[45..53].copy_from_slice(&tx.lt.to_be_bytes());
1260
1261                    // Update flags
1262                    if first_tx {
1263                        // Remember the original status from the first transaction
1264                        was_active = tx.orig_status == AccountStatus::Active;
1265                        first_tx = false;
1266                    }
1267                    if was_active && tx.orig_status != AccountStatus::Active {
1268                        // Handle the case when an account (with some updated code) was deleted,
1269                        // and then deployed with the initial code (end status).
1270                        // Treat this situation as a special action.
1271                        has_special_actions = true;
1272                    }
1273                    is_active = tx.end_status == AccountStatus::Active;
1274
1275                    if !has_special_actions {
1276                        // Search for special actions (might be code hash update)
1277                        let info = tx.load_info()?;
1278                        let action_phase = match &info {
1279                            TxInfo::Ordinary(info) => info.action_phase.as_ref(),
1280                            TxInfo::TickTock(info) => info.action_phase.as_ref(),
1281                        };
1282                        if let Some(action_phase) = action_phase {
1283                            has_special_actions |= action_phase.special_actions > 0;
1284                        }
1285                    }
1286
1287                    // Don't write tx for account from blacklist
1288                    if let Some(blacklist) = &rpc_blacklist
1289                        && blacklist.contains(&tx_info[..33])
1290                    {
1291                        continue;
1292                    }
1293
1294                    if let Some(ref mut map) = updates {
1295                        let entry = map.entry(account).or_insert(tx.lt);
1296                        if tx.lt > *entry {
1297                            *entry = tx.lt;
1298                        }
1299                    }
1300
1301                    let tx_hash = tx_cell.inner().repr_hash();
1302                    let (tx_mask, msg_hash) = match &tx.in_msg {
1303                        Some(in_msg) => {
1304                            let hash = Some(in_msg.repr_hash());
1305                            let mask = TransactionMask::HAS_MSG_HASH;
1306                            (mask, hash)
1307                        }
1308                        None => (TransactionMask::empty(), None),
1309                    };
1310
1311                    // Collect transaction data to `tx_buffer`
1312                    buffer.clear();
1313                    buffer.push(tx_mask.bits());
1314                    buffer.extend_from_slice(tx_hash.as_slice());
1315                    if let Some(msg_hash) = msg_hash {
1316                        buffer.extend_from_slice(msg_hash.as_slice());
1317                    }
1318                    tycho_types::boc::ser::BocHeader::<ahash::RandomState>::with_root(
1319                        tx_cell.inner().as_ref(),
1320                    )
1321                    .encode(&mut buffer);
1322
1323                    // Write tx data and indices
1324                    write_batch.put_cf(tx_by_hash_cf, tx_hash.as_slice(), tx_info.as_slice());
1325                    write_batch.put_cf(block_txs_cf, block_tx.as_slice(), tx_hash.as_slice());
1326
1327                    if let Some(msg_hash) = msg_hash {
1328                        write_batch.put_cf(
1329                            tx_by_in_msg_cf,
1330                            msg_hash,
1331                            &tx_info[..tables::Transactions::KEY_LEN],
1332                        );
1333                    }
1334
1335                    write_batch.put_cf(tx_cf, &tx_info[..tables::Transactions::KEY_LEN], &buffer);
1336                }
1337
1338                // Update code hash
1339                let update = if is_active && (!was_active || has_special_actions) {
1340                    // Account is active after this block and this is either a new account,
1341                    // or it was an existing account which possibly changed its code.
1342                    // Update: just store the code hash.
1343                    Some(false)
1344                } else if was_active && !is_active {
1345                    // Account was active before this block and is not active after the block.
1346                    // Update: remove the code hash.
1347                    Some(true)
1348                } else {
1349                    // No update for other cases
1350                    None
1351                };
1352
1353                // Apply the update if any
1354                if let Some(remove) = update {
1355                    Self::update_code_hash(
1356                        &db,
1357                        workchain,
1358                        &account,
1359                        &accounts,
1360                        remove,
1361                        &mut write_batch,
1362                    )?;
1363                }
1364            }
1365
1366            // Write block info.
1367            let brief_block_info =
1368                BriefBlockInfo::new(block.as_ref(), info, extra, block_tx_count)?;
1369            buffer.clear();
1370            buffer.extend_from_slice(&tx_info[46..114]); // root_hash + file_hash + mc_seqno
1371            brief_block_info.write_to_bytes(&mut buffer); // everything else
1372
1373            write_batch.put_cf(
1374                &db.known_blocks.cf(),
1375                &block_tx[0..tables::KnownBlocks::KEY_LEN],
1376                buffer.as_slice(),
1377            );
1378
1379            drop(prepare_batch_histogram);
1380
1381            let _execute_batch_histogram =
1382                HistogramGuard::begin("tycho_storage_rpc_execute_batch_time");
1383
1384            db.rocksdb()
1385                .write_opt(write_batch, db.transactions.write_config())?;
1386
1387            let updates = updates
1388                .map(|map| {
1389                    map.into_iter()
1390                        .map(|(address, max_lt)| super::subscriptions::AccountUpdate {
1391                            address: StdAddr::new(workchain, address),
1392                            max_lt,
1393                            gen_utime: info.gen_utime,
1394                        })
1395                        .collect::<Vec<_>>()
1396                })
1397                .unwrap_or_default();
1398
1399            Ok::<_, anyhow::Error>((info.start_lt, updates))
1400        })
1401        .await??;
1402
1403        // Update min lt after a successful block processing.
1404        'min_lt: {
1405            // Update the runtime value first. Load is relaxed since we just need
1406            // to know that the value was updated.
1407            if start_lt < self.min_tx_lt.fetch_min(start_lt, Ordering::Release) {
1408                // Acquire the operation guard to ensure that there is only one writer.
1409                let _guard = self.min_tx_lt_guard.lock().await;
1410
1411                // Do nothing if the value was already updated while we were waiting.
1412                // Load is Acquire since we need to see the most recent value.
1413                if start_lt > self.min_tx_lt.load(Ordering::Acquire) {
1414                    break 'min_lt;
1415                }
1416
1417                // Update the value in the database.
1418                self.db.state.insert(TX_MIN_LT, start_lt.to_le_bytes())?;
1419            }
1420        }
1421
1422        if !updates.is_empty() {
1423            subscriptions.fanout_updates(updates).await;
1424        }
1425
1426        Ok(())
1427    }
1428
1429    fn update_code_hash(
1430        db: &RpcDb,
1431        workchain: i8,
1432        account: &HashBytes,
1433        accounts: &ShardAccountsDict,
1434        remove: bool,
1435        write_batch: &mut rocksdb::WriteBatch,
1436    ) -> Result<()> {
1437        // Find the new code hash
1438        let new_code_hash = 'code_hash: {
1439            if !remove && let Some((_, account)) = accounts.get(account)? {
1440                match extract_code_hash(&account)? {
1441                    ExtractedCodeHash::Exact(hash) => break 'code_hash hash,
1442                    ExtractedCodeHash::Skip => return Ok(()),
1443                }
1444            }
1445            None
1446        };
1447
1448        // Prepare column families
1449        let code_hashes_cf = &db.code_hashes.cf();
1450        let code_hashes_by_address_cf = &db.code_hashes_by_address.cf();
1451
1452        // Check the secondary index first
1453        let mut code_hashes_by_address_id = [0u8; tables::CodeHashesByAddress::KEY_LEN];
1454        code_hashes_by_address_id[0] = workchain as u8;
1455        code_hashes_by_address_id[1..33].copy_from_slice(account.as_slice());
1456
1457        // Find the old code hash
1458        let old_code_hash = db
1459            .code_hashes_by_address
1460            .get(code_hashes_by_address_id.as_slice())?;
1461
1462        if remove && old_code_hash.is_none()
1463            || matches!(
1464                (&old_code_hash, &new_code_hash),
1465                (Some(old), Some(new)) if old.as_ref() == new.as_slice()
1466            )
1467        {
1468            // Code hash should not be changed.
1469            return Ok(());
1470        }
1471
1472        let mut code_hashes_id = [0u8; tables::CodeHashes::KEY_LEN];
1473        code_hashes_id[32] = workchain as u8;
1474        code_hashes_id[33..65].copy_from_slice(account.as_slice());
1475
1476        // Remove entry from the primary index
1477        if let Some(old_code_hash) = old_code_hash {
1478            code_hashes_id[..32].copy_from_slice(&old_code_hash);
1479            write_batch.delete_cf(code_hashes_cf, code_hashes_id.as_slice());
1480        }
1481
1482        match new_code_hash {
1483            Some(new_code_hash) => {
1484                // Update primary index
1485                code_hashes_id[..32].copy_from_slice(new_code_hash.as_slice());
1486                write_batch.put_cf(
1487                    code_hashes_cf,
1488                    code_hashes_id.as_slice(),
1489                    new_code_hash.as_slice(),
1490                );
1491
1492                // Update secondary index
1493                write_batch.put_cf(
1494                    code_hashes_by_address_cf,
1495                    code_hashes_by_address_id.as_slice(),
1496                    new_code_hash.as_slice(),
1497                );
1498            }
1499            None => {
1500                // Remove entry from the secondary index
1501                write_batch.delete_cf(
1502                    code_hashes_by_address_cf,
1503                    code_hashes_by_address_id.as_slice(),
1504                );
1505            }
1506        }
1507
1508        Ok(())
1509    }
1510
1511    async fn remove_code_hashes(&self, shard: &ShardIdent) -> Result<()> {
1512        let workchain = shard.workchain() as u8;
1513
1514        // Remove from the secondary index first
1515        {
1516            let mut from = [0u8; { tables::CodeHashesByAddress::KEY_LEN }];
1517            from[0] = workchain;
1518
1519            {
1520                let [_, from @ ..] = &mut from;
1521                extend_account_prefix(shard, false, from);
1522            }
1523
1524            let mut to = from;
1525            {
1526                let [_, to @ ..] = &mut to;
1527                extend_account_prefix(shard, true, to);
1528            }
1529
1530            let raw = self.db.rocksdb();
1531            let cf = &self.db.code_hashes_by_address.cf();
1532            let writeopts = self.db.code_hashes_by_address.write_config();
1533
1534            // Remove `[from; to)`
1535            raw.delete_range_cf_opt(cf, &from, &to, writeopts)?;
1536            // Remove `to`, (-1:ffff..ffff might be a valid existing address)
1537            raw.delete_cf_opt(cf, to, writeopts)?;
1538        }
1539
1540        let cancelled = CancellationFlag::new();
1541        scopeguard::defer! {
1542            cancelled.cancel();
1543        }
1544
1545        // Full scan the main code hashes index and remove all entires for the shard
1546        let db = self.db.clone();
1547        let mut cancelled = cancelled.debounce(1000);
1548        let shard = *shard;
1549        let span = tracing::Span::current();
1550
1551        // NOTE: `spawn_blocking` is used here instead of `rayon_run` as it is IO-bound task.
1552        tokio::task::spawn_blocking(move || {
1553            let _span = span.enter();
1554
1555            let guard = scopeguard::guard((), |_| {
1556                tracing::warn!("cancelled");
1557            });
1558
1559            let cf = &db.code_hashes.cf();
1560
1561            let raw = db.rocksdb().as_ref();
1562            let snapshot = raw.snapshot();
1563            let mut readopts = db.code_hashes.new_read_config();
1564            readopts.set_snapshot(&snapshot);
1565
1566            let writeopts = db.code_hashes.write_config();
1567
1568            let mut iter = raw.raw_iterator_cf_opt(cf, readopts);
1569            iter.seek_to_first();
1570
1571            let mut prefix = shard.prefix();
1572            let tag = extract_tag(&shard);
1573            prefix -= tag; // Remove tag from the prefix
1574
1575            // For the prefix 1010000 the mask is 1100000
1576            let prefix_mask = !(tag | (tag - 1));
1577
1578            loop {
1579                let key = match iter.key() {
1580                    Some(key) => key,
1581                    None => break iter.status()?,
1582                };
1583
1584                if cancelled.check() {
1585                    anyhow::bail!("remove_code_hashes cancelled");
1586                }
1587
1588                if key.len() != tables::CodeHashes::KEY_LEN
1589                    || key[32] == workchain
1590                        && (shard.is_full() || {
1591                            // Filter only the keys with the same prefix
1592                            let key = u64::from_be_bytes(key[33..41].try_into().unwrap());
1593                            (key ^ prefix) & prefix_mask == 0
1594                        })
1595                {
1596                    raw.delete_cf_opt(cf, key, writeopts)?;
1597                }
1598
1599                iter.next();
1600            }
1601
1602            scopeguard::ScopeGuard::into_inner(guard);
1603            Ok(())
1604        })
1605        .await?
1606    }
1607}
1608
1609trait TableExt {
1610    fn get_ext<'db, K: AsRef<[u8]>>(
1611        &'db self,
1612        key: K,
1613        snapshot: Option<&RpcSnapshot>,
1614    ) -> Result<Option<rocksdb::DBPinnableSlice<'db>>>;
1615}
1616
1617impl<T: weedb::ColumnFamily> TableExt for weedb::Table<T> {
1618    fn get_ext<'db, K: AsRef<[u8]>>(
1619        &'db self,
1620        key: K,
1621        snapshot: Option<&RpcSnapshot>,
1622    ) -> Result<Option<rocksdb::DBPinnableSlice<'db>>> {
1623        match snapshot {
1624            None => self.get(key),
1625            Some(snapshot) => {
1626                anyhow::ensure!(
1627                    Arc::ptr_eq(snapshot.db(), self.db()),
1628                    "snapshot must be made for the same DB instance"
1629                );
1630
1631                let mut readopts = self.new_read_config();
1632                readopts.set_snapshot(snapshot);
1633                self.db().get_pinned_cf_opt(&self.cf(), key, &readopts)
1634            }
1635        }
1636        .map_err(Into::into)
1637    }
1638}
1639
1640#[derive(Debug)]
1641pub struct BriefShardDescr {
1642    pub shard_ident: ShardIdent,
1643    pub seqno: u32,
1644    pub root_hash: HashBytes,
1645    pub file_hash: HashBytes,
1646    pub start_lt: u64,
1647    pub end_lt: u64,
1648}
1649
1650#[derive(Debug, Clone)]
1651pub struct BriefBlockInfo {
1652    pub global_id: i32,
1653    pub version: u32,
1654    pub flags: u8,
1655    pub after_merge: bool,
1656    pub after_split: bool,
1657    pub before_split: bool,
1658    pub want_merge: bool,
1659    pub want_split: bool,
1660    pub validator_list_hash_short: u32,
1661    pub catchain_seqno: u32,
1662    pub min_ref_mc_seqno: u32,
1663    pub is_key_block: bool,
1664    pub prev_key_block_seqno: u32,
1665    pub start_lt: u64,
1666    pub end_lt: u64,
1667    pub gen_utime: u32,
1668    pub vert_seqno: u32,
1669    pub rand_seed: HashBytes,
1670    pub tx_count: u32,
1671    pub master_ref: Option<BlockId>,
1672    pub prev_blocks: Vec<BlockId>,
1673}
1674
1675impl BriefBlockInfo {
1676    const VERSION: u8 = 0;
1677    const MIN_BYTE_LEN: usize = 256;
1678
1679    fn new(
1680        block: &Block,
1681        info: &BlockInfo,
1682        extra: &BlockExtra,
1683        tx_count: usize,
1684    ) -> Result<Self, tycho_types::error::Error> {
1685        let shard_ident = info.shard;
1686        let prev_blocks = match info.load_prev_ref()? {
1687            PrevBlockRef::Single(block_ref) => vec![block_ref.as_block_id(shard_ident)],
1688            PrevBlockRef::AfterMerge { left, right } => vec![
1689                left.as_block_id(shard_ident),
1690                right.as_block_id(shard_ident),
1691            ],
1692        };
1693
1694        Ok(Self {
1695            global_id: block.global_id,
1696            version: info.version,
1697            flags: info.flags,
1698            after_merge: info.after_merge,
1699            after_split: info.after_split,
1700            before_split: info.before_split,
1701            want_merge: info.want_merge,
1702            want_split: info.want_split,
1703            validator_list_hash_short: info.gen_validator_list_hash_short,
1704            catchain_seqno: info.gen_catchain_seqno,
1705            min_ref_mc_seqno: info.min_ref_mc_seqno,
1706            is_key_block: info.key_block,
1707            prev_key_block_seqno: info.prev_key_block_seqno,
1708            start_lt: info.start_lt,
1709            end_lt: info.end_lt,
1710            gen_utime: info.gen_utime,
1711            vert_seqno: info.vert_seqno,
1712            rand_seed: extra.rand_seed,
1713            tx_count: tx_count.try_into().unwrap_or(u32::MAX),
1714            master_ref: info
1715                .load_master_ref()?
1716                .map(|r| r.as_block_id(ShardIdent::MASTERCHAIN)),
1717            prev_blocks,
1718        })
1719    }
1720
1721    fn write_to_bytes(&self, target: &mut Vec<u8>) {
1722        // NOTE: Bit 0 is reserved for future.
1723        let packed_flags = ((self.master_ref.is_some() as u8) << 7)
1724            | ((self.after_merge as u8) << 6)
1725            | ((self.before_split as u8) << 5)
1726            | ((self.after_split as u8) << 4)
1727            | ((self.want_split as u8) << 3)
1728            | ((self.want_merge as u8) << 2)
1729            | ((self.is_key_block as u8) << 1);
1730
1731        target.reserve(Self::MIN_BYTE_LEN);
1732        target.push(Self::VERSION);
1733        target.extend_from_slice(&self.global_id.to_le_bytes());
1734        target.extend_from_slice(&self.version.to_le_bytes());
1735        target.push(self.flags);
1736        target.push(packed_flags);
1737        target.extend_from_slice(&self.validator_list_hash_short.to_le_bytes());
1738        target.extend_from_slice(&self.catchain_seqno.to_le_bytes());
1739        target.extend_from_slice(&self.min_ref_mc_seqno.to_le_bytes());
1740        target.extend_from_slice(&self.prev_key_block_seqno.to_le_bytes());
1741        target.extend_from_slice(&self.start_lt.to_le_bytes());
1742        target.extend_from_slice(&self.end_lt.to_le_bytes());
1743        target.extend_from_slice(&self.gen_utime.to_le_bytes());
1744        target.extend_from_slice(&self.vert_seqno.to_le_bytes());
1745        target.extend_from_slice(self.rand_seed.as_slice());
1746        target.extend_from_slice(&self.tx_count.to_le_bytes());
1747        if let Some(block_id) = &self.master_ref {
1748            target.extend_from_slice(&block_id.seqno.to_le_bytes());
1749            target.extend_from_slice(block_id.root_hash.as_slice());
1750            target.extend_from_slice(block_id.file_hash.as_slice());
1751        }
1752        target.push(self.prev_blocks.len() as u8);
1753        for block_id in &self.prev_blocks {
1754            target.extend_from_slice(&block_id.shard.prefix().to_le_bytes());
1755            target.extend_from_slice(&block_id.seqno.to_le_bytes());
1756            target.extend_from_slice(block_id.root_hash.as_slice());
1757            target.extend_from_slice(block_id.file_hash.as_slice());
1758        }
1759    }
1760
1761    fn load_from_bytes(workchain: i32, mut bytes: &[u8]) -> Option<Self> {
1762        use bytes::Buf;
1763
1764        if bytes.get_u8() != Self::VERSION {
1765            return None;
1766        }
1767
1768        let global_id = bytes.get_i32_le();
1769        let version = bytes.get_u32_le();
1770        let [flags, packed_flags] = bytes.get_u16().to_be_bytes();
1771        let validator_list_hash_short = bytes.get_u32_le();
1772        let catchain_seqno = bytes.get_u32_le();
1773        let min_ref_mc_seqno = bytes.get_u32_le();
1774        let prev_key_block_seqno = bytes.get_u32_le();
1775        let start_lt = bytes.get_u64_le();
1776        let end_lt = bytes.get_u64_le();
1777        let gen_utime = bytes.get_u32_le();
1778        let vert_seqno = bytes.get_u32_le();
1779        let rand_seed = HashBytes::from_slice(&bytes[..32]);
1780        bytes = &bytes[32..];
1781        let tx_count = bytes.get_u32_le();
1782
1783        let master_ref = if packed_flags & 0b10000000 != 0 {
1784            let seqno = bytes.get_u32_le();
1785            let root_hash = HashBytes::from_slice(&bytes[0..32]);
1786            let file_hash = HashBytes::from_slice(&bytes[32..64]);
1787            bytes = &bytes[64..];
1788            Some(BlockId {
1789                shard: ShardIdent::MASTERCHAIN,
1790                seqno,
1791                root_hash,
1792                file_hash,
1793            })
1794        } else {
1795            None
1796        };
1797
1798        let prev_block_count = bytes.get_u8();
1799        let mut prev_blocks = Vec::with_capacity(prev_block_count as _);
1800        for _ in 0..prev_block_count {
1801            prev_blocks.push(BlockId {
1802                shard: ShardIdent::new(
1803                    workchain,
1804                    u64::from_le_bytes(bytes[0..8].try_into().unwrap()),
1805                )
1806                .unwrap(),
1807                seqno: u32::from_le_bytes(bytes[8..12].try_into().unwrap()),
1808                root_hash: HashBytes::from_slice(&bytes[12..44]),
1809                file_hash: HashBytes::from_slice(&bytes[44..76]),
1810            });
1811            bytes = &bytes[76..];
1812        }
1813
1814        Some(Self {
1815            global_id,
1816            version,
1817            flags,
1818            after_merge: packed_flags & 0b01000000 != 0,
1819            after_split: packed_flags & 0b00010000 != 0,
1820            before_split: packed_flags & 0b00100000 != 0,
1821            want_merge: packed_flags & 0b00000100 != 0,
1822            want_split: packed_flags & 0b00001000 != 0,
1823            validator_list_hash_short,
1824            catchain_seqno,
1825            min_ref_mc_seqno,
1826            is_key_block: packed_flags & 0b00000010 != 0,
1827            prev_key_block_seqno,
1828            start_lt,
1829            end_lt,
1830            gen_utime,
1831            vert_seqno,
1832            rand_seed,
1833            tx_count,
1834            master_ref,
1835            prev_blocks,
1836        })
1837    }
1838}
1839
1840#[derive(Clone)]
1841#[repr(transparent)]
1842pub struct RpcSnapshot(Arc<weedb::OwnedSnapshot>);
1843
1844impl std::ops::Deref for RpcSnapshot {
1845    type Target = weedb::OwnedSnapshot;
1846
1847    #[inline]
1848    fn deref(&self) -> &Self::Target {
1849        self.0.as_ref()
1850    }
1851}
1852
1853pub struct BlocksByMcSeqnoIter {
1854    mc_seqno: u32,
1855    inner: weedb::OwnedRawIterator,
1856    snapshot: RpcSnapshot,
1857}
1858
1859impl BlocksByMcSeqnoIter {
1860    pub fn mc_seqno(&self) -> u32 {
1861        self.mc_seqno
1862    }
1863
1864    pub fn snapshot(&self) -> &RpcSnapshot {
1865        &self.snapshot
1866    }
1867}
1868
1869impl Iterator for BlocksByMcSeqnoIter {
1870    // TODO: Extend with LT range?
1871    type Item = BlockId;
1872
1873    fn next(&mut self) -> Option<Self::Item> {
1874        let (key, value) = self.inner.item()?;
1875        let shard = ShardIdent::new(
1876            key[4] as i8 as i32,
1877            u64::from_be_bytes(key[5..13].try_into().unwrap()),
1878        )
1879        .expect("stored shard must have a valid prefix");
1880        let seqno = u32::from_be_bytes(key[13..17].try_into().unwrap());
1881
1882        let block_id = BlockId {
1883            shard,
1884            seqno,
1885            root_hash: HashBytes::from_slice(&value[0..32]),
1886            file_hash: HashBytes::from_slice(&value[32..64]),
1887        };
1888        self.inner.next();
1889
1890        Some(block_id)
1891    }
1892}
1893
1894pub struct CodeHashesIter<'a> {
1895    inner: rocksdb::DBRawIterator<'a>,
1896    snapshot: RpcSnapshot,
1897}
1898
1899impl<'a> CodeHashesIter<'a> {
1900    pub fn snapshot(&self) -> &RpcSnapshot {
1901        &self.snapshot
1902    }
1903
1904    pub fn into_raw(self) -> RawCodeHashesIter<'a> {
1905        RawCodeHashesIter {
1906            inner: self.inner,
1907            snapshot: self.snapshot,
1908        }
1909    }
1910}
1911
1912impl Iterator for CodeHashesIter<'_> {
1913    type Item = StdAddr;
1914
1915    fn next(&mut self) -> Option<Self::Item> {
1916        let value = self.inner.key()?;
1917        debug_assert!(value.len() == tables::CodeHashes::KEY_LEN);
1918
1919        let result = Some(StdAddr {
1920            anycast: None,
1921            workchain: value[32] as i8,
1922            address: HashBytes(value[33..65].try_into().unwrap()),
1923        });
1924        self.inner.next();
1925        result
1926    }
1927}
1928
1929pub struct RawCodeHashesIter<'a> {
1930    inner: rocksdb::DBRawIterator<'a>,
1931    snapshot: RpcSnapshot,
1932}
1933
1934impl RawCodeHashesIter<'_> {
1935    pub fn snapshot(&self) -> &RpcSnapshot {
1936        &self.snapshot
1937    }
1938}
1939
1940impl Iterator for RawCodeHashesIter<'_> {
1941    type Item = [u8; 33];
1942
1943    fn next(&mut self) -> Option<Self::Item> {
1944        let value = self.inner.key()?;
1945        debug_assert!(value.len() == tables::CodeHashes::KEY_LEN);
1946
1947        let result = Some(value[32..65].try_into().unwrap());
1948        self.inner.next();
1949        result
1950    }
1951}
1952
1953#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
1954pub struct BlockTransactionsCursor {
1955    pub hash: HashBytes,
1956    pub lt: u64,
1957}
1958
1959pub struct BlockTransactionIdsIter {
1960    block_id: BlockId,
1961    ref_by_mc_seqno: u32,
1962    is_reversed: bool,
1963    inner: weedb::OwnedRawIterator,
1964    snapshot: RpcSnapshot,
1965}
1966
1967impl BlockTransactionIdsIter {
1968    pub fn is_reversed(&self) -> bool {
1969        self.is_reversed
1970    }
1971
1972    pub fn block_id(&self) -> &BlockId {
1973        &self.block_id
1974    }
1975
1976    pub fn ref_by_mc_seqno(&self) -> u32 {
1977        self.ref_by_mc_seqno
1978    }
1979
1980    pub fn snapshot(&self) -> &RpcSnapshot {
1981        &self.snapshot
1982    }
1983}
1984
1985impl Iterator for BlockTransactionIdsIter {
1986    type Item = FullTransactionId;
1987
1988    fn next(&mut self) -> Option<Self::Item> {
1989        let (key, value) = self.inner.item()?;
1990        let res = Some(FullTransactionId {
1991            account: StdAddr::new(key[0] as i8, HashBytes::from_slice(&key[13..45])),
1992            lt: u64::from_be_bytes(key[45..53].try_into().unwrap()),
1993            hash: HashBytes::from_slice(&value[0..32]),
1994        });
1995        if self.is_reversed {
1996            self.inner.prev();
1997        } else {
1998            self.inner.next();
1999        }
2000        res
2001    }
2002}
2003
2004pub struct BlockTransactionsIterBuilder {
2005    ids: BlockTransactionIdsIter,
2006    transactions_cf: weedb::UnboundedCfHandle,
2007}
2008
2009impl BlockTransactionsIterBuilder {
2010    #[inline]
2011    pub fn is_reversed(&self) -> bool {
2012        self.ids.is_reversed()
2013    }
2014
2015    #[inline]
2016    pub fn block_id(&self) -> &BlockId {
2017        self.ids.block_id()
2018    }
2019
2020    #[inline]
2021    pub fn ref_by_mc_seqno(&self) -> u32 {
2022        self.ids.ref_by_mc_seqno()
2023    }
2024
2025    #[inline]
2026    pub fn snapshot(&self) -> &RpcSnapshot {
2027        self.ids.snapshot()
2028    }
2029
2030    #[inline]
2031    pub fn into_ids(self) -> BlockTransactionIdsIter {
2032        self.ids
2033    }
2034
2035    pub fn map<F, R>(self, map: F) -> BlockTransactionsIter<F>
2036    where
2037        for<'a> F: FnMut(&'a StdAddr, u64, &'a [u8]) -> R,
2038    {
2039        BlockTransactionsIter {
2040            ids: self.ids,
2041            transactions_cf: self.transactions_cf,
2042            map,
2043        }
2044    }
2045}
2046
2047pub struct BlockTransactionsIter<F> {
2048    ids: BlockTransactionIdsIter,
2049    transactions_cf: weedb::UnboundedCfHandle,
2050    map: F,
2051}
2052
2053impl<F> BlockTransactionsIter<F> {
2054    #[inline]
2055    pub fn is_reversed(&self) -> bool {
2056        self.ids.is_reversed()
2057    }
2058
2059    #[inline]
2060    pub fn block_id(&self) -> &BlockId {
2061        self.ids.block_id()
2062    }
2063
2064    #[inline]
2065    pub fn snapshot(&self) -> &RpcSnapshot {
2066        self.ids.snapshot()
2067    }
2068
2069    #[inline]
2070    pub fn into_ids(self) -> BlockTransactionIdsIter {
2071        self.ids
2072    }
2073}
2074
2075impl<F, R> Iterator for BlockTransactionsIter<F>
2076where
2077    for<'a> F: FnMut(&'a StdAddr, u64, &'a [u8]) -> Option<R>,
2078{
2079    type Item = R;
2080
2081    fn next(&mut self) -> Option<Self::Item> {
2082        loop {
2083            let id = self.ids.next()?;
2084
2085            let mut key = [0; tables::Transactions::KEY_LEN];
2086            key[0] = id.account.workchain as u8;
2087            key[1..33].copy_from_slice(id.account.address.as_slice());
2088            key[33..41].copy_from_slice(&id.lt.to_be_bytes());
2089
2090            let cf = self.transactions_cf.bound();
2091            let value = match self.ids.snapshot.get_pinned_cf(&cf, key) {
2092                Ok(Some(value)) => value,
2093                // TODO: Maybe return error here?
2094                Ok(None) => continue,
2095                // TODO: Maybe return error here?
2096                Err(_) => return None,
2097            };
2098            break (self.map)(
2099                &id.account,
2100                id.lt,
2101                TransactionData::read_transaction(&value),
2102            );
2103        }
2104    }
2105}
2106
2107#[derive(Debug, Clone)]
2108pub struct FullTransactionId {
2109    pub account: StdAddr,
2110    pub lt: u64,
2111    pub hash: HashBytes,
2112}
2113
2114pub struct TransactionsIterBuilder {
2115    is_reversed: bool,
2116    inner: weedb::OwnedRawIterator,
2117    // NOTE: We must store the snapshot for as long as iterator is alive.
2118    snapshot: RpcSnapshot,
2119}
2120
2121impl TransactionsIterBuilder {
2122    #[inline]
2123    pub fn is_reversed(&self) -> bool {
2124        self.is_reversed
2125    }
2126
2127    #[inline]
2128    pub fn snapshot(&self) -> &RpcSnapshot {
2129        &self.snapshot
2130    }
2131
2132    pub fn map<F, R>(self, map: F) -> TransactionsIter<F, false>
2133    where
2134        for<'a> F: FnMut(&'a [u8]) -> R,
2135    {
2136        TransactionsIter {
2137            is_reversed: self.is_reversed,
2138            inner: self.inner,
2139            map,
2140            snapshot: self.snapshot,
2141        }
2142    }
2143
2144    pub fn map_ext<F, R>(self, map: F) -> TransactionsIter<F, true>
2145    where
2146        for<'a> F: FnMut(u64, &'a HashBytes, &'a [u8]) -> R,
2147    {
2148        TransactionsIter {
2149            is_reversed: self.is_reversed,
2150            inner: self.inner,
2151            map,
2152            snapshot: self.snapshot,
2153        }
2154    }
2155}
2156
2157pub struct TransactionsIter<F, const EXT: bool> {
2158    is_reversed: bool,
2159    inner: weedb::OwnedRawIterator,
2160    map: F,
2161    snapshot: RpcSnapshot,
2162}
2163
2164pub type TransactionsExtIter<F> = TransactionsIter<F, true>;
2165
2166impl<F, const EXT: bool> TransactionsIter<F, EXT> {
2167    #[inline]
2168    pub fn is_reversed(&self) -> bool {
2169        self.is_reversed
2170    }
2171
2172    #[inline]
2173    pub fn snapshot(&self) -> &RpcSnapshot {
2174        &self.snapshot
2175    }
2176}
2177
2178impl<F, R> Iterator for TransactionsIter<F, false>
2179where
2180    for<'a> F: FnMut(&'a [u8]) -> Option<R>,
2181{
2182    type Item = R;
2183
2184    fn next(&mut self) -> Option<Self::Item> {
2185        let value = self.inner.value()?;
2186        let result = (self.map)(TransactionData::read_transaction(value))?;
2187        if self.is_reversed {
2188            self.inner.prev();
2189        } else {
2190            self.inner.next();
2191        }
2192        Some(result)
2193    }
2194}
2195
2196impl<F, R> Iterator for TransactionsIter<F, true>
2197where
2198    for<'a> F: FnMut(u64, &'a HashBytes, &'a [u8]) -> Option<R>,
2199{
2200    type Item = R;
2201
2202    fn next(&mut self) -> Option<Self::Item> {
2203        let (key, value) = self.inner.item()?;
2204        let result = (self.map)(
2205            u64::from_be_bytes(key[33..41].try_into().unwrap()),
2206            &TransactionData::read_tx_hash(value),
2207            TransactionData::read_transaction(value),
2208        )?;
2209        if self.is_reversed {
2210            self.inner.prev();
2211        } else {
2212            self.inner.next();
2213        }
2214        Some(result)
2215    }
2216}
2217
2218#[derive(Debug, Clone)]
2219pub struct TransactionInfo {
2220    pub account: StdAddr,
2221    pub lt: u64,
2222    pub block_id: BlockId,
2223    pub mc_seqno: u32,
2224}
2225
2226impl TransactionInfo {
2227    fn from_bytes(bytes: &[u8]) -> Option<Self> {
2228        if bytes.len() < tables::TransactionsByHash::VALUE_FULL_LEN {
2229            return None;
2230        }
2231
2232        let account = StdAddr::new(bytes[0] as i8, HashBytes::from_slice(&bytes[1..33]));
2233        let lt = u64::from_be_bytes(bytes[33..41].try_into().unwrap());
2234        let prefix_len = bytes[41];
2235        debug_assert!(prefix_len < 64);
2236
2237        let tail_mask = 1u64 << (63 - prefix_len);
2238
2239        // TODO: Move into types?
2240        let Some(shard) = ShardIdent::new(
2241            bytes[0] as i8 as i32,
2242            (account.prefix() | tail_mask) & !(tail_mask - 1),
2243        ) else {
2244            // TODO: unwrap?
2245            return None;
2246        };
2247
2248        let block_id = BlockId {
2249            shard,
2250            seqno: u32::from_le_bytes(bytes[42..46].try_into().unwrap()),
2251            root_hash: HashBytes::from_slice(&bytes[46..78]),
2252            file_hash: HashBytes::from_slice(&bytes[78..110]),
2253        };
2254        let mc_seqno = u32::from_le_bytes(bytes[110..114].try_into().unwrap());
2255
2256        Some(Self {
2257            account,
2258            lt,
2259            block_id,
2260            mc_seqno,
2261        })
2262    }
2263}
2264
2265pub struct TransactionDataExt<'a> {
2266    pub info: TransactionInfo,
2267    pub data: TransactionData<'a>,
2268}
2269
2270pub struct TransactionData<'a> {
2271    data: rocksdb::DBPinnableSlice<'a>,
2272}
2273
2274impl<'a> TransactionData<'a> {
2275    pub fn new(data: rocksdb::DBPinnableSlice<'a>) -> Self {
2276        Self { data }
2277    }
2278
2279    pub fn tx_hash(&self) -> HashBytes {
2280        let value = self.data.as_ref();
2281        assert!(!value.is_empty());
2282        HashBytes::from_slice(&value[1..33])
2283    }
2284
2285    pub fn in_msg_hash(&self) -> Option<HashBytes> {
2286        let value = self.data.as_ref();
2287        assert!(!value.is_empty());
2288
2289        let mask = TransactionMask::from_bits_retain(value[0]);
2290        mask.has_msg_hash()
2291            .then(|| HashBytes::from_slice(&value[33..65]))
2292    }
2293
2294    fn read_tx_hash(value: &[u8]) -> HashBytes {
2295        HashBytes::from_slice(&value[1..33])
2296    }
2297
2298    fn read_transaction<T: AsRef<[u8]> + ?Sized>(value: &T) -> &[u8] {
2299        let value = value.as_ref();
2300        assert!(!value.is_empty());
2301
2302        let mask = TransactionMask::from_bits_retain(value[0]);
2303        let boc_start = if mask.has_msg_hash() { 65 } else { 33 }; // 1 + 32 + (32)
2304
2305        assert!(boc_start < value.len());
2306
2307        value[boc_start..].as_ref()
2308    }
2309}
2310
2311impl AsRef<[u8]> for TransactionData<'_> {
2312    fn as_ref(&self) -> &[u8] {
2313        Self::read_transaction(self.data.as_ref())
2314    }
2315}
2316
2317enum ExtractedCodeHash {
2318    Exact(Option<HashBytes>),
2319    Skip,
2320}
2321
2322fn extract_code_hash(account: &ShardAccount) -> Result<ExtractedCodeHash> {
2323    if account.account.inner().descriptor().is_pruned_branch() {
2324        return Ok(ExtractedCodeHash::Skip);
2325    }
2326
2327    if let Some(account) = account.load_account()?
2328        && let AccountState::Active(state_init) = &account.state
2329        && let Some(code) = &state_init.code
2330    {
2331        return Ok(ExtractedCodeHash::Exact(Some(*code.repr_hash())));
2332    }
2333
2334    Ok(ExtractedCodeHash::Exact(None))
2335}
2336
2337fn split_shard(
2338    shard: &ShardIdent,
2339    accounts: &ShardAccountsDict,
2340    depth: u8,
2341    shards: &mut FastHashMap<ShardIdent, ShardAccountsDict>,
2342) -> Result<()> {
2343    fn split_shard_impl(
2344        shard: &ShardIdent,
2345        accounts: &ShardAccountsDict,
2346        depth: u8,
2347        shards: &mut FastHashMap<ShardIdent, ShardAccountsDict>,
2348        builder: &mut CellBuilder,
2349    ) -> Result<()> {
2350        let (left_shard_ident, right_shard_ident) = 'split: {
2351            if depth > 0
2352                && let Some((left, right)) = shard.split()
2353            {
2354                break 'split (left, right);
2355            }
2356            shards.insert(*shard, accounts.clone());
2357            return Ok(());
2358        };
2359
2360        let (left_accounts, right_accounts) = {
2361            builder.clear_bits();
2362            let prefix_len = shard.prefix_len();
2363            if prefix_len > 0 {
2364                builder.store_uint(shard.prefix() >> (64 - prefix_len), prefix_len)?;
2365            }
2366            accounts.split_by_prefix(&builder.as_data_slice())?
2367        };
2368
2369        split_shard_impl(
2370            &left_shard_ident,
2371            &left_accounts,
2372            depth - 1,
2373            shards,
2374            builder,
2375        )?;
2376        split_shard_impl(
2377            &right_shard_ident,
2378            &right_accounts,
2379            depth - 1,
2380            shards,
2381            builder,
2382        )
2383    }
2384
2385    split_shard_impl(shard, accounts, depth, shards, &mut CellBuilder::new())
2386}
2387
2388type ShardAccountsDict = Dict<HashBytes, (DepthBalanceInfo, ShardAccount)>;
2389
2390fn extend_account_prefix(shard: &ShardIdent, max: bool, target: &mut [u8; 32]) {
2391    let mut prefix = shard.prefix();
2392    if max {
2393        // Fill remaining bits after the trailing bit
2394        // 1010000:
2395        // 1010000 | (1010000 - 1) = 1010000 | 1001111 = 1011111
2396        prefix |= prefix - 1;
2397    } else {
2398        // Remove the trailing bit
2399        // 1010000:
2400        // (!1010000 + 1) = 0101111 + 1 = 0110000
2401        // 1010000 & 0110000 = 0010000 // only trailing bit
2402        prefix -= extract_tag(shard);
2403    };
2404    target[..8].copy_from_slice(&prefix.to_be_bytes());
2405    target[8..].fill(0xff * max as u8);
2406}
2407
2408const fn extract_tag(shard: &ShardIdent) -> u64 {
2409    let prefix = shard.prefix();
2410    prefix & (!prefix).wrapping_add(1)
2411}
2412
2413bitflags::bitflags! {
2414    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2415    pub struct TransactionMask: u8 {
2416        const HAS_MSG_HASH = 1 << 0;
2417    }
2418}
2419
2420impl TransactionMask {
2421    pub fn has_msg_hash(&self) -> bool {
2422        self.contains(TransactionMask::HAS_MSG_HASH)
2423    }
2424}
2425
2426type AddressKey = [u8; 33];
2427
2428const TX_MIN_LT: &[u8] = b"tx_min_lt";
2429const TX_GC_RUNNING: &[u8] = b"tx_gc_running";
2430const INSTANCE_ID: &[u8] = b"instance_id";
2431
2432#[cfg(test)]
2433mod tests {
2434    use super::*;
2435
2436    #[test]
2437    fn shard_prefix() {
2438        let prefix_len = 10;
2439
2440        let account_prefix = 0xabccdeadaaaaaaaa;
2441        let tail_mask = 1u64 << (63 - prefix_len);
2442
2443        let shard = ShardIdent::new(0, (account_prefix | tail_mask) & !(tail_mask - 1)).unwrap();
2444        assert_eq!(shard, unsafe {
2445            ShardIdent::new_unchecked(0, 0xabe0000000000000)
2446        });
2447    }
2448}