use core::{alloc::Layout, hint, iter::FusedIterator, marker::PhantomData, mem, ptr::NonNull};
use std::alloc::{alloc, dealloc, handle_alloc_error};
use crate::index::hash_table::{
bitmask::{BitMask, BitMaskIter},
imp::Group,
};
use arcon_state::data::{Key, Value};
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum CollectionAllocErr {
CapacityOverflow,
AllocErr {
layout: Layout,
},
}
#[inline]
unsafe fn offset_from<T>(to: *const T, from: *const T) -> usize {
(to as usize - from as usize) / mem::size_of::<T>()
}
#[derive(Copy, Clone)]
#[allow(dead_code)]
enum Fallibility {
Fallible,
Infallible,
}
impl Fallibility {
#[inline]
fn capacity_overflow(self) -> CollectionAllocErr {
match self {
Fallibility::Fallible => CollectionAllocErr::CapacityOverflow,
Fallibility::Infallible => panic!("Hash table capacity overflow"),
}
}
#[inline]
fn alloc_err(self, layout: Layout) -> CollectionAllocErr {
match self {
Fallibility::Fallible => CollectionAllocErr::AllocErr { layout },
Fallibility::Infallible => handle_alloc_error(layout),
}
}
}
pub(crate) const EMPTY: u8 = 0b1111_1111;
pub(crate) const MODIFIED: u8 = 0b1111_1110;
pub(crate) const MODIFIED_TOUCHED: u8 = 0b1000_0001;
pub(crate) const SAFE: u8 = 0b0000_0000;
pub(crate) const SAFE_TOUCHED: u8 = 0b0000_0010;
#[inline]
fn is_safe(meta: u8) -> bool {
meta & 0x80 == 0
}
#[inline]
#[allow(clippy::cast_possible_truncation)]
fn h1(hash: u64) -> usize {
hash as usize
}
#[inline]
#[allow(clippy::cast_possible_truncation)]
fn h2(hash: u64) -> u8 {
let hash_len = usize::min(mem::size_of::<usize>(), mem::size_of::<u64>());
let top7 = hash >> (hash_len * 8 - 7);
(top7 & 0x7f) as u8 }
struct ProbeSeq {
bucket_mask: usize,
pos: usize,
stride: usize,
probe_counter: usize,
probe_limit: usize,
}
impl Iterator for ProbeSeq {
type Item = usize;
#[inline]
fn next(&mut self) -> Option<usize> {
if self.probe_counter >= self.probe_limit || self.stride >= self.bucket_mask {
return None;
}
let result = self.pos;
self.stride += Group::WIDTH;
self.pos += self.stride;
self.pos &= self.bucket_mask;
self.probe_counter += 1;
Some(result)
}
}
#[inline]
fn calculate_layout<A, B>(
mod_buckets: usize,
read_buckets: usize,
) -> Option<(Layout, usize, usize, usize)> {
debug_assert!(mod_buckets.is_power_of_two());
debug_assert!(read_buckets.is_power_of_two());
let mod_data = Layout::array::<A>(mod_buckets).ok()?;
let read_data = Layout::array::<B>(read_buckets).ok()?;
let (bucket_layout, bucket_offset) = mod_data.extend(read_data).ok()?;
let mod_ctrl =
unsafe { Layout::from_size_align_unchecked(mod_buckets + Group::WIDTH, Group::WIDTH) };
let read_ctrl =
unsafe { Layout::from_size_align_unchecked(read_buckets + Group::WIDTH, Group::WIDTH) };
let (ctrl, read_ctrl_offset) = mod_ctrl.extend(read_ctrl).ok()?;
let (full_layout, ctrl_offset) = bucket_layout.extend(ctrl).ok()?;
Some((full_layout, bucket_offset, ctrl_offset, read_ctrl_offset))
}
fn meta_layout(buckets: usize) -> Option<Layout> {
debug_assert!(buckets.is_power_of_two());
let ctrl = unsafe { Layout::from_size_align_unchecked(buckets + Group::WIDTH, Group::WIDTH) };
Some(ctrl)
}
pub struct Bucket<T> {
ptr: NonNull<T>,
}
impl<T> Clone for Bucket<T> {
#[inline]
fn clone(&self) -> Self {
Self { ptr: self.ptr }
}
}
impl<T> Bucket<T> {
#[inline]
unsafe fn from_base_index(base: NonNull<T>, index: usize) -> Self {
let ptr = if mem::size_of::<T>() == 0 {
(index + 1) as *mut T
} else {
base.as_ptr().sub(index)
};
Self {
ptr: NonNull::new_unchecked(ptr),
}
}
#[inline]
pub unsafe fn as_ptr(&self) -> *mut T {
if mem::size_of::<T>() == 0 {
mem::align_of::<T>() as *mut T
} else {
self.ptr.as_ptr().sub(1)
}
}
#[inline]
unsafe fn next_n(&self, offset: usize) -> Self {
let ptr = if mem::size_of::<T>() == 0 {
(self.ptr.as_ptr() as usize + offset) as *mut T
} else {
self.ptr.as_ptr().sub(offset)
};
Self {
ptr: NonNull::new_unchecked(ptr),
}
}
#[inline]
pub unsafe fn drop(&self) {
self.as_ptr().drop_in_place();
}
#[inline]
pub unsafe fn read(&self) -> T {
self.as_ptr().read()
}
#[inline]
pub unsafe fn write(&self, val: T) {
self.as_ptr().write(val);
}
#[inline]
pub unsafe fn as_ref<'a>(&self) -> &'a T {
&*self.as_ptr()
}
#[inline]
pub unsafe fn as_mut<'a>(&self) -> &'a mut T {
&mut *self.as_ptr()
}
}
pub struct RawTable<K, V>
where
K: Key,
V: Value,
{
mod_ctrl: NonNull<u8>,
mod_mask: usize,
read_ctrl: NonNull<u8>,
read_mask: usize,
mod_bucket: NonNull<u8>,
mod_meta: NonNull<u8>,
mod_counter: usize,
items: usize,
marker: PhantomData<(K, V)>,
}
impl<K, V> RawTable<K, V>
where
K: Key,
V: Value,
{
#[inline]
unsafe fn new_uninitialized(
mod_lane_buckets: usize,
read_lane_buckets: usize,
fallability: Fallibility,
) -> Result<Self, CollectionAllocErr> {
debug_assert!(mod_lane_buckets.is_power_of_two());
debug_assert!(read_lane_buckets.is_power_of_two());
let (layout, bucket_offset, ctrl_offset, read_ctrl_offset) =
match calculate_layout::<(K, V, u64), (K, V)>(mod_lane_buckets, read_lane_buckets) {
Some(lco) => lco,
None => return Err(fallability.capacity_overflow()),
};
let table_alloc = match NonNull::new(alloc(layout)) {
Some(ptr) => ptr,
None => return Err(fallability.alloc_err(layout)),
};
let mod_ctrl = NonNull::new_unchecked(table_alloc.as_ptr().add(ctrl_offset));
let read_ctrl =
NonNull::new_unchecked(table_alloc.as_ptr().add(ctrl_offset + read_ctrl_offset));
let mod_bucket = NonNull::new_unchecked(mod_ctrl.as_ptr().sub(ctrl_offset - bucket_offset));
let layout = match meta_layout(mod_lane_buckets) {
Some(lco) => lco,
None => return Err(fallability.capacity_overflow()),
};
let mod_meta = match NonNull::new(alloc(layout)) {
Some(ptr) => ptr,
None => return Err(fallability.alloc_err(layout)),
};
Ok(Self {
items: 0,
mod_ctrl,
read_ctrl,
read_mask: read_lane_buckets - 1,
mod_meta,
mod_counter: 0,
mod_mask: mod_lane_buckets - 1,
mod_bucket,
marker: PhantomData,
})
}
fn try_with_capacity(
read_capacity: usize,
mod_capacity: usize,
fallability: Fallibility,
) -> Result<Self, CollectionAllocErr> {
assert!(read_capacity > 32, "Capacity size must be larger than 32");
assert!(mod_capacity > 32, "Capacity size must be larger than 32");
unsafe {
let result = Self::new_uninitialized(mod_capacity, read_capacity, fallability)?;
result
.mod_ctrl(0)
.write_bytes(EMPTY, result.mod_ctrl_bytes());
result.ctrl(0).write_bytes(EMPTY, result.read_ctrl_bytes());
result.meta(0).write_bytes(SAFE, result.mod_ctrl_bytes());
Ok(result)
}
}
pub fn with_capacity(mod_capacity: usize, read_capacity: usize) -> Self {
Self::try_with_capacity(read_capacity, mod_capacity, Fallibility::Infallible)
.unwrap_or_else(|_| unsafe { hint::unreachable_unchecked() })
}
#[inline]
unsafe fn free_buckets(&mut self) {
let (layout, _, ctrl_offset, _) =
calculate_layout::<(K, V, u64), (K, V)>(self.mod_buckets(), self.read_buckets())
.unwrap_or_else(|| hint::unreachable_unchecked());
dealloc(self.mod_ctrl.as_ptr().sub(ctrl_offset), layout);
let mod_meta_layout =
meta_layout(self.mod_buckets()).unwrap_or_else(|| hint::unreachable_unchecked());
dealloc(self.mod_meta.as_ptr(), mod_meta_layout);
}
#[inline]
pub unsafe fn data_end(&self) -> NonNull<(K, V)> {
NonNull::new_unchecked(self.mod_ctrl.as_ptr() as *mut (K, V))
}
#[inline]
pub unsafe fn mod_data_end(&self) -> NonNull<(K, V, u64)> {
NonNull::new_unchecked(self.mod_bucket.as_ptr() as *mut (K, V, u64))
}
#[inline]
unsafe fn ctrl(&self, index: usize) -> *mut u8 {
debug_assert!(index < self.read_ctrl_bytes());
self.read_ctrl.as_ptr().add(index)
}
#[inline]
unsafe fn mod_ctrl(&self, index: usize) -> *mut u8 {
debug_assert!(index < self.mod_ctrl_bytes());
self.mod_ctrl.as_ptr().add(index)
}
#[inline]
unsafe fn meta(&self, index: usize) -> *mut u8 {
debug_assert!(index < self.mod_ctrl_bytes());
self.mod_meta.as_ptr().add(index)
}
#[inline]
pub unsafe fn read_bucket(&self, index: usize) -> Bucket<(K, V)> {
Bucket::from_base_index(
NonNull::new_unchecked(self.mod_ctrl.as_ptr() as *mut (K, V)),
index,
)
}
#[inline]
pub unsafe fn mod_bucket(&self, index: usize) -> Bucket<(K, V, u64)> {
Bucket::from_base_index(
NonNull::new_unchecked(self.mod_bucket.as_ptr() as *mut (K, V, u64)),
index,
)
}
#[inline]
fn probe_read(&self, hash: u64) -> ProbeSeq {
ProbeSeq {
bucket_mask: self.read_mask,
pos: h1(hash) & self.read_mask,
stride: 0,
probe_counter: 0,
probe_limit: 16, }
}
#[inline]
fn probe_mod(&self, hash: u64) -> ProbeSeq {
ProbeSeq {
bucket_mask: self.mod_mask,
pos: h1(hash) & self.mod_mask,
stride: 0,
probe_counter: 0,
probe_limit: 16, }
}
#[inline]
unsafe fn set_meta(&self, index: usize, meta: u8) {
self.set_lane_byte(index, meta, self.mod_meta.as_ptr(), self.mod_mask);
}
#[inline]
unsafe fn set_mod_ctrl(&self, index: usize, ctrl: u8) {
debug_assert!(index < self.mod_ctrl_bytes());
self.set_lane_byte(index, ctrl, self.mod_ctrl.as_ptr(), self.mod_mask);
}
#[inline]
unsafe fn set_read_ctrl(&self, index: usize, ctrl: u8) {
debug_assert!(index < self.read_ctrl_bytes());
self.set_lane_byte(index, ctrl, self.read_ctrl.as_ptr(), self.read_mask);
}
#[inline]
unsafe fn set_lane_byte(&self, index: usize, ctrl: u8, ptr: *mut u8, mask: usize) {
let index2 = ((index.wrapping_sub(Group::WIDTH)) & mask) + Group::WIDTH;
*ptr.add(index) = ctrl;
*ptr.add(index2) = ctrl;
}
#[inline]
unsafe fn find_insert_slot(
&self,
probe: impl Iterator<Item = usize>,
mask: usize,
ptr: *mut u8,
) -> Option<usize> {
for pos in probe {
let group = Group::load(ptr.add(pos));
if let Some(bit) = group.match_empty().lowest_set_bit() {
let result = (pos + bit) & mask;
return Some(result);
}
}
None
}
#[inline(always)]
pub fn insert_read_lane(&mut self, hash: u64, record: (K, V)) {
unsafe {
if let Some(index) = self.find_insert_slot(
self.probe_read(hash),
self.read_mask,
self.read_ctrl.as_ptr(),
) {
let bucket = self.read_bucket(index);
self.set_read_ctrl(index, h2(hash));
bucket.write(record);
} else {
let pos = self.probe_read(hash).next().unwrap();
let group = Group::load(self.read_ctrl.as_ptr().add(pos));
for bit in group.match_full() {
let index = (pos + bit) & self.read_mask;
let bucket = self.read_bucket(index);
let _ = bucket.drop();
self.set_read_ctrl(index, EMPTY);
}
let insert_index = pos & self.read_mask;
let bucket = self.read_bucket(insert_index);
self.set_read_ctrl(insert_index, h2(hash));
bucket.write(record);
}
}
}
#[inline(always)]
pub fn insert_mod_lane(
&mut self,
hash: u64,
value: (K, V),
) -> Option<(ProbeModIterator<K, V>, (K, V))> {
unsafe {
let index = match self.find_insert_slot(
self.probe_mod(hash),
self.mod_mask,
self.mod_ctrl.as_ptr(),
) {
Some(index) => index,
None => {
return Some((ProbeModIterator::new(self, hash), value));
}
};
self.set_mod_ctrl(index, h2(hash));
self.set_meta(index, MODIFIED);
let bucket = self.mod_bucket(index);
bucket.write((value.0, value.1, hash));
self.mod_counter += 1;
None
}
}
#[inline]
pub fn remove(&mut self, hash: u64, mut eq: impl FnMut((&K, &V)) -> bool) -> Option<(K, V)> {
unsafe {
for pos in self.probe_mod(hash) {
let group = Group::load(self.mod_ctrl(pos));
for bit in group.match_byte(h2(hash)) {
let index = (pos + bit) & self.mod_mask;
let bucket = self.mod_bucket(index);
let &(ref key, ref value, _) = bucket.as_ref();
if eq((key, value)) {
let (key, value, _) = bucket.read();
self.set_mod_ctrl(index, EMPTY);
self.set_meta(index, SAFE);
return Some((key, value));
}
}
}
self.take_read_lane(hash, eq)
}
}
#[inline(always)]
pub fn find_mod_lane_mut(
&mut self,
hash: u64,
mut eq: impl FnMut((&K, &V)) -> bool,
) -> Option<&mut V> {
unsafe {
for pos in self.probe_mod(hash) {
let group = Group::load(self.mod_ctrl(pos));
for bit in group.match_byte(h2(hash)) {
let index = (pos + bit) & self.mod_mask;
let bucket = self.mod_bucket(index);
let &mut (ref key, ref mut value, _) = bucket.as_mut();
if eq((key, value)) {
if is_safe(*self.meta(index)) {
self.mod_counter += 1;
}
self.set_meta(index, MODIFIED_TOUCHED);
return Some(value);
}
}
}
None
}
}
#[inline(always)]
pub fn take_read_lane(
&mut self,
hash: u64,
mut eq: impl FnMut((&K, &V)) -> bool,
) -> Option<(K, V)> {
unsafe {
for pos in self.probe_read(hash) {
let group = Group::load(self.ctrl(pos));
for bit in group.match_byte(h2(hash)) {
let index = (pos + bit) & self.read_mask;
let bucket = self.read_bucket(index);
let &(ref key, ref value) = bucket.as_ref();
if eq((key, value)) {
self.set_read_ctrl(index, EMPTY);
return Some(bucket.read());
}
}
}
}
None
}
#[inline(always)]
pub fn find(&self, hash: u64, mut eq: impl FnMut((&K, &V)) -> bool) -> Option<(&K, &V)> {
unsafe {
for pos in self.probe_mod(hash) {
let group = Group::load(self.mod_ctrl(pos));
for bit in group.match_byte(h2(hash)) {
let index = (pos + bit) & self.mod_mask;
let bucket = self.mod_bucket(index);
let &(ref key, ref value, _) = bucket.as_ref();
if eq((key, value)) {
if is_safe(*self.meta(index)) {
self.set_meta(index, SAFE_TOUCHED);
}
return Some((key, value));
}
}
}
for pos in self.probe_read(hash) {
let group = Group::load(self.ctrl(pos));
for bit in group.match_byte(h2(hash)) {
let index = (pos + bit) & self.read_mask;
let bucket = self.read_bucket(index);
let &(ref key, ref value) = bucket.as_ref();
if eq((key, value)) {
return Some((key, value));
}
}
}
}
None
}
#[inline]
pub fn len(&self) -> usize {
self.items
}
#[inline]
pub fn mod_buckets(&self) -> usize {
self.mod_mask + 1
}
#[inline]
pub fn read_buckets(&self) -> usize {
self.read_mask + 1
}
#[inline]
fn read_ctrl_bytes(&self) -> usize {
self.read_mask + 1 + Group::WIDTH
}
#[inline]
fn mod_ctrl_bytes(&self) -> usize {
self.mod_mask + 1 + Group::WIDTH
}
#[inline]
pub(crate) unsafe fn iter_modified(&mut self) -> TableModIterator<K, V> {
TableModIterator::new(self, 0)
}
#[inline]
pub unsafe fn mod_lane_iter(&self) -> RawIter<(K, V, u64)> {
let data = Bucket::from_base_index(self.mod_data_end(), 0);
RawIter {
iter: RawIterRange::new(self.mod_ctrl.as_ptr(), data, self.mod_buckets()),
items: self.mod_buckets(),
}
}
#[inline]
pub unsafe fn read_lane_iter(&self) -> RawIter<(K, V)> {
let data = Bucket::from_base_index(self.data_end(), 0);
RawIter {
iter: RawIterRange::new(self.read_ctrl.as_ptr(), data, self.read_buckets()),
items: self.read_buckets(),
}
}
}
unsafe impl<K, V> Send for RawTable<K, V>
where
K: Key + Send,
V: Value + Send,
{
}
unsafe impl<K, V> Sync for RawTable<K, V>
where
K: Key + Send,
V: Value + Send,
{
}
impl<K, V> Drop for RawTable<K, V>
where
K: Key,
V: Value,
{
#[inline]
fn drop(&mut self) {
unsafe {
if mem::needs_drop::<(K, V)>() {
for item in self.mod_lane_iter() {
item.drop();
}
for item in self.read_lane_iter() {
item.drop();
}
}
self.free_buckets();
}
}
}
pub struct ProbeModIterator<'a, K, V>
where
K: Key,
V: Value,
{
pub(crate) iter: RawModIterator<'a, K, V>,
hash: u64,
_marker: std::marker::PhantomData<&'a ()>,
}
impl<'a, K, V> ProbeModIterator<'a, K, V>
where
K: Key,
V: Value,
{
fn new(table: &'a mut RawTable<K, V>, hash: u64) -> Self {
unsafe {
let mut probe_seq = table.probe_mod(hash);
let pos = probe_seq.next().unwrap();
let group = Group::load(table.meta(pos));
let bitmask = group.match_modified().into_iter();
let iter = RawModIterator {
table,
probe_seq,
group,
pos,
bitmask,
erased_buckets: 0,
};
ProbeModIterator {
iter,
hash,
_marker: PhantomData,
}
}
}
}
impl<'a, K: 'a, V: 'a> Iterator for ProbeModIterator<'a, K, V>
where
K: Key,
V: Value,
{
type Item = (&'a K, &'a V);
#[inline]
fn next(&mut self) -> Option<(&'a K, &'a V)> {
if let Some(bucket) = self.iter.next() {
Some(bucket)
} else {
if self.iter.erased_buckets == 0 {
let mut cleared_groups = 0;
let raw_table = &mut self.iter.table;
for pos in raw_table.probe_mod(self.hash) {
unsafe {
let group = Group::load(raw_table.meta(pos));
let bitmask = group.match_byte(SAFE_TOUCHED);
if bitmask.any_bit_set() {
cleared_groups += 1;
}
for bit in bitmask {
let index = (pos + bit) & raw_table.mod_mask;
let bucket = raw_table.mod_bucket(index);
let (key, value, hash) = bucket.read();
raw_table.insert_read_lane(hash, (key, value));
raw_table.set_mod_ctrl(index, EMPTY);
raw_table.set_meta(index, SAFE);
}
if cleared_groups > 1 {
return None;
}
};
}
}
None
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
pub struct RawModIterator<'a, K, V>
where
K: Key,
V: Value,
{
table: &'a mut RawTable<K, V>,
probe_seq: ProbeSeq,
group: Group,
pos: usize,
bitmask: BitMaskIter,
erased_buckets: usize,
}
impl<'a, K, V> Iterator for RawModIterator<'a, K, V>
where
K: Key,
V: Value,
{
type Item = (&'a K, &'a V);
fn next(&mut self) -> Option<(&'a K, &'a V)> {
unsafe {
loop {
if let Some(bit) = self.bitmask.next() {
let index = (self.pos + bit) & self.table.mod_mask;
self.table.mod_counter -= 1;
let bucket = self.table.mod_bucket(index);
let (ref key, ref value, _) = bucket.as_ref();
return Some((key, value));
}
self.group = self.group.convert_mod_to_safe(self.table.meta(self.pos));
for bit in self.group.match_byte(SAFE) {
let index = (self.pos + bit) & self.table.mod_mask;
let bucket = self.table.mod_bucket(index);
self.table.set_mod_ctrl(index, EMPTY);
let (key, value, hash) = bucket.read();
self.table.insert_read_lane(hash, (key, value));
self.erased_buckets += 1;
}
if let Some(pos) = self.probe_seq.next() {
self.pos = pos;
self.group = Group::load(self.table.meta(self.pos));
self.bitmask = self.group.match_modified().into_iter();
} else {
return None;
}
}
}
}
}
pub struct TableModIterator<'a, K, V>
where
K: Key,
V: Value,
{
table: &'a mut RawTable<K, V>,
group: Group,
pos: usize,
bitmask: BitMaskIter,
buckets: usize,
}
impl<'a, K, V> TableModIterator<'a, K, V>
where
K: Key,
V: Value,
{
fn new(table: &'a mut RawTable<K, V>, pos: usize) -> Self {
debug_assert_eq!(pos % Group::WIDTH, 0);
unsafe {
let group = Group::load_aligned(table.meta(pos));
let bitmask = group.match_modified().into_iter();
let buckets = table.mod_buckets();
TableModIterator {
table,
group,
pos,
bitmask,
buckets,
}
}
}
}
impl<'a, K, V> Iterator for TableModIterator<'a, K, V>
where
K: Key,
V: Value,
{
type Item = (&'a K, &'a V);
fn next(&mut self) -> Option<(&'a K, &'a V)> {
unsafe {
loop {
if let Some(bit) = self.bitmask.next() {
let index = (self.pos + bit) & self.table.mod_mask;
self.table.mod_counter -= 1;
let bucket = self.table.mod_bucket(index);
let (ref key, ref value, _) = bucket.as_ref();
return Some((key, value));
}
self.group = self.group.convert_mod_to_safe(self.table.meta(self.pos));
if self.table.mod_counter == 0 {
return None;
}
if self.pos < self.buckets {
self.pos += Group::WIDTH;
self.group = Group::load_aligned(self.table.meta(self.pos));
self.bitmask = self.group.match_modified().into_iter();
} else {
return None;
}
}
}
}
}
pub(crate) struct RawIterRange<T> {
current_group: BitMask,
data: Bucket<T>,
next_ctrl: *const u8,
end: *const u8,
}
impl<T> RawIterRange<T> {
#[inline]
unsafe fn new(ctrl: *const u8, data: Bucket<T>, len: usize) -> Self {
debug_assert_ne!(len, 0);
debug_assert_eq!(ctrl as usize % Group::WIDTH, 0);
let end = ctrl.add(len);
let current_group = Group::load_aligned(ctrl).match_full();
let next_ctrl = ctrl.add(Group::WIDTH);
Self {
current_group,
data,
next_ctrl,
end,
}
}
}
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<T> Send for RawIterRange<T> {}
unsafe impl<T> Sync for RawIterRange<T> {}
impl<T> Clone for RawIterRange<T> {
#[inline]
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
next_ctrl: self.next_ctrl,
current_group: self.current_group,
end: self.end,
}
}
}
impl<T> Iterator for RawIterRange<T> {
type Item = Bucket<T>;
#[inline]
fn next(&mut self) -> Option<Bucket<T>> {
unsafe {
loop {
if let Some(index) = self.current_group.lowest_set_bit() {
self.current_group = self.current_group.remove_lowest_bit();
return Some(self.data.next_n(index));
}
if self.next_ctrl >= self.end {
return None;
}
self.current_group = Group::load_aligned(self.next_ctrl).match_full();
self.data = self.data.next_n(Group::WIDTH);
self.next_ctrl = self.next_ctrl.add(Group::WIDTH);
}
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(
0,
Some(unsafe { offset_from(self.end, self.next_ctrl) + Group::WIDTH }),
)
}
}
impl<T> FusedIterator for RawIterRange<T> {}
pub struct RawIter<T> {
pub(crate) iter: RawIterRange<T>,
items: usize,
}
impl<T> Clone for RawIter<T> {
#[inline]
fn clone(&self) -> Self {
Self {
iter: self.iter.clone(),
items: self.items,
}
}
}
impl<T> Iterator for RawIter<T> {
type Item = Bucket<T>;
#[inline]
fn next(&mut self) -> Option<Bucket<T>> {
if let Some(b) = self.iter.next() {
self.items -= 1;
Some(b)
} else {
None
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.items, Some(self.items))
}
}
impl<T> ExactSizeIterator for RawIter<T> {}
impl<T> FusedIterator for RawIter<T> {}