rust_rocksdb/transactions/
optimistic_transaction_db.rs1use std::{collections::BTreeMap, ffi::CString, fs, iter, marker::PhantomData, path::Path, ptr};
17
18use libc::{c_char, c_int, size_t};
19
20use crate::column_family::ColumnFamilyTtl;
21use crate::{
22 AsColumnFamilyRef, ColumnFamilyDescriptor, DEFAULT_COLUMN_FAMILY_NAME, Error,
23 OptimisticTransactionOptions, Options, ThreadMode, Transaction, WriteOptions,
24 db::{DBCommon, DBInner},
25 ffi,
26 ffi_util::to_cpath,
27 write_batch::WriteBatchWithTransaction,
28};
29
30thread_local! { static DEFAULT_WRITE_OPTS: WriteOptions = WriteOptions::default(); }
37
38#[cfg(not(feature = "multi-threaded-cf"))]
73pub type OptimisticTransactionDB<T = crate::SingleThreaded> =
74 DBCommon<T, OptimisticTransactionDBInner>;
75#[cfg(feature = "multi-threaded-cf")]
76pub type OptimisticTransactionDB<T = crate::MultiThreaded> =
77 DBCommon<T, OptimisticTransactionDBInner>;
78
79pub struct OptimisticTransactionDBInner {
80 base: *mut ffi::rocksdb_t,
81 db: *mut ffi::rocksdb_optimistictransactiondb_t,
82}
83
84impl DBInner for OptimisticTransactionDBInner {
85 fn inner(&self) -> *mut ffi::rocksdb_t {
86 self.base
87 }
88}
89
90impl Drop for OptimisticTransactionDBInner {
91 fn drop(&mut self) {
92 unsafe {
93 ffi::rocksdb_optimistictransactiondb_close_base_db(self.base);
94 ffi::rocksdb_optimistictransactiondb_close(self.db);
95 }
96 }
97}
98
99impl<T: ThreadMode> OptimisticTransactionDB<T> {
101 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
103 let mut opts = Options::default();
104 opts.create_if_missing(true);
105 Self::open(&opts, path)
106 }
107
108 pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
110 Self::open_cf(opts, path, None::<&str>)
111 }
112
113 pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
120 where
121 P: AsRef<Path>,
122 I: IntoIterator<Item = N>,
123 N: AsRef<str>,
124 {
125 let cfs = cfs
126 .into_iter()
127 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
128
129 Self::open_cf_descriptors_internal(opts, path, cfs)
130 }
131
132 pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
134 where
135 P: AsRef<Path>,
136 I: IntoIterator<Item = ColumnFamilyDescriptor>,
137 {
138 Self::open_cf_descriptors_internal(opts, path, cfs)
139 }
140
141 fn open_cf_descriptors_internal<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
143 where
144 P: AsRef<Path>,
145 I: IntoIterator<Item = ColumnFamilyDescriptor>,
146 {
147 let cfs: Vec<_> = cfs.into_iter().collect();
148 let outlive = iter::once(opts.outlive.clone())
149 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
150 .collect();
151
152 let cpath = to_cpath(&path)?;
153
154 if let Err(e) = fs::create_dir_all(&path) {
155 return Err(Error::new(format!(
156 "Failed to create RocksDB directory: `{e:?}`."
157 )));
158 }
159
160 let db: *mut ffi::rocksdb_optimistictransactiondb_t;
161 let mut cf_map = BTreeMap::new();
162
163 if cfs.is_empty() {
164 db = Self::open_raw(opts, &cpath)?;
165 } else {
166 let mut cfs_v = cfs;
167 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
169 cfs_v.push(ColumnFamilyDescriptor {
170 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
171 options: Options::default(),
172 ttl: ColumnFamilyTtl::SameAsDb,
173 });
174 }
175 let c_cfs: Vec<CString> = cfs_v
178 .iter()
179 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
180 .collect();
181
182 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
183
184 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
186
187 let cfopts: Vec<_> = cfs_v
188 .iter()
189 .map(|cf| cf.options.inner.cast_const())
190 .collect();
191
192 db = Self::open_cf_raw(opts, &cpath, &cfs_v, &cfnames, &cfopts, &mut cfhandles)?;
193
194 for handle in &cfhandles {
195 if handle.is_null() {
196 return Err(Error::new(
197 "Received null column family handle from DB.".to_owned(),
198 ));
199 }
200 }
201
202 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
203 cf_map.insert(cf_desc.name.clone(), inner);
204 }
205 }
206
207 if db.is_null() {
208 return Err(Error::new("Could not initialize database.".to_owned()));
209 }
210
211 let base = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(db) };
212 if base.is_null() {
213 unsafe {
214 ffi::rocksdb_optimistictransactiondb_close(db);
215 }
216 return Err(Error::new("Could not initialize database.".to_owned()));
217 }
218 let inner = OptimisticTransactionDBInner { base, db };
219
220 Ok(Self::new(
221 inner,
222 T::new_cf_map_internal(cf_map),
223 path.as_ref().to_path_buf(),
224 outlive,
225 ))
226 }
227
228 fn open_raw(
229 opts: &Options,
230 cpath: &CString,
231 ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
232 unsafe {
233 let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open(
234 opts.inner,
235 cpath.as_ptr()
236 ));
237 Ok(db)
238 }
239 }
240
241 fn open_cf_raw(
242 opts: &Options,
243 cpath: &CString,
244 cfs_v: &[ColumnFamilyDescriptor],
245 cfnames: &[*const c_char],
246 cfopts: &[*const ffi::rocksdb_options_t],
247 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
248 ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
249 unsafe {
250 let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families(
251 opts.inner,
252 cpath.as_ptr(),
253 cfs_v.len() as c_int,
254 cfnames.as_ptr(),
255 cfopts.as_ptr(),
256 cfhandles.as_mut_ptr(),
257 ));
258 Ok(db)
259 }
260 }
261
262 pub fn transaction(&'_ self) -> Transaction<'_, Self> {
264 DEFAULT_WRITE_OPTS
265 .with(|opts| self.transaction_opt(opts, &OptimisticTransactionOptions::default()))
266 }
267
268 pub fn transaction_opt(
270 &'_ self,
271 writeopts: &WriteOptions,
272 otxn_opts: &OptimisticTransactionOptions,
273 ) -> Transaction<'_, Self> {
274 Transaction {
275 inner: unsafe {
276 ffi::rocksdb_optimistictransaction_begin(
277 self.inner.db,
278 writeopts.inner,
279 otxn_opts.inner,
280 std::ptr::null_mut(),
281 )
282 },
283 _marker: PhantomData,
284 }
285 }
286
287 pub fn write_opt(
288 &self,
289 batch: &WriteBatchWithTransaction<true>,
290 writeopts: &WriteOptions,
291 ) -> Result<(), Error> {
292 unsafe {
293 ffi_try!(ffi::rocksdb_optimistictransactiondb_write(
294 self.inner.db,
295 writeopts.inner,
296 batch.inner
297 ));
298 }
299 Ok(())
300 }
301
302 pub fn write(&self, batch: &WriteBatchWithTransaction<true>) -> Result<(), Error> {
303 DEFAULT_WRITE_OPTS.with(|opts| self.write_opt(batch, opts))
304 }
305
306 pub fn write_without_wal(&self, batch: &WriteBatchWithTransaction<true>) -> Result<(), Error> {
307 let mut wo = WriteOptions::new();
308 wo.disable_wal(true);
309 self.write_opt(batch, &wo)
310 }
311
312 pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
314 &self,
315 cf: &impl AsColumnFamilyRef,
316 from: K,
317 to: K,
318 writeopts: &WriteOptions,
319 ) -> Result<(), Error> {
320 let from = from.as_ref();
321 let to = to.as_ref();
322
323 unsafe {
324 ffi_try!(ffi::rocksdb_delete_range_cf(
325 self.inner.inner(),
326 writeopts.inner,
327 cf.inner(),
328 from.as_ptr() as *const c_char,
329 from.len() as size_t,
330 to.as_ptr() as *const c_char,
331 to.len() as size_t,
332 ));
333 Ok(())
334 }
335 }
336
337 pub fn delete_range_cf<K: AsRef<[u8]>>(
339 &self,
340 cf: &impl AsColumnFamilyRef,
341 from: K,
342 to: K,
343 ) -> Result<(), Error> {
344 DEFAULT_WRITE_OPTS.with(|opts| self.delete_range_cf_opt(cf, from, to, opts))
345 }
346}