concurrent-map 5.0.37

a lock-free linearizable B+ tree
Documentation
use std::time::Instant;

use concurrent_map::ConcurrentMap;

mod alloc {
    use std::alloc::{Layout, System};
    use std::sync::atomic::{AtomicUsize, Ordering};

    #[global_allocator]
    static ALLOCATOR: Alloc = Alloc;

    static ALLOCATED: AtomicUsize = AtomicUsize::new(0);
    static FREED: AtomicUsize = AtomicUsize::new(0);
    static RESIDENT: AtomicUsize = AtomicUsize::new(0);

    pub fn allocated() -> usize {
        ALLOCATED.swap(0, Ordering::Relaxed) / 1_000_000
    }

    pub fn freed() -> usize {
        FREED.swap(0, Ordering::Relaxed) / 1_000_000
    }

    pub fn resident() -> usize {
        RESIDENT.load(Ordering::Relaxed) / 1_000_000
    }

    #[derive(Default, Debug, Clone, Copy)]
    struct Alloc;

    unsafe impl std::alloc::GlobalAlloc for Alloc {
        unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
            let ret = System.alloc(layout);
            assert_ne!(
                ret,
                std::ptr::null_mut(),
                "alloc returned null pointer for layout {layout:?}"
            );
            ALLOCATED.fetch_add(layout.size(), Ordering::Relaxed);
            RESIDENT.fetch_add(layout.size(), Ordering::Relaxed);
            std::ptr::write_bytes(ret, 0xa1, layout.size());
            ret
        }

        unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
            std::ptr::write_bytes(ptr, 0xde, layout.size());
            FREED.fetch_add(layout.size(), Ordering::Relaxed);
            RESIDENT.fetch_sub(layout.size(), Ordering::Relaxed);
            System.dealloc(ptr, layout)
        }
    }
}

#[test]
fn leak_check() {
    let n: u32 = 16 * 1024;

    let concurrency = std::thread::available_parallelism()
        .map(std::num::NonZeroUsize::get)
        .unwrap_or(8)
        * 2;

    let run = |tree: ConcurrentMap<u32, u32, 5>, barrier: &std::sync::Barrier, low_bits| {
        let shift = concurrency.next_power_of_two().trailing_zeros();
        let unique_key = |key| (key << shift) | low_bits;

        barrier.wait();
        for key in 0..n {
            let i = unique_key(key);
            assert_eq!(tree.get(&i), None);
            tree.insert(i, i);
            assert_eq!(tree.get(&i), Some(i), "failed to get key {i}");
        }
        for key in 0_u32..n {
            let i = unique_key(key);
            assert_eq!(tree.get(&i), Some(i), "failed to get key {i}");
        }
        for key in 0_u32..n {
            let i = unique_key(key);
            assert_eq!(
                tree.cas(i, Some(&i), Some(unique_key(key * 2))),
                Ok(Some(i)),
                "failed to get key {i}"
            );
        }
        let visible: std::collections::HashMap<u32, u32> = tree.iter().collect();

        for key in 0_u32..n {
            let i = unique_key(key);
            let v = unique_key(key * 2);
            assert_eq!(visible.get(&i).copied(), Some(v), "failed to get key {i}");
        }

        for key in 0..n {
            let i = unique_key(key);
            let v = unique_key(key * 2);
            assert_eq!(tree.remove(&i), Some(v));
        }
        for key in 0..n {
            let i = unique_key(key);
            assert_eq!(tree.get(&i), None, "failed to get key {i}");
        }
    };

    let before = Instant::now();
    let resident_before = alloc::resident();

    let tree = ConcurrentMap::default();
    std::thread::scope(|s| {
        for _ in 0..64 {
            let barrier = std::sync::Arc::new(std::sync::Barrier::new(concurrency));
            let mut threads = vec![];
            for i in 0..concurrency {
                let tree_2 = tree.clone();
                let barrier_2 = barrier.clone();

                let thread = s.spawn(move || run(tree_2, &barrier_2, u32::try_from(i).unwrap()));
                threads.push(thread);
            }
            for thread in threads {
                thread.join().unwrap();
            }
        }
    });

    drop(tree);

    let resident_after = alloc::resident();

    println!(
        "{:.2} million wps {} mb allocated {} mb freed {} mb resident to insert {} items",
        n as f64 / (before.elapsed().as_micros().max(1)) as f64,
        alloc::allocated(),
        alloc::freed(),
        resident_after,
        n,
    );

    assert_eq!(
        resident_after - resident_before,
        0,
        "leaked {}mb",
        resident_after
    );
}