use crate::ebr::{AtomicShared, Guard, Shared};
use crate::LinkedList;
use std::borrow::Borrow;
use std::cmp::Ordering;
use std::fmt::{self, Debug};
use std::mem::{needs_drop, MaybeUninit};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
pub struct Leaf<K, V>
where
    K: 'static + Clone + Ord,
    V: 'static + Clone,
{
    metadata: AtomicUsize,
    entry_array: EntryArray<K, V>,
    link: AtomicShared<Leaf<K, V>>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct Dimension {
    pub num_entries: usize,
    pub num_bits_per_entry: usize,
}
pub enum InsertResult<K, V> {
    Success,
    Duplicate(K, V),
    Full(K, V),
    Frozen(K, V),
    Retired(K, V),
    Retry(K, V),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum RemoveResult {
    Success,
    Cleanup,
    Retired,
    Fail,
    Frozen,
}
impl<K, V> Leaf<K, V>
where
    K: 'static + Clone + Ord,
    V: 'static + Clone,
{
    #[inline]
    pub(super) const fn new() -> Leaf<K, V> {
        #[allow(clippy::uninit_assumed_init)]
        Leaf {
            metadata: AtomicUsize::new(0),
            entry_array: unsafe { MaybeUninit::uninit().assume_init() },
            link: AtomicShared::null(),
        }
    }
    #[inline]
    pub(super) fn thaw(&self) -> bool {
        self.metadata
            .fetch_update(Relaxed, Relaxed, |p| {
                if Dimension::frozen(p) {
                    Some(Dimension::thaw(p))
                } else {
                    None
                }
            })
            .is_ok()
    }
    #[inline]
    pub(super) fn is_retired(&self) -> bool {
        Dimension::retired(self.metadata.load(Relaxed))
    }
    #[inline]
    pub(super) fn is_empty(&self) -> bool {
        let mut mutable_metadata = self.metadata.load(Relaxed);
        for _ in 0..DIMENSION.num_entries {
            if mutable_metadata == 0 {
                break;
            }
            let rank = mutable_metadata % (1_usize << DIMENSION.num_bits_per_entry);
            if rank != Dimension::uninit_rank() && rank != DIMENSION.removed_rank() {
                return false;
            }
            mutable_metadata >>= DIMENSION.num_bits_per_entry;
        }
        true
    }
    #[inline]
    pub(super) fn max_key(&self) -> Option<&K> {
        self.max_entry().map(|(k, _)| k)
    }
    #[inline]
    pub(super) fn max_entry(&self) -> Option<(&K, &V)> {
        let mut mutable_metadata = self.metadata.load(Acquire);
        let mut max_rank = 0;
        let mut max_index = DIMENSION.num_entries;
        for i in 0..DIMENSION.num_entries {
            if mutable_metadata == 0 {
                break;
            }
            let rank = mutable_metadata % (1_usize << DIMENSION.num_bits_per_entry);
            if rank > max_rank && rank != DIMENSION.removed_rank() {
                max_rank = rank;
                max_index = i;
            }
            mutable_metadata >>= DIMENSION.num_bits_per_entry;
        }
        if max_index != DIMENSION.num_entries {
            return Some((self.key_at(max_index), self.value_at(max_index)));
        }
        None
    }
    #[inline]
    pub(super) fn insert(&self, key: K, val: V) -> InsertResult<K, V> {
        let mut metadata = self.metadata.load(Acquire);
        'after_read_metadata: loop {
            if Dimension::retired(metadata) {
                return InsertResult::Retired(key, val);
            } else if Dimension::frozen(metadata) {
                return InsertResult::Frozen(key, val);
            }
            let mut mutable_metadata = metadata;
            for i in 0..DIMENSION.num_entries {
                let rank = mutable_metadata % (1_usize << DIMENSION.num_bits_per_entry);
                if rank == Dimension::uninit_rank() {
                    let interim_metadata = DIMENSION.augment(metadata, i, DIMENSION.removed_rank());
                    if let Err(actual) =
                        self.metadata
                            .compare_exchange(metadata, interim_metadata, Acquire, Acquire)
                    {
                        metadata = actual;
                        continue 'after_read_metadata;
                    }
                    self.write(i, key, val);
                    return self.post_insert(i, interim_metadata);
                }
                mutable_metadata >>= DIMENSION.num_bits_per_entry;
            }
            if self.search_slot(&key, metadata).is_some() {
                return InsertResult::Duplicate(key, val);
            }
            return InsertResult::Full(key, val);
        }
    }
    #[inline]
    pub(super) fn insert_unchecked(&self, key: K, val: V, index: usize) {
        debug_assert!(index < DIMENSION.num_entries);
        let metadata = self.metadata.load(Relaxed);
        let new_metadata = DIMENSION.augment(metadata, index, index + 1);
        self.write(index, key, val);
        self.metadata.store(new_metadata, Release);
    }
    #[inline]
    pub(super) fn remove_if<Q, F: FnMut(&V) -> bool>(
        &self,
        key: &Q,
        condition: &mut F,
    ) -> RemoveResult
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized,
    {
        let mut metadata = self.metadata.load(Acquire);
        if Dimension::frozen(metadata) {
            return RemoveResult::Frozen;
        }
        let mut min_max_rank = DIMENSION.removed_rank();
        let mut max_min_rank = 0;
        let mut mutable_metadata = metadata;
        for i in 0..DIMENSION.num_entries {
            if mutable_metadata == 0 {
                break;
            }
            let rank = mutable_metadata % (1_usize << DIMENSION.num_bits_per_entry);
            if rank < min_max_rank && rank > max_min_rank {
                match self.compare(i, key) {
                    Ordering::Less => {
                        if max_min_rank < rank {
                            max_min_rank = rank;
                        }
                    }
                    Ordering::Greater => {
                        if min_max_rank > rank {
                            min_max_rank = rank;
                        }
                    }
                    Ordering::Equal => {
                        loop {
                            if !condition(self.value_at(i)) {
                                return RemoveResult::Fail;
                            }
                            let mut empty = true;
                            mutable_metadata = metadata;
                            for j in 0..DIMENSION.num_entries {
                                if mutable_metadata == 0 {
                                    break;
                                }
                                if i != j {
                                    let rank = mutable_metadata
                                        % (1_usize << DIMENSION.num_bits_per_entry);
                                    if rank != Dimension::uninit_rank()
                                        && rank != DIMENSION.removed_rank()
                                    {
                                        empty = false;
                                        break;
                                    }
                                }
                                mutable_metadata >>= DIMENSION.num_bits_per_entry;
                            }
                            let mut new_metadata = metadata | DIMENSION.rank_mask(i);
                            if empty {
                                new_metadata = Dimension::retire(new_metadata);
                            }
                            match self.metadata.compare_exchange(
                                metadata,
                                new_metadata,
                                Release,
                                Relaxed,
                            ) {
                                Ok(_) => {
                                    if empty {
                                        return RemoveResult::Retired;
                                    }
                                    return RemoveResult::Success;
                                }
                                Err(actual) => {
                                    if DIMENSION.rank(actual, i) == DIMENSION.removed_rank() {
                                        return RemoveResult::Fail;
                                    }
                                    if Dimension::frozen(actual) {
                                        return RemoveResult::Frozen;
                                    }
                                    metadata = actual;
                                }
                            }
                        }
                    }
                };
            }
            mutable_metadata >>= DIMENSION.num_bits_per_entry;
        }
        RemoveResult::Fail
    }
    #[inline]
    pub(super) fn search<Q>(&self, key: &Q) -> Option<&V>
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized,
    {
        let metadata = self.metadata.load(Acquire);
        self.search_slot(key, metadata).map(|i| self.value_at(i))
    }
    #[inline]
    pub(super) fn max_less<Q>(&self, mut mutable_metadata: usize, key: &Q) -> usize
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized,
    {
        let mut min_max_rank = DIMENSION.removed_rank();
        let mut max_min_rank = 0;
        let mut max_min_index = DIMENSION.num_entries;
        for i in 0..DIMENSION.num_entries {
            if mutable_metadata == 0 {
                break;
            }
            let rank = mutable_metadata % (1_usize << DIMENSION.num_bits_per_entry);
            if rank < min_max_rank && rank > max_min_rank {
                match self.compare(i, key) {
                    Ordering::Less => {
                        if max_min_rank < rank {
                            max_min_rank = rank;
                            max_min_index = i;
                        }
                    }
                    Ordering::Greater => {
                        if min_max_rank > rank {
                            min_max_rank = rank;
                        }
                    }
                    Ordering::Equal => {
                        min_max_rank = rank;
                    }
                }
            }
            mutable_metadata >>= DIMENSION.num_bits_per_entry;
        }
        max_min_index
    }
    #[inline]
    pub(super) fn min_greater_equal<Q>(&self, key: &Q) -> (Option<(&K, &V)>, usize)
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized,
    {
        let metadata = self.metadata.load(Acquire);
        let mut min_max_rank = DIMENSION.removed_rank();
        let mut max_min_rank = 0;
        let mut min_max_index = DIMENSION.num_entries;
        let mut mutable_metadata = metadata;
        for i in 0..DIMENSION.num_entries {
            if mutable_metadata == 0 {
                break;
            }
            let rank = mutable_metadata % (1_usize << DIMENSION.num_bits_per_entry);
            if rank < min_max_rank && rank > max_min_rank {
                let k = self.key_at(i);
                match k.borrow().cmp(key) {
                    Ordering::Less => {
                        if max_min_rank < rank {
                            max_min_rank = rank;
                        }
                    }
                    Ordering::Greater => {
                        if min_max_rank > rank {
                            min_max_rank = rank;
                            min_max_index = i;
                        }
                    }
                    Ordering::Equal => {
                        return (Some((k, self.value_at(i))), metadata);
                    }
                }
            }
            mutable_metadata >>= DIMENSION.num_bits_per_entry;
        }
        if min_max_index != DIMENSION.num_entries {
            return (
                Some((self.key_at(min_max_index), self.value_at(min_max_index))),
                metadata,
            );
        }
        (None, metadata)
    }
    #[inline]
    pub(super) fn validate(&self, metadata: usize) -> bool {
        self.metadata.load(Relaxed) == metadata
    }
    #[inline]
    pub(super) fn freeze(&self) -> bool {
        self.metadata
            .fetch_update(AcqRel, Acquire, |p| {
                if Dimension::frozen(p) {
                    None
                } else {
                    Some(Dimension::freeze(p))
                }
            })
            .is_ok()
    }
    #[inline]
    pub(super) fn freeze_and_distribute(
        &self,
        low_key_leaf: &mut Option<Shared<Leaf<K, V>>>,
        high_key_leaf: &mut Option<Shared<Leaf<K, V>>>,
    ) {
        let metadata = unsafe {
            self.metadata
                .fetch_update(AcqRel, Acquire, |p| {
                    if Dimension::frozen(p) {
                        None
                    } else {
                        Some(Dimension::freeze(p))
                    }
                })
                .unwrap_unchecked()
        };
        let boundary = Self::optimal_boundary(metadata);
        let scanner = Scanner {
            leaf: self,
            metadata,
            entry_index: DIMENSION.num_entries,
        };
        for (i, (k, v)) in scanner.enumerate() {
            if i < boundary {
                low_key_leaf
                    .get_or_insert_with(|| Shared::new(Leaf::new()))
                    .insert_unchecked(k.clone(), v.clone(), i);
            } else {
                high_key_leaf
                    .get_or_insert_with(|| Shared::new(Leaf::new()))
                    .insert_unchecked(k.clone(), v.clone(), i - boundary);
            };
        }
    }
    #[inline]
    pub(super) fn optimal_boundary(mut mutable_metadata: usize) -> usize {
        let mut boundary: usize = DIMENSION.num_entries;
        let mut prev_rank = 1;
        for _ in 0..DIMENSION.num_entries {
            let rank = mutable_metadata % (1_usize << DIMENSION.num_bits_per_entry);
            if rank != 0 && rank != DIMENSION.removed_rank() {
                if prev_rank < rank {
                    boundary += 1;
                } else {
                    boundary -= 1;
                }
                prev_rank = rank;
            }
            mutable_metadata >>= DIMENSION.num_bits_per_entry;
        }
        boundary.clamp(
            DIMENSION.num_entries / 2 + 1,
            DIMENSION.num_entries + DIMENSION.num_entries / 2 - 1,
        ) - DIMENSION.num_entries / 2
    }
    fn search_slot<Q>(&self, key: &Q, mut mutable_metadata: usize) -> Option<usize>
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized,
    {
        let mut min_max_rank = DIMENSION.removed_rank();
        let mut max_min_rank = 0;
        for i in 0..DIMENSION.num_entries {
            if mutable_metadata == 0 {
                break;
            }
            let rank = mutable_metadata % (1_usize << DIMENSION.num_bits_per_entry);
            if rank < min_max_rank && rank > max_min_rank {
                match self.compare(i, key) {
                    Ordering::Less => {
                        if max_min_rank < rank {
                            max_min_rank = rank;
                        }
                    }
                    Ordering::Greater => {
                        if min_max_rank > rank {
                            min_max_rank = rank;
                        }
                    }
                    Ordering::Equal => {
                        return Some(i);
                    }
                }
            }
            mutable_metadata >>= DIMENSION.num_bits_per_entry;
        }
        None
    }
    fn post_insert(&self, free_slot_index: usize, mut prev_metadata: usize) -> InsertResult<K, V> {
        let key = self.key_at(free_slot_index);
        loop {
            let mut min_max_rank = DIMENSION.removed_rank();
            let mut max_min_rank = 0;
            let mut new_metadata = prev_metadata;
            let mut mutable_metadata = prev_metadata;
            for i in 0..DIMENSION.num_entries {
                if mutable_metadata == 0 {
                    break;
                }
                let rank = mutable_metadata % (1_usize << DIMENSION.num_bits_per_entry);
                if rank < min_max_rank && rank > max_min_rank {
                    match self.compare(i, key) {
                        Ordering::Less => {
                            if max_min_rank < rank {
                                max_min_rank = rank;
                            }
                        }
                        Ordering::Greater => {
                            if min_max_rank > rank {
                                min_max_rank = rank;
                            }
                            new_metadata = DIMENSION.augment(new_metadata, i, rank + 1);
                        }
                        Ordering::Equal => {
                            return self.rollback(free_slot_index);
                        }
                    }
                } else if rank != DIMENSION.removed_rank() && rank > min_max_rank {
                    new_metadata = DIMENSION.augment(new_metadata, i, rank + 1);
                }
                mutable_metadata >>= DIMENSION.num_bits_per_entry;
            }
            let final_metadata = DIMENSION.augment(new_metadata, free_slot_index, max_min_rank + 1);
            if let Err(actual) =
                self.metadata
                    .compare_exchange(prev_metadata, final_metadata, AcqRel, Acquire)
            {
                if Dimension::frozen(actual) || Dimension::retired(actual) {
                    return self.rollback(free_slot_index);
                }
                prev_metadata = actual;
                continue;
            }
            return InsertResult::Success;
        }
    }
    fn rollback(&self, index: usize) -> InsertResult<K, V> {
        let (key, val) = self.take(index);
        let result = self
            .metadata
            .fetch_and(!DIMENSION.rank_mask(index), Relaxed)
            & (!DIMENSION.rank_mask(index));
        if Dimension::retired(result) {
            InsertResult::Retired(key, val)
        } else if Dimension::frozen(result) {
            InsertResult::Frozen(key, val)
        } else {
            InsertResult::Duplicate(key, val)
        }
    }
    fn compare<Q>(&self, index: usize, key: &Q) -> Ordering
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized,
    {
        self.key_at(index).borrow().cmp(key)
    }
    fn take(&self, index: usize) -> (K, V) {
        unsafe {
            (
                self.entry_array.0[index].as_ptr().read(),
                self.entry_array.1[index].as_ptr().read(),
            )
        }
    }
    fn write(&self, index: usize, key: K, val: V) {
        unsafe {
            (self.entry_array.0[index].as_ptr().cast_mut()).write(key);
            (self.entry_array.1[index].as_ptr().cast_mut()).write(val);
        }
    }
    const fn key_at(&self, index: usize) -> &K {
        unsafe { &*self.entry_array.0[index].as_ptr() }
    }
    const fn value_at(&self, index: usize) -> &V {
        unsafe { &*self.entry_array.1[index].as_ptr() }
    }
    fn next(index: usize, mut mutable_metadata: usize) -> usize {
        debug_assert_ne!(index, usize::MAX);
        let current_entry_rank = if index == DIMENSION.num_entries {
            0
        } else {
            DIMENSION.rank(mutable_metadata, index)
        };
        let mut next_index = DIMENSION.num_entries;
        if current_entry_rank < DIMENSION.num_entries {
            let mut next_rank = DIMENSION.removed_rank();
            for i in 0..DIMENSION.num_entries {
                if mutable_metadata == 0 {
                    break;
                }
                if i != index {
                    let rank = mutable_metadata % (1_usize << DIMENSION.num_bits_per_entry);
                    if rank != Dimension::uninit_rank() && rank < next_rank {
                        if rank == current_entry_rank + 1 {
                            return i;
                        } else if rank > current_entry_rank {
                            next_rank = rank;
                            next_index = i;
                        }
                    }
                }
                mutable_metadata >>= DIMENSION.num_bits_per_entry;
            }
        }
        next_index
    }
}
impl<K, V> Drop for Leaf<K, V>
where
    K: 'static + Clone + Ord,
    V: 'static + Clone,
{
    #[inline]
    fn drop(&mut self) {
        if needs_drop::<(K, V)>() {
            let mut mutable_metadata = self.metadata.load(Acquire);
            for i in 0..DIMENSION.num_entries {
                if mutable_metadata == 0 {
                    break;
                }
                if mutable_metadata % (1_usize << DIMENSION.num_bits_per_entry)
                    != Dimension::uninit_rank()
                {
                    self.take(i);
                }
                mutable_metadata >>= DIMENSION.num_bits_per_entry;
            }
        }
    }
}
impl<K, V> LinkedList for Leaf<K, V>
where
    K: 'static + Clone + Ord,
    V: 'static + Clone,
{
    #[inline]
    fn link_ref(&self) -> &AtomicShared<Leaf<K, V>> {
        &self.link
    }
}
impl Dimension {
    const fn frozen(metadata: usize) -> bool {
        metadata & (1_usize << (usize::BITS - 2)) != 0
    }
    const fn freeze(metadata: usize) -> usize {
        metadata | (1_usize << (usize::BITS - 2))
    }
    const fn thaw(metadata: usize) -> usize {
        metadata & (!(1_usize << (usize::BITS - 2)))
    }
    const fn retired(metadata: usize) -> bool {
        metadata & (1_usize << (usize::BITS - 1)) != 0
    }
    const fn retire(metadata: usize) -> usize {
        metadata | (1_usize << (usize::BITS - 1))
    }
    const fn rank_mask(&self, index: usize) -> usize {
        ((1_usize << self.num_bits_per_entry) - 1) << (index * self.num_bits_per_entry)
    }
    const fn rank(&self, metadata: usize, index: usize) -> usize {
        (metadata >> (index * self.num_bits_per_entry)) % (1_usize << self.num_bits_per_entry)
    }
    const fn uninit_rank() -> usize {
        0
    }
    const fn removed_rank(&self) -> usize {
        (1_usize << self.num_bits_per_entry) - 1
    }
    const fn augment(&self, metadata: usize, index: usize, rank: usize) -> usize {
        (metadata & (!self.rank_mask(index))) | (rank << (index * self.num_bits_per_entry))
    }
}
pub const DIMENSION: Dimension = match usize::BITS / 8 {
    1 => Dimension {
        num_entries: 2,
        num_bits_per_entry: 2,
    },
    2 => Dimension {
        num_entries: 4,
        num_bits_per_entry: 3,
    },
    4 => Dimension {
        num_entries: 7,
        num_bits_per_entry: 4,
    },
    8 => Dimension {
        num_entries: 14,
        num_bits_per_entry: 4,
    },
    _ => Dimension {
        num_entries: 25,
        num_bits_per_entry: 5,
    },
};
pub type EntryArray<K, V> = (
    [MaybeUninit<K>; DIMENSION.num_entries],
    [MaybeUninit<V>; DIMENSION.num_entries],
);
pub struct Scanner<'l, K, V>
where
    K: 'static + Clone + Ord,
    V: 'static + Clone,
{
    leaf: &'l Leaf<K, V>,
    metadata: usize,
    entry_index: usize,
}
impl<'l, K, V> Scanner<'l, K, V>
where
    K: 'static + Clone + Ord,
    V: 'static + Clone,
{
    #[inline]
    pub(super) fn new(leaf: &'l Leaf<K, V>) -> Scanner<'l, K, V> {
        Scanner {
            leaf,
            metadata: leaf.metadata.load(Acquire),
            entry_index: DIMENSION.num_entries,
        }
    }
    #[inline]
    pub(super) fn max_less<Q>(leaf: &'l Leaf<K, V>, key: &Q) -> Option<Scanner<'l, K, V>>
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized,
    {
        let metadata = leaf.metadata.load(Acquire);
        let index = leaf.max_less(metadata, key);
        if index == DIMENSION.num_entries {
            None
        } else {
            Some(Scanner {
                leaf,
                metadata,
                entry_index: index,
            })
        }
    }
    #[inline]
    pub(super) const fn metadata(&self) -> usize {
        self.metadata
    }
    #[inline]
    pub(super) const fn get(&self) -> Option<(&'l K, &'l V)> {
        if self.entry_index >= DIMENSION.num_entries {
            return None;
        }
        Some((
            self.leaf.key_at(self.entry_index),
            self.leaf.value_at(self.entry_index),
        ))
    }
    #[inline]
    pub(super) fn max_key(&self) -> Option<&'l K> {
        self.leaf.max_key()
    }
    #[inline]
    pub(super) fn jump<'g, Q>(
        &self,
        min_allowed_key: Option<&Q>,
        guard: &'g Guard,
    ) -> Option<Scanner<'g, K, V>>
    where
        K: Borrow<Q>,
        Q: Ord + ?Sized,
    {
        let mut next_leaf_ptr = self.leaf.next_ptr(Acquire, guard);
        while let Some(next_leaf_ref) = next_leaf_ptr.as_ref() {
            let mut leaf_scanner = Scanner::new(next_leaf_ref);
            if let Some(key) = min_allowed_key {
                if !self.leaf.is_clear(Relaxed) {
                    while let Some(entry) = leaf_scanner.next() {
                        if key.cmp(entry.0.borrow()) == Ordering::Less {
                            return Some(leaf_scanner);
                        }
                    }
                    next_leaf_ptr = next_leaf_ref.next_ptr(Acquire, guard);
                    continue;
                }
            }
            if leaf_scanner.next().is_some() {
                return Some(leaf_scanner);
            }
            next_leaf_ptr = next_leaf_ref.next_ptr(Acquire, guard);
        }
        None
    }
    fn proceed(&mut self) {
        if self.entry_index == usize::MAX {
            return;
        }
        let index = Leaf::<K, V>::next(self.entry_index, self.metadata);
        if index == DIMENSION.num_entries {
            self.entry_index = usize::MAX;
        } else {
            self.entry_index = index;
        }
    }
}
impl<'l, K, V> Debug for Scanner<'l, K, V>
where
    K: 'static + Clone + Ord,
    V: 'static + Clone,
{
    #[inline]
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Scanner")
            .field("metadata", &self.metadata)
            .field("entry_index", &self.entry_index)
            .finish()
    }
}
impl<'l, K, V> Iterator for Scanner<'l, K, V>
where
    K: Clone + Ord,
    V: Clone,
{
    type Item = (&'l K, &'l V);
    #[inline]
    fn next(&mut self) -> Option<Self::Item> {
        self.proceed();
        self.get()
    }
}
#[cfg(test)]
mod test {
    use super::*;
    use proptest::prelude::*;
    use std::sync::atomic::AtomicBool;
    use tokio::sync::Barrier;
    #[cfg_attr(miri, ignore)]
    #[test]
    fn basic() {
        let leaf: Leaf<String, String> = Leaf::new();
        assert!(matches!(
            leaf.insert("MY GOODNESS!".to_owned(), "OH MY GOD!!".to_owned()),
            InsertResult::Success
        ));
        assert!(matches!(
            leaf.insert("GOOD DAY".to_owned(), "OH MY GOD!!".to_owned()),
            InsertResult::Success
        ));
        assert_eq!(leaf.search("MY GOODNESS!").unwrap(), "OH MY GOD!!");
        assert_eq!(leaf.search("GOOD DAY").unwrap(), "OH MY GOD!!");
        for i in 0..DIMENSION.num_entries {
            if let InsertResult::Full(k, v) = leaf.insert(i.to_string(), i.to_string()) {
                assert_eq!(i + 2, DIMENSION.num_entries);
                assert_eq!(k, i.to_string());
                assert_eq!(v, i.to_string());
                break;
            }
            assert_eq!(leaf.search(&i.to_string()).unwrap(), &i.to_string());
        }
        for i in 0..DIMENSION.num_entries {
            let result = leaf.remove_if(&i.to_string(), &mut |_| i >= 10);
            if i >= 10 && i + 2 < DIMENSION.num_entries {
                assert_eq!(result, RemoveResult::Success);
            } else {
                assert_eq!(result, RemoveResult::Fail);
            }
        }
        assert_eq!(
            leaf.remove_if("GOOD DAY", &mut |v| v == "OH MY"),
            RemoveResult::Fail
        );
        assert_eq!(
            leaf.remove_if("GOOD DAY", &mut |v| v == "OH MY GOD!!"),
            RemoveResult::Success
        );
        assert!(leaf.search("GOOD DAY").is_none());
        assert_eq!(
            leaf.remove_if("MY GOODNESS!", &mut |_| true),
            RemoveResult::Success
        );
        assert!(leaf.search("MY GOODNESS!").is_none());
        assert!(leaf.search("1").is_some());
        assert!(matches!(
            leaf.insert("1".to_owned(), "1".to_owned()),
            InsertResult::Duplicate(..)
        ));
        assert!(matches!(
            leaf.insert("100".to_owned(), "100".to_owned()),
            InsertResult::Full(..)
        ));
        let mut scanner = Scanner::new(&leaf);
        for i in 0..DIMENSION.num_entries {
            if let Some(e) = scanner.next() {
                assert_eq!(e.0, &i.to_string());
                assert_eq!(e.1, &i.to_string());
                assert_ne!(
                    leaf.remove_if(&i.to_string(), &mut |_| true),
                    RemoveResult::Fail
                );
            } else {
                break;
            }
        }
        assert!(matches!(
            leaf.insert("200".to_owned(), "200".to_owned()),
            InsertResult::Retired(..)
        ));
    }
    #[cfg_attr(miri, ignore)]
    #[test]
    fn calculate_boundary() {
        let leaf: Leaf<usize, usize> = Leaf::new();
        for i in 0..DIMENSION.num_entries {
            assert!(matches!(leaf.insert(i, i), InsertResult::Success));
        }
        assert_eq!(
            Leaf::<usize, usize>::optimal_boundary(leaf.metadata.load(Relaxed)),
            DIMENSION.num_entries - 1
        );
        let leaf: Leaf<usize, usize> = Leaf::new();
        for i in (0..DIMENSION.num_entries).rev() {
            assert!(matches!(leaf.insert(i, i), InsertResult::Success));
        }
        assert_eq!(
            Leaf::<usize, usize>::optimal_boundary(leaf.metadata.load(Relaxed)),
            1
        );
        let leaf: Leaf<usize, usize> = Leaf::new();
        for i in 0..DIMENSION.num_entries {
            if i < DIMENSION.num_entries / 2 {
                assert!(matches!(
                    leaf.insert(usize::MAX - i, usize::MAX - i),
                    InsertResult::Success
                ));
            } else {
                assert!(matches!(leaf.insert(i, i), InsertResult::Success));
            }
        }
        assert_eq!(
            Leaf::<usize, usize>::optimal_boundary(leaf.metadata.load(Relaxed)),
            DIMENSION.num_entries / 2
        );
    }
    #[cfg_attr(miri, ignore)]
    #[test]
    fn special() {
        let leaf: Leaf<usize, usize> = Leaf::new();
        assert!(matches!(leaf.insert(11, 17), InsertResult::Success));
        assert!(matches!(leaf.insert(17, 11), InsertResult::Success));
        let mut leaf1 = None;
        let mut leaf2 = None;
        leaf.freeze_and_distribute(&mut leaf1, &mut leaf2);
        assert_eq!(leaf1.as_ref().and_then(|l| l.search(&11)), Some(&17));
        assert_eq!(leaf1.as_ref().and_then(|l| l.search(&17)), Some(&11));
        assert!(leaf2.is_none());
        assert!(matches!(leaf.insert(1, 7), InsertResult::Frozen(..)));
        assert_eq!(leaf.remove_if(&17, &mut |_| true), RemoveResult::Frozen);
        assert!(matches!(leaf.insert(3, 5), InsertResult::Frozen(..)));
        assert!(leaf.thaw());
        assert!(matches!(leaf.insert(1, 7), InsertResult::Success));
        assert_eq!(leaf.remove_if(&1, &mut |_| true), RemoveResult::Success);
        assert_eq!(leaf.remove_if(&17, &mut |_| true), RemoveResult::Success);
        assert_eq!(leaf.remove_if(&11, &mut |_| true), RemoveResult::Retired);
        assert!(matches!(leaf.insert(5, 3), InsertResult::Retired(..)));
    }
    proptest! {
        #[cfg_attr(miri, ignore)]
        #[test]
        fn prop(insert in 0_usize..DIMENSION.num_entries, remove in 0_usize..DIMENSION.num_entries) {
            let leaf: Leaf<usize, usize> = Leaf::new();
            assert!(leaf.is_empty());
            for i in 0..insert {
                assert!(matches!(leaf.insert(i, i), InsertResult::Success));
                if i != 0 {
                    let result = leaf.max_less(leaf.metadata.load(Relaxed), &i);
                    assert_eq!(*leaf.key_at(result), i - 1);
                    assert_eq!(*leaf.value_at(result), i - 1);
                }
            }
            if insert == 0 {
                assert_eq!(leaf.max_key(), None);
                assert!(leaf.is_empty());
            } else {
                assert_eq!(leaf.max_key(), Some(&(insert - 1)));
                assert!(!leaf.is_empty());
            }
            for i in 0..insert {
                assert!(matches!(leaf.insert(i, i), InsertResult::Duplicate(..)));
                assert!(!leaf.is_empty());
                let result = leaf.min_greater_equal(&i);
                assert_eq!(result.0, Some((&i, &i)));
            }
            for i in 0..insert {
                assert_eq!(*leaf.search(&i).unwrap(), i);
            }
            if insert == DIMENSION.num_entries {
                assert!(matches!(leaf.insert(usize::MAX, usize::MAX), InsertResult::Full(..)));
            }
            for i in 0..remove {
                if i < insert {
                    if i == insert - 1 {
                        assert!(matches!(leaf.remove_if(&i, &mut |_| true), RemoveResult::Retired));
                        for i in 0..insert {
                            assert!(matches!(leaf.insert(i, i), InsertResult::Retired(..)));
                        }
                    } else {
                        assert!(matches!(leaf.remove_if(&i, &mut |_| true), RemoveResult::Success));
                    }
                } else {
                    assert!(matches!(leaf.remove_if(&i, &mut |_| true), RemoveResult::Fail));
                    assert!(leaf.is_empty());
                }
            }
        }
    }
    #[cfg_attr(miri, ignore)]
    #[tokio::test(flavor = "multi_thread", worker_threads = 16)]
    async fn update() {
        let num_excess = 3;
        let num_tasks = DIMENSION.num_entries + num_excess;
        for _ in 0..256 {
            let barrier = Shared::new(Barrier::new(num_tasks));
            let leaf: Shared<Leaf<usize, usize>> = Shared::new(Leaf::new());
            let full: Shared<AtomicUsize> = Shared::new(AtomicUsize::new(0));
            let retire: Shared<AtomicUsize> = Shared::new(AtomicUsize::new(0));
            let mut task_handles = Vec::with_capacity(num_tasks);
            for t in 1..=num_tasks {
                let barrier_clone = barrier.clone();
                let leaf_clone = leaf.clone();
                let full_clone = full.clone();
                let retire_clone = retire.clone();
                task_handles.push(tokio::spawn(async move {
                    barrier_clone.wait().await;
                    let inserted = match leaf_clone.insert(t, t) {
                        InsertResult::Success => {
                            assert_eq!(*leaf_clone.search(&t).unwrap(), t);
                            true
                        }
                        InsertResult::Duplicate(_, _)
                        | InsertResult::Frozen(_, _)
                        | InsertResult::Retired(_, _)
                        | InsertResult::Retry(_, _) => {
                            unreachable!();
                        }
                        InsertResult::Full(k, v) => {
                            assert_eq!(k, v);
                            assert_eq!(k, t);
                            full_clone.fetch_add(1, Relaxed);
                            false
                        }
                    };
                    {
                        let mut prev = 0;
                        let mut scanner = Scanner::new(&leaf_clone);
                        for e in scanner.by_ref() {
                            assert_eq!(e.0, e.1);
                            assert!(*e.0 > prev);
                            prev = *e.0;
                        }
                    }
                    barrier_clone.wait().await;
                    assert_eq!((*full_clone).load(Relaxed), num_excess);
                    if inserted {
                        assert_eq!(*leaf_clone.search(&t).unwrap(), t);
                    }
                    {
                        let scanner = Scanner::new(&leaf_clone);
                        assert_eq!(scanner.count(), DIMENSION.num_entries);
                    }
                    barrier_clone.wait().await;
                    match leaf_clone.remove_if(&t, &mut |_| true) {
                        RemoveResult::Success => assert!(inserted),
                        RemoveResult::Fail => assert!(!inserted),
                        RemoveResult::Frozen | RemoveResult::Cleanup => unreachable!(),
                        RemoveResult::Retired => {
                            assert!(inserted);
                            assert_eq!(retire_clone.swap(1, Relaxed), 0);
                        }
                    };
                }));
            }
            for r in futures::future::join_all(task_handles).await {
                assert!(r.is_ok());
            }
        }
    }
    #[cfg_attr(miri, ignore)]
    #[tokio::test(flavor = "multi_thread", worker_threads = 16)]
    async fn durability() {
        let num_tasks = 16_usize;
        let workload_size = 8_usize;
        for _ in 0..16 {
            for k in 0..=workload_size {
                let barrier = Shared::new(Barrier::new(num_tasks));
                let leaf: Shared<Leaf<usize, usize>> = Shared::new(Leaf::new());
                let inserted: Shared<AtomicBool> = Shared::new(AtomicBool::new(false));
                let mut task_handles = Vec::with_capacity(num_tasks);
                for _ in 0..num_tasks {
                    let barrier_clone = barrier.clone();
                    let leaf_clone = leaf.clone();
                    let inserted_clone = inserted.clone();
                    task_handles.push(tokio::spawn(async move {
                        {
                            barrier_clone.wait().await;
                            if let InsertResult::Success = leaf_clone.insert(k, k) {
                                assert!(!inserted_clone.swap(true, Relaxed));
                            }
                        }
                        {
                            barrier_clone.wait().await;
                            for i in 0..workload_size {
                                if i != k {
                                    let _result = leaf_clone.insert(i, i);
                                }
                                assert!(!leaf_clone.is_retired());
                                assert_eq!(leaf_clone.search(&k).unwrap(), &k);
                            }
                            for i in 0..workload_size {
                                let _result = leaf_clone.remove_if(&i, &mut |v| *v != k);
                                assert_eq!(leaf_clone.search(&k).unwrap(), &k);
                            }
                        }
                    }));
                }
                for r in futures::future::join_all(task_handles).await {
                    assert!(r.is_ok());
                }
                assert!((*inserted).load(Relaxed));
            }
        }
    }
}