scc/tree_index/
internal_node.rs

1use std::cmp::Ordering::{Equal, Greater, Less};
2use std::mem::forget;
3use std::ops::RangeBounds;
4use std::ptr;
5use std::sync::atomic::AtomicPtr;
6use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
7
8use saa::Lock;
9use sdd::{AtomicShared, Guard, Ptr, Shared, Tag};
10
11use super::leaf::{DIMENSION, InsertResult, Leaf, RemoveResult, Scanner};
12use super::leaf_node::RETIRED;
13use super::leaf_node::RemoveRangeState;
14use super::node::Node;
15use crate::Comparable;
16use crate::async_helper::TryWait;
17use crate::exit_guard::ExitGuard;
18
19/// Internal node.
20///
21/// The layout of an internal node: `|ptr(children)/max(child keys)|...|ptr(children)|`.
22pub struct InternalNode<K, V> {
23    /// Children of the [`InternalNode`].
24    pub(super) children: Leaf<K, AtomicShared<Node<K, V>>>,
25    /// A child [`Node`] that has no upper key bound.
26    ///
27    /// It stores the maximum key in the node, and key-value pairs are first pushed to this [`Node`]
28    /// until it splits.
29    pub(super) unbounded_child: AtomicShared<Node<K, V>>,
30    /// Ongoing split operation.
31    split_op: StructuralChange<K, V>,
32    /// [`Lock`] to protect the [`InternalNode`].
33    pub(super) lock: Lock,
34}
35
36/// [`Locker`] holds exclusive ownership of an [`InternalNode`].
37pub(super) struct Locker<'n, K, V> {
38    internal_node: &'n InternalNode<K, V>,
39}
40
41/// [`StructuralChange`] stores intermediate results during a split operation.
42///
43/// `AtomicPtr` members may point to values under the protection of the [`Guard`] used for the
44/// split operation.
45struct StructuralChange<K, V> {
46    origin_node_key: AtomicPtr<K>,
47    origin_node: AtomicShared<Node<K, V>>,
48    low_key_node: AtomicShared<Node<K, V>>,
49    middle_key: AtomicPtr<K>,
50    high_key_node: AtomicShared<Node<K, V>>,
51}
52
53impl<K, V> InternalNode<K, V> {
54    /// Creates a new empty internal node.
55    #[inline]
56    pub(super) fn new() -> InternalNode<K, V> {
57        InternalNode {
58            children: Leaf::new(),
59            unbounded_child: AtomicShared::null(),
60            split_op: StructuralChange::default(),
61            lock: Lock::default(),
62        }
63    }
64
65    /// Clears the internal node.
66    #[inline]
67    pub(super) fn clear(&self, guard: &Guard) {
68        let scanner = Scanner::new(&self.children);
69        for (_, child) in scanner {
70            let child_ptr = child.load(Acquire, guard);
71            if let Some(child) = child_ptr.as_ref() {
72                child.clear(guard);
73            }
74        }
75        let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
76        if let Some(unbounded) = unbounded_ptr.as_ref() {
77            unbounded.clear(guard);
78        }
79    }
80
81    /// Returns the depth of the node.
82    #[inline]
83    pub(super) fn depth(&self, depth: usize, guard: &Guard) -> usize {
84        let unbounded_ptr = self.unbounded_child.load(Relaxed, guard);
85        if let Some(unbounded_ref) = unbounded_ptr.as_ref() {
86            return unbounded_ref.depth(depth + 1, guard);
87        }
88        depth
89    }
90
91    /// Returns `true` if the [`InternalNode`] has retired.
92    #[inline]
93    pub(super) fn retired(&self) -> bool {
94        self.unbounded_child.tag(Acquire) == RETIRED
95    }
96}
97
98impl<K, V> InternalNode<K, V>
99where
100    K: 'static + Clone + Ord,
101    V: 'static + Clone,
102{
103    /// Searches for an entry containing the specified key.
104    #[inline]
105    pub(super) fn search_entry<'g, Q>(&self, key: &Q, guard: &'g Guard) -> Option<(&'g K, &'g V)>
106    where
107        K: 'g,
108        Q: Comparable<K> + ?Sized,
109    {
110        loop {
111            let (child, metadata) = self.children.min_greater_equal(key);
112            if let Some((_, child)) = child {
113                if let Some(child) = child.load(Acquire, guard).as_ref() {
114                    if self.children.validate(metadata) {
115                        // Data race resolution - see `LeafNode::search_entry`.
116                        return child.search_entry(key, guard);
117                    }
118                }
119            } else {
120                let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
121                if let Some(unbounded) = unbounded_ptr.as_ref() {
122                    if self.children.validate(metadata) {
123                        return unbounded.search_entry(key, guard);
124                    }
125                } else {
126                    return None;
127                }
128            }
129        }
130    }
131
132    /// Searches for the value associated with the specified key.
133    #[inline]
134    pub(super) fn search_value<'g, Q>(&self, key: &Q, guard: &'g Guard) -> Option<&'g V>
135    where
136        K: 'g,
137        Q: Comparable<K> + ?Sized,
138    {
139        loop {
140            let (child, metadata) = self.children.min_greater_equal(key);
141            if let Some((_, child)) = child {
142                if let Some(child) = child.load(Acquire, guard).as_ref() {
143                    if self.children.validate(metadata) {
144                        // Data race resolution - see `LeafNode::search_entry`.
145                        return child.search_value(key, guard);
146                    }
147                }
148            } else {
149                let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
150                if let Some(unbounded) = unbounded_ptr.as_ref() {
151                    if self.children.validate(metadata) {
152                        return unbounded.search_value(key, guard);
153                    }
154                } else {
155                    return None;
156                }
157            }
158        }
159    }
160
161    /// Returns the minimum key entry.
162    #[inline]
163    pub(super) fn min<'g>(&self, guard: &'g Guard) -> Option<Scanner<'g, K, V>> {
164        loop {
165            let mut retry = false;
166            let scanner = Scanner::new(&self.children);
167            let metadata = scanner.metadata();
168            for (_, child) in scanner {
169                let child_ptr = child.load(Acquire, guard);
170                if let Some(child) = child_ptr.as_ref() {
171                    if self.children.validate(metadata) {
172                        // Data race resolution - see `LeafNode::search_entry`.
173                        if let Some(scanner) = child.min(guard) {
174                            return Some(scanner);
175                        }
176                        continue;
177                    }
178                }
179                // It is not a hot loop - see `LeafNode::search_entry`.
180                retry = true;
181                break;
182            }
183            if retry {
184                continue;
185            }
186            let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
187            if let Some(unbounded) = unbounded_ptr.as_ref() {
188                if self.children.validate(metadata) {
189                    return unbounded.min(guard);
190                }
191                continue;
192            }
193            return None;
194        }
195    }
196
197    /// Returns a [`Scanner`] pointing to an entry that is close enough to the entry with the
198    /// maximum key among those keys that are smaller than or equal to the given key.
199    ///
200    /// Returns `None` if all keys in the [`InternalNode`] are greater than the given key.
201    #[inline]
202    pub(super) fn max_le_appr<'g, Q>(&self, key: &Q, guard: &'g Guard) -> Option<Scanner<'g, K, V>>
203    where
204        K: 'g,
205        Q: Comparable<K> + ?Sized,
206    {
207        loop {
208            if let Some(scanner) = Scanner::max_less(&self.children, key) {
209                if let Some((_, child)) = scanner.get() {
210                    if let Some(child) = child.load(Acquire, guard).as_ref() {
211                        if self.children.validate(scanner.metadata()) {
212                            // Data race resolution - see `LeafNode::search_entry`.
213                            if let Some(scanner) = child.max_le_appr(key, guard) {
214                                return Some(scanner);
215                            }
216                            // Fallback.
217                            break;
218                        }
219                    }
220                    // It is not a hot loop - see `LeafNode::search_entry`.
221                    continue;
222                }
223            }
224            // Fallback.
225            break;
226        }
227
228        // Starts scanning from the minimum key.
229        let mut min_scanner = self.min(guard)?;
230        min_scanner.next();
231        loop {
232            if let Some((k, _)) = min_scanner.get() {
233                if key.compare(k).is_ge() {
234                    return Some(min_scanner);
235                }
236                break;
237            }
238            min_scanner = min_scanner.jump(None, guard)?;
239        }
240
241        None
242    }
243
244    /// Inserts a key-value pair.
245    #[inline]
246    pub(super) fn insert<W: TryWait>(
247        &self,
248        mut key: K,
249        mut val: V,
250        async_wait: &mut W,
251        guard: &Guard,
252    ) -> Result<InsertResult<K, V>, (K, V)> {
253        loop {
254            let (child, metadata) = self.children.min_greater_equal(&key);
255            if let Some((child_key, child)) = child {
256                let child_ptr = child.load(Acquire, guard);
257                if let Some(child_ref) = child_ptr.as_ref() {
258                    if self.children.validate(metadata) {
259                        // Data race resolution - see `LeafNode::search_entry`.
260                        let insert_result = child_ref.insert(key, val, async_wait, guard)?;
261                        match insert_result {
262                            InsertResult::Success
263                            | InsertResult::Duplicate(..)
264                            | InsertResult::Frozen(..) => return Ok(insert_result),
265                            InsertResult::Full(k, v) => {
266                                let split_result = self.split_node(
267                                    (k, v),
268                                    Some(child_key),
269                                    child_ptr,
270                                    child,
271                                    false,
272                                    async_wait,
273                                    guard,
274                                )?;
275                                if let InsertResult::Retry(k, v) = split_result {
276                                    key = k;
277                                    val = v;
278                                    continue;
279                                }
280                                return Ok(split_result);
281                            }
282                            InsertResult::Retired(k, v) => {
283                                debug_assert!(child_ref.retired());
284                                if self.coalesce(guard) == RemoveResult::Retired {
285                                    debug_assert!(self.retired());
286                                    return Ok(InsertResult::Retired(k, v));
287                                }
288                                return Err((k, v));
289                            }
290                            InsertResult::Retry(k, v) => {
291                                // `child` has been split, therefore it can be retried.
292                                if self.cleanup_link(&k, false, guard) {
293                                    key = k;
294                                    val = v;
295                                    continue;
296                                }
297                                return Ok(InsertResult::Retry(k, v));
298                            }
299                        };
300                    }
301                }
302                // It is not a hot loop - see `LeafNode::search_entry`.
303                continue;
304            }
305
306            let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
307            if let Some(unbounded) = unbounded_ptr.as_ref() {
308                debug_assert!(unbounded_ptr.tag() == Tag::None);
309                if !self.children.validate(metadata) {
310                    continue;
311                }
312                let insert_result = unbounded.insert(key, val, async_wait, guard)?;
313                match insert_result {
314                    InsertResult::Success
315                    | InsertResult::Duplicate(..)
316                    | InsertResult::Frozen(..) => return Ok(insert_result),
317                    InsertResult::Full(k, v) => {
318                        let split_result = self.split_node(
319                            (k, v),
320                            None,
321                            unbounded_ptr,
322                            &self.unbounded_child,
323                            false,
324                            async_wait,
325                            guard,
326                        )?;
327                        if let InsertResult::Retry(k, v) = split_result {
328                            key = k;
329                            val = v;
330                            continue;
331                        }
332                        return Ok(split_result);
333                    }
334                    InsertResult::Retired(k, v) => {
335                        debug_assert!(unbounded.retired());
336                        if self.coalesce(guard) == RemoveResult::Retired {
337                            debug_assert!(self.retired());
338                            return Ok(InsertResult::Retired(k, v));
339                        }
340                        return Err((k, v));
341                    }
342                    InsertResult::Retry(k, v) => {
343                        if self.cleanup_link(&k, false, guard) {
344                            key = k;
345                            val = v;
346                            continue;
347                        }
348                        return Ok(InsertResult::Retry(k, v));
349                    }
350                };
351            }
352            debug_assert!(unbounded_ptr.tag() == RETIRED);
353            return Ok(InsertResult::Retired(key, val));
354        }
355    }
356
357    /// Removes an entry associated with the given key.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if a retry is required.
362    #[inline]
363    pub(super) fn remove_if<Q, F: FnMut(&V) -> bool, W>(
364        &self,
365        key: &Q,
366        condition: &mut F,
367        async_wait: &mut W,
368        guard: &Guard,
369    ) -> Result<RemoveResult, ()>
370    where
371        Q: Comparable<K> + ?Sized,
372        W: TryWait,
373    {
374        loop {
375            let (child, metadata) = self.children.min_greater_equal(key);
376            if let Some((_, child)) = child {
377                let child_ptr = child.load(Acquire, guard);
378                if let Some(child) = child_ptr.as_ref() {
379                    if self.children.validate(metadata) {
380                        // Data race resolution - see `LeafNode::search_entry`.
381                        let result =
382                            child.remove_if::<_, _, _>(key, condition, async_wait, guard)?;
383                        if result == RemoveResult::Cleanup {
384                            if self.cleanup_link(key, false, guard) {
385                                return Ok(RemoveResult::Success);
386                            }
387                            return Ok(RemoveResult::Cleanup);
388                        }
389                        if result == RemoveResult::Retired {
390                            return Ok(self.coalesce(guard));
391                        }
392                        return Ok(result);
393                    }
394                }
395                // It is not a hot loop - see `LeafNode::search_entry`.
396                continue;
397            }
398            let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
399            if let Some(unbounded) = unbounded_ptr.as_ref() {
400                debug_assert!(unbounded_ptr.tag() == Tag::None);
401                if !self.children.validate(metadata) {
402                    // Data race resolution - see `LeafNode::search_entry`.
403                    continue;
404                }
405                let result = unbounded.remove_if::<_, _, _>(key, condition, async_wait, guard)?;
406                if result == RemoveResult::Cleanup {
407                    if self.cleanup_link(key, false, guard) {
408                        return Ok(RemoveResult::Success);
409                    }
410                    return Ok(RemoveResult::Cleanup);
411                }
412                if result == RemoveResult::Retired {
413                    return Ok(self.coalesce(guard));
414                }
415                return Ok(result);
416            }
417            return Ok(RemoveResult::Fail);
418        }
419    }
420
421    /// Removes a range of entries.
422    ///
423    /// Returns the number of remaining children.
424    #[allow(clippy::too_many_lines)]
425    #[inline]
426    pub(super) fn remove_range<'g, Q, R: RangeBounds<Q>, W: TryWait>(
427        &self,
428        range: &R,
429        start_unbounded: bool,
430        valid_lower_max_leaf: Option<&'g Leaf<K, V>>,
431        valid_upper_min_node: Option<&'g Node<K, V>>,
432        async_wait: &mut W,
433        guard: &'g Guard,
434    ) -> Result<usize, ()>
435    where
436        Q: Comparable<K> + ?Sized,
437    {
438        debug_assert!(valid_lower_max_leaf.is_none() || start_unbounded);
439        debug_assert!(valid_lower_max_leaf.is_none() || valid_upper_min_node.is_none());
440
441        let Some(_lock) = Locker::try_lock(self) else {
442            async_wait.try_wait(&self.lock);
443            return Err(());
444        };
445
446        let mut current_state = RemoveRangeState::Below;
447        let mut num_children = 1;
448        let mut lower_border = None;
449        let mut upper_border = None;
450
451        for (key, node) in Scanner::new(&self.children) {
452            current_state = current_state.next(key, range, start_unbounded);
453            match current_state {
454                RemoveRangeState::Below => {
455                    num_children += 1;
456                }
457                RemoveRangeState::MaybeBelow => {
458                    debug_assert!(!start_unbounded);
459                    num_children += 1;
460                    lower_border.replace((Some(key), node));
461                }
462                RemoveRangeState::FullyContained => {
463                    // There can be another thread inserting keys into the node, and this may
464                    // render those concurrent operations completely ineffective.
465                    self.children.remove_if(key, &mut |_| true);
466                    node.swap((None, Tag::None), AcqRel);
467                }
468                RemoveRangeState::MaybeAbove => {
469                    if valid_upper_min_node.is_some() {
470                        // `valid_upper_min_node` is not in this sub-tree.
471                        self.children.remove_if(key, &mut |_| true);
472                        node.swap((None, Tag::None), AcqRel);
473                    } else {
474                        num_children += 1;
475                        upper_border.replace(node);
476                    }
477                    break;
478                }
479            }
480        }
481
482        // Now, examine the unbounded child.
483        match current_state {
484            RemoveRangeState::Below => {
485                // The unbounded child is the only child, or all the children are below the range.
486                debug_assert!(lower_border.is_none() && upper_border.is_none());
487                if valid_upper_min_node.is_some() {
488                    lower_border.replace((None, &self.unbounded_child));
489                } else {
490                    upper_border.replace(&self.unbounded_child);
491                }
492            }
493            RemoveRangeState::MaybeBelow => {
494                debug_assert!(!start_unbounded);
495                debug_assert!(lower_border.is_some() && upper_border.is_none());
496                upper_border.replace(&self.unbounded_child);
497            }
498            RemoveRangeState::FullyContained => {
499                debug_assert!(upper_border.is_none());
500                upper_border.replace(&self.unbounded_child);
501            }
502            RemoveRangeState::MaybeAbove => {
503                debug_assert!(upper_border.is_some());
504            }
505        }
506
507        if let Some(lower_leaf) = valid_lower_max_leaf {
508            // It is currently in the middle of a recursive call: pass `lower_leaf` to connect leaves.
509            debug_assert!(start_unbounded && lower_border.is_none() && upper_border.is_some());
510            if let Some(upper_node) = upper_border.and_then(|n| n.load(Acquire, guard).as_ref()) {
511                upper_node.remove_range(range, true, Some(lower_leaf), None, async_wait, guard)?;
512            }
513        } else if let Some(upper_node) = valid_upper_min_node {
514            // Pass `upper_node` to the lower leaf to connect leaves, so that this method can be
515            // recursively invoked on `upper_node`.
516            debug_assert!(lower_border.is_some());
517            if let Some((Some(key), lower_node)) = lower_border {
518                self.children.remove_if(key, &mut |_| true);
519                self.unbounded_child
520                    .swap((lower_node.get_shared(Acquire, guard), Tag::None), AcqRel);
521                lower_node.swap((None, Tag::None), Release);
522            }
523            if let Some(lower_node) = self.unbounded_child.load(Acquire, guard).as_ref() {
524                lower_node.remove_range(
525                    range,
526                    start_unbounded,
527                    None,
528                    Some(upper_node),
529                    async_wait,
530                    guard,
531                )?;
532            }
533        } else {
534            let lower_node = lower_border.and_then(|n| n.1.load(Acquire, guard).as_ref());
535            let upper_node = upper_border.and_then(|n| n.load(Acquire, guard).as_ref());
536            match (lower_node, upper_node) {
537                (_, None) => (),
538                (None, Some(upper_node)) => {
539                    upper_node.remove_range(
540                        range,
541                        start_unbounded,
542                        None,
543                        None,
544                        async_wait,
545                        guard,
546                    )?;
547                }
548                (Some(lower_node), Some(upper_node)) => {
549                    debug_assert!(!ptr::eq(lower_node, upper_node));
550                    lower_node.remove_range(
551                        range,
552                        start_unbounded,
553                        None,
554                        Some(upper_node),
555                        async_wait,
556                        guard,
557                    )?;
558                }
559            }
560        }
561
562        Ok(num_children)
563    }
564
565    /// Splits a full node.
566    ///
567    /// # Errors
568    ///
569    /// Returns an error if a retry is required.
570    #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
571    pub(super) fn split_node<W: TryWait>(
572        &self,
573        entry: (K, V),
574        full_node_key: Option<&K>,
575        full_node_ptr: Ptr<Node<K, V>>,
576        full_node: &AtomicShared<Node<K, V>>,
577        root_split: bool,
578        async_wait: &mut W,
579        guard: &Guard,
580    ) -> Result<InsertResult<K, V>, (K, V)> {
581        let target = full_node_ptr.as_ref().unwrap();
582        if !self.lock.try_lock() {
583            target.rollback(guard);
584            async_wait.try_wait(&self.lock);
585            return Err(entry);
586        }
587        debug_assert!(!self.retired());
588
589        if full_node_ptr != full_node.load(Relaxed, guard) {
590            self.lock.release_lock();
591            target.rollback(guard);
592            return Err(entry);
593        }
594
595        let prev = self
596            .split_op
597            .origin_node
598            .swap((full_node.get_shared(Relaxed, guard), Tag::None), Relaxed)
599            .0;
600        debug_assert!(prev.is_none());
601
602        if let Some(full_node_key) = full_node_key {
603            self.split_op
604                .origin_node_key
605                .store(ptr::from_ref(full_node_key).cast_mut(), Relaxed);
606        }
607
608        let mut exit_guard = ExitGuard::new(true, |rollback| {
609            if rollback {
610                self.rollback(guard);
611            }
612        });
613        match target {
614            Node::Internal(full_internal_node) => {
615                // Copies nodes except for the known full node to the newly allocated internal node entries.
616                let internal_nodes = (
617                    Shared::new(Node::new_internal_node()),
618                    Shared::new(Node::new_internal_node()),
619                );
620                let Node::Internal(low_key_nodes) = internal_nodes.0.as_ref() else {
621                    unreachable!()
622                };
623                let Node::Internal(high_key_nodes) = internal_nodes.1.as_ref() else {
624                    unreachable!()
625                };
626
627                // Builds a list of valid nodes.
628                #[allow(clippy::type_complexity)]
629                let mut entry_array: [Option<(
630                    Option<&K>,
631                    AtomicShared<Node<K, V>>,
632                )>;
633                    DIMENSION.num_entries + 2] = Default::default();
634                let mut num_entries = 0;
635                let scanner = Scanner::new(&full_internal_node.children);
636                let recommended_boundary = Leaf::<K, V>::optimal_boundary(scanner.metadata());
637                for entry in scanner {
638                    if unsafe {
639                        full_internal_node
640                            .split_op
641                            .origin_node_key
642                            .load(Relaxed)
643                            .as_ref()
644                            .map_or_else(|| false, |key| entry.0 == key)
645                    } {
646                        let low_key_node_ptr = full_internal_node
647                            .split_op
648                            .low_key_node
649                            .load(Relaxed, guard);
650                        if !low_key_node_ptr.is_null() {
651                            entry_array[num_entries].replace((
652                                Some(unsafe {
653                                    full_internal_node
654                                        .split_op
655                                        .middle_key
656                                        .load(Relaxed)
657                                        .as_ref()
658                                        .unwrap()
659                                }),
660                                full_internal_node
661                                    .split_op
662                                    .low_key_node
663                                    .clone(Relaxed, guard),
664                            ));
665                            num_entries += 1;
666                        }
667                        let high_key_node_ptr = full_internal_node
668                            .split_op
669                            .high_key_node
670                            .load(Relaxed, guard);
671                        if !high_key_node_ptr.is_null() {
672                            entry_array[num_entries].replace((
673                                Some(entry.0),
674                                full_internal_node
675                                    .split_op
676                                    .high_key_node
677                                    .clone(Relaxed, guard),
678                            ));
679                            num_entries += 1;
680                        }
681                    } else {
682                        entry_array[num_entries]
683                            .replace((Some(entry.0), entry.1.clone(Acquire, guard)));
684                        num_entries += 1;
685                    }
686                }
687                if full_internal_node
688                    .split_op
689                    .origin_node_key
690                    .load(Relaxed)
691                    .is_null()
692                {
693                    // If the origin is an unbounded node, assign the high key node to the high key
694                    // node's unbounded.
695                    let low_key_node_ptr = full_internal_node
696                        .split_op
697                        .low_key_node
698                        .load(Relaxed, guard);
699                    if !low_key_node_ptr.is_null() {
700                        entry_array[num_entries].replace((
701                            Some(unsafe {
702                                full_internal_node
703                                    .split_op
704                                    .middle_key
705                                    .load(Relaxed)
706                                    .as_ref()
707                                    .unwrap()
708                            }),
709                            full_internal_node
710                                .split_op
711                                .low_key_node
712                                .clone(Relaxed, guard),
713                        ));
714                        num_entries += 1;
715                    }
716                    let high_key_node_ptr = full_internal_node
717                        .split_op
718                        .high_key_node
719                        .load(Relaxed, guard);
720                    if !high_key_node_ptr.is_null() {
721                        entry_array[num_entries].replace((
722                            None,
723                            full_internal_node
724                                .split_op
725                                .high_key_node
726                                .clone(Relaxed, guard),
727                        ));
728                        num_entries += 1;
729                    }
730                } else {
731                    // If the origin is a bounded node, assign the unbounded node to the high key
732                    // node's unbounded.
733                    entry_array[num_entries].replace((
734                        None,
735                        full_internal_node.unbounded_child.clone(Relaxed, guard),
736                    ));
737                    num_entries += 1;
738                }
739                debug_assert!(num_entries >= 2);
740
741                let low_key_node_array_size = recommended_boundary.min(num_entries - 1);
742                for (i, entry) in entry_array.iter().enumerate() {
743                    if let Some((k, v)) = entry {
744                        match (i + 1).cmp(&low_key_node_array_size) {
745                            Less => {
746                                low_key_nodes.children.insert_unchecked(
747                                    k.unwrap().clone(),
748                                    v.clone(Relaxed, guard),
749                                    i,
750                                );
751                            }
752                            Equal => {
753                                if let Some(&k) = k.as_ref() {
754                                    self.split_op
755                                        .middle_key
756                                        .store(ptr::from_ref(k).cast_mut(), Relaxed);
757                                }
758                                low_key_nodes
759                                    .unbounded_child
760                                    .swap((v.get_shared(Relaxed, guard), Tag::None), Relaxed);
761                            }
762                            Greater => {
763                                if let Some(k) = k.cloned() {
764                                    high_key_nodes.children.insert_unchecked(
765                                        k,
766                                        v.clone(Relaxed, guard),
767                                        i - low_key_node_array_size,
768                                    );
769                                } else {
770                                    high_key_nodes
771                                        .unbounded_child
772                                        .swap((v.get_shared(Relaxed, guard), Tag::None), Relaxed);
773                                }
774                            }
775                        }
776                    } else {
777                        break;
778                    }
779                }
780
781                // Turns the new nodes into internal nodes.
782                self.split_op
783                    .low_key_node
784                    .swap((Some(internal_nodes.0), Tag::None), Relaxed);
785                self.split_op
786                    .high_key_node
787                    .swap((Some(internal_nodes.1), Tag::None), Relaxed);
788            }
789            Node::Leaf(full_leaf_node) => {
790                // Copies leaves except for the known full leaf to the newly allocated leaf node entries.
791                let leaf_nodes = (
792                    Shared::new(Node::new_leaf_node()),
793                    Shared::new(Node::new_leaf_node()),
794                );
795                let low_key_leaf_node = if let Node::Leaf(low_key_leaf_node) = leaf_nodes.0.as_ref()
796                {
797                    Some(low_key_leaf_node)
798                } else {
799                    None
800                };
801                let high_key_leaf_node =
802                    if let Node::Leaf(high_key_leaf_node) = &leaf_nodes.1.as_ref() {
803                        Some(high_key_leaf_node)
804                    } else {
805                        None
806                    };
807
808                self.split_op.middle_key.store(
809                    ptr::from_ref(full_leaf_node.split_leaf_node(
810                        low_key_leaf_node.unwrap(),
811                        high_key_leaf_node.unwrap(),
812                        guard,
813                    ))
814                    .cast_mut(),
815                    Relaxed,
816                );
817
818                // Turns the new leaves into leaf nodes.
819                self.split_op
820                    .low_key_node
821                    .swap((Some(leaf_nodes.0), Tag::None), Relaxed);
822                self.split_op
823                    .high_key_node
824                    .swap((Some(leaf_nodes.1), Tag::None), Relaxed);
825            }
826        }
827
828        // Inserts the newly allocated internal nodes into the main array.
829        match self.children.insert(
830            unsafe {
831                self.split_op
832                    .middle_key
833                    .load(Relaxed)
834                    .as_ref()
835                    .unwrap()
836                    .clone()
837            },
838            self.split_op.low_key_node.clone(Relaxed, guard),
839        ) {
840            InsertResult::Success => (),
841            InsertResult::Duplicate(..) | InsertResult::Frozen(..) | InsertResult::Retry(..) => {
842                unreachable!()
843            }
844            InsertResult::Full(..) | InsertResult::Retired(..) => {
845                // Insertion failed: expects that the parent splits this node.
846                *exit_guard = false;
847                return Ok(InsertResult::Full(entry.0, entry.1));
848            }
849        }
850        *exit_guard = false;
851
852        // Replace the full node with the high-key node.
853        let unused_node = full_node
854            .swap(
855                (
856                    self.split_op.high_key_node.get_shared(Relaxed, guard),
857                    Tag::None,
858                ),
859                Release,
860            )
861            .0;
862
863        if root_split {
864            // Return without unlocking it.
865            return Ok(InsertResult::Retry(entry.0, entry.1));
866        }
867
868        // Unlock the node.
869        self.finish_split();
870
871        // Drop the deprecated nodes.
872        if let Some(unused_node) = unused_node {
873            // Clean up the split operation by committing it.
874            unused_node.commit(guard);
875            let _: bool = unused_node.release();
876        }
877
878        // Since a new node has been inserted, the caller can retry.
879        Ok(InsertResult::Retry(entry.0, entry.1))
880    }
881
882    /// Finishes splitting the [`InternalNode`].
883    #[inline]
884    pub(super) fn finish_split(&self) {
885        let origin = self.split_op.reset();
886        self.lock.release_lock();
887        origin.map(Shared::release);
888    }
889
890    /// Commits an ongoing structural change recursively.
891    #[inline]
892    pub(super) fn commit(&self, guard: &Guard) {
893        let origin = self.split_op.reset();
894
895        // Prevent further exclusive access to the internal node.
896        self.lock.poison_lock();
897        if let Some(origin) = origin {
898            origin.commit(guard);
899            let _: bool = origin.release();
900        }
901    }
902
903    /// Rolls back the ongoing split operation recursively.
904    #[inline]
905    pub(super) fn rollback(&self, guard: &Guard) {
906        let origin = self.split_op.reset();
907        self.lock.release_lock();
908        if let Some(origin) = origin {
909            origin.rollback(guard);
910            let _: bool = origin.release();
911        }
912    }
913
914    /// Cleans up logically deleted leaves in the linked list.
915    ///
916    /// Returns `false` if the target leaf node does not exist in the subtree.
917    #[inline]
918    pub(super) fn cleanup_link<'g, Q>(&self, key: &Q, traverse_max: bool, guard: &'g Guard) -> bool
919    where
920        K: 'g,
921        Q: Comparable<K> + ?Sized,
922    {
923        if traverse_max {
924            // It just has to search for the maximum leaf node in the tree.
925            if let Some(unbounded) = self.unbounded_child.load(Acquire, guard).as_ref() {
926                return unbounded.cleanup_link(key, true, guard);
927            }
928        } else if let Some(child_scanner) = Scanner::max_less(&self.children, key) {
929            if let Some((_, child)) = child_scanner.get() {
930                if let Some(child) = child.load(Acquire, guard).as_ref() {
931                    return child.cleanup_link(key, true, guard);
932                }
933            }
934        }
935        false
936    }
937
938    /// Tries to coalesce nodes.
939    fn coalesce(&self, guard: &Guard) -> RemoveResult {
940        let mut node_deleted = false;
941        while let Some(lock) = Locker::try_lock(self) {
942            let mut max_key_entry = None;
943            for (key, node) in Scanner::new(&self.children) {
944                let node_ptr = node.load(Acquire, guard);
945                let node_ref = node_ptr.as_ref().unwrap();
946                if node_ref.retired() {
947                    let result = self.children.remove_if(key, &mut |_| true);
948                    debug_assert_ne!(result, RemoveResult::Fail);
949
950                    // Once the key is removed, it is safe to deallocate the node as the validation
951                    // loop ensures the absence of readers.
952                    if let Some(node) = node.swap((None, Tag::None), Release).0 {
953                        let _: bool = node.release();
954                        node_deleted = true;
955                    }
956                } else {
957                    max_key_entry.replace((key, node));
958                }
959            }
960
961            // The unbounded node is replaced with the maximum key node if retired.
962            let unbounded_ptr = self.unbounded_child.load(Acquire, guard);
963            let fully_empty = if let Some(unbounded) = unbounded_ptr.as_ref() {
964                if unbounded.retired() {
965                    if let Some((key, max_key_child)) = max_key_entry {
966                        if let Some(obsolete_node) = self
967                            .unbounded_child
968                            .swap(
969                                (max_key_child.get_shared(Relaxed, guard), Tag::None),
970                                Release,
971                            )
972                            .0
973                        {
974                            debug_assert!(obsolete_node.retired());
975                            let _: bool = obsolete_node.release();
976                            node_deleted = true;
977                        }
978                        let result = self.children.remove_if(key, &mut |_| true);
979                        debug_assert_ne!(result, RemoveResult::Fail);
980                        if let Some(node) = max_key_child.swap((None, Tag::None), Release).0 {
981                            let _: bool = node.release();
982                            node_deleted = true;
983                        }
984                        false
985                    } else {
986                        if let Some(obsolete_node) =
987                            self.unbounded_child.swap((None, RETIRED), Release).0
988                        {
989                            debug_assert!(obsolete_node.retired());
990                            let _: bool = obsolete_node.release();
991                            node_deleted = true;
992                        }
993                        true
994                    }
995                } else {
996                    false
997                }
998            } else {
999                debug_assert!(unbounded_ptr.tag() == RETIRED);
1000                true
1001            };
1002
1003            if fully_empty {
1004                return RemoveResult::Retired;
1005            }
1006
1007            drop(lock);
1008            if !self.has_retired_node(guard) {
1009                break;
1010            }
1011        }
1012
1013        if node_deleted {
1014            RemoveResult::Cleanup
1015        } else {
1016            RemoveResult::Success
1017        }
1018    }
1019
1020    /// Checks if the [`InternalNode`] has a retired [`Node`].
1021    fn has_retired_node(&self, guard: &Guard) -> bool {
1022        let mut has_valid_node = false;
1023        for entry in Scanner::new(&self.children) {
1024            let leaf_ptr = entry.1.load(Relaxed, guard);
1025            if let Some(leaf) = leaf_ptr.as_ref() {
1026                if leaf.retired() {
1027                    return true;
1028                }
1029                has_valid_node = true;
1030            }
1031        }
1032        if !has_valid_node {
1033            let unbounded_ptr = self.unbounded_child.load(Relaxed, guard);
1034            if let Some(unbounded) = unbounded_ptr.as_ref() {
1035                if unbounded.retired() {
1036                    return true;
1037                }
1038            }
1039        }
1040        false
1041    }
1042}
1043
1044impl<'n, K, V> Locker<'n, K, V> {
1045    /// Acquires exclusive lock on the [`InternalNode`].
1046    #[inline]
1047    pub(super) fn try_lock(internal_node: &'n InternalNode<K, V>) -> Option<Locker<'n, K, V>> {
1048        if internal_node.lock.try_lock() {
1049            Some(Locker { internal_node })
1050        } else {
1051            None
1052        }
1053    }
1054
1055    /// Retires the node with the lock released.
1056    pub(super) fn unlock_retire(self) {
1057        self.internal_node.lock.poison_lock();
1058        forget(self);
1059    }
1060}
1061
1062impl<K, V> Drop for Locker<'_, K, V> {
1063    #[inline]
1064    fn drop(&mut self) {
1065        self.internal_node.lock.release_lock();
1066    }
1067}
1068
1069impl<K, V> StructuralChange<K, V> {
1070    fn reset(&self) -> Option<Shared<Node<K, V>>> {
1071        self.origin_node_key.store(ptr::null_mut(), Relaxed);
1072        self.low_key_node.swap((None, Tag::None), Relaxed);
1073        self.middle_key.store(ptr::null_mut(), Relaxed);
1074        self.high_key_node.swap((None, Tag::None), Relaxed);
1075        self.origin_node.swap((None, Tag::None), Relaxed).0
1076    }
1077}
1078
1079impl<K, V> Default for StructuralChange<K, V> {
1080    #[inline]
1081    fn default() -> Self {
1082        Self {
1083            origin_node_key: AtomicPtr::default(),
1084            origin_node: AtomicShared::null(),
1085            low_key_node: AtomicShared::null(),
1086            middle_key: AtomicPtr::default(),
1087            high_key_node: AtomicShared::null(),
1088        }
1089    }
1090}
1091
1092#[cfg(not(feature = "loom"))]
1093#[cfg(test)]
1094mod test {
1095    use super::*;
1096    use std::sync::atomic::AtomicBool;
1097    use tokio::sync::Barrier;
1098
1099    fn new_level_3_node() -> InternalNode<usize, usize> {
1100        InternalNode {
1101            children: Leaf::new(),
1102            unbounded_child: AtomicShared::new(Node::Internal(InternalNode {
1103                children: Leaf::new(),
1104                unbounded_child: AtomicShared::new(Node::new_leaf_node()),
1105                split_op: StructuralChange::default(),
1106                lock: Lock::default(),
1107            })),
1108            split_op: StructuralChange::default(),
1109            lock: Lock::default(),
1110        }
1111    }
1112
1113    #[test]
1114    fn bulk() {
1115        let internal_node = new_level_3_node();
1116        let guard = Guard::new();
1117        assert_eq!(internal_node.depth(1, &guard), 3);
1118
1119        let data_size = if cfg!(miri) { 256 } else { 8192 };
1120        for k in 0..data_size {
1121            match internal_node.insert(k, k, &mut (), &guard) {
1122                Ok(result) => match result {
1123                    InsertResult::Success => {
1124                        assert_eq!(internal_node.search_entry(&k, &guard), Some((&k, &k)));
1125                    }
1126                    InsertResult::Duplicate(..)
1127                    | InsertResult::Frozen(..)
1128                    | InsertResult::Retired(..) => unreachable!(),
1129                    InsertResult::Full(_, _) => {
1130                        internal_node.rollback(&guard);
1131                        for j in 0..k {
1132                            assert_eq!(internal_node.search_entry(&j, &guard), Some((&j, &j)));
1133                            if j == k - 1 {
1134                                assert!(matches!(
1135                                    internal_node.remove_if::<_, _, _>(
1136                                        &j,
1137                                        &mut |_| true,
1138                                        &mut (),
1139                                        &guard
1140                                    ),
1141                                    Ok(RemoveResult::Retired)
1142                                ));
1143                            } else {
1144                                assert!(
1145                                    internal_node
1146                                        .remove_if::<_, _, _>(&j, &mut |_| true, &mut (), &guard)
1147                                        .is_ok(),
1148                                );
1149                            }
1150                            assert_eq!(internal_node.search_entry(&j, &guard), None);
1151                        }
1152                        break;
1153                    }
1154                    InsertResult::Retry(k, v) => {
1155                        let result = internal_node.insert(k, v, &mut (), &guard);
1156                        assert!(result.is_ok());
1157                        assert_eq!(internal_node.search_entry(&k, &guard), Some((&k, &k)));
1158                    }
1159                },
1160                Err((k, v)) => {
1161                    let result = internal_node.insert(k, v, &mut (), &guard);
1162                    assert!(result.is_ok());
1163                    assert_eq!(internal_node.search_entry(&k, &guard), Some((&k, &k)));
1164                }
1165            }
1166        }
1167    }
1168
1169    #[cfg_attr(miri, ignore)]
1170    #[tokio::test(flavor = "multi_thread", worker_threads = 16)]
1171    async fn parallel() {
1172        let num_tasks = 8;
1173        let workload_size = 64;
1174        let barrier = Shared::new(Barrier::new(num_tasks));
1175        for _ in 0..64 {
1176            let internal_node = Shared::new(new_level_3_node());
1177            assert!(
1178                internal_node
1179                    .insert(usize::MAX, usize::MAX, &mut (), &Guard::new())
1180                    .is_ok()
1181            );
1182            let mut task_handles = Vec::with_capacity(num_tasks);
1183            for task_id in 0..num_tasks {
1184                let barrier_clone = barrier.clone();
1185                let internal_node_clone = internal_node.clone();
1186                task_handles.push(tokio::task::spawn(async move {
1187                    barrier_clone.wait().await;
1188                    let guard = Guard::new();
1189                    let mut max_key = None;
1190                    let range = (task_id * workload_size)..((task_id + 1) * workload_size);
1191                    for id in range.clone() {
1192                        loop {
1193                            if let Ok(r) = internal_node_clone.insert(id, id, &mut (), &guard) {
1194                                match r {
1195                                    InsertResult::Success => {
1196                                        match internal_node_clone.insert(id, id, &mut (), &guard) {
1197                                            Ok(InsertResult::Duplicate(..)) | Err(_) => (),
1198                                            _ => unreachable!(),
1199                                        }
1200                                        break;
1201                                    }
1202                                    InsertResult::Full(..) => {
1203                                        internal_node_clone.rollback(&guard);
1204                                        max_key.replace(id);
1205                                        break;
1206                                    }
1207                                    InsertResult::Frozen(..) | InsertResult::Retry(..) => (),
1208                                    _ => unreachable!(),
1209                                }
1210                            }
1211                        }
1212                        if max_key.is_some() {
1213                            break;
1214                        }
1215                    }
1216                    for id in range.clone() {
1217                        if max_key == Some(id) {
1218                            break;
1219                        }
1220                        assert_eq!(
1221                            internal_node_clone.search_entry(&id, &guard),
1222                            Some((&id, &id))
1223                        );
1224                    }
1225                    for id in range {
1226                        if max_key == Some(id) {
1227                            break;
1228                        }
1229                        loop {
1230                            if let Ok(r) = internal_node_clone.remove_if::<_, _, _>(
1231                                &id,
1232                                &mut |_| true,
1233                                &mut (),
1234                                &guard,
1235                            ) {
1236                                match r {
1237                                    RemoveResult::Success
1238                                    | RemoveResult::Cleanup
1239                                    | RemoveResult::Fail => break,
1240                                    RemoveResult::Frozen | RemoveResult::Retired => unreachable!(),
1241                                }
1242                            }
1243                        }
1244                        assert!(internal_node_clone.search_entry(&id, &guard).is_none());
1245                        if let Ok(RemoveResult::Success) = internal_node_clone.remove_if::<_, _, _>(
1246                            &id,
1247                            &mut |_| true,
1248                            &mut (),
1249                            &guard,
1250                        ) {
1251                            unreachable!()
1252                        }
1253                    }
1254                }));
1255            }
1256
1257            for r in futures::future::join_all(task_handles).await {
1258                assert!(r.is_ok());
1259            }
1260            assert!(
1261                internal_node
1262                    .remove_if::<_, _, _>(&usize::MAX, &mut |_| true, &mut (), &Guard::new())
1263                    .is_ok()
1264            );
1265        }
1266    }
1267
1268    #[cfg_attr(miri, ignore)]
1269    #[tokio::test(flavor = "multi_thread", worker_threads = 16)]
1270    async fn durability() {
1271        let num_tasks = 8_usize;
1272        let num_iterations = 64;
1273        let workload_size = 64_usize;
1274        for k in 0..64 {
1275            let fixed_point = k * 16;
1276            for _ in 0..=num_iterations {
1277                let barrier = Shared::new(Barrier::new(num_tasks));
1278                let internal_node = Shared::new(new_level_3_node());
1279                let inserted: Shared<AtomicBool> = Shared::new(AtomicBool::new(false));
1280                let mut task_handles = Vec::with_capacity(num_tasks);
1281                for _ in 0..num_tasks {
1282                    let barrier_clone = barrier.clone();
1283                    let internal_node_clone = internal_node.clone();
1284                    let inserted_clone = inserted.clone();
1285                    task_handles.push(tokio::spawn(async move {
1286                        {
1287                            barrier_clone.wait().await;
1288                            let guard = Guard::new();
1289                            match internal_node_clone.insert(
1290                                fixed_point,
1291                                fixed_point,
1292                                &mut (),
1293                                &guard,
1294                            ) {
1295                                Ok(InsertResult::Success) => {
1296                                    assert!(!inserted_clone.swap(true, Relaxed));
1297                                }
1298                                Ok(InsertResult::Full(_, _) | InsertResult::Retired(_, _)) => {
1299                                    internal_node_clone.rollback(&guard);
1300                                }
1301                                _ => (),
1302                            }
1303                            assert_eq!(
1304                                internal_node_clone
1305                                    .search_entry(&fixed_point, &guard)
1306                                    .unwrap(),
1307                                (&fixed_point, &fixed_point)
1308                            );
1309                        }
1310                        {
1311                            barrier_clone.wait().await;
1312                            let guard = Guard::new();
1313                            for i in 0..workload_size {
1314                                if i != fixed_point {
1315                                    if let Ok(
1316                                        InsertResult::Full(_, _) | InsertResult::Retired(_, _),
1317                                    ) = internal_node_clone.insert(i, i, &mut (), &guard)
1318                                    {
1319                                        internal_node_clone.rollback(&guard);
1320                                    }
1321                                }
1322                                assert_eq!(
1323                                    internal_node_clone
1324                                        .search_entry(&fixed_point, &guard)
1325                                        .unwrap(),
1326                                    (&fixed_point, &fixed_point)
1327                                );
1328                            }
1329                            for i in 0..workload_size {
1330                                let max_scanner = internal_node_clone
1331                                    .max_le_appr(&fixed_point, &guard)
1332                                    .unwrap();
1333                                assert!(*max_scanner.get().unwrap().0 <= fixed_point);
1334                                let mut min_scanner = internal_node_clone.min(&guard).unwrap();
1335                                if let Some((f, v)) = min_scanner.next() {
1336                                    assert_eq!(*f, *v);
1337                                    assert!(*f <= fixed_point);
1338                                } else {
1339                                    let (f, v) =
1340                                        min_scanner.jump(None, &guard).unwrap().get().unwrap();
1341                                    assert_eq!(*f, *v);
1342                                    assert!(*f <= fixed_point);
1343                                }
1344                                let _result = internal_node_clone.remove_if::<_, _, _>(
1345                                    &i,
1346                                    &mut |v| *v != fixed_point,
1347                                    &mut (),
1348                                    &guard,
1349                                );
1350                                assert_eq!(
1351                                    internal_node_clone
1352                                        .search_entry(&fixed_point, &guard)
1353                                        .unwrap(),
1354                                    (&fixed_point, &fixed_point)
1355                                );
1356                            }
1357                        }
1358                    }));
1359                }
1360                for r in futures::future::join_all(task_handles).await {
1361                    assert!(r.is_ok());
1362                }
1363                assert!((*inserted).load(Relaxed));
1364            }
1365        }
1366    }
1367}