use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use std::fmt::Debug;
use std::hash::{BuildHasher, Hash, Hasher};
use std::mem::ManuallyDrop;
use std::num::NonZeroU8;
use std::sync::Mutex;
#[cfg(feature = "_serial")]
pub mod serial;
pub struct ConcurrentInterner<RS: Send + Sync + Clone + BuildHasher> {
indices: ConcurrentInternerMap<RS>,
storage: Mutex<ConcurrentInternerStorage>,
}
type ConcurrentInternerMap<RS> = DashMap<SSOStringRef, IStr, RS>;
impl<RS: Send + Sync + Clone + BuildHasher> std::fmt::Debug for ConcurrentInterner<RS> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut dbg_map = f.debug_map();
for kvref in self.indices.iter() {
let key_bytes = unsafe { kvref.key().as_bytes() };
let key_str = std::str::from_utf8(key_bytes).expect("invariant violation: invalid utf-8 in storage");
dbg_map.entry(&key_str, kvref.value());
}
dbg_map.finish()
}
}
impl<RS: Send + Sync + Clone + BuildHasher> ConcurrentInterner<RS> {
const LOCK_ACQUIRE_ERROR_MSG: &'static str = "Unable to acquire lock for interner storage.";
}
impl<RS: Send + Sync + Clone + BuildHasher> ConcurrentInterner<RS> {
pub fn new(arena_count: NonZeroU8, random_state: RS) -> ConcurrentInterner<RS> {
ConcurrentInterner {
indices: ConcurrentInternerMap::with_capacity_and_hasher(64, random_state),
storage: Mutex::new(ConcurrentInternerStorage {
handed_out_count: 0,
boxes: (0..arena_count.get())
.map(|i| Box::new(ConcurrentInternerMemberStorage::new(i)))
.collect(),
}),
}
}
}
impl<RS: Send + Sync + Clone + BuildHasher> ConcurrentInterner<RS> {
pub fn get_member(&self) -> ConcurrentInternerMember<'_, RS> {
let mut lock = self.storage.lock().expect(Self::LOCK_ACQUIRE_ERROR_MSG);
let storage = lock
.boxes
.pop()
.expect("All ConcurrentInternerMembers are taken!");
lock.handed_out_count += 1;
ConcurrentInternerMember {
indices: &self.indices,
storage: ManuallyDrop::new(storage),
owner: self,
}
}
pub fn get_string(&self, istr: IStr) -> String {
debug_assert!(self.is_organized(), "Forgot to call reorganize?");
let storage = &self
.storage
.lock()
.expect(Self::LOCK_ACQUIRE_ERROR_MSG)
.boxes[istr.arena_id() as usize];
storage.get_str(istr).to_string()
}
}
impl<RS: Send + Sync + Clone + BuildHasher> ConcurrentInterner<RS> {
pub fn freeze(self) -> FrozenInterner<RS> {
debug_assert!(self.is_organized(), "Forgot to call reorganize?");
FrozenInterner {
indices: self.indices,
frozen_storage: self
.storage
.into_inner()
.expect(Self::LOCK_ACQUIRE_ERROR_MSG),
}
}
}
impl<RS: Send + Sync + Clone + BuildHasher> ConcurrentInterner<RS> {
fn is_organized(&self) -> bool {
let storage_vec = self.storage.lock().expect(Self::LOCK_ACQUIRE_ERROR_MSG);
return storage_vec
.boxes
.iter()
.enumerate()
.all(|(i, member)| (member.arena_id as usize) == i);
}
}
pub struct ConcurrentInternerMember<'i, RS: Send + Sync + Clone + BuildHasher> {
indices: &'i ConcurrentInternerMap<RS>,
storage: ManuallyDrop<Box<ConcurrentInternerMemberStorage>>,
owner: &'i ConcurrentInterner<RS>,
}
impl<RS: Send + Sync + Clone + BuildHasher> Drop for ConcurrentInternerMember<'_, RS> {
fn drop(&mut self) {
let mut guard = self
.owner
.storage
.lock()
.expect(ConcurrentInterner::<RS>::LOCK_ACQUIRE_ERROR_MSG);
let storage = unsafe { ManuallyDrop::take(&mut self.storage) };
guard.boxes.push(storage);
guard.handed_out_count -= 1;
if guard.handed_out_count == 0 {
guard.boxes.sort_by_key(|s| s.arena_id);
}
}
}
impl<'i, RS: Send + Sync + Clone + BuildHasher> ConcurrentInternerMember<'i, RS> {
const STORAGE_CHUNK_SIZE: usize = ConcurrentInternerMemberStorage::STORAGE_CHUNK_SIZE;
const CHUNK_SLICE_ERROR_MSG: &'static str =
ConcurrentInternerMemberStorage::CHUNK_SLICE_ERROR_MSG;
}
impl<'i, RS: Send + Sync + Clone + BuildHasher> ConcurrentInternerMember<'i, RS> {
pub fn intern(&mut self, s: &str) -> IStr {
let sref = SSOStringRef::new(s);
if let Some(istr) = self.indices.get(&sref) {
return istr.value().clone();
}
let istr: IStr;
let potential_new_key: SSOStringRef;
let storage = self.storage.as_mut();
let composite_index: CompositeIndex;
if s.len() <= storage.bytes_left {
let chunk_index = storage.chunks.len() - 1;
let start = Self::STORAGE_CHUNK_SIZE - storage.bytes_left;
let range = start..start + s.len();
storage.chunks[chunk_index][range.clone()].copy_from_slice(s.as_bytes());
storage.bytes_left -= s.len();
potential_new_key = unsafe {
SSOStringRef::new_unchecked(
&storage.chunks[chunk_index]
.get(range.clone())
.expect(Self::CHUNK_SLICE_ERROR_MSG),
)
};
debug_assert!(start <= u16::MAX as usize);
composite_index =
CompositeIndex::new_chunk_index(chunk_index, start as u16, s.len() as u16);
} else if s.len() <= Self::STORAGE_CHUNK_SIZE {
let chunk_index = storage.chunks.len();
storage.chunks.push(Box::new(
[0; ConcurrentInternerMemberStorage::STORAGE_CHUNK_SIZE],
));
let range = 0..s.len();
storage.chunks[chunk_index][range.clone()].copy_from_slice(s.as_bytes());
storage.bytes_left = Self::STORAGE_CHUNK_SIZE - s.len();
potential_new_key = unsafe {
SSOStringRef::new_unchecked(
&storage.chunks[chunk_index]
.get(range)
.expect(Self::CHUNK_SLICE_ERROR_MSG),
)
};
composite_index = CompositeIndex::new_chunk_index(chunk_index, 0, s.len() as u16);
} else {
let huge_index = storage.huge_strings.len();
storage.huge_strings.push(s.to_string());
potential_new_key = SSOStringRef::new(storage.huge_strings[huge_index].as_str());
composite_index = CompositeIndex::new_huge_index(huge_index);
}
let arena_local_index = storage.alloc_indices.len();
storage.alloc_indices.push(composite_index);
istr = IStr::new(storage.arena_id, arena_local_index);
match self.indices.entry(potential_new_key) {
Entry::Occupied(o) => {
self.storage.wasted_bytes += s.len();
return *o.get();
}
Entry::Vacant(v) => {
v.insert(istr);
return istr;
}
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
pub struct IStr {
raw: u32,
}
impl std::fmt::Debug for IStr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IStr")
.field("arena_id", &self.arena_id())
.field("arena_local_id", &self.arena_local_id())
.finish()
}
}
impl IStr {
fn new(arena_id: u8, arena_local_index: usize) -> IStr {
std::debug_assert!(arena_local_index < (2 << 24));
IStr {
raw: ((arena_id as u32) << 24) | (arena_local_index as u32),
}
}
const fn arena_id(&self) -> u8 {
(self.raw >> 24) as u8
}
const fn arena_local_id(&self) -> u32 {
(self.raw << 8) >> 8
}
}
struct ConcurrentInternerStorage {
handed_out_count: u8,
boxes: Vec<Box<ConcurrentInternerMemberStorage>>,
}
struct ConcurrentInternerMemberStorage {
arena_id: u8,
bytes_left: usize,
alloc_indices: Vec<CompositeIndex>,
chunks: Vec<Box<[u8; Self::STORAGE_CHUNK_SIZE]>>,
huge_strings: Vec<String>,
wasted_bytes: usize,
}
impl ConcurrentInternerMemberStorage {
#[cfg(all(target_arch = "aarch64", any(target_os = "macos", target_os = "ios")))]
const STORAGE_CHUNK_SIZE: usize = 16384 - 24;
#[cfg(not(all(target_arch = "aarch64", any(target_os = "macos", target_os = "ios"))))]
const STORAGE_CHUNK_SIZE: usize = 4192 - 24;
const CHUNK_SLICE_ERROR_MSG: &'static str = "Trying to slice chunk with out-of-bounds indices.";
}
impl ConcurrentInternerMemberStorage {
fn new(original_index: u8) -> ConcurrentInternerMemberStorage {
ConcurrentInternerMemberStorage {
arena_id: original_index,
bytes_left: Self::STORAGE_CHUNK_SIZE,
alloc_indices: vec![],
chunks: vec![Box::new([0; Self::STORAGE_CHUNK_SIZE])],
huge_strings: vec![],
wasted_bytes: 0,
}
}
}
impl ConcurrentInternerMemberStorage {
fn get_str(&self, istr: IStr) -> &str {
let alloc_id = istr.arena_local_id();
let composite_index = self.alloc_indices[alloc_id as usize];
if let Some(huge_index) = composite_index.huge_index() {
return &self.huge_strings[huge_index as usize];
}
let (chunk_index, chunk_start_offset, len) = composite_index
.chunk_info()
.expect("Unexpected huge string index instead of chunk index.");
let (chunk_index, chunk_start_offset, len) = (
chunk_index as usize,
chunk_start_offset as usize,
len as usize,
);
let bytes = self.chunks[chunk_index]
.get(chunk_start_offset..(chunk_start_offset + len))
.expect(Self::CHUNK_SLICE_ERROR_MSG);
std::str::from_utf8(bytes).expect("Expected valid UTF-8")
}
}
pub struct FrozenInterner<RS: Send + Sync + Clone + BuildHasher> {
indices: ConcurrentInternerMap<RS>,
frozen_storage: ConcurrentInternerStorage,
}
impl<RS: Send + Sync + Clone + BuildHasher> From<ConcurrentInterner<RS>> for FrozenInterner<RS> {
fn from(interner: ConcurrentInterner<RS>) -> FrozenInterner<RS> {
interner.freeze()
}
}
impl<RS: Send + Sync + Clone + BuildHasher> std::fmt::Debug for FrozenInterner<RS> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut dbg_map = f.debug_map();
for kvref in self.indices.iter() {
let key_bytes = unsafe { kvref.key().as_bytes() };
let key_str = std::str::from_utf8(key_bytes).expect("invariant violation: invalid utf-8 in storage");
dbg_map.entry(&key_str, kvref.value());
}
dbg_map.finish()
}
}
impl<RS: Send + Sync + Clone + BuildHasher> FrozenInterner<RS> {
pub fn get_str(&self, istr: IStr) -> &str {
self.frozen_storage.boxes[istr.arena_id() as usize].get_str(istr)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
struct ArenaLocalIndex {
raw: u32,
}
impl ArenaLocalIndex {
#[inline]
fn new_chunk_index(index: u32) -> ArenaLocalIndex {
debug_assert!(index >> 31 == 0);
ArenaLocalIndex { raw: index }
}
#[inline]
fn new_huge_index(index: u32) -> ArenaLocalIndex {
debug_assert!(index >> 31 == 0);
ArenaLocalIndex {
raw: (1 << 31) | index,
}
}
}
impl ArenaLocalIndex {
#[inline]
const fn get_index_unchecked(&self) -> u32 {
(self.raw << 1) >> 1
}
#[inline]
const fn is_huge(&self) -> bool {
(self.raw >> 31) == 1
}
#[inline]
const fn chunk_index(&self) -> Option<u32> {
if self.is_huge() {
None
} else {
Some(self.get_index_unchecked())
}
}
#[inline]
const fn huge_index(&self) -> Option<u32> {
if self.is_huge() {
Some(self.get_index_unchecked())
} else {
None
}
}
}
#[derive(Copy, Clone, Debug)]
struct CompositeIndex {
arena_local_index: ArenaLocalIndex,
chunk_start_offset: u16,
len: u16,
}
impl CompositeIndex {
#[inline]
fn new_huge_index(index: usize) -> CompositeIndex {
debug_assert!(index <= u32::MAX as usize);
CompositeIndex {
arena_local_index: ArenaLocalIndex::new_huge_index(index as u32),
chunk_start_offset: u16::max_value(),
len: u16::max_value(),
} }
#[inline]
fn new_chunk_index(index: usize, chunk_start_offset: u16, len: u16) -> CompositeIndex {
debug_assert!(index <= u32::MAX as usize);
debug_assert!(
(chunk_start_offset as usize) < ConcurrentInternerMemberStorage::STORAGE_CHUNK_SIZE
);
debug_assert!(
chunk_start_offset as usize + (len as usize)
<= ConcurrentInternerMemberStorage::STORAGE_CHUNK_SIZE
);
CompositeIndex {
arena_local_index: ArenaLocalIndex::new_chunk_index(index as u32),
chunk_start_offset,
len,
}
}
}
impl CompositeIndex {
#[inline]
const fn huge_index(&self) -> Option<u32> {
self.arena_local_index.huge_index()
}
#[inline]
const fn chunk_info(&self) -> Option<(u32, u16, u16)> {
if let Some(chunk_index) = self.arena_local_index.chunk_index() {
Some((chunk_index, self.chunk_start_offset, self.len))
} else {
None
}
}
}
struct SSOStringRef {
inner: SSOStringRefInner,
}
#[repr(C)]
union SSOStringRefInner {
sref: ByteSlice,
bytes: [u8; ByteSlice::SIZE_IN_BYTES],
}
unsafe impl Send for SSOStringRef {}
unsafe impl Sync for SSOStringRef {}
impl PartialEq for SSOStringRef {
fn eq(&self, other: &SSOStringRef) -> bool {
if unsafe { self.inner.sref.len != other.inner.sref.len } {
return false;
}
unsafe {
if self.inner.sref.len == 0 {
return true;
} else if self.has_inline_data() {
std::debug_assert!(other.has_inline_data());
return self.inner.sref.ptr == other.inner.sref.ptr;
} else {
return (self.inner.sref.ptr == other.inner.sref.ptr)
|| (self.inner.sref.as_bytes() == other.inner.sref.as_bytes());
}
}
}
}
impl Eq for SSOStringRef {}
impl Hash for SSOStringRef {
fn hash<H: Hasher>(&self, state: &mut H) {
(unsafe { self.as_bytes() }).hash(state);
}
}
impl SSOStringRef {
unsafe fn new_unchecked(s: &[u8]) -> SSOStringRef {
assert!(
s.len() <= ByteSlice::MAX_LEN,
"Trying to intern string bigger than 16 MiB on 32-bit?"
);
if s.len() == 0 {
SSOStringRef {
inner: SSOStringRefInner {
bytes: [0; ByteSlice::SIZE_IN_BYTES],
},
}
} else if s.len() <= ByteSlice::SIZE_IN_BYTES && s.iter().all(|&c| c != 0) {
let mut sso_stringref = SSOStringRef {
inner: SSOStringRefInner {
bytes: [0; ByteSlice::SIZE_IN_BYTES],
},
};
for (i, &c) in s.iter().enumerate() {
sso_stringref.inner.bytes[i] = c;
}
sso_stringref
} else {
SSOStringRef {
inner: SSOStringRefInner {
sref: ByteSlice::new(s),
},
}
}
}
pub fn new(s: &str) -> SSOStringRef {
unsafe { Self::new_unchecked(s.as_bytes()) }
}
}
impl SSOStringRef {
fn has_inline_data(&self) -> bool {
unsafe { self.inner.bytes[0] != 0 }
}
unsafe fn as_bytes<'a>(&'a self) -> &'a [u8] {
let s: &'a [u8];
if self.has_inline_data() {
let mut i = 1;
while i < ByteSlice::SIZE_IN_BYTES && self.inner.bytes[i] != 0 {
i += 1;
} s = self
.inner
.bytes
.get(0..i)
.expect("Unexpected out-of-bounds byte index for SSOStringRef.");
} else if self.inner.sref.len() == 0 {
s = &[];
} else {
s = self.inner.sref.clone().as_bytes();
}
return s;
}
}
#[derive(Copy, Clone)]
#[repr(C)]
struct ByteSlice {
len: usize,
ptr: *const u8,
}
impl ByteSlice {
const MAX_LEN: usize = usize::max_value() >> 8;
const SIZE_IN_BYTES: usize = std::mem::size_of::<Self>();
}
impl ByteSlice {
#[cfg(target_endian = "little")]
fn new(s: &[u8]) -> ByteSlice {
std::debug_assert!(s.len() <= Self::MAX_LEN);
ByteSlice {
len: s.len() << 8,
ptr: s.as_ptr(),
}
}
#[cfg(target_endian = "big")]
fn new(s: &[u8]) -> ByteSlice {
std::debug_assert!(s.len() <= Self::MAX_LEN);
ByteSlice {
len: s.len(),
ptr: s.as_ptr(),
}
}
}
impl ByteSlice {
#[cfg(target_endian = "little")]
fn len(&self) -> usize {
self.len >> 8
}
#[cfg(target_endian = "big")]
fn len(&self) -> usize {
self.len
}
unsafe fn as_bytes<'a>(self) -> &'a [u8] {
debug_assert!(self.ptr != std::ptr::null());
std::slice::from_raw_parts(self.ptr, self.len())
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_istr_size() {
assert!(std::mem::size_of::<crate::IStr>() == 4);
}
use std::num::NonZeroU8;
use crate::{
ArenaLocalIndex, ConcurrentInterner, ConcurrentInternerMemberStorage, IStr, SSOStringRef,
};
use quickcheck::*;
quickcheck! {
fn test_sso_stringref_eq1(a: String, b: String) -> bool {
(a == b) == (SSOStringRef::new(&a) == SSOStringRef::new(&b))
}
fn test_sso_stringref_eq2(s: String) -> bool {
SSOStringRef::new(&s) == SSOStringRef::new(&s.clone())
}
}
quickcheck! {
fn test_arena_local_index_roundtrip(i: u32) -> bool {
let i = i & !(1 << 31);
assert_eq!(Some(i), ArenaLocalIndex::new_chunk_index(i).chunk_index());
assert_eq!(Some(i), ArenaLocalIndex::new_huge_index(i).huge_index());
return true;
}
}
quickcheck! {
fn test_insertion(vs: Vec<String>, n: u8, near_huge_idxs: Vec<u8>, huge_idxs: Vec<u8>) -> bool {
let mut vs = vs;
if vs.len() == 0 {
return true;
}
let chunk_size = ConcurrentInternerMemberStorage::STORAGE_CHUNK_SIZE;
let mut near_huge_idxs = near_huge_idxs;
near_huge_idxs.truncate(3);
for i in near_huge_idxs.iter() {
let near_huge_idx = *i as usize;
if near_huge_idx < vs.len() {
vs[near_huge_idx] = (0 .. (chunk_size - 1 - near_huge_idx)).map(|_| 'k').collect();
}
}
let mut huge_idxs = huge_idxs;
huge_idxs.truncate(3);
for i in huge_idxs.iter() {
let huge_idx = *i as usize;
if huge_idx < vs.len() {
vs[huge_idx] = (0 .. (chunk_size + 1 + huge_idx)).map(|_| 'a').collect();
}
}
let n = NonZeroU8::new((n % 8) + 1).unwrap();
let interner = ConcurrentInterner::<ahash::RandomState>::new(n, Default::default());
let v: Vec<Vec<IStr>> = crossbeam::scope(|scope| {
let interner_ref = &interner;
let vsref = &vs;
let mut handles = vec![];
for _ in 0 .. n.get() {
handles.push(scope.spawn(move |_| {
let mut vout = Vec::with_capacity(vsref.len());
let mut member = interner_ref.get_member();
for s in vsref.iter() {
vout.push(member.intern(&s));
}
vout
}));
}
handles
.into_iter().map(|h| h.join().expect("Thread error!"))
.collect()
}).expect("crossbeam::scope had an error");
let istrs0 = &v[0];
assert!(istrs0.len() == vs.len());
for istrs in v.get(1 ..).expect("v.len() != 0").iter() {
assert!(istrs.len() == vs.len());
for (i1, i2) in istrs0.iter().zip(istrs.iter()) {
assert!(*i1 == *i2);
}
}
for istrs in v.iter() {
for (istr, s) in istrs.iter().zip(vs.iter()) {
assert!(interner.get_string(*istr) == *s);
}
}
return true;
}
}
}