orx-concurrent-vec 1.7.0

An efficient, convenient and lightweight grow-only read & write concurrent data structure allowing high performance concurrent collection.
Documentation
use orx_concurrent_vec::*;
use std::{
    collections::HashSet,
    sync::{Arc, Mutex},
};

#[test]
fn into_inner_from() {
    let bag = ConcurrentVec::new();

    bag.push('a');
    bag.push('b');
    bag.push('c');
    bag.push('d');
    assert_eq!(
        vec!['a', 'b', 'c', 'd'],
        bag.iter().copied().collect::<Vec<_>>()
    );

    let mut split = bag.into_inner();
    assert_eq!(
        vec!['a', 'b', 'c', 'd'],
        split.iter().copied().flatten().collect::<Vec<_>>()
    );

    split.push(Some('e'));
    *split.get_mut(0).expect("exists") = Some('x');

    assert_eq!(
        vec!['x', 'b', 'c', 'd', 'e'],
        split.iter().copied().flatten().collect::<Vec<_>>()
    );

    let mut bag: ConcurrentVec<_> = split.into();
    assert_eq!(
        vec!['x', 'b', 'c', 'd', 'e'],
        bag.iter().copied().collect::<Vec<_>>()
    );

    bag.clear();
    assert!(bag.is_empty());

    let split = bag.into_inner();
    assert!(split.is_empty());
}

#[test]
fn ok_at_num_threads() {
    use std::thread::available_parallelism;
    let default_parallelism_approx = available_parallelism().expect("is-ok").get();

    let num_threads = default_parallelism_approx;
    let num_items_per_thread = 16384;

    let bag = ConcurrentVec::new();
    let bag_ref = &bag;
    std::thread::scope(|s| {
        for i in 0..num_threads {
            s.spawn(move || {
                for j in 0..num_items_per_thread {
                    bag_ref.push((i * 100000 + j) as i32);
                }
            });
        }
    });

    let pinned = bag.into_inner();
    assert_eq!(pinned.len(), num_threads * num_items_per_thread);
}

#[test]
fn push_indices() {
    let num_threads = 4;
    let num_items_per_thread = 16;

    let indices_set = Arc::new(Mutex::new(HashSet::new()));

    let bag = ConcurrentVec::new();
    let bag_ref = &bag;
    std::thread::scope(|s| {
        for i in 0..num_threads {
            let indices_set = indices_set.clone();
            s.spawn(move || {
                for j in 0..num_items_per_thread {
                    let idx = bag_ref.push(i * 100000 + j);
                    let mut set = indices_set.lock().expect("is ok");
                    set.insert(idx);
                }
            });
        }
    });

    let set = indices_set.lock().expect("is ok");
    assert_eq!(set.len(), 4 * 16);
    for i in 0..(4 * 16) {
        assert!(set.contains(&i));
    }
}

#[test]
fn extend_indices() {
    let num_threads = 4;
    let num_items_per_thread = 16;

    let indices_set = Arc::new(Mutex::new(HashSet::new()));

    let bag = ConcurrentVec::new();
    let bag_ref = &bag;
    std::thread::scope(|s| {
        for i in 0..num_threads {
            let indices_set = indices_set.clone();
            s.spawn(move || {
                let iter = (0..num_items_per_thread).map(|j| i * 100000 + j);

                let begin_idx = bag_ref.extend(iter);

                let mut set = indices_set.lock().expect("is ok");
                set.insert(begin_idx);
            });
        }
    });

    let set = indices_set.lock().expect("is ok");
    assert_eq!(set.len(), num_threads);
    for i in 0..num_threads {
        assert!(set.contains(&(i * num_items_per_thread)));
    }
}

#[test]
fn extend_n_items_indices() {
    let num_threads = 4;
    let num_items_per_thread = 16;

    let indices_set = Arc::new(Mutex::new(HashSet::new()));

    let bag = ConcurrentVec::new();
    let bag_ref = &bag;
    std::thread::scope(|s| {
        for i in 0..num_threads {
            let indices_set = indices_set.clone();
            s.spawn(move || {
                let iter = (0..num_items_per_thread).map(|j| i * 100000 + j);

                let begin_idx = unsafe { bag_ref.extend_n_items(iter, num_items_per_thread) };

                let mut set = indices_set.lock().expect("is ok");
                set.insert(begin_idx);
            });
        }
    });

    let set = indices_set.lock().expect("is ok");
    assert_eq!(set.len(), num_threads);
    for i in 0..num_threads {
        assert!(set.contains(&(i * num_items_per_thread)));
    }
}