rust_rocksdb/transactions/
optimistic_transaction_db.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{collections::BTreeMap, ffi::CString, fs, iter, marker::PhantomData, path::Path, ptr};
17
18use libc::{c_char, c_int, size_t};
19
20use crate::column_family::ColumnFamilyTtl;
21use crate::{
22    AsColumnFamilyRef, ColumnFamilyDescriptor, DEFAULT_COLUMN_FAMILY_NAME, Error,
23    OptimisticTransactionOptions, Options, ThreadMode, Transaction, WriteOptions,
24    db::{DBCommon, DBInner},
25    ffi,
26    ffi_util::to_cpath,
27    write_batch::WriteBatchWithTransaction,
28};
29
30// Default options are kept per-thread to avoid re-allocating on every call while
31// also preventing cross-thread sharing. Some RocksDB option wrappers hold
32// pointers into internal buffers and are not safe to share across threads.
33// Using thread_local allows cheap reuse in the common "default options" path
34// without synchronization overhead. Callers who need non-defaults must pass
35// explicit options.
36thread_local! { static DEFAULT_WRITE_OPTS: WriteOptions = WriteOptions::default(); }
37
38/// A type alias to RocksDB Optimistic Transaction DB.
39///
40/// Please read the official
41/// [guide](https://github.com/facebook/rocksdb/wiki/Transactions#optimistictransactiondb)
42/// to learn more about RocksDB OptimisticTransactionDB.
43///
44/// The default thread mode for [`OptimisticTransactionDB`] is [`SingleThreaded`]
45/// if feature `multi-threaded-cf` is not enabled.
46///
47/// See [`DBCommon`] for full list of methods.
48///
49/// # Examples
50///
51/// ```
52/// use rust_rocksdb::{DB, Options, OptimisticTransactionDB, SingleThreaded};
53/// let tempdir = tempfile::Builder::new()
54///     .prefix("_path_for_optimistic_transaction_db")
55///     .tempdir()
56///     .expect("Failed to create temporary path for the _path_for_optimistic_transaction_db");
57/// let path = tempdir.path();
58/// {
59///     let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(path).unwrap();
60///     db.put(b"my key", b"my value").unwrap();
61///
62///     // create transaction
63///     let txn = db.transaction();
64///     txn.put(b"key2", b"value2");
65///     txn.put(b"key3", b"value3");
66///     txn.commit().unwrap();
67/// }
68/// let _ = DB::destroy(&Options::default(), path);
69/// ```
70///
71/// [`SingleThreaded`]: crate::SingleThreaded
72#[cfg(not(feature = "multi-threaded-cf"))]
73pub type OptimisticTransactionDB<T = crate::SingleThreaded> =
74    DBCommon<T, OptimisticTransactionDBInner>;
75#[cfg(feature = "multi-threaded-cf")]
76pub type OptimisticTransactionDB<T = crate::MultiThreaded> =
77    DBCommon<T, OptimisticTransactionDBInner>;
78
79pub struct OptimisticTransactionDBInner {
80    base: *mut ffi::rocksdb_t,
81    db: *mut ffi::rocksdb_optimistictransactiondb_t,
82}
83
84impl DBInner for OptimisticTransactionDBInner {
85    fn inner(&self) -> *mut ffi::rocksdb_t {
86        self.base
87    }
88}
89
90impl Drop for OptimisticTransactionDBInner {
91    fn drop(&mut self) {
92        unsafe {
93            ffi::rocksdb_optimistictransactiondb_close_base_db(self.base);
94            ffi::rocksdb_optimistictransactiondb_close(self.db);
95        }
96    }
97}
98
99/// Methods of `OptimisticTransactionDB`.
100impl<T: ThreadMode> OptimisticTransactionDB<T> {
101    /// Opens a database with default options.
102    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
103        let mut opts = Options::default();
104        opts.create_if_missing(true);
105        Self::open(&opts, path)
106    }
107
108    /// Opens the database with the specified options.
109    pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
110        Self::open_cf(opts, path, None::<&str>)
111    }
112
113    /// Opens a database with the given database options and column family names.
114    ///
115    /// Column families opened using this function will be created with default `Options`.
116    /// *NOTE*: `default` column family will be opened with the `Options::default()`.
117    /// If you want to open `default` column family with custom options, use `open_cf_descriptors` and
118    /// provide a `ColumnFamilyDescriptor` with the desired options.
119    pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
120    where
121        P: AsRef<Path>,
122        I: IntoIterator<Item = N>,
123        N: AsRef<str>,
124    {
125        let cfs = cfs
126            .into_iter()
127            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
128
129        Self::open_cf_descriptors_internal(opts, path, cfs)
130    }
131
132    /// Opens a database with the given database options and column family descriptors.
133    pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
134    where
135        P: AsRef<Path>,
136        I: IntoIterator<Item = ColumnFamilyDescriptor>,
137    {
138        Self::open_cf_descriptors_internal(opts, path, cfs)
139    }
140
141    /// Internal implementation for opening RocksDB.
142    fn open_cf_descriptors_internal<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
143    where
144        P: AsRef<Path>,
145        I: IntoIterator<Item = ColumnFamilyDescriptor>,
146    {
147        let cfs: Vec<_> = cfs.into_iter().collect();
148        let outlive = iter::once(opts.outlive.clone())
149            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
150            .collect();
151
152        let cpath = to_cpath(&path)?;
153
154        if let Err(e) = fs::create_dir_all(&path) {
155            return Err(Error::new(format!(
156                "Failed to create RocksDB directory: `{e:?}`."
157            )));
158        }
159
160        let db: *mut ffi::rocksdb_optimistictransactiondb_t;
161        let mut cf_map = BTreeMap::new();
162
163        if cfs.is_empty() {
164            db = Self::open_raw(opts, &cpath)?;
165        } else {
166            let mut cfs_v = cfs;
167            // Always open the default column family.
168            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
169                cfs_v.push(ColumnFamilyDescriptor {
170                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
171                    options: Options::default(),
172                    ttl: ColumnFamilyTtl::SameAsDb,
173                });
174            }
175            // We need to store our CStrings in an intermediate vector
176            // so that their pointers remain valid.
177            let c_cfs: Vec<CString> = cfs_v
178                .iter()
179                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
180                .collect();
181
182            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
183
184            // These handles will be populated by DB.
185            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
186
187            let cfopts: Vec<_> = cfs_v
188                .iter()
189                .map(|cf| cf.options.inner.cast_const())
190                .collect();
191
192            db = Self::open_cf_raw(opts, &cpath, &cfs_v, &cfnames, &cfopts, &mut cfhandles)?;
193
194            for handle in &cfhandles {
195                if handle.is_null() {
196                    return Err(Error::new(
197                        "Received null column family handle from DB.".to_owned(),
198                    ));
199                }
200            }
201
202            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
203                cf_map.insert(cf_desc.name.clone(), inner);
204            }
205        }
206
207        if db.is_null() {
208            return Err(Error::new("Could not initialize database.".to_owned()));
209        }
210
211        let base = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(db) };
212        if base.is_null() {
213            unsafe {
214                ffi::rocksdb_optimistictransactiondb_close(db);
215            }
216            return Err(Error::new("Could not initialize database.".to_owned()));
217        }
218        let inner = OptimisticTransactionDBInner { base, db };
219
220        Ok(Self::new(
221            inner,
222            T::new_cf_map_internal(cf_map),
223            path.as_ref().to_path_buf(),
224            outlive,
225        ))
226    }
227
228    fn open_raw(
229        opts: &Options,
230        cpath: &CString,
231    ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
232        unsafe {
233            let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open(
234                opts.inner,
235                cpath.as_ptr()
236            ));
237            Ok(db)
238        }
239    }
240
241    fn open_cf_raw(
242        opts: &Options,
243        cpath: &CString,
244        cfs_v: &[ColumnFamilyDescriptor],
245        cfnames: &[*const c_char],
246        cfopts: &[*const ffi::rocksdb_options_t],
247        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
248    ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
249        unsafe {
250            let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families(
251                opts.inner,
252                cpath.as_ptr(),
253                cfs_v.len() as c_int,
254                cfnames.as_ptr(),
255                cfopts.as_ptr(),
256                cfhandles.as_mut_ptr(),
257            ));
258            Ok(db)
259        }
260    }
261
262    /// Creates a transaction with default options.
263    pub fn transaction(&'_ self) -> Transaction<'_, Self> {
264        DEFAULT_WRITE_OPTS
265            .with(|opts| self.transaction_opt(opts, &OptimisticTransactionOptions::default()))
266    }
267
268    /// Creates a transaction with default options.
269    pub fn transaction_opt(
270        &'_ self,
271        writeopts: &WriteOptions,
272        otxn_opts: &OptimisticTransactionOptions,
273    ) -> Transaction<'_, Self> {
274        Transaction {
275            inner: unsafe {
276                ffi::rocksdb_optimistictransaction_begin(
277                    self.inner.db,
278                    writeopts.inner,
279                    otxn_opts.inner,
280                    std::ptr::null_mut(),
281                )
282            },
283            _marker: PhantomData,
284        }
285    }
286
287    pub fn write_opt(
288        &self,
289        batch: &WriteBatchWithTransaction<true>,
290        writeopts: &WriteOptions,
291    ) -> Result<(), Error> {
292        unsafe {
293            ffi_try!(ffi::rocksdb_optimistictransactiondb_write(
294                self.inner.db,
295                writeopts.inner,
296                batch.inner
297            ));
298        }
299        Ok(())
300    }
301
302    pub fn write(&self, batch: &WriteBatchWithTransaction<true>) -> Result<(), Error> {
303        DEFAULT_WRITE_OPTS.with(|opts| self.write_opt(batch, opts))
304    }
305
306    pub fn write_without_wal(&self, batch: &WriteBatchWithTransaction<true>) -> Result<(), Error> {
307        let mut wo = WriteOptions::new();
308        wo.disable_wal(true);
309        self.write_opt(batch, &wo)
310    }
311
312    /// Removes the database entries in the range `["from", "to")` using given write options.
313    pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
314        &self,
315        cf: &impl AsColumnFamilyRef,
316        from: K,
317        to: K,
318        writeopts: &WriteOptions,
319    ) -> Result<(), Error> {
320        let from = from.as_ref();
321        let to = to.as_ref();
322
323        unsafe {
324            ffi_try!(ffi::rocksdb_delete_range_cf(
325                self.inner.inner(),
326                writeopts.inner,
327                cf.inner(),
328                from.as_ptr() as *const c_char,
329                from.len() as size_t,
330                to.as_ptr() as *const c_char,
331                to.len() as size_t,
332            ));
333            Ok(())
334        }
335    }
336
337    /// Removes the database entries in the range `["from", "to")` using default write options.
338    pub fn delete_range_cf<K: AsRef<[u8]>>(
339        &self,
340        cf: &impl AsColumnFamilyRef,
341        from: K,
342        to: K,
343    ) -> Result<(), Error> {
344        DEFAULT_WRITE_OPTS.with(|opts| self.delete_range_cf_opt(cf, from, to, opts))
345    }
346}