electrs_rocksdb/transactions/
optimistic_transaction_db.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{collections::BTreeMap, ffi::CString, fs, iter, marker::PhantomData, path::Path, ptr};
17
18use libc::{c_char, c_int};
19
20use crate::{
21    db::DBCommon, db::DBInner, ffi, ffi_util::to_cpath, write_batch::WriteBatchWithTransaction,
22    ColumnFamilyDescriptor, Error, OptimisticTransactionOptions, Options, ThreadMode, Transaction,
23    WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
24};
25
26/// A type alias to RocksDB Optimistic Transaction DB.
27///
28/// Please read the official
29/// [guide](https://github.com/facebook/rocksdb/wiki/Transactions#optimistictransactiondb)
30/// to learn more about RocksDB OptimisticTransactionDB.
31///
32/// The default thread mode for [`OptimisticTransactionDB`] is [`SingleThreaded`]
33/// if feature `multi-threaded-cf` is not enabled.
34///
35/// See [`DBCommon`] for full list of methods.
36///
37/// # Examples
38///
39/// ```
40/// use rocksdb::{DB, Options, OptimisticTransactionDB, SingleThreaded};
41/// let path = "_path_for_optimistic_transaction_db";
42/// {
43///     let db: OptimisticTransactionDB = OptimisticTransactionDB::open_default(path).unwrap();
44///     db.put(b"my key", b"my value").unwrap();
45///     
46///     // create transaction
47///     let txn = db.transaction();
48///     txn.put(b"key2", b"value2");
49///     txn.put(b"key3", b"value3");
50///     txn.commit().unwrap();
51/// }
52/// let _ = DB::destroy(&Options::default(), path);
53/// ```
54///
55/// [`SingleThreaded`]: crate::SingleThreaded
56#[cfg(not(feature = "multi-threaded-cf"))]
57pub type OptimisticTransactionDB<T = crate::SingleThreaded> =
58    DBCommon<T, OptimisticTransactionDBInner>;
59#[cfg(feature = "multi-threaded-cf")]
60pub type OptimisticTransactionDB<T = crate::MultiThreaded> =
61    DBCommon<T, OptimisticTransactionDBInner>;
62
63pub struct OptimisticTransactionDBInner {
64    base: *mut ffi::rocksdb_t,
65    db: *mut ffi::rocksdb_optimistictransactiondb_t,
66}
67
68impl DBInner for OptimisticTransactionDBInner {
69    fn inner(&self) -> *mut ffi::rocksdb_t {
70        self.base
71    }
72}
73
74impl Drop for OptimisticTransactionDBInner {
75    fn drop(&mut self) {
76        unsafe {
77            ffi::rocksdb_optimistictransactiondb_close_base_db(self.base);
78            ffi::rocksdb_optimistictransactiondb_close(self.db);
79        }
80    }
81}
82
83/// Methods of `OptimisticTransactionDB`.
84impl<T: ThreadMode> OptimisticTransactionDB<T> {
85    /// Opens a database with default options.
86    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
87        let mut opts = Options::default();
88        opts.create_if_missing(true);
89        Self::open(&opts, path)
90    }
91
92    /// Opens the database with the specified options.
93    pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
94        Self::open_cf(opts, path, None::<&str>)
95    }
96
97    /// Opens a database with the given database options and column family names.
98    ///
99    /// Column families opened using this function will be created with default `Options`.
100    pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
101    where
102        P: AsRef<Path>,
103        I: IntoIterator<Item = N>,
104        N: AsRef<str>,
105    {
106        let cfs = cfs
107            .into_iter()
108            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
109
110        Self::open_cf_descriptors_internal(opts, path, cfs)
111    }
112
113    /// Opens a database with the given database options and column family descriptors.
114    pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
115    where
116        P: AsRef<Path>,
117        I: IntoIterator<Item = ColumnFamilyDescriptor>,
118    {
119        Self::open_cf_descriptors_internal(opts, path, cfs)
120    }
121
122    /// Internal implementation for opening RocksDB.
123    fn open_cf_descriptors_internal<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
124    where
125        P: AsRef<Path>,
126        I: IntoIterator<Item = ColumnFamilyDescriptor>,
127    {
128        let cfs: Vec<_> = cfs.into_iter().collect();
129        let outlive = iter::once(opts.outlive.clone())
130            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
131            .collect();
132
133        let cpath = to_cpath(&path)?;
134
135        if let Err(e) = fs::create_dir_all(&path) {
136            return Err(Error::new(format!(
137                "Failed to create RocksDB directory: `{:?}`.",
138                e
139            )));
140        }
141
142        let db: *mut ffi::rocksdb_optimistictransactiondb_t;
143        let mut cf_map = BTreeMap::new();
144
145        if cfs.is_empty() {
146            db = Self::open_raw(opts, &cpath)?;
147        } else {
148            let mut cfs_v = cfs;
149            // Always open the default column family.
150            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
151                cfs_v.push(ColumnFamilyDescriptor {
152                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
153                    options: Options::default(),
154                });
155            }
156            // We need to store our CStrings in an intermediate vector
157            // so that their pointers remain valid.
158            let c_cfs: Vec<CString> = cfs_v
159                .iter()
160                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
161                .collect();
162
163            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
164
165            // These handles will be populated by DB.
166            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
167
168            let cfopts: Vec<_> = cfs_v
169                .iter()
170                .map(|cf| cf.options.inner as *const _)
171                .collect();
172
173            db = Self::open_cf_raw(opts, &cpath, &cfs_v, &cfnames, &cfopts, &mut cfhandles)?;
174
175            for handle in &cfhandles {
176                if handle.is_null() {
177                    return Err(Error::new(
178                        "Received null column family handle from DB.".to_owned(),
179                    ));
180                }
181            }
182
183            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
184                cf_map.insert(cf_desc.name.clone(), inner);
185            }
186        }
187
188        if db.is_null() {
189            return Err(Error::new("Could not initialize database.".to_owned()));
190        }
191
192        let base = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(db) };
193        if base.is_null() {
194            unsafe {
195                ffi::rocksdb_optimistictransactiondb_close(db);
196            }
197            return Err(Error::new("Could not initialize database.".to_owned()));
198        }
199        let inner = OptimisticTransactionDBInner { base, db };
200
201        Ok(Self::new(
202            inner,
203            T::new_cf_map_internal(cf_map),
204            path.as_ref().to_path_buf(),
205            outlive,
206        ))
207    }
208
209    fn open_raw(
210        opts: &Options,
211        cpath: &CString,
212    ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
213        unsafe {
214            let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open(
215                opts.inner,
216                cpath.as_ptr()
217            ));
218            Ok(db)
219        }
220    }
221
222    fn open_cf_raw(
223        opts: &Options,
224        cpath: &CString,
225        cfs_v: &[ColumnFamilyDescriptor],
226        cfnames: &[*const c_char],
227        cfopts: &[*const ffi::rocksdb_options_t],
228        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
229    ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
230        unsafe {
231            let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families(
232                opts.inner,
233                cpath.as_ptr(),
234                cfs_v.len() as c_int,
235                cfnames.as_ptr(),
236                cfopts.as_ptr(),
237                cfhandles.as_mut_ptr(),
238            ));
239            Ok(db)
240        }
241    }
242
243    /// Creates a transaction with default options.
244    pub fn transaction(&self) -> Transaction<Self> {
245        self.transaction_opt(
246            &WriteOptions::default(),
247            &OptimisticTransactionOptions::default(),
248        )
249    }
250
251    /// Creates a transaction with default options.
252    pub fn transaction_opt(
253        &self,
254        writeopts: &WriteOptions,
255        otxn_opts: &OptimisticTransactionOptions,
256    ) -> Transaction<Self> {
257        Transaction {
258            inner: unsafe {
259                ffi::rocksdb_optimistictransaction_begin(
260                    self.inner.db,
261                    writeopts.inner,
262                    otxn_opts.inner,
263                    std::ptr::null_mut(),
264                )
265            },
266            _marker: PhantomData::default(),
267        }
268    }
269
270    pub fn write_opt(
271        &self,
272        batch: WriteBatchWithTransaction<true>,
273        writeopts: &WriteOptions,
274    ) -> Result<(), Error> {
275        unsafe {
276            ffi_try!(ffi::rocksdb_optimistictransactiondb_write(
277                self.inner.db,
278                writeopts.inner,
279                batch.inner
280            ));
281        }
282        Ok(())
283    }
284
285    pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
286        self.write_opt(batch, &WriteOptions::default())
287    }
288
289    pub fn write_without_wal(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
290        let mut wo = WriteOptions::new();
291        wo.disable_wal(true);
292        self.write_opt(batch, &wo)
293    }
294}