dynec 0.2.1

An opinionated ECS-like framework
Documentation
use std::collections::BTreeSet;
use std::sync::Arc;
use std::{iter, ops};

use parking_lot::lock_api::ArcMutexGuard;
use parking_lot::{Mutex, RawMutex};

use super::{iter_gaps, Ealloc, Shard, ShardAssigner, Snapshot};
use crate::entity::raw::Atomic;
use crate::entity::Raw;

mod recycler;
pub use recycler::{BTreeHint, Recycler};

type MutableShards<T> = Vec<Arc<Mutex<T>>>;

/// The default allocator supporting atomically-allocated new IDs and arbitrary recycler.
#[derive(Debug)]
pub struct Recycling<RawT: Raw, T: Recycler<RawT>, S: ShardAssigner> {
    /// Whether `mark_need_flush` was called.
    flush_mark:         bool,
    /// The next ID to allocate into shards.
    global_gauge:       Arc<RawT::Atomic>,
    /// A sorted list of recycled IDs during the last join.
    recyclable:         Arc<BTreeSet<RawT>>,
    /// The actual IDs assigned to different shards.
    recycler_shards:    MutableShards<T>,
    /// The assigned shard.
    shard_assigner:     S,
    /// The queue of deallocated IDs to distribute.
    dealloc_queue:      Vec<RawT>,
    /// The queue of allocated IDs during online, to be synced to recyclable after join.
    reuse_queue_shards: MutableShards<Vec<RawT>>,
}

impl<RawT: Raw, T: Recycler<RawT>, S: ShardAssigner> Recycling<RawT, T, S> {
    /// Creates a new recycling allocator with a custom shard assigner.
    /// This can only be used for unit testing since the Archetype API does not support dynamic
    /// shard assigners.
    pub(crate) fn new_with_shard_assigner(num_shards: usize, shard_assigner: S) -> Self {
        let global_gauge = RawT::new();
        Self {
            flush_mark: false,
            global_gauge: Arc::new(global_gauge),
            recyclable: Arc::default(),
            recycler_shards: (0..num_shards).map(|_| Arc::default()).collect(),
            shard_assigner,
            dealloc_queue: Vec::new(),
            reuse_queue_shards: (0..num_shards).map(|_| Arc::default()).collect(),
        }
    }

    fn get_recycler_offline(sharded_recyclers: &mut MutableShards<T>, index: usize) -> &mut T {
        let arc = sharded_recyclers.get_mut(index).expect("index out of bounds");
        Arc::get_mut(arc).expect("shards are dropped in offline mode").get_mut()
    }

    fn get_reuse_queue_offline(
        reuse_queues: &mut MutableShards<Vec<RawT>>,
        index: usize,
    ) -> &mut Vec<RawT> {
        let arc = reuse_queues.get_mut(index).expect("index out of bounds");
        Arc::get_mut(arc).expect("shards are dropped in offline mode").get_mut()
    }

    fn iter_allocated_chunks_offline(
        &mut self,
    ) -> impl iter::FusedIterator<Item = ops::Range<RawT>> + '_ {
        iter_gaps(self.global_gauge.load(), self.recyclable.iter().copied())
    }
}

impl<RawT: Raw, T: Recycler<RawT>, S: ShardAssigner> Ealloc for Recycling<RawT, T, S> {
    type Raw = RawT;
    type AllocHint = T::Hint;
    type Shard = RecyclingShard<
        Arc<RawT::Atomic>,
        ArcMutexGuard<RawMutex, T>,
        ArcMutexGuard<RawMutex, Vec<RawT>>,
    >;

    fn new(num_shards: usize) -> Self { Self::new_with_shard_assigner(num_shards, S::default()) }

    fn shards<U, F: Fn(Self::Shard) -> U>(&mut self, vec: &mut Vec<U>, f: F) {
        let slice_start = vec.len();

        vec.extend(
            iter::zip(self.recycler_shards.iter(), self.reuse_queue_shards.iter())
                .map(|(recycler, reuse_queue)| RecyclingShard {
                    global_gauge: Arc::clone(&self.global_gauge),
                    recycler:     Arc::clone(recycler).lock_arc(),
                    reuse_queue:  Arc::clone(reuse_queue).lock_arc(),
                })
                .map(f),
        );
        let my_slice = vec.get_mut(slice_start..).expect("just inserted");
        self.shard_assigner.shuffle_shards(my_slice);
    }

    fn snapshot(&self) -> Snapshot<Self::Raw> {
        Snapshot { gauge: self.global_gauge.load(), recyclable: Arc::clone(&self.recyclable) }
    }

