ckb_rocksdb/
db_with_ttl.rs

1use crate::ffi;
2use crate::ffi_util::to_cstring;
3use crate::ops::GetColumnFamilys;
4use crate::{
5    ColumnFamily, Error, Options,
6    db_iterator::DBRawIterator,
7    db_options::{OptionsMustOutliveDB, ReadOptions},
8    handle::Handle,
9    open_raw::{OpenRaw, OpenRawFFI},
10    ops,
11};
12use std::collections::BTreeMap;
13use std::fmt;
14use std::marker::PhantomData;
15use std::path::{Path, PathBuf};
16
17pub struct DBWithTTL {
18    pub(crate) inner: *mut ffi::rocksdb_t,
19    cfs: BTreeMap<String, ColumnFamily>,
20    path: PathBuf,
21    _outlive: Vec<OptionsMustOutliveDB>,
22}
23
24impl DBWithTTL {
25    pub fn path(&self) -> &Path {
26        self.path.as_path()
27    }
28
29    pub fn create_cf_with_ttl<N: AsRef<str>>(
30        &mut self,
31        name: N,
32        opts: &Options,
33        ttl: i32,
34    ) -> Result<(), Error> {
35        let cname = to_cstring(
36            name.as_ref(),
37            "Failed to convert path to CString when opening rocksdb",
38        )?;
39        unsafe {
40            let cf_handle = ffi_try!(ffi::rocksdb_create_column_family_with_ttl(
41                self.handle(),
42                opts.inner,
43                cname.as_ptr(),
44                ttl as libc::c_int,
45            ));
46
47            self.get_mut_cfs()
48                .insert(name.as_ref().to_string(), ColumnFamily::new(cf_handle));
49        };
50        Ok(())
51    }
52}
53
54impl Default for TTLOpenDescriptor {
55    fn default() -> Self {
56        TTLOpenDescriptor {
57            ttls: TTLs::Default(-1),
58        }
59    }
60}
61
62pub enum TTLs {
63    Default(i32),
64    Columns(Vec<i32>),
65}
66
67// TTL is accepted in seconds
68// If TTL is non positive or not provided, the behaviour is TTL = infinity
69// (int32_t)Timestamp(creation) is suffixed to values in Put internally
70// Expired TTL values are deleted in compaction only:(Timestamp+ttl<time_now)
71// Get/Iterator may return expired entries(compaction not run on them yet)
72// Different TTL may be used during different Opens
73// Example: Open1 at t=0 with ttl=4 and insert k1,k2, close at t=2. Open2 at t=3 with ttl=5. Now k1,k2 should be deleted at t>=5
74// read_only=true opens in the usual read-only mode. Compactions will not be triggered(neither manual nor automatic), so no expired entries removed
75pub struct TTLOpenDescriptor {
76    ttls: TTLs,
77}
78
79impl TTLOpenDescriptor {
80    pub fn by_columns(ttls: Vec<i32>) -> Self {
81        TTLOpenDescriptor {
82            ttls: TTLs::Columns(ttls),
83        }
84    }
85
86    pub fn by_default(ttl: i32) -> Self {
87        TTLOpenDescriptor {
88            ttls: TTLs::Default(ttl),
89        }
90    }
91}
92
93impl ops::Open for DBWithTTL {}
94impl ops::OpenCF for DBWithTTL {}
95
96impl OpenRaw for DBWithTTL {
97    type Pointer = ffi::rocksdb_t;
98    type Descriptor = TTLOpenDescriptor;
99
100    fn open_ffi(input: OpenRawFFI<'_, Self::Descriptor>) -> Result<*mut Self::Pointer, Error> {
101        let pointer = unsafe {
102            if input.num_column_families <= 0 {
103                let ttl = match input.open_descriptor.ttls {
104                    TTLs::Default(ttl) => ttl as libc::c_int,
105                    TTLs::Columns(_) => {
106                        return Err(Error::new(
107                            "Ttls size has to be the same as number of column families".to_owned(),
108                        ));
109                    }
110                };
111
112                ffi_try!(ffi::rocksdb_open_with_ttl(input.options, input.path, ttl,))
113            } else {
114                let ttls = match input.open_descriptor.ttls {
115                    TTLs::Default(ttl) => (0..input.num_column_families)
116                        .map(|_| ttl as libc::c_int)
117                        .collect::<Vec<_>>(),
118                    TTLs::Columns(ref ttls) => {
119                        let ttls: Vec<_> = ttls.iter().map(|t| *t as libc::c_int).collect();
120
121                        let is_ttls_match = if input.num_column_families <= 0 {
122                            ttls.len() as i32 == 1
123                        } else {
124                            ttls.len() as i32 == input.num_column_families
125                        };
126
127                        if !is_ttls_match {
128                            return Err(Error::new(
129                                "Ttls size has to be the same as number of column families"
130                                    .to_owned(),
131                            ));
132                        }
133
134                        ttls
135                    }
136                };
137
138                ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
139                    input.options,
140                    input.path,
141                    input.num_column_families,
142                    input.column_family_names,
143                    input.column_family_options,
144                    input.column_family_handles,
145                    ttls.as_ptr(),
146                ))
147            }
148        };
149
150        Ok(pointer)
151    }
152
153    fn build<I>(
154        path: PathBuf,
155        _open_descriptor: Self::Descriptor,
156        pointer: *mut Self::Pointer,
157        column_families: I,
158        outlive: Vec<OptionsMustOutliveDB>,
159    ) -> Result<Self, Error>
160    where
161        I: IntoIterator<Item = (String, *mut ffi::rocksdb_column_family_handle_t)>,
162    {
163        let cfs: BTreeMap<_, _> = column_families
164            .into_iter()
165            .map(|(k, h)| (k, ColumnFamily::new(h)))
166            .collect();
167        Ok(DBWithTTL {
168            inner: pointer,
169            cfs,
170            path,
171            _outlive: outlive,
172        })
173    }
174}
175
176impl Handle<ffi::rocksdb_t> for DBWithTTL {
177    fn handle(&self) -> *mut ffi::rocksdb_t {
178        self.inner
179    }
180}
181
182impl ops::Iterate for DBWithTTL {
183    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
184        unsafe {
185            DBRawIterator {
186                inner: ffi::rocksdb_create_iterator(self.inner, readopts.handle()),
187                db: PhantomData,
188            }
189        }
190    }
191}
192
193impl ops::IterateCF for DBWithTTL {
194    fn get_raw_iter_cf<'a: 'b, 'b>(
195        &'a self,
196        cf_handle: &ColumnFamily,
197        readopts: &ReadOptions,
198    ) -> Result<DBRawIterator<'b>, Error> {
199        unsafe {
200            Ok(DBRawIterator {
201                inner: ffi::rocksdb_create_iterator_cf(
202                    self.inner,
203                    readopts.handle(),
204                    cf_handle.inner,
205                ),
206                db: PhantomData,
207            })
208        }
209    }
210}
211
212impl ops::GetColumnFamilys for DBWithTTL {
213    fn get_cfs(&self) -> &BTreeMap<String, ColumnFamily> {
214        &self.cfs
215    }
216    fn get_mut_cfs(&mut self) -> &mut BTreeMap<String, ColumnFamily> {
217        &mut self.cfs
218    }
219}
220
221impl ops::Read for DBWithTTL {}
222impl ops::Write for DBWithTTL {}
223
224unsafe impl Send for DBWithTTL {}
225unsafe impl Sync for DBWithTTL {}
226
227impl Drop for DBWithTTL {
228    fn drop(&mut self) {
229        unsafe {
230            for cf in self.cfs.values() {
231                ffi::rocksdb_column_family_handle_destroy(cf.inner);
232            }
233            ffi::rocksdb_close(self.inner);
234        }
235    }
236}
237
238impl fmt::Debug for DBWithTTL {
239    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240        write!(f, "Read-only RocksDB {{ path: {:?} }}", self.path())
241    }
242}