use bytes::Bytes;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender, TryRecvError, TrySendError};
use parking_lot::RwLock;
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
#[repr(C, align(64))]
pub struct CacheAligned<T>(pub T);
impl<T> std::ops::Deref for CacheAligned<T> {
type Target = T;
fn deref(&self) -> &T {
&self.0
}
}
impl<T> std::ops::DerefMut for CacheAligned<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.0
}
}
pub struct LockFreeQueue<T> {
sender: Sender<T>,
receiver: Receiver<T>,
capacity: usize,
len: AtomicUsize,
total_enqueued: AtomicU64,
total_dequeued: AtomicU64,
blocked_enqueues: AtomicU64,
}
impl<T> LockFreeQueue<T> {
pub fn bounded(capacity: usize) -> Self {
let (sender, receiver) = bounded(capacity);
Self {
sender,
receiver,
capacity,
len: AtomicUsize::new(0),
total_enqueued: AtomicU64::new(0),
total_dequeued: AtomicU64::new(0),
blocked_enqueues: AtomicU64::new(0),
}
}
pub fn unbounded() -> Self {
let (sender, receiver) = unbounded();
Self {
sender,
receiver,
capacity: usize::MAX,
len: AtomicUsize::new(0),
total_enqueued: AtomicU64::new(0),
total_dequeued: AtomicU64::new(0),
blocked_enqueues: AtomicU64::new(0),
}
}
pub fn try_push(&self, item: T) -> Result<(), T> {
match self.sender.try_send(item) {
Ok(()) => {
self.len.fetch_add(1, Ordering::Relaxed);
self.total_enqueued.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(TrySendError::Full(item)) => {
self.blocked_enqueues.fetch_add(1, Ordering::Relaxed);
Err(item)
}
Err(TrySendError::Disconnected(item)) => Err(item),
}
}
pub fn push(&self, item: T) -> Result<(), T> {
match self.sender.send(item) {
Ok(()) => {
self.len.fetch_add(1, Ordering::Relaxed);
self.total_enqueued.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(e) => Err(e.0),
}
}
pub fn try_pop(&self) -> Option<T> {
match self.receiver.try_recv() {
Ok(item) => {
self.len.fetch_sub(1, Ordering::Relaxed);
self.total_dequeued.fetch_add(1, Ordering::Relaxed);
Some(item)
}
Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => None,
}
}
pub fn pop(&self) -> Option<T> {
match self.receiver.recv() {
Ok(item) => {
self.len.fetch_sub(1, Ordering::Relaxed);
self.total_dequeued.fetch_add(1, Ordering::Relaxed);
Some(item)
}
Err(_) => None,
}
}
pub fn pop_batch(&self, max: usize) -> Vec<T> {
let mut batch = Vec::with_capacity(max.min(64));
for _ in 0..max {
match self.receiver.try_recv() {
Ok(item) => {
batch.push(item);
}
Err(_) => break,
}
}
let count = batch.len();
if count > 0 {
self.len.fetch_sub(count, Ordering::Relaxed);
self.total_dequeued
.fetch_add(count as u64, Ordering::Relaxed);
}
batch
}
pub fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn fill_ratio(&self) -> f64 {
if self.capacity == usize::MAX {
0.0
} else {
self.len() as f64 / self.capacity as f64
}
}
pub fn stats(&self) -> QueueStats {
QueueStats {
len: self.len(),
capacity: self.capacity,
total_enqueued: self.total_enqueued.load(Ordering::Relaxed),
total_dequeued: self.total_dequeued.load(Ordering::Relaxed),
blocked_enqueues: self.blocked_enqueues.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct QueueStats {
pub len: usize,
pub capacity: usize,
pub total_enqueued: u64,
pub total_dequeued: u64,
pub blocked_enqueues: u64,
}
#[derive(Debug, Clone)]
pub struct AppendLogConfig {
pub segment_size: usize,
pub max_segments: usize,
pub preallocate: bool,
}
impl Default for AppendLogConfig {
fn default() -> Self {
Self {
segment_size: 64 * 1024 * 1024, max_segments: 4,
preallocate: true,
}
}
}
struct LogSegment {
data: UnsafeCell<Box<[u8]>>,
write_pos: AtomicUsize,
capacity: usize,
base_offset: u64,
sealed: AtomicBool,
}
unsafe impl Send for LogSegment {}
unsafe impl Sync for LogSegment {}
impl LogSegment {
fn new(base_offset: u64, capacity: usize, _preallocate: bool) -> Self {
let data = vec![0u8; capacity].into_boxed_slice();
Self {
data: UnsafeCell::new(data),
capacity,
write_pos: AtomicUsize::new(0),
base_offset,
sealed: AtomicBool::new(false),
}
}
#[inline]
fn data_ptr(&self) -> *mut u8 {
unsafe { (*self.data.get()).as_mut_ptr() }
}
fn try_append(&self, data: &[u8]) -> Option<(usize, u64)> {
if self.sealed.load(Ordering::Acquire) {
return None;
}
if data.len() > u32::MAX as usize {
return None;
}
let needed = 4 + data.len();
loop {
let current_pos = self.write_pos.load(Ordering::Acquire);
let new_pos = current_pos + needed;
if new_pos > self.capacity {
self.sealed.store(true, Ordering::Release);
return None;
}
match self.write_pos.compare_exchange_weak(
current_pos,
new_pos,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
let ptr = self.data_ptr();
unsafe {
let len = data.len() as u32;
let len_bytes = len.to_be_bytes();
std::ptr::copy_nonoverlapping(len_bytes.as_ptr(), ptr.add(current_pos), 4);
std::ptr::copy_nonoverlapping(
data.as_ptr(),
ptr.add(current_pos + 4),
data.len(),
);
}
let offset = self.base_offset + current_pos as u64;
return Some((current_pos, offset));
}
Err(_) => {
std::hint::spin_loop();
}
}
}
}
fn read(&self, position: usize) -> Option<&[u8]> {
let committed = self.write_pos.load(Ordering::Acquire);
if position + 4 > committed {
return None;
}
let ptr = self.data_ptr();
unsafe {
let mut len_bytes = [0u8; 4];
std::ptr::copy_nonoverlapping(ptr.add(position), len_bytes.as_mut_ptr(), 4);
let len = u32::from_be_bytes(len_bytes) as usize;
if position + 4 + len > committed {
return None;
}
Some(std::slice::from_raw_parts(ptr.add(position + 4), len))
}
}
fn committed_size(&self) -> usize {
self.write_pos.load(Ordering::Acquire)
}
fn is_sealed(&self) -> bool {
self.sealed.load(Ordering::Acquire)
}
}
pub struct AppendOnlyLog {
config: AppendLogConfig,
segments: RwLock<Vec<Arc<LogSegment>>>,
total_bytes: AtomicU64,
total_entries: AtomicU64,
}
impl AppendOnlyLog {
pub fn new(config: AppendLogConfig) -> Self {
let initial_segment = Arc::new(LogSegment::new(0, config.segment_size, config.preallocate));
Self {
config,
segments: RwLock::new(vec![initial_segment]),
total_bytes: AtomicU64::new(0),
total_entries: AtomicU64::new(0),
}
}
pub fn append(&self, data: &[u8]) -> u64 {
loop {
{
let segments = self.segments.read();
if let Some(segment) = segments.last() {
if let Some((_, offset)) = segment.try_append(data) {
self.total_bytes
.fetch_add(data.len() as u64, Ordering::Relaxed);
self.total_entries.fetch_add(1, Ordering::Relaxed);
return offset;
}
}
}
self.rotate_segment();
}
}
pub fn append_batch(&self, entries: &[&[u8]]) -> Vec<u64> {
let mut offsets = Vec::with_capacity(entries.len());
for data in entries {
offsets.push(self.append(data));
}
offsets
}
fn rotate_segment(&self) {
let mut segments = self.segments.write();
if let Some(last) = segments.last() {
if !last.is_sealed() {
return;
}
}
let next_base = segments
.last()
.map(|s| s.base_offset + s.committed_size() as u64)
.unwrap_or(0);
let new_segment = Arc::new(LogSegment::new(
next_base,
self.config.segment_size,
self.config.preallocate,
));
segments.push(new_segment);
if segments.len() > self.config.max_segments {
let excess = segments.len() - self.config.max_segments;
segments.drain(..excess);
}
}
pub fn read(&self, start_offset: u64, max_entries: usize) -> Vec<Bytes> {
let segments = self.segments.read();
let mut entries = Vec::with_capacity(max_entries);
let mut found_start = false;
for segment in segments.iter() {
if entries.len() >= max_entries {
break;
}
if !found_start {
if segment.base_offset > start_offset {
break;
}
found_start = true;
}
let relative_pos = if segment.base_offset <= start_offset {
(start_offset - segment.base_offset) as usize
} else {
0
};
let mut pos = relative_pos;
while entries.len() < max_entries {
match segment.read(pos) {
Some(data) => {
entries.push(Bytes::copy_from_slice(data));
pos += 4 + data.len(); }
None => break,
}
}
}
entries
}
pub fn total_bytes(&self) -> u64 {
self.total_bytes.load(Ordering::Relaxed)
}
pub fn total_entries(&self) -> u64 {
self.total_entries.load(Ordering::Relaxed)
}
pub fn end_offset(&self) -> u64 {
let segments = self.segments.read();
segments
.last()
.map(|s| s.base_offset + s.committed_size() as u64)
.unwrap_or(0)
}
pub fn segment_count(&self) -> usize {
self.segments.read().len()
}
}
const SHARD_COUNT: usize = 64;
pub struct ConcurrentHashMap<K, V> {
shards: [CacheAligned<RwLock<HashMap<K, V>>>; SHARD_COUNT],
len: AtomicUsize,
}
impl<K: Hash + Eq + Clone, V: Clone> ConcurrentHashMap<K, V> {
pub fn new() -> Self {
let shards = std::array::from_fn(|_| CacheAligned(RwLock::new(HashMap::new())));
Self {
shards,
len: AtomicUsize::new(0),
}
}
fn shard_index(&self, key: &K) -> usize {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
key.hash(&mut hasher);
hasher.finish() as usize % SHARD_COUNT
}
pub fn insert(&self, key: K, value: V) -> Option<V> {
let shard_idx = self.shard_index(&key);
let mut shard = self.shards[shard_idx].write();
let old = shard.insert(key, value);
if old.is_none() {
self.len.fetch_add(1, Ordering::Relaxed);
}
old
}
pub fn get(&self, key: &K) -> Option<V> {
let shard_idx = self.shard_index(key);
let shard = self.shards[shard_idx].read();
shard.get(key).cloned()
}
pub fn contains_key(&self, key: &K) -> bool {
let shard_idx = self.shard_index(key);
let shard = self.shards[shard_idx].read();
shard.contains_key(key)
}
pub fn remove(&self, key: &K) -> Option<V> {
let shard_idx = self.shard_index(key);
let mut shard = self.shards[shard_idx].write();
let removed = shard.remove(key);
if removed.is_some() {
self.len.fetch_sub(1, Ordering::Relaxed);
}
removed
}
pub fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn update<F>(&self, key: &K, f: F) -> Option<V>
where
F: FnOnce(&mut V),
{
let shard_idx = self.shard_index(key);
let mut shard = self.shards[shard_idx].write();
if let Some(value) = shard.get_mut(key) {
f(value);
Some(value.clone())
} else {
None
}
}
pub fn get_or_insert(&self, key: K, default: V) -> V {
let shard_idx = self.shard_index(&key);
let mut shard = self.shards[shard_idx].write();
if let Some(value) = shard.get(&key) {
value.clone()
} else {
self.len.fetch_add(1, Ordering::Relaxed);
shard.insert(key, default.clone());
default
}
}
pub fn get_or_insert_with<F>(&self, key: K, f: F) -> V
where
F: FnOnce() -> V,
{
let shard_idx = self.shard_index(&key);
let mut shard = self.shards[shard_idx].write();
if let Some(value) = shard.get(&key) {
value.clone()
} else {
let value = f();
self.len.fetch_add(1, Ordering::Relaxed);
shard.insert(key, value.clone());
value
}
}
pub fn snapshot(&self) -> Vec<(K, V)> {
let mut entries = Vec::new();
for shard in &self.shards {
let shard = shard.read();
for (k, v) in shard.iter() {
entries.push((k.clone(), v.clone()));
}
}
entries
}
pub fn clear(&self) {
let mut old_maps = Vec::with_capacity(SHARD_COUNT);
for shard in &self.shards {
let mut guard = shard.write();
let old = std::mem::take(&mut *guard);
old_maps.push(old);
}
self.len.store(0, Ordering::Release);
}
}
impl<K: Hash + Eq + Clone, V: Clone> Default for ConcurrentHashMap<K, V> {
fn default() -> Self {
Self::new()
}
}
const MAX_HEIGHT: usize = 32;
struct SkipNode<K, V> {
key: K,
value: V,
forward: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT],
#[allow(dead_code)]
height: usize,
}
impl<K, V> SkipNode<K, V> {
fn new(key: K, value: V, height: usize) -> *mut Self {
let forward = std::array::from_fn(|_| AtomicPtr::new(std::ptr::null_mut()));
let node = Box::new(Self {
key,
value,
forward,
height,
});
Box::into_raw(node)
}
}
pub struct ConcurrentSkipList<K: Ord + Clone, V: Clone> {
head: *mut SkipNode<K, V>,
max_level: AtomicUsize,
len: AtomicUsize,
rand_state: AtomicU64,
}
unsafe impl<K: Ord + Clone + Send, V: Clone + Send> Send for ConcurrentSkipList<K, V> {}
unsafe impl<K: Ord + Clone + Sync, V: Clone + Sync> Sync for ConcurrentSkipList<K, V> {}
impl<K: Ord + Clone + Default, V: Clone + Default> ConcurrentSkipList<K, V> {
pub fn new() -> Self {
let head = SkipNode::new(K::default(), V::default(), MAX_HEIGHT);
let seed = Self::generate_seed();
Self {
head,
max_level: AtomicUsize::new(1),
len: AtomicUsize::new(0),
rand_state: AtomicU64::new(seed),
}
}
fn generate_seed() -> u64 {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
let state = RandomState::new();
let mut hasher = state.build_hasher();
hasher.write_u64(std::process::id().into());
if let Ok(time) = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
hasher.write_u64(time.as_nanos() as u64);
}
hasher.write_usize(&hasher as *const _ as usize);
hasher.finish().max(1)
}
fn random_level(&self) -> usize {
let mut level = 1;
let x = self
.rand_state
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |mut x| {
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
Some(x)
})
.unwrap_or(1);
let mut bits = x;
while bits & 1 == 0 && level < MAX_HEIGHT {
level += 1;
bits >>= 1;
}
level
}
pub fn insert(&self, key: K, value: V) {
let height = self.random_level();
let new_node = SkipNode::new(key.clone(), value, height);
let mut current_max = self.max_level.load(Ordering::Relaxed);
while height > current_max {
match self.max_level.compare_exchange_weak(
current_max,
height,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(m) => current_max = m,
}
}
let mut update = [std::ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
let mut current = self.head;
#[allow(clippy::needless_range_loop)]
for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
unsafe {
loop {
let next = (*current).forward[i].load(Ordering::Acquire);
if next.is_null() || (*next).key >= key {
break;
}
current = next;
}
update[i] = current;
}
}
#[allow(clippy::needless_range_loop)]
for i in 0..height {
unsafe {
let mut pred = if update[i].is_null() {
self.head
} else {
update[i]
};
loop {
let next = (*pred).forward[i].load(Ordering::Acquire);
if !next.is_null() && (*next).key < key {
let mut cur = self.head;
loop {
let n = (*cur).forward[i].load(Ordering::Acquire);
if n.is_null() || (*n).key >= key {
break;
}
cur = n;
}
pred = cur;
continue;
}
(*new_node).forward[i].store(next, Ordering::Release);
match (*pred).forward[i].compare_exchange(
next,
new_node,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break, Err(_) => {
let mut cur = self.head;
loop {
let n = (*cur).forward[i].load(Ordering::Acquire);
if n.is_null() || (*n).key >= key {
break;
}
cur = n;
}
pred = cur;
}
}
}
}
}
self.len.fetch_add(1, Ordering::Relaxed);
}
pub fn get(&self, key: &K) -> Option<V> {
let mut current = self.head;
for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
unsafe {
loop {
let next = (*current).forward[i].load(Ordering::Acquire);
if next.is_null() {
break;
}
if (*next).key == *key {
return Some((*next).value.clone());
}
if (*next).key > *key {
break;
}
current = next;
}
}
}
None
}
pub fn floor(&self, key: &K) -> Option<(K, V)> {
let mut current = self.head;
let mut result: Option<*mut SkipNode<K, V>> = None;
for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
unsafe {
loop {
let next = (*current).forward[i].load(Ordering::Acquire);
if next.is_null() {
break;
}
if (*next).key <= *key {
result = Some(next);
current = next;
} else {
break;
}
}
}
}
result.map(|node| unsafe { ((*node).key.clone(), (*node).value.clone()) })
}
pub fn ceiling(&self, key: &K) -> Option<(K, V)> {
let mut current = self.head;
for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
unsafe {
loop {
let next = (*current).forward[i].load(Ordering::Acquire);
if next.is_null() || (*next).key >= *key {
break;
}
current = next;
}
}
}
unsafe {
let next = (*current).forward[0].load(Ordering::Acquire);
if !next.is_null() {
Some(((*next).key.clone(), (*next).value.clone()))
} else {
None
}
}
}
pub fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn range(&self, start: &K, end: &K, limit: usize) -> Vec<(K, V)> {
let mut entries = Vec::with_capacity(limit.min(1000));
let mut current = self.head;
for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
unsafe {
loop {
let next = (*current).forward[i].load(Ordering::Acquire);
if next.is_null() || (*next).key >= *start {
break;
}
current = next;
}
}
}
unsafe {
let mut node = (*current).forward[0].load(Ordering::Acquire);
while !node.is_null() && entries.len() < limit {
if (*node).key > *end {
break;
}
entries.push(((*node).key.clone(), (*node).value.clone()));
node = (*node).forward[0].load(Ordering::Acquire);
}
}
entries
}
}
impl<K: Ord + Clone + Default, V: Clone + Default> Default for ConcurrentSkipList<K, V> {
fn default() -> Self {
Self::new()
}
}
impl<K: Ord + Clone, V: Clone> Drop for ConcurrentSkipList<K, V> {
fn drop(&mut self) {
let mut current = self.head;
unsafe {
while !current.is_null() {
let next = (*current).forward[0].load(Ordering::Relaxed);
drop(Box::from_raw(current));
current = next;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_lock_free_queue() {
let queue = LockFreeQueue::<i32>::bounded(100);
assert!(queue.is_empty());
queue.push(1).unwrap();
queue.push(2).unwrap();
queue.push(3).unwrap();
assert_eq!(queue.len(), 3);
assert_eq!(queue.pop(), Some(1));
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), Some(3));
assert!(queue.is_empty());
}
#[test]
fn test_lock_free_queue_concurrent() {
let queue = Arc::new(LockFreeQueue::<i32>::bounded(1000));
let mut handles = vec![];
for i in 0..4 {
let q = queue.clone();
handles.push(thread::spawn(move || {
for j in 0..250 {
q.push(i * 250 + j).unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(queue.len(), 1000);
let batch = queue.pop_batch(1000);
assert_eq!(batch.len(), 1000);
}
#[test]
fn test_append_only_log() {
let config = AppendLogConfig {
segment_size: 1024,
max_segments: 4,
preallocate: true,
};
let log = AppendOnlyLog::new(config);
let offset1 = log.append(b"hello");
let offset2 = log.append(b"world");
assert!(offset2 > offset1);
let entries = log.read(offset1, 10);
assert_eq!(entries.len(), 2);
assert_eq!(&entries[0][..], b"hello");
assert_eq!(&entries[1][..], b"world");
}
#[test]
fn test_concurrent_hash_map() {
let map = Arc::new(ConcurrentHashMap::<String, i32>::new());
let mut handles = vec![];
for i in 0..4 {
let m = map.clone();
handles.push(thread::spawn(move || {
for j in 0..250 {
m.insert(format!("key-{}-{}", i, j), i * 250 + j);
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(map.len(), 1000);
assert_eq!(map.get(&"key-0-0".to_string()), Some(0));
assert_eq!(map.get(&"key-3-249".to_string()), Some(999));
}
#[test]
fn test_skip_list() {
let list = ConcurrentSkipList::<u64, String>::new();
list.insert(10, "ten".to_string());
list.insert(20, "twenty".to_string());
list.insert(5, "five".to_string());
list.insert(15, "fifteen".to_string());
assert_eq!(list.len(), 4);
assert_eq!(list.get(&10), Some("ten".to_string()));
assert_eq!(list.get(&99), None);
assert_eq!(list.floor(&12), Some((10, "ten".to_string())));
assert_eq!(list.floor(&15), Some((15, "fifteen".to_string())));
assert_eq!(list.ceiling(&12), Some((15, "fifteen".to_string())));
assert_eq!(list.ceiling(&1), Some((5, "five".to_string())));
}
#[test]
fn test_skip_list_range() {
let list = ConcurrentSkipList::<u64, String>::new();
for i in 0..100 {
list.insert(i * 10, format!("value-{}", i));
}
let range = list.range(&150, &350, 100);
assert!(!range.is_empty());
for (k, _) in &range {
assert!(*k >= 150 && *k <= 350);
}
}
#[test]
fn test_skip_list_concurrent_insert_no_lost_nodes() {
let list = Arc::new(ConcurrentSkipList::<u64, u64>::new());
let per_thread = 500;
let num_threads = 8;
let mut handles = vec![];
for t in 0..num_threads {
let l = list.clone();
handles.push(thread::spawn(move || {
for i in 0..per_thread {
let key = (i * num_threads + t) as u64;
l.insert(key, key);
}
}));
}
for h in handles {
h.join().unwrap();
}
let expected = (num_threads * per_thread) as usize;
assert_eq!(
list.len(),
expected,
"Expected {} entries but got {} — nodes were lost under concurrency",
expected,
list.len()
);
for i in 0..(num_threads * per_thread) {
let key = i as u64;
assert!(
list.get(&key).is_some(),
"Key {} was lost under concurrent insertion",
key
);
}
}
}