ckb_db/
snapshot.rs

1//! RocksDB snapshot wrapper
2use crate::db::cf_handle;
3use crate::{Result, internal_error};
4use ckb_db_schema::Col;
5use libc::{self, c_char, size_t};
6use rocksdb::ops::{GetPinnedCF, Iterate, IterateCF, Read};
7use rocksdb::{
8    ColumnFamily, ConstHandle, DBPinnableSlice, DBRawIterator, Error, Handle,
9    OptimisticTransactionDB, ReadOptions, ffi, ffi_util,
10};
11use std::sync::Arc;
12
13/// A snapshot captures a point-in-time view of the DB at the time it's created
14pub struct RocksDBSnapshot {
15    pub(crate) db: Arc<OptimisticTransactionDB>,
16    pub(crate) inner: *const ffi::rocksdb_snapshot_t,
17}
18
19unsafe impl Sync for RocksDBSnapshot {}
20unsafe impl Send for RocksDBSnapshot {}
21
22impl RocksDBSnapshot {
23    /// # Safety
24    ///
25    /// This function is unsafe because it take raw pointer as arguments
26    pub unsafe fn new(
27        db: &Arc<OptimisticTransactionDB>,
28        ptr: *const ffi::rocksdb_snapshot_t,
29    ) -> RocksDBSnapshot {
30        RocksDBSnapshot {
31            db: Arc::clone(db),
32            inner: ptr,
33        }
34    }
35
36    /// Return the value associated with a key using RocksDB's PinnableSlice from the given column
37    /// so as to avoid unnecessary memory copy.
38    pub fn get_pinned(&self, col: Col, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
39        let cf = cf_handle(&self.db, col)?;
40        self.get_pinned_cf_full(Some(cf), key, None)
41            .map_err(internal_error)
42    }
43}
44
45impl Read for RocksDBSnapshot {}
46
47impl ConstHandle<ffi::rocksdb_snapshot_t> for RocksDBSnapshot {
48    fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
49        self.inner
50    }
51}
52
53impl<'a> GetPinnedCF<'a> for RocksDBSnapshot {
54    type ColumnFamily = &'a ColumnFamily;
55    type ReadOptions = &'a ReadOptions;
56
57    fn get_pinned_cf_full<K: AsRef<[u8]>>(
58        &'a self,
59        cf: Option<Self::ColumnFamily>,
60        key: K,
61        readopts: Option<Self::ReadOptions>,
62    ) -> ::std::result::Result<Option<DBPinnableSlice<'a>>, Error> {
63        let mut ro = readopts.cloned().unwrap_or_default();
64        ro.set_snapshot(self);
65
66        let key = key.as_ref();
67        let key_ptr = key.as_ptr() as *const c_char;
68        let key_len = key.len() as size_t;
69
70        unsafe {
71            let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
72            let val = match cf {
73                Some(cf) => ffi::rocksdb_get_pinned_cf(
74                    self.db.handle(),
75                    ro.handle(),
76                    cf.handle(),
77                    key_ptr,
78                    key_len,
79                    &mut err,
80                ),
81                None => ffi::rocksdb_get_pinned(
82                    self.db.handle(),
83                    ro.handle(),
84                    key_ptr,
85                    key_len,
86                    &mut err,
87                ),
88            };
89
90            if !err.is_null() {
91                return Err(Error::new(ffi_util::error_message(err)));
92            }
93
94            if val.is_null() {
95                Ok(None)
96            } else {
97                Ok(Some(DBPinnableSlice::from_c(val)))
98            }
99        }
100    }
101}
102
103impl Drop for RocksDBSnapshot {
104    fn drop(&mut self) {
105        unsafe {
106            ffi::rocksdb_release_snapshot(self.db.base_db_ptr(), self.inner);
107        }
108    }
109}
110
111impl Iterate for RocksDBSnapshot {
112    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
113        let mut ro = readopts.to_owned();
114        ro.set_snapshot(self);
115        self.db.get_raw_iter(&ro)
116    }
117}
118
119impl IterateCF for RocksDBSnapshot {
120    fn get_raw_iter_cf<'a: 'b, 'b>(
121        &'a self,
122        cf_handle: &ColumnFamily,
123        readopts: &ReadOptions,
124    ) -> ::std::result::Result<DBRawIterator<'b>, Error> {
125        let mut ro = readopts.to_owned();
126        ro.set_snapshot(self);
127        self.db.get_raw_iter_cf(cf_handle, &ro)
128    }
129}