ckb_db/
db_with_ttl.rs

1//! DB with ttl support wrapper
2
3use crate::{Result, internal_error};
4use rocksdb::ops::{DropCF, GetColumnFamilys, GetPinnedCF, GetPropertyCF, OpenCF, PutCF};
5use rocksdb::{
6    ColumnFamilyDescriptor, DBPinnableSlice, DBWithTTL as RawDBWithTTL, Options, TTLOpenDescriptor,
7};
8use std::path::Path;
9
10const PROPERTY_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
11const DB_LOG_KEEP_NUM: usize = 10;
12
13/// DB with ttl support wrapper
14///
15/// TTL is accepted in seconds
16/// If TTL is non positive or not provided, the behaviour is TTL = infinity
17/// (int32_t)Timestamp(creation) is suffixed to values in Put internally
18/// Expired TTL values are deleted in compaction only:(Timestamp+ttl<time_now)
19/// Get/Iterator may return expired entries(compaction not run on them yet)
20/// Different TTL may be used during different Opens
21/// Example: Open1 at t=0 with ttl=4 and insert k1,k2, close at t=2. Open2 at t=3 with ttl=5. Now k1,k2 should be deleted at t>=5
22/// read_only=true opens in the usual read-only mode. Compactions will not be triggered(neither manual nor automatic), so no expired entries removed
23#[derive(Debug)]
24pub struct DBWithTTL {
25    pub(crate) inner: RawDBWithTTL,
26}
27
28impl DBWithTTL {
29    /// Open a database with ttl support.
30    pub fn open_cf<P, I, N>(path: P, cf_names: I, ttl: i32) -> Result<Self>
31    where
32        P: AsRef<Path>,
33        I: IntoIterator<Item = N>,
34        N: Into<String>,
35    {
36        let mut opts = Options::default();
37        opts.create_if_missing(true);
38        opts.create_missing_column_families(true);
39        opts.set_keep_log_file_num(DB_LOG_KEEP_NUM);
40
41        let cf_descriptors: Vec<_> = cf_names
42            .into_iter()
43            .map(|name| ColumnFamilyDescriptor::new(name, Options::default()))
44            .collect();
45
46        let descriptor = TTLOpenDescriptor::by_default(ttl);
47        let inner = RawDBWithTTL::open_cf_descriptors_with_descriptor(
48            &opts,
49            path,
50            cf_descriptors,
51            descriptor,
52        )
53        .map_err(|err| internal_error(format!("failed to open database: {err}")))?;
54        Ok(DBWithTTL { inner })
55    }
56
57    /// Return the value associated with a key using RocksDB's PinnableSlice from the given column
58    /// so as to avoid unnecessary memory copy.
59    pub fn get_pinned(&self, col: &str, key: &[u8]) -> Result<Option<DBPinnableSlice>> {
60        let cf = self
61            .inner
62            .cf_handle(col)
63            .ok_or_else(|| internal_error(format!("column {col} not found")))?;
64        self.inner.get_pinned_cf(cf, key).map_err(internal_error)
65    }
66
67    /// Insert a value into the database under the given key.
68    pub fn put<K, V>(&self, col: &str, key: K, value: V) -> Result<()>
69    where
70        K: AsRef<[u8]>,
71        V: AsRef<[u8]>,
72    {
73        let cf = self
74            .inner
75            .cf_handle(col)
76            .ok_or_else(|| internal_error(format!("column {col} not found")))?;
77        self.inner.put_cf(cf, key, value).map_err(internal_error)
78    }
79
80    /// Create a new column family for the database.
81    pub fn create_cf_with_ttl(&mut self, col: &str, ttl: i32) -> Result<()> {
82        let opts = Options::default();
83        self.inner
84            .create_cf_with_ttl(col, &opts, ttl)
85            .map_err(internal_error)
86    }
87
88    /// Delete column family.
89    pub fn drop_cf(&mut self, col: &str) -> Result<()> {
90        self.inner.drop_cf(col).map_err(internal_error)
91    }
92
93    /// "rocksdb.estimate-num-keys" - returns estimated number of total keys in
94    /// the active and unflushed immutable memtables and storage.
95    pub fn estimate_num_keys_cf(&self, col: &str) -> Result<Option<u64>> {
96        let cf = self
97            .inner
98            .cf_handle(col)
99            .ok_or_else(|| internal_error(format!("column {col} not found")))?;
100        self.inner
101            .property_int_value_cf(cf, PROPERTY_NUM_KEYS)
102            .map_err(internal_error)
103    }
104}