use crossbeam::queue::{ArrayQueue, SegQueue};
use dashmap::DashMap;
use parking_lot::RwLock;
use std::hash::Hash;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
pub struct SpscQueue<T> {
inner: ArrayQueue<T>,
capacity: usize,
}
impl<T> SpscQueue<T> {
pub fn new(capacity: usize) -> Self {
Self {
inner: ArrayQueue::new(capacity),
capacity,
}
}
#[inline]
pub fn push(&self, item: T) -> Result<(), T> {
self.inner.push(item)
}
#[inline]
pub fn pop(&self) -> Option<T> {
self.inner.pop()
}
#[inline]
pub fn len(&self) -> usize {
self.inner.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
#[inline]
pub fn is_full(&self) -> bool {
self.inner.len() >= self.capacity
}
}
#[derive(Debug)]
pub struct MpmcQueue<T> {
inner: SegQueue<T>,
len: AtomicUsize,
}
impl<T> MpmcQueue<T> {
pub fn new() -> Self {
Self {
inner: SegQueue::new(),
len: AtomicUsize::new(0),
}
}
#[inline]
pub fn push(&self, item: T) {
self.inner.push(item);
self.len.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn pop(&self) -> Option<T> {
let item = self.inner.pop();
if item.is_some() {
self.len.fetch_sub(1, Ordering::Relaxed);
}
item
}
#[inline]
pub fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
impl<T> Default for MpmcQueue<T> {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct LockFreeStack<T> {
inner: SegQueue<T>,
}
impl<T> LockFreeStack<T> {
pub fn new() -> Self {
Self {
inner: SegQueue::new(),
}
}
#[inline]
pub fn push(&self, data: T) {
self.inner.push(data);
}
#[inline]
pub fn pop(&self) -> Option<T> {
self.inner.pop()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
impl<T> Default for LockFreeStack<T> {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct LockFreeMap<K, V>
where
K: Eq + Hash + Clone,
V: Clone,
{
inner: Arc<DashMap<K, V>>,
}
impl<K, V> LockFreeMap<K, V>
where
K: Eq + Hash + Clone,
V: Clone,
{
pub fn new() -> Self {
Self {
inner: Arc::new(DashMap::new()),
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
inner: Arc::new(DashMap::with_capacity(capacity)),
}
}
#[inline]
pub fn insert(&self, key: K, value: V) -> Option<V> {
self.inner.insert(key, value)
}
#[inline]
pub fn get(&self, key: &K) -> Option<V> {
self.inner.get(key).map(|v| v.clone())
}
#[inline]
pub fn remove(&self, key: &K) -> Option<(K, V)> {
self.inner.remove(key)
}
#[inline]
pub fn contains_key(&self, key: &K) -> bool {
self.inner.contains_key(key)
}
#[inline]
pub fn len(&self) -> usize {
self.inner.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
#[inline]
pub fn clear(&self) {
self.inner.clear()
}
}
impl<K, V> Default for LockFreeMap<K, V>
where
K: Eq + Hash + Clone,
V: Clone,
{
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct RingBuffer<T> {
buffer: Arc<Vec<RwLock<Option<T>>>>,
capacity: usize,
write_pos: AtomicUsize,
read_pos: AtomicUsize,
}
impl<T: Clone> RingBuffer<T> {
pub fn new(capacity: usize) -> Self {
let mut buffer = Vec::with_capacity(capacity);
for _ in 0..capacity {
buffer.push(RwLock::new(None));
}
Self {
buffer: Arc::new(buffer),
capacity,
write_pos: AtomicUsize::new(0),
read_pos: AtomicUsize::new(0),
}
}
pub fn write(&self, item: T) -> bool {
let write_pos = self.write_pos.load(Ordering::Acquire);
let next_write = (write_pos + 1) % self.capacity;
let read_pos = self.read_pos.load(Ordering::Acquire);
if next_write == read_pos {
return false;
}
let mut slot = self.buffer[write_pos].write();
*slot = Some(item);
drop(slot);
self.write_pos.store(next_write, Ordering::Release);
true
}
pub fn read(&self) -> Option<T> {
let read_pos = self.read_pos.load(Ordering::Acquire);
let write_pos = self.write_pos.load(Ordering::Acquire);
if read_pos == write_pos {
return None;
}
let mut slot = self.buffer[read_pos].write();
let item = slot.take();
drop(slot);
if item.is_some() {
let next_read = (read_pos + 1) % self.capacity;
self.read_pos.store(next_read, Ordering::Release);
}
item
}
pub fn len(&self) -> usize {
let write_pos = self.write_pos.load(Ordering::Relaxed);
let read_pos = self.read_pos.load(Ordering::Relaxed);
if write_pos >= read_pos {
write_pos - read_pos
} else {
self.capacity - read_pos + write_pos
}
}
pub fn is_empty(&self) -> bool {
let write_pos = self.write_pos.load(Ordering::Relaxed);
let read_pos = self.read_pos.load(Ordering::Relaxed);
write_pos == read_pos
}
pub fn is_full(&self) -> bool {
self.len() >= self.capacity - 1 }
}
#[derive(Debug)]
pub struct AtomicCounter {
value: AtomicUsize,
}
impl AtomicCounter {
pub const fn new(initial: usize) -> Self {
Self {
value: AtomicUsize::new(initial),
}
}
#[inline]
pub fn increment(&self) -> usize {
self.value.fetch_add(1, Ordering::Relaxed)
}
#[inline]
pub fn decrement(&self) -> usize {
self.value.fetch_sub(1, Ordering::Relaxed)
}
#[inline]
pub fn get(&self) -> usize {
self.value.load(Ordering::Relaxed)
}
#[inline]
pub fn set(&self, value: usize) {
self.value.store(value, Ordering::Relaxed);
}
#[inline]
pub fn reset(&self) {
self.value.store(0, Ordering::Relaxed);
}
}
impl Default for AtomicCounter {
fn default() -> Self {
Self::new(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_spsc_queue() {
let queue = Arc::new(SpscQueue::new(10));
let q1 = queue.clone();
let q2 = queue.clone();
let producer = thread::spawn(move || {
for i in 0..10 {
while q1.push(i).is_err() {
thread::yield_now();
}
}
});
let consumer = thread::spawn(move || {
let mut items = Vec::new();
while items.len() < 10 {
if let Some(item) = q2.pop() {
items.push(item);
}
}
items
});
producer.join().unwrap();
let items = consumer.join().unwrap();
assert_eq!(items.len(), 10);
for (i, &item) in items.iter().enumerate() {
assert_eq!(item, i);
}
}
#[test]
fn test_mpmc_queue() {
let queue = Arc::new(MpmcQueue::new());
let mut handles = Vec::new();
for i in 0..4 {
let q = queue.clone();
handles.push(thread::spawn(move || {
for j in 0..25 {
q.push(i * 25 + j);
}
}));
}
for h in handles {
h.join().unwrap();
}
let mut items = Vec::new();
while let Some(item) = queue.pop() {
items.push(item);
}
assert_eq!(items.len(), 100);
items.sort();
for (i, &item) in items.iter().enumerate() {
assert_eq!(item, i);
}
}
#[test]
fn test_lock_free_stack() {
let stack = Arc::new(LockFreeStack::new());
let mut handles = Vec::new();
for i in 0..4 {
let s = stack.clone();
handles.push(thread::spawn(move || {
for j in 0..25 {
s.push(i * 25 + j);
}
}));
}
for h in handles {
h.join().unwrap();
}
let mut items = Vec::new();
while let Some(item) = stack.pop() {
items.push(item);
}
assert_eq!(items.len(), 100);
}
#[test]
fn test_lock_free_map() {
let map = Arc::new(LockFreeMap::new());
let mut handles = Vec::new();
for i in 0..4 {
let m = map.clone();
handles.push(thread::spawn(move || {
for j in 0..25 {
let key = i * 25 + j;
m.insert(key, format!("value_{}", key));
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(map.len(), 100);
for i in 0..100 {
assert_eq!(map.get(&i), Some(format!("value_{}", i)));
}
}
#[test]
fn test_ring_buffer() {
let buffer = RingBuffer::new(10);
for i in 0..9 {
assert!(buffer.write(i));
}
assert_eq!(buffer.len(), 9);
assert!(buffer.is_full());
assert!(!buffer.write(99));
for i in 0..5 {
assert_eq!(buffer.read(), Some(i));
}
for i in 9..14 {
assert!(buffer.write(i));
}
let mut items = Vec::new();
while let Some(item) = buffer.read() {
items.push(item);
}
assert_eq!(items, vec![5, 6, 7, 8, 9, 10, 11, 12, 13]);
}
#[test]
fn test_atomic_counter() {
let counter = Arc::new(AtomicCounter::new(0));
let mut handles = Vec::new();
for _ in 0..10 {
let c = counter.clone();
handles.push(thread::spawn(move || {
for _ in 0..1000 {
c.increment();
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(counter.get(), 10000);
}
}