1#![allow(clippy::module_name_repetitions)]
76use std::{cell::RefCell, fmt, rc::Rc};
77
78#[cfg(not(feature = "testing"))]
79use std::collections::HashMap as Map;
80
81#[cfg(feature = "testing")]
84use std::collections::BTreeMap as Map;
85
86use crate::{
87 concurrency_control, pin, Batch, Error, Guard, IVec, Protector, Result,
88 Tree,
89};
90
91#[derive(Clone)]
95pub struct TransactionalTree {
96 pub(super) tree: Tree,
97 pub(super) writes: Rc<RefCell<Map<IVec, Option<IVec>>>>,
98 pub(super) read_cache: Rc<RefCell<Map<IVec, Option<IVec>>>>,
99 pub(super) flush_on_commit: Rc<RefCell<bool>>,
100}
101
102#[derive(Debug, Clone, PartialEq)]
105pub enum UnabortableTransactionError {
106 Conflict,
111 Storage(Error),
115}
116
117impl fmt::Display for UnabortableTransactionError {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 use UnabortableTransactionError::*;
120 match self {
121 Conflict => write!(f, "Conflict during transaction"),
122 Storage(e) => e.fmt(f),
123 }
124 }
125}
126
127impl std::error::Error for UnabortableTransactionError {
128 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
129 match self {
130 UnabortableTransactionError::Storage(ref e) => Some(e),
131 _ => None,
132 }
133 }
134}
135
136pub(crate) type UnabortableTransactionResult<T> =
137 std::result::Result<T, UnabortableTransactionError>;
138
139impl From<Error> for UnabortableTransactionError {
140 fn from(error: Error) -> Self {
141 UnabortableTransactionError::Storage(error)
142 }
143}
144
145impl<E> From<UnabortableTransactionError> for ConflictableTransactionError<E> {
146 fn from(error: UnabortableTransactionError) -> Self {
147 match error {
148 UnabortableTransactionError::Conflict => {
149 ConflictableTransactionError::Conflict
150 }
151 UnabortableTransactionError::Storage(error) => {
152 ConflictableTransactionError::Storage(error)
153 }
154 }
155 }
156}
157
158#[derive(Debug, Clone, PartialEq)]
161pub enum ConflictableTransactionError<T = Error> {
162 Abort(T),
167 #[doc(hidden)]
168 Conflict,
173 Storage(Error),
177}
178
179impl<E: fmt::Display> fmt::Display for ConflictableTransactionError<E> {
180 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181 use ConflictableTransactionError::*;
182 match self {
183 Abort(e) => e.fmt(f),
184 Conflict => write!(f, "Conflict during transaction"),
185 Storage(e) => e.fmt(f),
186 }
187 }
188}
189
190impl<E: std::error::Error> std::error::Error
191 for ConflictableTransactionError<E>
192{
193 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
194 match self {
195 ConflictableTransactionError::Storage(ref e) => Some(e),
196 _ => None,
197 }
198 }
199}
200
201#[derive(Debug, Clone, PartialEq)]
204pub enum TransactionError<T = Error> {
205 Abort(T),
210 Storage(Error),
214}
215
216impl<E: fmt::Display> fmt::Display for TransactionError<E> {
217 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218 use TransactionError::*;
219 match self {
220 Abort(e) => e.fmt(f),
221 Storage(e) => e.fmt(f),
222 }
223 }
224}
225
226impl<E: std::error::Error> std::error::Error for TransactionError<E> {
227 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
228 match self {
229 TransactionError::Storage(ref e) => Some(e),
230 _ => None,
231 }
232 }
233}
234
235pub type ConflictableTransactionResult<T, E = ()> =
238 std::result::Result<T, ConflictableTransactionError<E>>;
239
240impl<T> From<Error> for ConflictableTransactionError<T> {
241 fn from(error: Error) -> Self {
242 ConflictableTransactionError::Storage(error)
243 }
244}
245
246pub type TransactionResult<T, E = ()> =
250 std::result::Result<T, TransactionError<E>>;
251
252impl<T> From<Error> for TransactionError<T> {
253 fn from(error: Error) -> Self {
254 TransactionError::Storage(error)
255 }
256}
257
258impl TransactionalTree {
259 pub fn insert<K, V>(
261 &self,
262 key: K,
263 value: V,
264 ) -> UnabortableTransactionResult<Option<IVec>>
265 where
266 K: AsRef<[u8]> + Into<IVec>,
267 V: Into<IVec>,
268 {
269 let old = self.get(key.as_ref())?;
270 let mut writes = self.writes.borrow_mut();
271 let _last_write =
272 writes.insert(key.into(), Some(value.into()));
273 Ok(old)
274 }
275
276 pub fn remove<K>(
278 &self,
279 key: K,
280 ) -> UnabortableTransactionResult<Option<IVec>>
281 where
282 K: AsRef<[u8]> + Into<IVec>,
283 {
284 let old = self.get(key.as_ref());
285 let mut writes = self.writes.borrow_mut();
286 let _last_write = writes.insert(key.into(), None);
287 old
288 }
289
290 pub fn get<K: AsRef<[u8]>>(
292 &self,
293 key: K,
294 ) -> UnabortableTransactionResult<Option<IVec>> {
295 let writes = self.writes.borrow();
296 if let Some(first_try) = writes.get(key.as_ref()) {
297 return Ok(first_try.clone());
298 }
299 let mut reads = self.read_cache.borrow_mut();
300 if let Some(second_try) = reads.get(key.as_ref()) {
301 return Ok(second_try.clone());
302 }
303
304 let mut guard = pin();
306 let get = loop {
307 if let Ok(get) = self.tree.get_inner(key.as_ref(), &mut guard)? {
308 break get;
309 }
310 };
311 let last = reads.insert(key.as_ref().into(), get.clone());
312 assert!(last.is_none());
313
314 Ok(get)
315 }
316
317 pub fn apply_batch(
319 &self,
320 batch: &Batch,
321 ) -> UnabortableTransactionResult<()> {
322 for (k, v_opt) in &batch.writes {
323 if let Some(v) = v_opt {
324 let _old = self.insert(k, v)?;
325 } else {
326 let _old = self.remove(k)?;
327 }
328 }
329 Ok(())
330 }
331
332 pub fn flush(&self) {
334 *self.flush_on_commit.borrow_mut() = true;
335 }
336
337 pub fn generate_id(&self) -> Result<u64> {
348 self.tree.context.pagecache.generate_id_inner()
349 }
350
351 fn unstage(&self) {
352 unimplemented!()
353 }
354
355 const fn validate(&self) -> bool {
356 true
357 }
358
359 fn commit(&self) -> Result<()> {
360 let writes = self.writes.borrow();
361 let mut guard = pin();
362 for (k, v_opt) in &*writes {
363 while self.tree.insert_inner(k, v_opt.clone(), &mut guard)?.is_err()
364 {
365 }
366 }
367 Ok(())
368 }
369 fn from_tree(tree: &Tree) -> Self {
370 Self {
371 tree: tree.clone(),
372 writes: Default::default(),
373 read_cache: Default::default(),
374 flush_on_commit: Default::default(),
375 }
376 }
377}
378
379pub struct TransactionalTrees {
381 inner: Vec<TransactionalTree>,
382}
383
384impl TransactionalTrees {
385 fn stage(&self) -> UnabortableTransactionResult<Protector<'_>> {
386 Ok(concurrency_control::write())
387 }
388
389 fn unstage(&self) {
390 for tree in &self.inner {
391 tree.unstage();
392 }
393 }
394
395 fn validate(&self) -> bool {
396 for tree in &self.inner {
397 if !tree.validate() {
398 return false;
399 }
400 }
401 true
402 }
403
404 fn commit(&self, guard: &Guard) -> Result<()> {
405 let peg = self.inner[0].tree.context.pin_log(guard)?;
406 for tree in &self.inner {
407 tree.commit()?;
408 }
409
410 peg.seal_batch()
414 }
415
416 fn flush_if_configured(&self) -> Result<()> {
417 let mut should_flush = None;
418
419 for tree in &self.inner {
420 if *tree.flush_on_commit.borrow() {
421 should_flush = Some(tree);
422 break;
423 }
424 }
425
426 if let Some(tree) = should_flush {
427 tree.tree.flush()?;
428 }
429 Ok(())
430 }
431}
432
433pub fn abort<A, T>(t: T) -> ConflictableTransactionResult<A, T> {
435 Err(ConflictableTransactionError::Abort(t))
436}
437
438pub trait Transactional<E = ()> {
440 type View;
443
444 fn make_overlay(&self) -> Result<TransactionalTrees>;
447
448 fn view_overlay(overlay: &TransactionalTrees) -> Self::View;
452
453 fn transaction<F, A>(&self, f: F) -> TransactionResult<A, E>
458 where
459 F: Fn(&Self::View) -> ConflictableTransactionResult<A, E>,
460 {
461 loop {
462 let tt = self.make_overlay()?;
463 let view = Self::view_overlay(&tt);
464
465 let locks = if let Ok(l) = tt.stage() {
467 l
468 } else {
469 tt.unstage();
470 continue;
471 };
472 let ret = f(&view);
473 if !tt.validate() {
474 tt.unstage();
475 continue;
476 }
477 match ret {
478 Ok(r) => {
479 let guard = pin();
480 tt.commit(&guard)?;
481 drop(locks);
482 tt.flush_if_configured()?;
483 return Ok(r);
484 }
485 Err(ConflictableTransactionError::Abort(e)) => {
486 return Err(TransactionError::Abort(e));
487 }
488 Err(ConflictableTransactionError::Conflict) => continue,
489 Err(ConflictableTransactionError::Storage(other)) => {
490 return Err(TransactionError::Storage(other));
491 }
492 }
493 }
494 }
495}
496
497impl<E> Transactional<E> for &Tree {
498 type View = TransactionalTree;
499
500 fn make_overlay(&self) -> Result<TransactionalTrees> {
501 Ok(TransactionalTrees {
502 inner: vec![TransactionalTree::from_tree(self)],
503 })
504 }
505
506 fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
507 overlay.inner[0].clone()
508 }
509}
510
511impl<E> Transactional<E> for &&Tree {
512 type View = TransactionalTree;
513
514 fn make_overlay(&self) -> Result<TransactionalTrees> {
515 Ok(TransactionalTrees {
516 inner: vec![TransactionalTree::from_tree(*self)],
517 })
518 }
519
520 fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
521 overlay.inner[0].clone()
522 }
523}
524
525impl<E> Transactional<E> for Tree {
526 type View = TransactionalTree;
527
528 fn make_overlay(&self) -> Result<TransactionalTrees> {
529 Ok(TransactionalTrees {
530 inner: vec![TransactionalTree::from_tree(self)],
531 })
532 }
533
534 fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
535 overlay.inner[0].clone()
536 }
537}
538
539impl<E> Transactional<E> for [Tree] {
540 type View = Vec<TransactionalTree>;
541
542 fn make_overlay(&self) -> Result<TransactionalTrees> {
543 let same_db = self.windows(2).all(|w| {
544 let path_1 = w[0].context.get_path();
545 let path_2 = w[1].context.get_path();
546 path_1 == path_2
547 });
548 if !same_db {
549 return Err(Error::Unsupported(
550 "cannot use trees from multiple databases in the same transaction".into(),
551 ));
552 }
553
554 Ok(TransactionalTrees {
555 inner: self
556 .iter()
557 .map(|t| TransactionalTree::from_tree(t))
558 .collect(),
559 })
560 }
561
562 fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
563 overlay.inner.clone()
564 }
565}
566
567impl<E> Transactional<E> for [&Tree] {
568 type View = Vec<TransactionalTree>;
569
570 fn make_overlay(&self) -> Result<TransactionalTrees> {
571 let same_db = self.windows(2).all(|w| {
572 let path_1 = w[0].context.get_path();
573 let path_2 = w[1].context.get_path();
574 path_1 == path_2
575 });
576 if !same_db {
577 return Err(Error::Unsupported(
578 "cannot use trees from multiple databases in the same transaction".into(),
579 ));
580 }
581
582 Ok(TransactionalTrees {
583 inner: self
584 .iter()
585 .map(|&t| TransactionalTree::from_tree(t))
586 .collect(),
587 })
588 }
589
590 fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
591 overlay.inner.clone()
592 }
593}
594
595macro_rules! repeat_type {
596 ($t:ty, ($literal:literal)) => {
597 ($t,)
598 };
599 ($t:ty, ($($literals:literal),+)) => {
600 repeat_type!(IMPL $t, (), ($($literals),*))
601 };
602 (IMPL $t:ty, (), ($first:literal, $($rest:literal),*)) => {
603 repeat_type!(IMPL $t, ($t), ($($rest),*))
604 };
605 (IMPL $t:ty, ($($partial:tt),*), ($first:literal, $($rest:literal),*)) => {
606 repeat_type!(IMPL $t, ($t, $($partial),*), ($($rest),*))
607 };
608 (IMPL $t:ty, ($($partial:tt),*), ($last:literal)) => {
609 ($($partial),*, $t)
610 };
611}
612
613macro_rules! impl_transactional_tuple_trees {
614 ($($indices:tt),+) => {
615 impl<E> Transactional<E> for repeat_type!(&Tree, ($($indices),+)) {
616 type View = repeat_type!(TransactionalTree, ($($indices),+));
617
618 fn make_overlay(&self) -> Result<TransactionalTrees> {
619 let mut paths = vec![];
620 $(
621 paths.push(self.$indices.context.get_path());
622 )+
623 if !paths.windows(2).all(|w| {
624 w[0] == w[1]
625 }) {
626 return Err(Error::Unsupported(
627 "cannot use trees from multiple databases in the same transaction".into(),
628 ));
629 }
630
631 Ok(TransactionalTrees {
632 inner: vec![
633 $(
634 TransactionalTree::from_tree(self.$indices)
635 ),+
636 ],
637 })
638 }
639
640 fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
641 (
642 $(
643 overlay.inner[$indices].clone()
644 ),+,
645 )
646 }
647 }
648 };
649}
650
651impl_transactional_tuple_trees!(0);
652impl_transactional_tuple_trees!(0, 1);
653impl_transactional_tuple_trees!(0, 1, 2);
654impl_transactional_tuple_trees!(0, 1, 2, 3);
655impl_transactional_tuple_trees!(0, 1, 2, 3, 4);
656impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5);
657impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6);
658impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7);
659impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8);
660impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
661impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
662impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
663impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
664impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13);