ckb_store/
transaction.rs

1use crate::cache::StoreCache;
2use crate::store::ChainStore;
3use ckb_chain_spec::versionbits::VersionbitsIndexer;
4use ckb_db::{
5    DBPinnableSlice, RocksDBTransaction, RocksDBTransactionSnapshot,
6    iter::{DBIter, DBIterator, IteratorMode},
7};
8use ckb_db_schema::{
9    COLUMN_BLOCK_BODY, COLUMN_BLOCK_EPOCH, COLUMN_BLOCK_EXT, COLUMN_BLOCK_EXTENSION,
10    COLUMN_BLOCK_FILTER, COLUMN_BLOCK_FILTER_HASH, COLUMN_BLOCK_HEADER, COLUMN_BLOCK_PROPOSAL_IDS,
11    COLUMN_BLOCK_UNCLE, COLUMN_CELL, COLUMN_CELL_DATA, COLUMN_CELL_DATA_HASH,
12    COLUMN_CHAIN_ROOT_MMR, COLUMN_EPOCH, COLUMN_INDEX, COLUMN_META, COLUMN_NUMBER_HASH,
13    COLUMN_TRANSACTION_INFO, COLUMN_UNCLES, Col, META_CURRENT_EPOCH_KEY,
14    META_LATEST_BUILT_FILTER_DATA_KEY, META_TIP_HEADER_KEY,
15};
16use ckb_error::Error;
17use ckb_freezer::Freezer;
18use ckb_merkle_mountain_range::{Error as MMRError, MMRStore, Result as MMRResult};
19use ckb_types::{
20    core::{
21        BlockExt, BlockView, EpochExt, HeaderView, TransactionView,
22        cell::{CellChecker, CellProvider, CellStatus},
23    },
24    packed::{self, Byte32, OutPoint},
25    prelude::*,
26    utilities::calc_filter_hash,
27};
28use std::sync::Arc;
29
30/// A Transaction DB
31pub struct StoreTransaction {
32    pub(crate) inner: RocksDBTransaction,
33    pub(crate) freezer: Option<Freezer>,
34    pub(crate) cache: Arc<StoreCache>,
35}
36
37impl ChainStore for StoreTransaction {
38    fn cache(&self) -> Option<&StoreCache> {
39        Some(&self.cache)
40    }
41
42    fn freezer(&self) -> Option<&Freezer> {
43        self.freezer.as_ref()
44    }
45
46    fn get(&self, col: Col, key: &[u8]) -> Option<DBPinnableSlice<'_>> {
47        self.inner
48            .get_pinned(col, key)
49            .expect("db operation should be ok")
50    }
51
52    fn get_iter(&self, col: Col, mode: IteratorMode) -> DBIter {
53        self.inner
54            .iter(col, mode)
55            .expect("db operation should be ok")
56    }
57}
58
59impl VersionbitsIndexer for StoreTransaction {
60    fn block_epoch_index(&self, block_hash: &Byte32) -> Option<Byte32> {
61        ChainStore::get_block_epoch_index(self, block_hash)
62    }
63
64    fn epoch_ext(&self, index: &Byte32) -> Option<EpochExt> {
65        ChainStore::get_epoch_ext(self, index)
66    }
67
68    fn block_header(&self, block_hash: &Byte32) -> Option<HeaderView> {
69        ChainStore::get_block_header(self, block_hash)
70    }
71
72    fn cellbase(&self, block_hash: &Byte32) -> Option<TransactionView> {
73        ChainStore::get_cellbase(self, block_hash)
74    }
75}
76
77impl CellProvider for StoreTransaction {
78    fn cell(&self, out_point: &OutPoint, eager_load: bool) -> CellStatus {
79        match self.get_cell(out_point) {
80            Some(mut cell_meta) => {
81                if eager_load {
82                    if let Some((data, data_hash)) = self.get_cell_data(out_point) {
83                        cell_meta.mem_cell_data = Some(data);
84                        cell_meta.mem_cell_data_hash = Some(data_hash);
85                    }
86                }
87                CellStatus::live_cell(cell_meta)
88            }
89            None => CellStatus::Unknown,
90        }
91    }
92}
93
94impl CellChecker for StoreTransaction {
95    fn is_live(&self, out_point: &OutPoint) -> Option<bool> {
96        if self.have_cell(out_point) {
97            Some(true)
98        } else {
99            None
100        }
101    }
102}
103
104pub struct StoreTransactionSnapshot<'a> {
105    pub(crate) inner: RocksDBTransactionSnapshot<'a>,
106    pub(crate) freezer: Option<Freezer>,
107    pub(crate) cache: Arc<StoreCache>,
108}
109
110impl<'a> ChainStore for StoreTransactionSnapshot<'a> {
111    fn cache(&self) -> Option<&StoreCache> {
112        Some(&self.cache)
113    }
114
115    fn freezer(&self) -> Option<&Freezer> {
116        self.freezer.as_ref()
117    }
118
119    fn get(&self, col: Col, key: &[u8]) -> Option<DBPinnableSlice> {
120        self.inner
121            .get_pinned(col, key)
122            .expect("db operation should be ok")
123    }
124
125    fn get_iter(&self, col: Col, mode: IteratorMode) -> DBIter {
126        self.inner
127            .iter(col, mode)
128            .expect("db operation should be ok")
129    }
130}
131
132impl StoreTransaction {
133    /// Inserts a raw key-value pair into the specified column.
134    pub fn insert_raw(&self, col: Col, key: &[u8], value: &[u8]) -> Result<(), Error> {
135        self.inner.put(col, key, value)
136    }
137
138    /// Deletes a key from the specified column.
139    pub fn delete(&self, col: Col, key: &[u8]) -> Result<(), Error> {
140        self.inner.delete(col, key)
141    }
142
143    /// Commits the transaction, writing all changes to the database.
144    pub fn commit(&self) -> Result<(), Error> {
145        self.inner.commit()
146    }
147
148    /// Returns a snapshot of the transaction's current state.
149    pub fn get_snapshot(&self) -> StoreTransactionSnapshot<'_> {
150        StoreTransactionSnapshot {
151            inner: self.inner.get_snapshot(),
152            freezer: self.freezer.clone(),
153            cache: Arc::clone(&self.cache),
154        }
155    }
156
157    /// Gets the tip header hash for update, locking the row in the transaction.
158    pub fn get_update_for_tip_hash(
159        &self,
160        snapshot: &StoreTransactionSnapshot<'_>,
161    ) -> Option<packed::Byte32> {
162        self.inner
163            .get_for_update(COLUMN_META, META_TIP_HEADER_KEY, &snapshot.inner)
164            .expect("db operation should be ok")
165            .map(|slice| packed::Byte32Reader::from_slice_should_be_ok(slice.as_ref()).to_entity())
166    }
167
168    /// Inserts or updates the tip header hash in the store.
169    pub fn insert_tip_header(&self, h: &HeaderView) -> Result<(), Error> {
170        self.insert_raw(COLUMN_META, META_TIP_HEADER_KEY, h.hash().as_slice())
171    }
172
173    /// Inserts a block into the store.
174    pub fn insert_block(&self, block: &BlockView) -> Result<(), Error> {
175        let hash = block.hash();
176        let header = Into::<packed::HeaderView>::into(block.header());
177        let uncles = Into::<packed::UncleBlockVecView>::into(block.uncles());
178        let proposals = block.data().proposals();
179        let txs_len: packed::Uint32 = (block.transactions().len() as u32).into();
180        self.insert_raw(COLUMN_BLOCK_HEADER, hash.as_slice(), header.as_slice())?;
181        self.insert_raw(COLUMN_BLOCK_UNCLE, hash.as_slice(), uncles.as_slice())?;
182        if let Some(extension) = block.extension() {
183            self.insert_raw(
184                COLUMN_BLOCK_EXTENSION,
185                hash.as_slice(),
186                extension.as_slice(),
187            )?;
188        }
189        self.insert_raw(
190            COLUMN_NUMBER_HASH,
191            packed::NumberHash::new_builder()
192                .number(block.number())
193                .block_hash(hash.clone())
194                .build()
195                .as_slice(),
196            txs_len.as_slice(),
197        )?;
198        self.insert_raw(
199            COLUMN_BLOCK_PROPOSAL_IDS,
200            hash.as_slice(),
201            proposals.as_slice(),
202        )?;
203        for (index, tx) in block.transactions().into_iter().enumerate() {
204            let key = packed::TransactionKey::new_builder()
205                .block_hash(hash.clone())
206                .index(index)
207                .build();
208            let tx_data = Into::<packed::TransactionView>::into(tx);
209            self.insert_raw(COLUMN_BLOCK_BODY, key.as_slice(), tx_data.as_slice())?;
210        }
211        Ok(())
212    }
213
214    /// Deletes a block from the store.
215    pub fn delete_block(&self, block: &BlockView) -> Result<(), Error> {
216        let hash = block.hash();
217        let txs_len = block.transactions().len();
218        self.delete(COLUMN_BLOCK_HEADER, hash.as_slice())?;
219        self.delete(COLUMN_BLOCK_UNCLE, hash.as_slice())?;
220        self.delete(COLUMN_BLOCK_EXTENSION, hash.as_slice())?;
221        self.delete(COLUMN_BLOCK_PROPOSAL_IDS, hash.as_slice())?;
222        self.delete(
223            COLUMN_NUMBER_HASH,
224            packed::NumberHash::new_builder()
225                .number(block.number())
226                .block_hash(hash.clone())
227                .build()
228                .as_slice(),
229        )?;
230        // currently rocksdb transaction do not support `DeleteRange`
231        // https://github.com/facebook/rocksdb/issues/4812
232        for index in 0..txs_len {
233            let key = packed::TransactionKey::new_builder()
234                .block_hash(hash.clone())
235                .index(index)
236                .build();
237            self.delete(COLUMN_BLOCK_BODY, key.as_slice())?;
238        }
239        Ok(())
240    }
241
242    /// Inserts block extension data.
243    pub fn insert_block_ext(
244        &self,
245        block_hash: &packed::Byte32,
246        ext: &BlockExt,
247    ) -> Result<(), Error> {
248        let packed_ext: packed::BlockExtV1 = ext.into();
249        self.insert_raw(
250            COLUMN_BLOCK_EXT,
251            block_hash.as_slice(),
252            packed_ext.as_slice(),
253        )
254    }
255
256    /// Attaches a block to the main chain, indexing its transactions and uncles.
257    pub fn attach_block(&self, block: &BlockView) -> Result<(), Error> {
258        let header = block.data().header();
259        let block_hash = block.hash();
260        for (index, tx_hash) in block.tx_hashes().iter().enumerate() {
261            let key = packed::TransactionKey::new_builder()
262                .block_hash(block_hash.clone())
263                .index(index)
264                .build();
265            let info = packed::TransactionInfo::new_builder()
266                .key(key)
267                .block_number(header.raw().number())
268                .block_epoch(header.raw().epoch())
269                .build();
270            self.insert_raw(COLUMN_TRANSACTION_INFO, tx_hash.as_slice(), info.as_slice())?;
271        }
272        let block_number: packed::Uint64 = block.number().into();
273        self.insert_raw(COLUMN_INDEX, block_number.as_slice(), block_hash.as_slice())?;
274        for uncle in block.uncles().into_iter() {
275            self.insert_raw(
276                COLUMN_UNCLES,
277                uncle.hash().as_slice(),
278                Into::<packed::HeaderView>::into(uncle.header()).as_slice(),
279            )?;
280        }
281        self.insert_raw(COLUMN_INDEX, block_hash.as_slice(), block_number.as_slice())
282    }
283
284    /// Detaches a block from the main chain, removing its transaction and uncle indices.
285    pub fn detach_block(&self, block: &BlockView) -> Result<(), Error> {
286        for tx_hash in block.tx_hashes().iter() {
287            self.delete(COLUMN_TRANSACTION_INFO, tx_hash.as_slice())?;
288        }
289        for uncle in block.uncles().into_iter() {
290            self.delete(COLUMN_UNCLES, uncle.hash().as_slice())?;
291        }
292        let block_number = block.data().header().raw().number();
293        self.delete(COLUMN_INDEX, block_number.as_slice())?;
294        self.delete(COLUMN_INDEX, block.hash().as_slice())
295    }
296
297    /// Inserts the block-to-epoch index mapping.
298    pub fn insert_block_epoch_index(
299        &self,
300        block_hash: &packed::Byte32,
301        epoch_hash: &packed::Byte32,
302    ) -> Result<(), Error> {
303        self.insert_raw(
304            COLUMN_BLOCK_EPOCH,
305            block_hash.as_slice(),
306            epoch_hash.as_slice(),
307        )
308    }
309
310    /// Inserts epoch extension data.
311    pub fn insert_epoch_ext(&self, hash: &packed::Byte32, epoch: &EpochExt) -> Result<(), Error> {
312        self.insert_raw(
313            COLUMN_EPOCH,
314            hash.as_slice(),
315            Into::<packed::EpochExt>::into(epoch).as_slice(),
316        )?;
317        let epoch_number: packed::Uint64 = epoch.number().into();
318        self.insert_raw(COLUMN_EPOCH, epoch_number.as_slice(), hash.as_slice())
319    }
320
321    /// Inserts the current epoch extension data.
322    pub fn insert_current_epoch_ext(&self, epoch: &EpochExt) -> Result<(), Error> {
323        self.insert_raw(
324            COLUMN_META,
325            META_CURRENT_EPOCH_KEY,
326            Into::<packed::EpochExt>::into(epoch).as_slice(),
327        )
328    }
329
330    /// Inserts multiple cells into the store.
331    pub fn insert_cells(
332        &self,
333        cells: impl Iterator<
334            Item = (
335                packed::OutPoint,
336                packed::CellEntry,
337                Option<packed::CellDataEntry>,
338            ),
339        >,
340    ) -> Result<(), Error> {
341        for (out_point, cell, cell_data) in cells {
342            let key = out_point.to_cell_key();
343            self.insert_raw(COLUMN_CELL, &key, cell.as_slice())?;
344            if let Some(data) = cell_data {
345                self.insert_raw(COLUMN_CELL_DATA, &key, data.as_slice())?;
346                self.insert_raw(
347                    COLUMN_CELL_DATA_HASH,
348                    &key,
349                    data.output_data_hash().as_slice(),
350                )?;
351            } else {
352                self.insert_raw(COLUMN_CELL_DATA, &key, &[])?;
353                self.insert_raw(COLUMN_CELL_DATA_HASH, &key, &[])?;
354            }
355        }
356        Ok(())
357    }
358
359    /// Deletes multiple cells from the store.
360    pub fn delete_cells(
361        &self,
362        out_points: impl Iterator<Item = packed::OutPoint>,
363    ) -> Result<(), Error> {
364        for out_point in out_points {
365            let key = out_point.to_cell_key();
366            self.delete(COLUMN_CELL, &key)?;
367            self.delete(COLUMN_CELL_DATA, &key)?;
368            self.delete(COLUMN_CELL_DATA_HASH, &key)?;
369        }
370        Ok(())
371    }
372
373    /// Inserts a header digest.
374    pub fn insert_header_digest(
375        &self,
376        position_u64: u64,
377        header_digest: &packed::HeaderDigest,
378    ) -> Result<(), Error> {
379        let position: packed::Uint64 = position_u64.into();
380        self.insert_raw(
381            COLUMN_CHAIN_ROOT_MMR,
382            position.as_slice(),
383            header_digest.as_slice(),
384        )
385    }
386
387    /// Deletes a header digest.
388    pub fn delete_header_digest(&self, position_u64: u64) -> Result<(), Error> {
389        let position: packed::Uint64 = position_u64.into();
390        self.delete(COLUMN_CHAIN_ROOT_MMR, position.as_slice())
391    }
392
393    /// insert block filter data
394    pub fn insert_block_filter(
395        &self,
396        block_hash: &packed::Byte32,
397        filter_data: &packed::Bytes,
398        parent_block_filter_hash: &packed::Byte32,
399    ) -> Result<(), Error> {
400        self.insert_raw(
401            COLUMN_BLOCK_FILTER,
402            block_hash.as_slice(),
403            filter_data.as_slice(),
404        )?;
405        let current_block_filter_hash = calc_filter_hash(parent_block_filter_hash, filter_data);
406        self.insert_raw(
407            COLUMN_BLOCK_FILTER_HASH,
408            block_hash.as_slice(),
409            current_block_filter_hash.as_slice(),
410        )?;
411        self.insert_raw(
412            COLUMN_META,
413            META_LATEST_BUILT_FILTER_DATA_KEY,
414            block_hash.as_slice(),
415        )
416    }
417}
418
419impl MMRStore<packed::HeaderDigest> for &StoreTransaction {
420    fn get_elem(&self, pos: u64) -> MMRResult<Option<packed::HeaderDigest>> {
421        Ok(self.get_header_digest(pos))
422    }
423
424    fn append(&mut self, pos: u64, elems: Vec<packed::HeaderDigest>) -> MMRResult<()> {
425        for (offset, elem) in elems.iter().enumerate() {
426            let pos: u64 = pos + (offset as u64);
427            self.insert_header_digest(pos, elem).map_err(|err| {
428                MMRError::StoreError(format!("Failed to append to MMR, DB error {err}"))
429            })?;
430        }
431        Ok(())
432    }
433}