1use std::cmp::Ordering::{Equal, Greater, Less};
2use std::mem::forget;
3use std::ops::RangeBounds;
4use std::ptr;
5use std::sync::atomic::AtomicPtr;
6use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
7
8use saa::Lock;
9use sdd::{AtomicShared, Guard, Ptr, Shared, Tag};
10
11use super::leaf::{DIMENSION, InsertResult, Leaf, RemoveResult, Scanner};
12use super::leaf_node::RETIRED;
13use super::leaf_node::RemoveRangeState;
14use super::node::Node;
15use crate::Comparable;
16use crate::async_helper::TryWait;
17use crate::exit_guard::ExitGuard;
18
19pub struct InternalNode<K, V> {
23 pub(super) children: Leaf<K, AtomicShared<Node<K, V>>>,
25 pub(super) unbounded_child: AtomicShared<Node<K, V>>,
30 split_op: StructuralChange<K, V>,
32 pub(super) lock: Lock,
34}
35
36pub(super) struct Locker<'n, K, V> {
38 internal_node: &'n InternalNode<K, V>,
39}
40
41struct StructuralChange<K, V> {
46 origin_node_key: AtomicPtr<K>,
47 origin_node: AtomicShared<Node<K, V>>,
48 low_key_node: AtomicShared<Node<K, V>>,
49 middle_key: AtomicPtr<K>,
50 high_key_node: AtomicShared<Node<K, V>>,
51}
52
53impl<K, V> InternalNode<K, V> {
54 #[inline]
56 pub(super) fn new() -> InternalNode<K, V> {
57 InternalNode {
58 children: Leaf::new(),
59 unbounded_child: AtomicShared::null(),
60 split_op: StructuralChange::default(),
61 lock: Lock::default(),
62 }
63 }
64
65 #[inline]
67 pub(super) fn clear(&self, guard: &Guard) {
68 let scanner = Scanner::new(&self.children);
69 for (_, child) in scanner {
70 let child_ptr = child.load(Acquire, guard);
71 if let Some(child) = child_ptr.as_ref() {
72 child.clear(guard);
73 }
74 }
75 let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
76 if let Some(unbounded) = unbounded_ptr.as_ref() {
77 unbounded.clear(guard);
78 }
79 }
80
81 #[inline]
83 pub(super) fn depth(&self, depth: usize, guard: &Guard) -> usize {
84 let unbounded_ptr = self.unbounded_child.load(Relaxed, guard);
85 if let Some(unbounded_ref) = unbounded_ptr.as_ref() {
86 return unbounded_ref.depth(depth + 1, guard);
87 }
88 depth
89 }
90
91 #[inline]
93 pub(super) fn retired(&self) -> bool {
94 self.unbounded_child.tag(Acquire) == RETIRED
95 }
96}
97
98impl<K, V> InternalNode<K, V>
99where
100 K: 'static + Clone + Ord,
101 V: 'static + Clone,
102{
103 #[inline]
105 pub(super) fn search_entry<'g, Q>(&self, key: &Q, guard: &'g Guard) -> Option<(&'g K, &'g V)>
106 where
107 K: 'g,
108 Q: Comparable<K> + ?Sized,
109 {
110 loop {
111 let (child, metadata) = self.children.min_greater_equal(key);
112 if let Some((_, child)) = child {
113 if let Some(child) = child.load(Acquire, guard).as_ref() {
114 if self.children.validate(metadata) {
115 return child.search_entry(key, guard);
117 }
118 }
119 } else {
120 let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
121 if let Some(unbounded) = unbounded_ptr.as_ref() {
122 if self.children.validate(metadata) {
123 return unbounded.search_entry(key, guard);
124 }
125 } else {
126 return None;
127 }
128 }
129 }
130 }
131
132 #[inline]
134 pub(super) fn search_value<'g, Q>(&self, key: &Q, guard: &'g Guard) -> Option<&'g V>
135 where
136 K: 'g,
137 Q: Comparable<K> + ?Sized,
138 {
139 loop {
140 let (child, metadata) = self.children.min_greater_equal(key);
141 if let Some((_, child)) = child {
142 if let Some(child) = child.load(Acquire, guard).as_ref() {
143 if self.children.validate(metadata) {
144 return child.search_value(key, guard);
146 }
147 }
148 } else {
149 let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
150 if let Some(unbounded) = unbounded_ptr.as_ref() {
151 if self.children.validate(metadata) {
152 return unbounded.search_value(key, guard);
153 }
154 } else {
155 return None;
156 }
157 }
158 }
159 }
160
161 #[inline]
163 pub(super) fn min<'g>(&self, guard: &'g Guard) -> Option<Scanner<'g, K, V>> {
164 loop {
165 let mut retry = false;
166 let scanner = Scanner::new(&self.children);
167 let metadata = scanner.metadata();
168 for (_, child) in scanner {
169 let child_ptr = child.load(Acquire, guard);
170 if let Some(child) = child_ptr.as_ref() {
171 if self.children.validate(metadata) {
172 if let Some(scanner) = child.min(guard) {
174 return Some(scanner);
175 }
176 continue;
177 }
178 }
179 retry = true;
181 break;
182 }
183 if retry {
184 continue;
185 }
186 let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
187 if let Some(unbounded) = unbounded_ptr.as_ref() {
188 if self.children.validate(metadata) {
189 return unbounded.min(guard);
190 }
191 continue;
192 }
193 return None;
194 }
195 }
196
197 #[inline]
202 pub(super) fn max_le_appr<'g, Q>(&self, key: &Q, guard: &'g Guard) -> Option<Scanner<'g, K, V>>
203 where
204 K: 'g,
205 Q: Comparable<K> + ?Sized,
206 {
207 loop {
208 if let Some(scanner) = Scanner::max_less(&self.children, key) {
209 if let Some((_, child)) = scanner.get() {
210 if let Some(child) = child.load(Acquire, guard).as_ref() {
211 if self.children.validate(scanner.metadata()) {
212 if let Some(scanner) = child.max_le_appr(key, guard) {
214 return Some(scanner);
215 }
216 break;
218 }
219 }
220 continue;
222 }
223 }
224 break;
226 }
227
228 let mut min_scanner = self.min(guard)?;
230 min_scanner.next();
231 loop {
232 if let Some((k, _)) = min_scanner.get() {
233 if key.compare(k).is_ge() {
234 return Some(min_scanner);
235 }
236 break;
237 }
238 min_scanner = min_scanner.jump(None, guard)?;
239 }
240
241 None
242 }
243
244 #[inline]
246 pub(super) fn insert<W: TryWait>(
247 &self,
248 mut key: K,
249 mut val: V,
250 async_wait: &mut W,
251 guard: &Guard,
252 ) -> Result<InsertResult<K, V>, (K, V)> {
253 loop {
254 let (child, metadata) = self.children.min_greater_equal(&key);
255 if let Some((child_key, child)) = child {
256 let child_ptr = child.load(Acquire, guard);
257 if let Some(child_ref) = child_ptr.as_ref() {
258 if self.children.validate(metadata) {
259 let insert_result = child_ref.insert(key, val, async_wait, guard)?;
261 match insert_result {
262 InsertResult::Success
263 | InsertResult::Duplicate(..)
264 | InsertResult::Frozen(..) => return Ok(insert_result),
265 InsertResult::Full(k, v) => {
266 let split_result = self.split_node(
267 (k, v),
268 Some(child_key),
269 child_ptr,
270 child,
271 false,
272 async_wait,
273 guard,
274 )?;
275 if let InsertResult::Retry(k, v) = split_result {
276 key = k;
277 val = v;
278 continue;
279 }
280 return Ok(split_result);
281 }
282 InsertResult::Retired(k, v) => {
283 debug_assert!(child_ref.retired());
284 if self.coalesce(guard) == RemoveResult::Retired {
285 debug_assert!(self.retired());
286 return Ok(InsertResult::Retired(k, v));
287 }
288 return Err((k, v));
289 }
290 InsertResult::Retry(k, v) => {
291 if self.cleanup_link(&k, false, guard) {
293 key = k;
294 val = v;
295 continue;
296 }
297 return Ok(InsertResult::Retry(k, v));
298 }
299 };
300 }
301 }
302 continue;
304 }
305
306 let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
307 if let Some(unbounded) = unbounded_ptr.as_ref() {
308 debug_assert!(unbounded_ptr.tag() == Tag::None);
309 if !self.children.validate(metadata) {
310 continue;
311 }
312 let insert_result = unbounded.insert(key, val, async_wait, guard)?;
313 match insert_result {
314 InsertResult::Success
315 | InsertResult::Duplicate(..)
316 | InsertResult::Frozen(..) => return Ok(insert_result),
317 InsertResult::Full(k, v) => {
318 let split_result = self.split_node(
319 (k, v),
320 None,
321 unbounded_ptr,
322 &self.unbounded_child,
323 false,
324 async_wait,
325 guard,
326 )?;
327 if let InsertResult::Retry(k, v) = split_result {
328 key = k;
329 val = v;
330 continue;
331 }
332 return Ok(split_result);
333 }
334 InsertResult::Retired(k, v) => {
335 debug_assert!(unbounded.retired());
336 if self.coalesce(guard) == RemoveResult::Retired {
337 debug_assert!(self.retired());
338 return Ok(InsertResult::Retired(k, v));
339 }
340 return Err((k, v));
341 }
342 InsertResult::Retry(k, v) => {
343 if self.cleanup_link(&k, false, guard) {
344 key = k;
345 val = v;
346 continue;
347 }
348 return Ok(InsertResult::Retry(k, v));
349 }
350 };
351 }
352 debug_assert!(unbounded_ptr.tag() == RETIRED);
353 return Ok(InsertResult::Retired(key, val));
354 }
355 }
356
357 #[inline]
363 pub(super) fn remove_if<Q, F: FnMut(&V) -> bool, W>(
364 &self,
365 key: &Q,
366 condition: &mut F,
367 async_wait: &mut W,
368 guard: &Guard,
369 ) -> Result<RemoveResult, ()>
370 where
371 Q: Comparable<K> + ?Sized,
372 W: TryWait,
373 {
374 loop {
375 let (child, metadata) = self.children.min_greater_equal(key);
376 if let Some((_, child)) = child {
377 let child_ptr = child.load(Acquire, guard);
378 if let Some(child) = child_ptr.as_ref() {
379 if self.children.validate(metadata) {
380 let result =
382 child.remove_if::<_, _, _>(key, condition, async_wait, guard)?;
383 if result == RemoveResult::Cleanup {
384 if self.cleanup_link(key, false, guard) {
385 return Ok(RemoveResult::Success);
386 }
387 return Ok(RemoveResult::Cleanup);
388 }
389 if result == RemoveResult::Retired {
390 return Ok(self.coalesce(guard));
391 }
392 return Ok(result);
393 }
394 }
395 continue;
397 }
398 let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
399 if let Some(unbounded) = unbounded_ptr.as_ref() {
400 debug_assert!(unbounded_ptr.tag() == Tag::None);
401 if !self.children.validate(metadata) {
402 continue;
404 }
405 let result = unbounded.remove_if::<_, _, _>(key, condition, async_wait, guard)?;
406 if result == RemoveResult::Cleanup {
407 if self.cleanup_link(key, false, guard) {
408 return Ok(RemoveResult::Success);
409 }
410 return Ok(RemoveResult::Cleanup);
411 }
412 if result == RemoveResult::Retired {
413 return Ok(self.coalesce(guard));
414 }
415 return Ok(result);
416 }
417 return Ok(RemoveResult::Fail);
418 }
419 }
420
421 #[allow(clippy::too_many_lines)]
425 #[inline]
426 pub(super) fn remove_range<'g, Q, R: RangeBounds<Q>, W: TryWait>(
427 &self,
428 range: &R,
429 start_unbounded: bool,
430 valid_lower_max_leaf: Option<&'g Leaf<K, V>>,
431 valid_upper_min_node: Option<&'g Node<K, V>>,
432 async_wait: &mut W,
433 guard: &'g Guard,
434 ) -> Result<usize, ()>
435 where
436 Q: Comparable<K> + ?Sized,
437 {
438 debug_assert!(valid_lower_max_leaf.is_none() || start_unbounded);
439 debug_assert!(valid_lower_max_leaf.is_none() || valid_upper_min_node.is_none());
440
441 let Some(_lock) = Locker::try_lock(self) else {
442 async_wait.try_wait(&self.lock);
443 return Err(());
444 };
445
446 let mut current_state = RemoveRangeState::Below;
447 let mut num_children = 1;
448 let mut lower_border = None;
449 let mut upper_border = None;
450
451 for (key, node) in Scanner::new(&self.children) {
452 current_state = current_state.next(key, range, start_unbounded);
453 match current_state {
454 RemoveRangeState::Below => {
455 num_children += 1;
456 }
457 RemoveRangeState::MaybeBelow => {
458 debug_assert!(!start_unbounded);
459 num_children += 1;
460 lower_border.replace((Some(key), node));
461 }
462 RemoveRangeState::FullyContained => {
463 self.children.remove_if(key, &mut |_| true);
466 node.swap((None, Tag::None), AcqRel);
467 }
468 RemoveRangeState::MaybeAbove => {
469 if valid_upper_min_node.is_some() {
470 self.children.remove_if(key, &mut |_| true);
472 node.swap((None, Tag::None), AcqRel);
473 } else {
474 num_children += 1;
475 upper_border.replace(node);
476 }
477 break;
478 }
479 }
480 }
481
482 match current_state {
484 RemoveRangeState::Below => {
485 debug_assert!(lower_border.is_none() && upper_border.is_none());
487 if valid_upper_min_node.is_some() {
488 lower_border.replace((None, &self.unbounded_child));
489 } else {
490 upper_border.replace(&self.unbounded_child);
491 }
492 }
493 RemoveRangeState::MaybeBelow => {
494 debug_assert!(!start_unbounded);
495 debug_assert!(lower_border.is_some() && upper_border.is_none());
496 upper_border.replace(&self.unbounded_child);
497 }
498 RemoveRangeState::FullyContained => {
499 debug_assert!(upper_border.is_none());
500 upper_border.replace(&self.unbounded_child);
501 }
502 RemoveRangeState::MaybeAbove => {
503 debug_assert!(upper_border.is_some());
504 }
505 }
506
507 if let Some(lower_leaf) = valid_lower_max_leaf {
508 debug_assert!(start_unbounded && lower_border.is_none() && upper_border.is_some());
510 if let Some(upper_node) = upper_border.and_then(|n| n.load(Acquire, guard).as_ref()) {
511 upper_node.remove_range(range, true, Some(lower_leaf), None, async_wait, guard)?;
512 }
513 } else if let Some(upper_node) = valid_upper_min_node {
514 debug_assert!(lower_border.is_some());
517 if let Some((Some(key), lower_node)) = lower_border {
518 self.children.remove_if(key, &mut |_| true);
519 self.unbounded_child
520 .swap((lower_node.get_shared(Acquire, guard), Tag::None), AcqRel);
521 lower_node.swap((None, Tag::None), Release);
522 }
523 if let Some(lower_node) = self.unbounded_child.load(Acquire, guard).as_ref() {
524 lower_node.remove_range(
525 range,
526 start_unbounded,
527 None,
528 Some(upper_node),
529 async_wait,
530 guard,
531 )?;
532 }
533 } else {
534 let lower_node = lower_border.and_then(|n| n.1.load(Acquire, guard).as_ref());
535 let upper_node = upper_border.and_then(|n| n.load(Acquire, guard).as_ref());
536 match (lower_node, upper_node) {
537 (_, None) => (),
538 (None, Some(upper_node)) => {
539 upper_node.remove_range(
540 range,
541 start_unbounded,
542 None,
543 None,
544 async_wait,
545 guard,
546 )?;
547 }
548 (Some(lower_node), Some(upper_node)) => {
549 debug_assert!(!ptr::eq(lower_node, upper_node));
550 lower_node.remove_range(
551 range,
552 start_unbounded,
553 None,
554 Some(upper_node),
555 async_wait,
556 guard,
557 )?;
558 }
559 }
560 }
561
562 Ok(num_children)
563 }
564
565 #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
571 pub(super) fn split_node<W: TryWait>(
572 &self,
573 entry: (K, V),
574 full_node_key: Option<&K>,
575 full_node_ptr: Ptr<Node<K, V>>,
576 full_node: &AtomicShared<Node<K, V>>,
577 root_split: bool,
578 async_wait: &mut W,
579 guard: &Guard,
580 ) -> Result<InsertResult<K, V>, (K, V)> {
581 let target = full_node_ptr.as_ref().unwrap();
582 if !self.lock.try_lock() {
583 target.rollback(guard);
584 async_wait.try_wait(&self.lock);
585 return Err(entry);
586 }
587 debug_assert!(!self.retired());
588
589 if full_node_ptr != full_node.load(Relaxed, guard) {
590 self.lock.release_lock();
591 target.rollback(guard);
592 return Err(entry);
593 }
594
595 let prev = self
596 .split_op
597 .origin_node
598 .swap((full_node.get_shared(Relaxed, guard), Tag::None), Relaxed)
599 .0;
600 debug_assert!(prev.is_none());
601
602 if let Some(full_node_key) = full_node_key {
603 self.split_op
604 .origin_node_key
605 .store(ptr::from_ref(full_node_key).cast_mut(), Relaxed);
606 }
607
608 let mut exit_guard = ExitGuard::new(true, |rollback| {
609 if rollback {
610 self.rollback(guard);
611 }
612 });
613 match target {
614 Node::Internal(full_internal_node) => {
615 let internal_nodes = (
617 Shared::new(Node::new_internal_node()),
618 Shared::new(Node::new_internal_node()),
619 );
620 let Node::Internal(low_key_nodes) = internal_nodes.0.as_ref() else {
621 unreachable!()
622 };
623 let Node::Internal(high_key_nodes) = internal_nodes.1.as_ref() else {
624 unreachable!()
625 };
626
627 #[allow(clippy::type_complexity)]
629 let mut entry_array: [Option<(
630 Option<&K>,
631 AtomicShared<Node<K, V>>,
632 )>;
633 DIMENSION.num_entries + 2] = Default::default();
634 let mut num_entries = 0;
635 let scanner = Scanner::new(&full_internal_node.children);
636 let recommended_boundary = Leaf::<K, V>::optimal_boundary(scanner.metadata());
637 for entry in scanner {
638 if unsafe {
639 full_internal_node
640 .split_op
641 .origin_node_key
642 .load(Relaxed)
643 .as_ref()
644 .map_or_else(|| false, |key| entry.0 == key)
645 } {
646 let low_key_node_ptr = full_internal_node
647 .split_op
648 .low_key_node
649 .load(Relaxed, guard);
650 if !low_key_node_ptr.is_null() {
651 entry_array[num_entries].replace((
652 Some(unsafe {
653 full_internal_node
654 .split_op
655 .middle_key
656 .load(Relaxed)
657 .as_ref()
658 .unwrap()
659 }),
660 full_internal_node
661 .split_op
662 .low_key_node
663 .clone(Relaxed, guard),
664 ));
665 num_entries += 1;
666 }
667 let high_key_node_ptr = full_internal_node
668 .split_op
669 .high_key_node
670 .load(Relaxed, guard);
671 if !high_key_node_ptr.is_null() {
672 entry_array[num_entries].replace((
673 Some(entry.0),
674 full_internal_node
675 .split_op
676 .high_key_node
677 .clone(Relaxed, guard),
678 ));
679 num_entries += 1;
680 }
681 } else {
682 entry_array[num_entries]
683 .replace((Some(entry.0), entry.1.clone(Acquire, guard)));
684 num_entries += 1;
685 }
686 }
687 if full_internal_node
688 .split_op
689 .origin_node_key
690 .load(Relaxed)
691 .is_null()
692 {
693 let low_key_node_ptr = full_internal_node
696 .split_op
697 .low_key_node
698 .load(Relaxed, guard);
699 if !low_key_node_ptr.is_null() {
700 entry_array[num_entries].replace((
701 Some(unsafe {
702 full_internal_node
703 .split_op
704 .middle_key
705 .load(Relaxed)
706 .as_ref()
707 .unwrap()
708 }),
709 full_internal_node
710 .split_op
711 .low_key_node
712 .clone(Relaxed, guard),
713 ));
714 num_entries += 1;
715 }
716 let high_key_node_ptr = full_internal_node
717 .split_op
718 .high_key_node
719 .load(Relaxed, guard);
720 if !high_key_node_ptr.is_null() {
721 entry_array[num_entries].replace((
722 None,
723 full_internal_node
724 .split_op
725 .high_key_node
726 .clone(Relaxed, guard),
727 ));
728 num_entries += 1;
729 }
730 } else {
731 entry_array[num_entries].replace((
734 None,
735 full_internal_node.unbounded_child.clone(Relaxed, guard),
736 ));
737 num_entries += 1;
738 }
739 debug_assert!(num_entries >= 2);
740
741 let low_key_node_array_size = recommended_boundary.min(num_entries - 1);
742 for (i, entry) in entry_array.iter().enumerate() {
743 if let Some((k, v)) = entry {
744 match (i + 1).cmp(&low_key_node_array_size) {
745 Less => {
746 low_key_nodes.children.insert_unchecked(
747 k.unwrap().clone(),
748 v.clone(Relaxed, guard),
749 i,
750 );
751 }
752 Equal => {
753 if let Some(&k) = k.as_ref() {
754 self.split_op
755 .middle_key
756 .store(ptr::from_ref(k).cast_mut(), Relaxed);
757 }
758 low_key_nodes
759 .unbounded_child
760 .swap((v.get_shared(Relaxed, guard), Tag::None), Relaxed);
761 }
762 Greater => {
763 if let Some(k) = k.cloned() {
764 high_key_nodes.children.insert_unchecked(
765 k,
766 v.clone(Relaxed, guard),
767 i - low_key_node_array_size,
768 );
769 } else {
770 high_key_nodes
771 .unbounded_child
772 .swap((v.get_shared(Relaxed, guard), Tag::None), Relaxed);
773 }
774 }
775 }
776 } else {
777 break;
778 }
779 }
780
781 self.split_op
783 .low_key_node
784 .swap((Some(internal_nodes.0), Tag::None), Relaxed);
785 self.split_op
786 .high_key_node
787 .swap((Some(internal_nodes.1), Tag::None), Relaxed);
788 }
789 Node::Leaf(full_leaf_node) => {
790 let leaf_nodes = (
792 Shared::new(Node::new_leaf_node()),
793 Shared::new(Node::new_leaf_node()),
794 );
795 let low_key_leaf_node = if let Node::Leaf(low_key_leaf_node) = leaf_nodes.0.as_ref()
796 {
797 Some(low_key_leaf_node)
798 } else {
799 None
800 };
801 let high_key_leaf_node =
802 if let Node::Leaf(high_key_leaf_node) = &leaf_nodes.1.as_ref() {
803 Some(high_key_leaf_node)
804 } else {
805 None
806 };
807
808 self.split_op.middle_key.store(
809 ptr::from_ref(full_leaf_node.split_leaf_node(
810 low_key_leaf_node.unwrap(),
811 high_key_leaf_node.unwrap(),
812 guard,
813 ))
814 .cast_mut(),
815 Relaxed,
816 );
817
818 self.split_op
820 .low_key_node
821 .swap((Some(leaf_nodes.0), Tag::None), Relaxed);
822 self.split_op
823 .high_key_node
824 .swap((Some(leaf_nodes.1), Tag::None), Relaxed);
825 }
826 }
827
828 match self.children.insert(
830 unsafe {
831 self.split_op
832 .middle_key
833 .load(Relaxed)
834 .as_ref()
835 .unwrap()
836 .clone()
837 },
838 self.split_op.low_key_node.clone(Relaxed, guard),
839 ) {
840 InsertResult::Success => (),
841 InsertResult::Duplicate(..) | InsertResult::Frozen(..) | InsertResult::Retry(..) => {
842 unreachable!()
843 }
844 InsertResult::Full(..) | InsertResult::Retired(..) => {
845 *exit_guard = false;
847 return Ok(InsertResult::Full(entry.0, entry.1));
848 }
849 }
850 *exit_guard = false;
851
852 let unused_node = full_node
854 .swap(
855 (
856 self.split_op.high_key_node.get_shared(Relaxed, guard),
857 Tag::None,
858 ),
859 Release,
860 )
861 .0;
862
863 if root_split {
864 return Ok(InsertResult::Retry(entry.0, entry.1));
866 }
867
868 self.finish_split();
870
871 if let Some(unused_node) = unused_node {
873 unused_node.commit(guard);
875 let _: bool = unused_node.release();
876 }
877
878 Ok(InsertResult::Retry(entry.0, entry.1))
880 }
881
882 #[inline]
884 pub(super) fn finish_split(&self) {
885 let origin = self.split_op.reset();
886 self.lock.release_lock();
887 origin.map(Shared::release);
888 }
889
890 #[inline]
892 pub(super) fn commit(&self, guard: &Guard) {
893 let origin = self.split_op.reset();
894
895 self.lock.poison_lock();
897 if let Some(origin) = origin {
898 origin.commit(guard);
899 let _: bool = origin.release();
900 }
901 }
902
903 #[inline]
905 pub(super) fn rollback(&self, guard: &Guard) {
906 let origin = self.split_op.reset();
907 self.lock.release_lock();
908 if let Some(origin) = origin {
909 origin.rollback(guard);
910 let _: bool = origin.release();
911 }
912 }
913
914 #[inline]
918 pub(super) fn cleanup_link<'g, Q>(&self, key: &Q, traverse_max: bool, guard: &'g Guard) -> bool
919 where
920 K: 'g,
921 Q: Comparable<K> + ?Sized,
922 {
923 if traverse_max {
924 if let Some(unbounded) = self.unbounded_child.load(Acquire, guard).as_ref() {
926 return unbounded.cleanup_link(key, true, guard);
927 }
928 } else if let Some(child_scanner) = Scanner::max_less(&self.children, key) {
929 if let Some((_, child)) = child_scanner.get() {
930 if let Some(child) = child.load(Acquire, guard).as_ref() {
931 return child.cleanup_link(key, true, guard);
932 }
933 }
934 }
935 false
936 }
937
938 fn coalesce(&self, guard: &Guard) -> RemoveResult {
940 let mut node_deleted = false;
941 while let Some(lock) = Locker::try_lock(self) {
942 let mut max_key_entry = None;
943 for (key, node) in Scanner::new(&self.children) {
944 let node_ptr = node.load(Acquire, guard);
945 let node_ref = node_ptr.as_ref().unwrap();
946 if node_ref.retired() {
947 let result = self.children.remove_if(key, &mut |_| true);
948 debug_assert_ne!(result, RemoveResult::Fail);
949
950 if let Some(node) = node.swap((None, Tag::None), Release).0 {
953 let _: bool = node.release();
954 node_deleted = true;
955 }
956 } else {
957 max_key_entry.replace((key, node));
958 }
959 }
960
961 let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
963 let fully_empty = if let Some(unbounded) = unbounded_ptr.as_ref() {
964 if unbounded.retired() {
965 if let Some((key, max_key_child)) = max_key_entry {
966 if let Some(obsolete_node) = self
967 .unbounded_child
968 .swap(
969 (max_key_child.get_shared(Relaxed, guard), Tag::None),
970 Release,
971 )
972 .0
973 {
974 debug_assert!(obsolete_node.retired());
975 let _: bool = obsolete_node.release();
976 node_deleted = true;
977 }
978 let result = self.children.remove_if(key, &mut |_| true);
979 debug_assert_ne!(result, RemoveResult::Fail);
980 if let Some(node) = max_key_child.swap((None, Tag::None), Release).0 {
981 let _: bool = node.release();
982 node_deleted = true;
983 }
984 false
985 } else {
986 if let Some(obsolete_node) =
987 self.unbounded_child.swap((None, RETIRED), Release).0
988 {
989 debug_assert!(obsolete_node.retired());
990 let _: bool = obsolete_node.release();
991 node_deleted = true;
992 }
993 true
994 }
995 } else {
996 false
997 }
998 } else {
999 debug_assert!(unbounded_ptr.tag() == RETIRED);
1000 true
1001 };
1002
1003 if fully_empty {
1004 return RemoveResult::Retired;
1005 }
1006
1007 drop(lock);
1008 if !self.has_retired_node(guard) {
1009 break;
1010 }
1011 }
1012
1013 if node_deleted {
1014 RemoveResult::Cleanup
1015 } else {
1016 RemoveResult::Success
1017 }
1018 }
1019
1020 fn has_retired_node(&self, guard: &Guard) -> bool {
1022 let mut has_valid_node = false;
1023 for entry in Scanner::new(&self.children) {
1024 let leaf_ptr = entry.1.load(Relaxed, guard);
1025 if let Some(leaf) = leaf_ptr.as_ref() {
1026 if leaf.retired() {
1027 return true;
1028 }
1029 has_valid_node = true;
1030 }
1031 }
1032 if !has_valid_node {
1033 let unbounded_ptr = self.unbounded_child.load(Relaxed, guard);
1034 if let Some(unbounded) = unbounded_ptr.as_ref() {
1035 if unbounded.retired() {
1036 return true;
1037 }
1038 }
1039 }
1040 false
1041 }
1042}
1043
1044impl<'n, K, V> Locker<'n, K, V> {
1045 #[inline]
1047 pub(super) fn try_lock(internal_node: &'n InternalNode<K, V>) -> Option<Locker<'n, K, V>> {
1048 if internal_node.lock.try_lock() {
1049 Some(Locker { internal_node })
1050 } else {
1051 None
1052 }
1053 }
1054
1055 pub(super) fn unlock_retire(self) {
1057 self.internal_node.lock.poison_lock();
1058 forget(self);
1059 }
1060}
1061
1062impl<K, V> Drop for Locker<'_, K, V> {
1063 #[inline]
1064 fn drop(&mut self) {
1065 self.internal_node.lock.release_lock();
1066 }
1067}
1068
1069impl<K, V> StructuralChange<K, V> {
1070 fn reset(&self) -> Option<Shared<Node<K, V>>> {
1071 self.origin_node_key.store(ptr::null_mut(), Relaxed);
1072 self.low_key_node.swap((None, Tag::None), Relaxed);
1073 self.middle_key.store(ptr::null_mut(), Relaxed);
1074 self.high_key_node.swap((None, Tag::None), Relaxed);
1075 self.origin_node.swap((None, Tag::None), Relaxed).0
1076 }
1077}
1078
1079impl<K, V> Default for StructuralChange<K, V> {
1080 #[inline]
1081 fn default() -> Self {
1082 Self {
1083 origin_node_key: AtomicPtr::default(),
1084 origin_node: AtomicShared::null(),
1085 low_key_node: AtomicShared::null(),
1086 middle_key: AtomicPtr::default(),
1087 high_key_node: AtomicShared::null(),
1088 }
1089 }
1090}
1091
1092#[cfg(not(feature = "loom"))]
1093#[cfg(test)]
1094mod test {
1095 use super::*;
1096 use std::sync::atomic::AtomicBool;
1097 use tokio::sync::Barrier;
1098
1099 fn new_level_3_node() -> InternalNode<usize, usize> {
1100 InternalNode {
1101 children: Leaf::new(),
1102 unbounded_child: AtomicShared::new(Node::Internal(InternalNode {
1103 children: Leaf::new(),
1104 unbounded_child: AtomicShared::new(Node::new_leaf_node()),
1105 split_op: StructuralChange::default(),
1106 lock: Lock::default(),
1107 })),
1108 split_op: StructuralChange::default(),
1109 lock: Lock::default(),
1110 }
1111 }
1112
1113 #[test]
1114 fn bulk() {
1115 let internal_node = new_level_3_node();
1116 let guard = Guard::new();
1117 assert_eq!(internal_node.depth(1, &guard), 3);
1118
1119 let data_size = if cfg!(miri) { 256 } else { 8192 };
1120 for k in 0..data_size {
1121 match internal_node.insert(k, k, &mut (), &guard) {
1122 Ok(result) => match result {
1123 InsertResult::Success => {
1124 assert_eq!(internal_node.search_entry(&k, &guard), Some((&k, &k)));
1125 }
1126 InsertResult::Duplicate(..)
1127 | InsertResult::Frozen(..)
1128 | InsertResult::Retired(..) => unreachable!(),
1129 InsertResult::Full(_, _) => {
1130 internal_node.rollback(&guard);
1131 for j in 0..k {
1132 assert_eq!(internal_node.search_entry(&j, &guard), Some((&j, &j)));
1133 if j == k - 1 {
1134 assert!(matches!(
1135 internal_node.remove_if::<_, _, _>(
1136 &j,
1137 &mut |_| true,
1138 &mut (),
1139 &guard
1140 ),
1141 Ok(RemoveResult::Retired)
1142 ));
1143 } else {
1144 assert!(
1145 internal_node
1146 .remove_if::<_, _, _>(&j, &mut |_| true, &mut (), &guard)
1147 .is_ok(),
1148 );
1149 }
1150 assert_eq!(internal_node.search_entry(&j, &guard), None);
1151 }
1152 break;
1153 }
1154 InsertResult::Retry(k, v) => {
1155 let result = internal_node.insert(k, v, &mut (), &guard);
1156 assert!(result.is_ok());
1157 assert_eq!(internal_node.search_entry(&k, &guard), Some((&k, &k)));
1158 }
1159 },
1160 Err((k, v)) => {
1161 let result = internal_node.insert(k, v, &mut (), &guard);
1162 assert!(result.is_ok());
1163 assert_eq!(internal_node.search_entry(&k, &guard), Some((&k, &k)));
1164 }
1165 }
1166 }
1167 }
1168
1169 #[cfg_attr(miri, ignore)]
1170 #[tokio::test(flavor = "multi_thread", worker_threads = 16)]
1171 async fn parallel() {
1172 let num_tasks = 8;
1173 let workload_size = 64;
1174 let barrier = Shared::new(Barrier::new(num_tasks));
1175 for _ in 0..64 {
1176 let internal_node = Shared::new(new_level_3_node());
1177 assert!(
1178 internal_node
1179 .insert(usize::MAX, usize::MAX, &mut (), &Guard::new())
1180 .is_ok()
1181 );
1182 let mut task_handles = Vec::with_capacity(num_tasks);
1183 for task_id in 0..num_tasks {
1184 let barrier_clone = barrier.clone();
1185 let internal_node_clone = internal_node.clone();
1186 task_handles.push(tokio::task::spawn(async move {
1187 barrier_clone.wait().await;
1188 let guard = Guard::new();
1189 let mut max_key = None;
1190 let range = (task_id * workload_size)..((task_id + 1) * workload_size);
1191 for id in range.clone() {
1192 loop {
1193 if let Ok(r) = internal_node_clone.insert(id, id, &mut (), &guard) {
1194 match r {
1195 InsertResult::Success => {
1196 match internal_node_clone.insert(id, id, &mut (), &guard) {
1197 Ok(InsertResult::Duplicate(..)) | Err(_) => (),
1198 _ => unreachable!(),
1199 }
1200 break;
1201 }
1202 InsertResult::Full(..) => {
1203 internal_node_clone.rollback(&guard);
1204 max_key.replace(id);
1205 break;
1206 }
1207 InsertResult::Frozen(..) | InsertResult::Retry(..) => (),
1208 _ => unreachable!(),
1209 }
1210 }
1211 }
1212 if max_key.is_some() {
1213 break;
1214 }
1215 }
1216 for id in range.clone() {
1217 if max_key == Some(id) {
1218 break;
1219 }
1220 assert_eq!(
1221 internal_node_clone.search_entry(&id, &guard),
1222 Some((&id, &id))
1223 );
1224 }
1225 for id in range {
1226 if max_key == Some(id) {
1227 break;
1228 }
1229 loop {
1230 if let Ok(r) = internal_node_clone.remove_if::<_, _, _>(
1231 &id,
1232 &mut |_| true,
1233 &mut (),
1234 &guard,
1235 ) {
1236 match r {
1237 RemoveResult::Success
1238 | RemoveResult::Cleanup
1239 | RemoveResult::Fail => break,
1240 RemoveResult::Frozen | RemoveResult::Retired => unreachable!(),
1241 }
1242 }
1243 }
1244 assert!(internal_node_clone.search_entry(&id, &guard).is_none());
1245 if let Ok(RemoveResult::Success) = internal_node_clone.remove_if::<_, _, _>(
1246 &id,
1247 &mut |_| true,
1248 &mut (),
1249 &guard,
1250 ) {
1251 unreachable!()
1252 }
1253 }
1254 }));
1255 }
1256
1257 for r in futures::future::join_all(task_handles).await {
1258 assert!(r.is_ok());
1259 }
1260 assert!(
1261 internal_node
1262 .remove_if::<_, _, _>(&usize::MAX, &mut |_| true, &mut (), &Guard::new())
1263 .is_ok()
1264 );
1265 }
1266 }
1267
1268 #[cfg_attr(miri, ignore)]
1269 #[tokio::test(flavor = "multi_thread", worker_threads = 16)]
1270 async fn durability() {
1271 let num_tasks = 8_usize;
1272 let num_iterations = 64;
1273 let workload_size = 64_usize;
1274 for k in 0..64 {
1275 let fixed_point = k * 16;
1276 for _ in 0..=num_iterations {
1277 let barrier = Shared::new(Barrier::new(num_tasks));
1278 let internal_node = Shared::new(new_level_3_node());
1279 let inserted: Shared<AtomicBool> = Shared::new(AtomicBool::new(false));
1280 let mut task_handles = Vec::with_capacity(num_tasks);
1281 for _ in 0..num_tasks {
1282 let barrier_clone = barrier.clone();
1283 let internal_node_clone = internal_node.clone();
1284 let inserted_clone = inserted.clone();
1285 task_handles.push(tokio::spawn(async move {
1286 {
1287 barrier_clone.wait().await;
1288 let guard = Guard::new();
1289 match internal_node_clone.insert(
1290 fixed_point,
1291 fixed_point,
1292 &mut (),
1293 &guard,
1294 ) {
1295 Ok(InsertResult::Success) => {
1296 assert!(!inserted_clone.swap(true, Relaxed));
1297 }
1298 Ok(InsertResult::Full(_, _) | InsertResult::Retired(_, _)) => {
1299 internal_node_clone.rollback(&guard);
1300 }
1301 _ => (),
1302 }
1303 assert_eq!(
1304 internal_node_clone
1305 .search_entry(&fixed_point, &guard)
1306 .unwrap(),
1307 (&fixed_point, &fixed_point)
1308 );
1309 }
1310 {
1311 barrier_clone.wait().await;
1312 let guard = Guard::new();
1313 for i in 0..workload_size {
1314 if i != fixed_point {
1315 if let Ok(
1316 InsertResult::Full(_, _) | InsertResult::Retired(_, _),
1317 ) = internal_node_clone.insert(i, i, &mut (), &guard)
1318 {
1319 internal_node_clone.rollback(&guard);
1320 }
1321 }
1322 assert_eq!(
1323 internal_node_clone
1324 .search_entry(&fixed_point, &guard)
1325 .unwrap(),
1326 (&fixed_point, &fixed_point)
1327 );
1328 }
1329 for i in 0..workload_size {
1330 let max_scanner = internal_node_clone
1331 .max_le_appr(&fixed_point, &guard)
1332 .unwrap();
1333 assert!(*max_scanner.get().unwrap().0 <= fixed_point);
1334 let mut min_scanner = internal_node_clone.min(&guard).unwrap();
1335 if let Some((f, v)) = min_scanner.next() {
1336 assert_eq!(*f, *v);
1337 assert!(*f <= fixed_point);
1338 } else {
1339 let (f, v) =
1340 min_scanner.jump(None, &guard).unwrap().get().unwrap();
1341 assert_eq!(*f, *v);
1342 assert!(*f <= fixed_point);
1343 }
1344 let _result = internal_node_clone.remove_if::<_, _, _>(
1345 &i,
1346 &mut |v| *v != fixed_point,
1347 &mut (),
1348 &guard,
1349 );
1350 assert_eq!(
1351 internal_node_clone
1352 .search_entry(&fixed_point, &guard)
1353 .unwrap(),
1354 (&fixed_point, &fixed_point)
1355 );
1356 }
1357 }
1358 }));
1359 }
1360 for r in futures::future::join_all(task_handles).await {
1361 assert!(r.is_ok());
1362 }
1363 assert!((*inserted).load(Relaxed));
1364 }
1365 }
1366 }
1367}