electrs_rocksdb/transactions/
optimistic_transaction_db.rs1use std::{collections::BTreeMap, ffi::CString, fs, iter, marker::PhantomData, path::Path, ptr};
17
18use libc::{c_char, c_int};
19
20use crate::{
21 db::DBCommon, db::DBInner, ffi, ffi_util::to_cpath, write_batch::WriteBatchWithTransaction,
22 ColumnFamilyDescriptor, Error, OptimisticTransactionOptions, Options, ThreadMode, Transaction,
23 WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
24};
25
26#[cfg(not(feature = "multi-threaded-cf"))]
57pub type OptimisticTransactionDB<T = crate::SingleThreaded> =
58 DBCommon<T, OptimisticTransactionDBInner>;
59#[cfg(feature = "multi-threaded-cf")]
60pub type OptimisticTransactionDB<T = crate::MultiThreaded> =
61 DBCommon<T, OptimisticTransactionDBInner>;
62
63pub struct OptimisticTransactionDBInner {
64 base: *mut ffi::rocksdb_t,
65 db: *mut ffi::rocksdb_optimistictransactiondb_t,
66}
67
68impl DBInner for OptimisticTransactionDBInner {
69 fn inner(&self) -> *mut ffi::rocksdb_t {
70 self.base
71 }
72}
73
74impl Drop for OptimisticTransactionDBInner {
75 fn drop(&mut self) {
76 unsafe {
77 ffi::rocksdb_optimistictransactiondb_close_base_db(self.base);
78 ffi::rocksdb_optimistictransactiondb_close(self.db);
79 }
80 }
81}
82
83impl<T: ThreadMode> OptimisticTransactionDB<T> {
85 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
87 let mut opts = Options::default();
88 opts.create_if_missing(true);
89 Self::open(&opts, path)
90 }
91
92 pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
94 Self::open_cf(opts, path, None::<&str>)
95 }
96
97 pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
101 where
102 P: AsRef<Path>,
103 I: IntoIterator<Item = N>,
104 N: AsRef<str>,
105 {
106 let cfs = cfs
107 .into_iter()
108 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
109
110 Self::open_cf_descriptors_internal(opts, path, cfs)
111 }
112
113 pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
115 where
116 P: AsRef<Path>,
117 I: IntoIterator<Item = ColumnFamilyDescriptor>,
118 {
119 Self::open_cf_descriptors_internal(opts, path, cfs)
120 }
121
122 fn open_cf_descriptors_internal<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
124 where
125 P: AsRef<Path>,
126 I: IntoIterator<Item = ColumnFamilyDescriptor>,
127 {
128 let cfs: Vec<_> = cfs.into_iter().collect();
129 let outlive = iter::once(opts.outlive.clone())
130 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
131 .collect();
132
133 let cpath = to_cpath(&path)?;
134
135 if let Err(e) = fs::create_dir_all(&path) {
136 return Err(Error::new(format!(
137 "Failed to create RocksDB directory: `{:?}`.",
138 e
139 )));
140 }
141
142 let db: *mut ffi::rocksdb_optimistictransactiondb_t;
143 let mut cf_map = BTreeMap::new();
144
145 if cfs.is_empty() {
146 db = Self::open_raw(opts, &cpath)?;
147 } else {
148 let mut cfs_v = cfs;
149 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
151 cfs_v.push(ColumnFamilyDescriptor {
152 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
153 options: Options::default(),
154 });
155 }
156 let c_cfs: Vec<CString> = cfs_v
159 .iter()
160 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
161 .collect();
162
163 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
164
165 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
167
168 let cfopts: Vec<_> = cfs_v
169 .iter()
170 .map(|cf| cf.options.inner as *const _)
171 .collect();
172
173 db = Self::open_cf_raw(opts, &cpath, &cfs_v, &cfnames, &cfopts, &mut cfhandles)?;
174
175 for handle in &cfhandles {
176 if handle.is_null() {
177 return Err(Error::new(
178 "Received null column family handle from DB.".to_owned(),
179 ));
180 }
181 }
182
183 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
184 cf_map.insert(cf_desc.name.clone(), inner);
185 }
186 }
187
188 if db.is_null() {
189 return Err(Error::new("Could not initialize database.".to_owned()));
190 }
191
192 let base = unsafe { ffi::rocksdb_optimistictransactiondb_get_base_db(db) };
193 if base.is_null() {
194 unsafe {
195 ffi::rocksdb_optimistictransactiondb_close(db);
196 }
197 return Err(Error::new("Could not initialize database.".to_owned()));
198 }
199 let inner = OptimisticTransactionDBInner { base, db };
200
201 Ok(Self::new(
202 inner,
203 T::new_cf_map_internal(cf_map),
204 path.as_ref().to_path_buf(),
205 outlive,
206 ))
207 }
208
209 fn open_raw(
210 opts: &Options,
211 cpath: &CString,
212 ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
213 unsafe {
214 let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open(
215 opts.inner,
216 cpath.as_ptr()
217 ));
218 Ok(db)
219 }
220 }
221
222 fn open_cf_raw(
223 opts: &Options,
224 cpath: &CString,
225 cfs_v: &[ColumnFamilyDescriptor],
226 cfnames: &[*const c_char],
227 cfopts: &[*const ffi::rocksdb_options_t],
228 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
229 ) -> Result<*mut ffi::rocksdb_optimistictransactiondb_t, Error> {
230 unsafe {
231 let db = ffi_try!(ffi::rocksdb_optimistictransactiondb_open_column_families(
232 opts.inner,
233 cpath.as_ptr(),
234 cfs_v.len() as c_int,
235 cfnames.as_ptr(),
236 cfopts.as_ptr(),
237 cfhandles.as_mut_ptr(),
238 ));
239 Ok(db)
240 }
241 }
242
243 pub fn transaction(&self) -> Transaction<Self> {
245 self.transaction_opt(
246 &WriteOptions::default(),
247 &OptimisticTransactionOptions::default(),
248 )
249 }
250
251 pub fn transaction_opt(
253 &self,
254 writeopts: &WriteOptions,
255 otxn_opts: &OptimisticTransactionOptions,
256 ) -> Transaction<Self> {
257 Transaction {
258 inner: unsafe {
259 ffi::rocksdb_optimistictransaction_begin(
260 self.inner.db,
261 writeopts.inner,
262 otxn_opts.inner,
263 std::ptr::null_mut(),
264 )
265 },
266 _marker: PhantomData::default(),
267 }
268 }
269
270 pub fn write_opt(
271 &self,
272 batch: WriteBatchWithTransaction<true>,
273 writeopts: &WriteOptions,
274 ) -> Result<(), Error> {
275 unsafe {
276 ffi_try!(ffi::rocksdb_optimistictransactiondb_write(
277 self.inner.db,
278 writeopts.inner,
279 batch.inner
280 ));
281 }
282 Ok(())
283 }
284
285 pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
286 self.write_opt(batch, &WriteOptions::default())
287 }
288
289 pub fn write_without_wal(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
290 let mut wo = WriteOptions::new();
291 wo.disable_wal(true);
292 self.write_opt(batch, &wo)
293 }
294}