ckb_db/
transaction.rs

1//! RocksDB optimistic transaction wrapper
2use crate::db::cf_handle;
3use crate::{Result, internal_error};
4use ckb_db_schema::Col;
5use rocksdb::ops::{DeleteCF, GetPinnedCF, PutCF};
6pub use rocksdb::{DBPinnableSlice, DBVector};
7use rocksdb::{
8    OptimisticTransaction, OptimisticTransactionDB, OptimisticTransactionSnapshot, ReadOptions,
9};
10use std::sync::Arc;
11
12/// An optimistic transaction database.
13pub struct RocksDBTransaction {
14    pub(crate) db: Arc<OptimisticTransactionDB>,
15    pub(crate) inner: OptimisticTransaction,
16}
17
18impl RocksDBTransaction {
19    /// Return the bytes associated with the given key and given column.
20    pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
21        let cf = cf_handle(&self.db, col)?;
22        self.inner.get_pinned_cf(cf, key).map_err(internal_error)
23    }
24
25    /// Write the bytes into the given column with associated key.
26    pub fn put(&self, col: Col, key: &[u8], value: &[u8]) -> Result<()> {
27        let cf = cf_handle(&self.db, col)?;
28        self.inner.put_cf(cf, key, value).map_err(internal_error)
29    }
30
31    /// Delete the data associated with the given key and given column.
32    pub fn delete(&self, col: Col, key: &[u8]) -> Result<()> {
33        let cf = cf_handle(&self.db, col)?;
34        self.inner.delete_cf(cf, key).map_err(internal_error)
35    }
36
37    /// Read a key and make the read value a precondition for transaction commit.
38    pub fn get_for_update(
39        &self,
40        col: Col,
41        key: &[u8],
42        snapshot: &RocksDBTransactionSnapshot<'_>,
43    ) -> Result<Option<DBVector>> {
44        let cf = cf_handle(&self.db, col)?;
45        let mut opts = ReadOptions::default();
46        opts.set_snapshot(&snapshot.inner);
47        self.inner
48            .get_for_update_cf_opt(cf, key, &opts, true)
49            .map_err(internal_error)
50    }
51
52    /// Commit the transaction.
53    pub fn commit(&self) -> Result<()> {
54        self.inner.commit().map_err(internal_error)
55    }
56
57    /// Rollback the transaction.
58    pub fn rollback(&self) -> Result<()> {
59        self.inner.rollback().map_err(internal_error)
60    }
61
62    /// Return `RocksDBTransactionSnapshot`
63    pub fn get_snapshot(&self) -> RocksDBTransactionSnapshot<'_> {
64        RocksDBTransactionSnapshot {
65            db: Arc::clone(&self.db),
66            inner: self.inner.snapshot(),
67        }
68    }
69
70    /// Set savepoint for transaction.
71    pub fn set_savepoint(&self) {
72        self.inner.set_savepoint()
73    }
74
75    /// Rollback the transaction to savepoint.
76    pub fn rollback_to_savepoint(&self) -> Result<()> {
77        self.inner.rollback_to_savepoint().map_err(internal_error)
78    }
79}
80
81/// A snapshot captures a point-in-time view of the transaction at the time it's created
82pub struct RocksDBTransactionSnapshot<'a> {
83    pub(crate) db: Arc<OptimisticTransactionDB>,
84    pub(crate) inner: OptimisticTransactionSnapshot<'a>,
85}
86
87impl<'a> RocksDBTransactionSnapshot<'a> {
88    /// Return the bytes associated with the given key and given column.
89    pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
90        let cf = cf_handle(&self.db, col)?;
91        self.inner.get_pinned_cf(cf, key).map_err(internal_error)
92    }
93}