1use libc::{
2 c_uint,
3 c_void,
4 size_t,
5};
6use std::marker::PhantomData;
7use std::{
8 fmt,
9 mem,
10 ptr,
11 result,
12 slice,
13};
14
15use crate::ffi;
16
17use crate::cursor::{
18 RoCursor,
19 RwCursor,
20};
21use crate::database::Database;
22use crate::environment::{
23 Environment,
24 Stat,
25};
26use crate::error::{
27 lmdb_result,
28 Error,
29 Result,
30};
31use crate::flags::{
32 DatabaseFlags,
33 EnvironmentFlags,
34 WriteFlags,
35};
36
37pub trait Transaction: Sized {
41 fn txn(&self) -> *mut ffi::MDB_txn;
46
47 fn commit(self) -> Result<()> {
51 unsafe {
52 let result = lmdb_result(ffi::mdb_txn_commit(self.txn()));
53 mem::forget(self);
54 result
55 }
56 }
57
58 fn abort(self) {
62 }
64
65 unsafe fn open_db(&self, name: Option<&str>) -> Result<Database> {
83 Database::new(self.txn(), name, 0)
84 }
85
86 fn get<'txn, K>(&'txn self, database: Database, key: &K) -> Result<&'txn [u8]>
95 where
96 K: AsRef<[u8]>,
97 {
98 let key = key.as_ref();
99 let mut key_val: ffi::MDB_val = ffi::MDB_val {
100 mv_size: key.len() as size_t,
101 mv_data: key.as_ptr() as *mut c_void,
102 };
103 let mut data_val: ffi::MDB_val = ffi::MDB_val {
104 mv_size: 0,
105 mv_data: ptr::null_mut(),
106 };
107 unsafe {
108 match ffi::mdb_get(self.txn(), database.dbi(), &mut key_val, &mut data_val) {
109 ffi::MDB_SUCCESS => Ok(slice::from_raw_parts(data_val.mv_data as *const u8, data_val.mv_size as usize)),
110 err_code => Err(Error::from_err_code(err_code)),
111 }
112 }
113 }
114
115 fn open_ro_cursor<'txn>(&'txn self, db: Database) -> Result<RoCursor<'txn>> {
117 RoCursor::new(self, db)
118 }
119
120 fn db_flags(&self, db: Database) -> Result<DatabaseFlags> {
122 let mut flags: c_uint = 0;
123 unsafe {
124 lmdb_result(ffi::mdb_dbi_flags(self.txn(), db.dbi(), &mut flags))?;
125 }
126 Ok(DatabaseFlags::from_bits_truncate(flags))
127 }
128
129 fn stat(&self, db: Database) -> Result<Stat> {
131 unsafe {
132 let mut stat = Stat::new();
133 lmdb_try!(ffi::mdb_stat(self.txn(), db.dbi(), stat.mdb_stat()));
134 Ok(stat)
135 }
136 }
137}
138
139pub struct RoTransaction<'env> {
141 txn: *mut ffi::MDB_txn,
142 _marker: PhantomData<&'env ()>,
143}
144
145impl<'env> fmt::Debug for RoTransaction<'env> {
146 fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
147 f.debug_struct("RoTransaction").finish()
148 }
149}
150
151impl<'env> Drop for RoTransaction<'env> {
152 fn drop(&mut self) {
153 unsafe { ffi::mdb_txn_abort(self.txn) }
154 }
155}
156
157impl<'env> RoTransaction<'env> {
158 pub(crate) fn new(env: &'env Environment) -> Result<RoTransaction<'env>> {
161 let mut txn: *mut ffi::MDB_txn = ptr::null_mut();
162 unsafe {
163 lmdb_result(ffi::mdb_txn_begin(env.env(), ptr::null_mut(), ffi::MDB_RDONLY, &mut txn))?;
164 Ok(RoTransaction {
165 txn,
166 _marker: PhantomData,
167 })
168 }
169 }
170
171 pub fn reset(self) -> InactiveTransaction<'env> {
184 let txn = self.txn;
185 unsafe {
186 mem::forget(self);
187 ffi::mdb_txn_reset(txn)
188 };
189 InactiveTransaction {
190 txn,
191 _marker: PhantomData,
192 }
193 }
194}
195
196impl<'env> Transaction for RoTransaction<'env> {
197 fn txn(&self) -> *mut ffi::MDB_txn {
198 self.txn
199 }
200}
201
202pub struct InactiveTransaction<'env> {
204 txn: *mut ffi::MDB_txn,
205 _marker: PhantomData<&'env ()>,
206}
207
208impl<'env> fmt::Debug for InactiveTransaction<'env> {
209 fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
210 f.debug_struct("InactiveTransaction").finish()
211 }
212}
213
214impl<'env> Drop for InactiveTransaction<'env> {
215 fn drop(&mut self) {
216 unsafe { ffi::mdb_txn_abort(self.txn) }
217 }
218}
219
220impl<'env> InactiveTransaction<'env> {
221 pub fn renew(self) -> Result<RoTransaction<'env>> {
227 let txn = self.txn;
228 unsafe {
229 mem::forget(self);
230 lmdb_result(ffi::mdb_txn_renew(txn))?
231 };
232 Ok(RoTransaction {
233 txn,
234 _marker: PhantomData,
235 })
236 }
237}
238
239pub struct RwTransaction<'env> {
241 txn: *mut ffi::MDB_txn,
242 _marker: PhantomData<&'env ()>,
243}
244
245impl<'env> fmt::Debug for RwTransaction<'env> {
246 fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
247 f.debug_struct("RwTransaction").finish()
248 }
249}
250
251impl<'env> Drop for RwTransaction<'env> {
252 fn drop(&mut self) {
253 unsafe { ffi::mdb_txn_abort(self.txn) }
254 }
255}
256
257impl<'env> RwTransaction<'env> {
258 pub(crate) fn new(env: &'env Environment) -> Result<RwTransaction<'env>> {
261 let mut txn: *mut ffi::MDB_txn = ptr::null_mut();
262 unsafe {
263 lmdb_result(ffi::mdb_txn_begin(env.env(), ptr::null_mut(), EnvironmentFlags::empty().bits(), &mut txn))?;
264 Ok(RwTransaction {
265 txn,
266 _marker: PhantomData,
267 })
268 }
269 }
270
271 pub unsafe fn create_db(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> {
289 Database::new(self.txn(), name, flags.bits() | ffi::MDB_CREATE)
290 }
291
292 pub fn open_rw_cursor<'txn>(&'txn mut self, db: Database) -> Result<RwCursor<'txn>> {
294 RwCursor::new(self, db)
295 }
296
297 pub fn put<K, D>(&mut self, database: Database, key: &K, data: &D, flags: WriteFlags) -> Result<()>
304 where
305 K: AsRef<[u8]>,
306 D: AsRef<[u8]>,
307 {
308 let key = key.as_ref();
309 let data = data.as_ref();
310 let mut key_val: ffi::MDB_val = ffi::MDB_val {
311 mv_size: key.len() as size_t,
312 mv_data: key.as_ptr() as *mut c_void,
313 };
314 let mut data_val: ffi::MDB_val = ffi::MDB_val {
315 mv_size: data.len() as size_t,
316 mv_data: data.as_ptr() as *mut c_void,
317 };
318 unsafe { lmdb_result(ffi::mdb_put(self.txn(), database.dbi(), &mut key_val, &mut data_val, flags.bits())) }
319 }
320
321 pub fn reserve<'txn, K>(
325 &'txn mut self,
326 database: Database,
327 key: &K,
328 len: size_t,
329 flags: WriteFlags,
330 ) -> Result<&'txn mut [u8]>
331 where
332 K: AsRef<[u8]>,
333 {
334 let key = key.as_ref();
335 let mut key_val: ffi::MDB_val = ffi::MDB_val {
336 mv_size: key.len() as size_t,
337 mv_data: key.as_ptr() as *mut c_void,
338 };
339 let mut data_val: ffi::MDB_val = ffi::MDB_val {
340 mv_size: len,
341 mv_data: ptr::null_mut::<c_void>(),
342 };
343 unsafe {
344 lmdb_result(ffi::mdb_put(
345 self.txn(),
346 database.dbi(),
347 &mut key_val,
348 &mut data_val,
349 flags.bits() | ffi::MDB_RESERVE,
350 ))?;
351 Ok(slice::from_raw_parts_mut(data_val.mv_data as *mut u8, data_val.mv_size as usize))
352 }
353 }
354
355 pub fn del<K>(&mut self, database: Database, key: &K, data: Option<&[u8]>) -> Result<()>
366 where
367 K: AsRef<[u8]>,
368 {
369 let key = key.as_ref();
370 let mut key_val: ffi::MDB_val = ffi::MDB_val {
371 mv_size: key.len() as size_t,
372 mv_data: key.as_ptr() as *mut c_void,
373 };
374 let data_val: Option<ffi::MDB_val> = data.map(|data| ffi::MDB_val {
375 mv_size: data.len() as size_t,
376 mv_data: data.as_ptr() as *mut c_void,
377 });
378
379 if let Some(mut d) = data_val {
380 unsafe { lmdb_result(ffi::mdb_del(self.txn(), database.dbi(), &mut key_val, &mut d)) }
381 } else {
382 unsafe { lmdb_result(ffi::mdb_del(self.txn(), database.dbi(), &mut key_val, ptr::null_mut())) }
383 }
384 }
385
386 pub fn clear_db(&mut self, db: Database) -> Result<()> {
388 unsafe { lmdb_result(ffi::mdb_drop(self.txn(), db.dbi(), 0)) }
389 }
390
391 pub unsafe fn drop_db(&mut self, db: Database) -> Result<()> {
398 lmdb_result(ffi::mdb_drop(self.txn, db.dbi(), 1))
399 }
400
401 pub fn begin_nested_txn<'txn>(&'txn mut self) -> Result<RwTransaction<'txn>> {
403 let mut nested: *mut ffi::MDB_txn = ptr::null_mut();
404 unsafe {
405 let env: *mut ffi::MDB_env = ffi::mdb_txn_env(self.txn());
406 ffi::mdb_txn_begin(env, self.txn(), 0, &mut nested);
407 }
408 Ok(RwTransaction {
409 txn: nested,
410 _marker: PhantomData,
411 })
412 }
413}
414
415impl<'env> Transaction for RwTransaction<'env> {
416 fn txn(&self) -> *mut ffi::MDB_txn {
417 self.txn
418 }
419}
420
421#[cfg(test)]
422mod test {
423
424 use std::io::Write;
425 use std::sync::{
426 Arc,
427 Barrier,
428 };
429 use std::thread::{
430 self,
431 JoinHandle,
432 };
433
434
435 use super::*;
436 use crate::cursor::Cursor;
437 use crate::error::*;
438 use crate::flags::*;
439
440 #[test]
441 fn test_put_get_del() {
442 let dir = tempfile::tempdir().unwrap();
443 let env = Environment::new().open(dir.path()).unwrap();
444 let db = env.open_db(None).unwrap();
445
446 let mut txn = env.begin_rw_txn().unwrap();
447 txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
448 txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
449 txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap();
450 txn.commit().unwrap();
451
452 let mut txn = env.begin_rw_txn().unwrap();
453 assert_eq!(b"val1", txn.get(db, b"key1").unwrap());
454 assert_eq!(b"val2", txn.get(db, b"key2").unwrap());
455 assert_eq!(b"val3", txn.get(db, b"key3").unwrap());
456 assert_eq!(txn.get(db, b"key"), Err(Error::NotFound));
457
458 txn.del(db, b"key1", None).unwrap();
459 assert_eq!(txn.get(db, b"key1"), Err(Error::NotFound));
460 }
461
462 #[test]
463 fn test_put_get_del_multi() {
464 let dir = tempfile::tempdir().unwrap();
465 let env = Environment::new().open(dir.path()).unwrap();
466 let db = env.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
467
468 let mut txn = env.begin_rw_txn().unwrap();
469 txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
470 txn.put(db, b"key1", b"val2", WriteFlags::empty()).unwrap();
471 txn.put(db, b"key1", b"val3", WriteFlags::empty()).unwrap();
472 txn.put(db, b"key2", b"val1", WriteFlags::empty()).unwrap();
473 txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
474 txn.put(db, b"key2", b"val3", WriteFlags::empty()).unwrap();
475 txn.put(db, b"key3", b"val1", WriteFlags::empty()).unwrap();
476 txn.put(db, b"key3", b"val2", WriteFlags::empty()).unwrap();
477 txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap();
478 txn.commit().unwrap();
479
480 let txn = env.begin_rw_txn().unwrap();
481 {
482 let mut cur = txn.open_ro_cursor(db).unwrap();
483 let iter = cur.iter_dup_of(b"key1");
484 let vals = iter.map(|x| x.unwrap()).map(|(_, x)| x).collect::<Vec<_>>();
485 assert_eq!(vals, vec![b"val1", b"val2", b"val3"]);
486 }
487 txn.commit().unwrap();
488
489 let mut txn = env.begin_rw_txn().unwrap();
490 txn.del(db, b"key1", Some(b"val2")).unwrap();
491 txn.del(db, b"key2", None).unwrap();
492 txn.commit().unwrap();
493
494 let txn = env.begin_rw_txn().unwrap();
495 {
496 let mut cur = txn.open_ro_cursor(db).unwrap();
497 let iter = cur.iter_dup_of(b"key1");
498 let vals = iter.map(|x| x.unwrap()).map(|(_, x)| x).collect::<Vec<_>>();
499 assert_eq!(vals, vec![b"val1", b"val3"]);
500
501 let iter = cur.iter_dup_of(b"key2");
502 assert_eq!(0, iter.count());
503 }
504 txn.commit().unwrap();
505 }
506
507 #[test]
508 fn test_reserve() {
509 let dir = tempfile::tempdir().unwrap();
510 let env = Environment::new().open(dir.path()).unwrap();
511 let db = env.open_db(None).unwrap();
512
513 let mut txn = env.begin_rw_txn().unwrap();
514 {
515 let mut writer = txn.reserve(db, b"key1", 4, WriteFlags::empty()).unwrap();
516 writer.write_all(b"val1").unwrap();
517 }
518 txn.commit().unwrap();
519
520 let mut txn = env.begin_rw_txn().unwrap();
521 assert_eq!(b"val1", txn.get(db, b"key1").unwrap());
522 assert_eq!(txn.get(db, b"key"), Err(Error::NotFound));
523
524 txn.del(db, b"key1", None).unwrap();
525 assert_eq!(txn.get(db, b"key1"), Err(Error::NotFound));
526 }
527
528 #[test]
529 fn test_inactive_txn() {
530 let dir = tempfile::tempdir().unwrap();
531 let env = Environment::new().open(dir.path()).unwrap();
532 let db = env.open_db(None).unwrap();
533
534 {
535 let mut txn = env.begin_rw_txn().unwrap();
536 txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap();
537 txn.commit().unwrap();
538 }
539
540 let txn = env.begin_ro_txn().unwrap();
541 let inactive = txn.reset();
542 let active = inactive.renew().unwrap();
543 assert!(active.get(db, b"key").is_ok());
544 }
545
546 #[test]
547 fn test_nested_txn() {
548 let dir = tempfile::tempdir().unwrap();
549 let env = Environment::new().open(dir.path()).unwrap();
550 let db = env.open_db(None).unwrap();
551
552 let mut txn = env.begin_rw_txn().unwrap();
553 txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
554
555 {
556 let mut nested = txn.begin_nested_txn().unwrap();
557 nested.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
558 assert_eq!(nested.get(db, b"key1").unwrap(), b"val1");
559 assert_eq!(nested.get(db, b"key2").unwrap(), b"val2");
560 }
561
562 assert_eq!(txn.get(db, b"key1").unwrap(), b"val1");
563 assert_eq!(txn.get(db, b"key2"), Err(Error::NotFound));
564 }
565
566 #[test]
567 fn test_clear_db() {
568 let dir = tempfile::tempdir().unwrap();
569 let env = Environment::new().open(dir.path()).unwrap();
570 let db = env.open_db(None).unwrap();
571
572 {
573 let mut txn = env.begin_rw_txn().unwrap();
574 txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap();
575 txn.commit().unwrap();
576 }
577
578 {
579 let mut txn = env.begin_rw_txn().unwrap();
580 txn.clear_db(db).unwrap();
581 txn.commit().unwrap();
582 }
583
584 let txn = env.begin_ro_txn().unwrap();
585 assert_eq!(txn.get(db, b"key"), Err(Error::NotFound));
586 }
587
588 #[test]
589 fn test_drop_db() {
590 let dir = tempfile::tempdir().unwrap();
591 let env = Environment::new().set_max_dbs(2).open(dir.path()).unwrap();
592 let db = env.create_db(Some("test"), DatabaseFlags::empty()).unwrap();
593
594 {
595 let mut txn = env.begin_rw_txn().unwrap();
596 txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap();
597 txn.commit().unwrap();
598 }
599 {
600 let mut txn = env.begin_rw_txn().unwrap();
601 unsafe {
602 txn.drop_db(db).unwrap();
603 }
604 txn.commit().unwrap();
605 }
606
607 assert_eq!(env.open_db(Some("test")), Err(Error::NotFound));
608 }
609
610 #[test]
611 fn test_concurrent_readers_single_writer() {
612 let dir = tempfile::tempdir().unwrap();
613 let env: Arc<Environment> = Arc::new(Environment::new().open(dir.path()).unwrap());
614
615 let n = 10usize; let barrier = Arc::new(Barrier::new(n + 1));
617 let mut threads: Vec<JoinHandle<bool>> = Vec::with_capacity(n);
618
619 let key = b"key";
620 let val = b"val";
621
622 for _ in 0..n {
623 let reader_env = env.clone();
624 let reader_barrier = barrier.clone();
625
626 threads.push(thread::spawn(move || {
627 let db = reader_env.open_db(None).unwrap();
628 {
629 let txn = reader_env.begin_ro_txn().unwrap();
630 assert_eq!(txn.get(db, key), Err(Error::NotFound));
631 txn.abort();
632 }
633 reader_barrier.wait();
634 reader_barrier.wait();
635 {
636 let txn = reader_env.begin_ro_txn().unwrap();
637 txn.get(db, key).unwrap() == val
638 }
639 }));
640 }
641
642 let db = env.open_db(None).unwrap();
643 let mut txn = env.begin_rw_txn().unwrap();
644 barrier.wait();
645 txn.put(db, key, val, WriteFlags::empty()).unwrap();
646 txn.commit().unwrap();
647 barrier.wait();
648
649 assert!(threads.into_iter().all(|b| b.join().unwrap()))
650 }
651
652 #[test]
653 fn test_concurrent_writers() {
654 let dir = tempfile::tempdir().unwrap();
655 let env = Arc::new(Environment::new().open(dir.path()).unwrap());
656
657 let n = 10usize; let mut threads: Vec<JoinHandle<bool>> = Vec::with_capacity(n);
659
660 let key = "key";
661 let val = "val";
662
663 for i in 0..n {
664 let writer_env = env.clone();
665
666 threads.push(thread::spawn(move || {
667 let db = writer_env.open_db(None).unwrap();
668 let mut txn = writer_env.begin_rw_txn().unwrap();
669 txn.put(db, &format!("{}{}", key, i), &format!("{}{}", val, i), WriteFlags::empty()).unwrap();
670 txn.commit().is_ok()
671 }));
672 }
673 assert!(threads.into_iter().all(|b| b.join().unwrap()));
674
675 let db = env.open_db(None).unwrap();
676 let txn = env.begin_ro_txn().unwrap();
677
678 for i in 0..n {
679 assert_eq!(format!("{}{}", val, i).as_bytes(), txn.get(db, &format!("{}{}", key, i)).unwrap());
680 }
681 }
682
683 #[test]
684 fn test_stat() {
685 let dir = tempfile::tempdir().unwrap();
686 let env = Environment::new().open(dir.path()).unwrap();
687 let db = env.create_db(None, DatabaseFlags::empty()).unwrap();
688
689 let mut txn = env.begin_rw_txn().unwrap();
690 txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
691 txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
692 txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap();
693 txn.commit().unwrap();
694
695 {
696 let txn = env.begin_ro_txn().unwrap();
697 let stat = txn.stat(db).unwrap();
698 assert_eq!(stat.entries(), 3);
699 }
700
701 let mut txn = env.begin_rw_txn().unwrap();
702 txn.del(db, b"key1", None).unwrap();
703 txn.del(db, b"key2", None).unwrap();
704 txn.commit().unwrap();
705
706 {
707 let txn = env.begin_ro_txn().unwrap();
708 let stat = txn.stat(db).unwrap();
709 assert_eq!(stat.entries(), 1);
710 }
711
712 let mut txn = env.begin_rw_txn().unwrap();
713 txn.put(db, b"key4", b"val4", WriteFlags::empty()).unwrap();
714 txn.put(db, b"key5", b"val5", WriteFlags::empty()).unwrap();
715 txn.put(db, b"key6", b"val6", WriteFlags::empty()).unwrap();
716 txn.commit().unwrap();
717
718 {
719 let txn = env.begin_ro_txn().unwrap();
720 let stat = txn.stat(db).unwrap();
721 assert_eq!(stat.entries(), 4);
722 }
723 }
724
725 #[test]
726 fn test_stat_dupsort() {
727 let dir = tempfile::tempdir().unwrap();
728 let env = Environment::new().open(dir.path()).unwrap();
729 let db = env.create_db(None, DatabaseFlags::DUP_SORT).unwrap();
730
731 let mut txn = env.begin_rw_txn().unwrap();
732 txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap();
733 txn.put(db, b"key1", b"val2", WriteFlags::empty()).unwrap();
734 txn.put(db, b"key1", b"val3", WriteFlags::empty()).unwrap();
735 txn.put(db, b"key2", b"val1", WriteFlags::empty()).unwrap();
736 txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap();
737 txn.put(db, b"key2", b"val3", WriteFlags::empty()).unwrap();
738 txn.put(db, b"key3", b"val1", WriteFlags::empty()).unwrap();
739 txn.put(db, b"key3", b"val2", WriteFlags::empty()).unwrap();
740 txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap();
741 txn.commit().unwrap();
742
743 {
744 let txn = env.begin_ro_txn().unwrap();
745 let stat = txn.stat(db).unwrap();
746 assert_eq!(stat.entries(), 9);
747 }
748
749 let mut txn = env.begin_rw_txn().unwrap();
750 txn.del(db, b"key1", Some(b"val2")).unwrap();
751 txn.del(db, b"key2", None).unwrap();
752 txn.commit().unwrap();
753
754 {
755 let txn = env.begin_ro_txn().unwrap();
756 let stat = txn.stat(db).unwrap();
757 assert_eq!(stat.entries(), 5);
758 }
759
760 let mut txn = env.begin_rw_txn().unwrap();
761 txn.put(db, b"key4", b"val1", WriteFlags::empty()).unwrap();
762 txn.put(db, b"key4", b"val2", WriteFlags::empty()).unwrap();
763 txn.put(db, b"key4", b"val3", WriteFlags::empty()).unwrap();
764 txn.commit().unwrap();
765
766 {
767 let txn = env.begin_ro_txn().unwrap();
768 let stat = txn.stat(db).unwrap();
769 assert_eq!(stat.entries(), 8);
770 }
771 }
772}