    fn allocate(&mut self, hint: Self::AllocHint) -> Self::Raw {
        let shard_id =
            self.shard_assigner.select_for_offline_allocation(self.recycler_shards.len());
        let recycler = Self::get_recycler_offline(&mut self.recycler_shards, shard_id);
        let reuse_queue = Self::get_reuse_queue_offline(&mut self.reuse_queue_shards, shard_id);

        let mut shard = RecyclingShard { global_gauge: &*self.global_gauge, recycler, reuse_queue };
        shard.allocate(hint)
    }

    fn queue_deallocate(&mut self, id: RawT) { self.dealloc_queue.push(id); }

    fn flush(&mut self) {
        self.flush_mark = false;

        let mut ids = &self.dealloc_queue[..];
        {
            let recyclable = Arc::get_mut(&mut self.recyclable)
                .expect("all exposed shards should be dropped before flush");
            recyclable.extend(ids);
            for shard in &mut self.reuse_queue_shards {
                let queue = Arc::get_mut(shard)
                    .expect("all exposed shards should be dropped before flush")
                    .get_mut();

                for item in queue.drain(..) {
                    recyclable.remove(&item);
                }
            }
        }

        // try to distribute `ids` to all shards evenly.
        let mut shards: Vec<_> = self
            .recycler_shards
            .iter_mut()
            .map(|recycler| {
                Arc::get_mut(recycler)
                    .expect("all exposed shards should be dropped before flush")
                    .get_mut()
            })
            .collect();

        // the shards with the smallest recycle count come first because we assign IDs to them
        // first.
        shards.sort_by_key(|recycler| recycler.len());

        let mut target_sizes: Vec<_> = shards.iter().map(|recycler| recycler.len()).collect();
        distribute_sorted(&mut target_sizes, ids.len());

        for (i, recycler) in shards.iter_mut().enumerate() {
            let take: usize =
                *target_sizes.get(i).expect("target_sizes collected from shards.iter()")
                    - recycler.len();
            // take >= 0 assuming correctness of distribute_sorted

            let (left, right) = ids.split_at(take);
            recycler.extend(left.iter().copied());
            ids = right;
        }

        self.dealloc_queue.clear();
    }

    fn mark_need_flush(&mut self) { self.flush_mark = true; }
    fn flush_if_marked(&mut self) {
        if self.flush_mark {
            self.flush();
        }
    }
}

fn distribute_sorted(sizes: &mut [usize], total: usize) {
    let mut added = 0;
    let mut target = 0;

    let mut shards_used = 0;
    for (i, &size) in sizes.iter().enumerate() {
        let delta = (size - target) * i;
        if added + delta >= total {
            break;
        }

        added += delta;
        target = size;
        shards_used += 1;
    }
    if shards_used == 0 {
        return; // no shards need updating
    }

    let deficit = total - added;
    target += deficit / shards_used;
    let remainder = deficit % shards_used;

    let (left, right) = sizes[..shards_used].split_at_mut(shards_used - remainder);
    left.fill(target);
    right.fill(target + 1);
}

/// [`Shard`] implementation for [`Recycling`].
pub struct RecyclingShard<GaugeRef, RecyclerRef, ReuseQueueRef> {
    global_gauge: GaugeRef,
    recycler:     RecyclerRef,
    reuse_queue:  ReuseQueueRef,
}

impl<RawT: Raw, GaugeRef, RecyclerRef, ReuseQueueRef> Shard
    for RecyclingShard<GaugeRef, RecyclerRef, ReuseQueueRef>
where
    GaugeRef: ops::Deref<Target = RawT::Atomic> + Send + 'static,
    RecyclerRef: ops::DerefMut + Send + 'static,
    <RecyclerRef as ops::Deref>::Target: Recycler<RawT>,
    ReuseQueueRef: ops::DerefMut<Target = Vec<RawT>> + Send + 'static,
{
    type Raw = RawT;
    type Hint = <RecyclerRef::Target as Recycler<RawT>>::Hint;

    fn allocate(&mut self, hint: Self::Hint) -> RawT {
        if let Some(id) = self.recycler.poll(hint) {
            id
        } else {
            self.global_gauge.fetch_add(1)
        }
    }
}

impl<RawT: Raw, T: Recycler<RawT>, GaugeRef, RecyclerRef, ReuseQueueRef>
    RecyclingShard<GaugeRef, RecyclerRef, ReuseQueueRef>
where
    GaugeRef: ops::Deref<Target = RawT::Atomic>,
    RecyclerRef: ops::DerefMut<Target = T>,
    ReuseQueueRef: ops::DerefMut<Target = Vec<RawT>>,
{
    fn allocate(&mut self, hint: T::Hint) -> RawT {
        if let Some(id) = self.recycler.poll(hint) {
            self.reuse_queue.push(id);
            id
        } else {
            self.global_gauge.fetch_add(1)
        }
    }
}

#[cfg(test)]
mod tests;