use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
pub struct LockFreeSPSCQueue<T> {
buffer: Vec<Option<T>>,
capacity: usize,
head: AtomicUsize, tail: AtomicUsize, _padding: [u8; 64], }
impl<T> LockFreeSPSCQueue<T> {
pub fn new(capacity: usize) -> Self {
let actual_capacity = capacity.next_power_of_two();
let mut buffer = Vec::with_capacity(actual_capacity);
for _ in 0..actual_capacity {
buffer.push(None);
}
Self {
buffer,
capacity: actual_capacity,
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
_padding: [0; 64],
}
}
pub fn try_push(&mut self, item: T) -> bool {
let tail = self.tail.load(Ordering::Relaxed);
let next_tail = (tail + 1) & (self.capacity - 1);
let head = self.head.load(Ordering::Acquire);
if next_tail == head {
return false; }
self.buffer[tail] = Some(item);
self.tail.store(next_tail, Ordering::Release);
true
}
pub fn try_pop(&mut self) -> Option<T> {
let head = self.head.load(Ordering::Relaxed);
let tail = self.tail.load(Ordering::Acquire);
if head == tail {
return None; }
let item = self.buffer[head].take();
let next_head = (head + 1) & (self.capacity - 1);
self.head.store(next_head, Ordering::Release);
item
}
pub fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
head == tail
}
pub fn len(&self) -> usize {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
if tail >= head {
tail - head
} else {
self.capacity - head + tail
}
}
pub fn capacity(&self) -> usize {
self.capacity
}
}
#[derive(Clone)]
pub struct LockFreeCacheEntry<T: Clone> {
data: Arc<T>,
access_count: Arc<AtomicUsize>,
last_access: Arc<AtomicUsize>, valid: Arc<AtomicBool>,
}
impl<T: Clone> LockFreeCacheEntry<T> {
pub fn new(data: T) -> Self {
Self {
data: Arc::new(data),
access_count: Arc::new(AtomicUsize::new(0)),
last_access: Arc::new(AtomicUsize::new(Self::current_timestamp())),
valid: Arc::new(AtomicBool::new(true)),
}
}
pub fn get(&self) -> Option<Arc<T>> {
if self.valid.load(Ordering::Acquire) {
self.access_count.fetch_add(1, Ordering::Relaxed);
self.last_access
.store(Self::current_timestamp(), Ordering::Release);
Some(Arc::clone(&self.data))
} else {
None
}
}
pub fn invalidate(&self) {
self.valid.store(false, Ordering::Release);
}
pub fn is_valid(&self) -> bool {
self.valid.load(Ordering::Acquire)
}
pub fn access_count(&self) -> usize {
self.access_count.load(Ordering::Relaxed)
}
pub fn last_access(&self) -> usize {
self.last_access.load(Ordering::Relaxed)
}
fn current_timestamp() -> usize {
use std::sync::atomic::AtomicUsize as GlobalCounter;
static COUNTER: GlobalCounter = GlobalCounter::new(0);
COUNTER.fetch_add(1, Ordering::Relaxed)
}
}
pub struct LockFreeCache<K: Eq + std::hash::Hash + Clone, V: Clone> {
entries: Vec<Option<(K, LockFreeCacheEntry<V>)>>,
size: AtomicUsize,
capacity: usize,
}
impl<K: Eq + std::hash::Hash + Clone, V: Clone> LockFreeCache<K, V> {
pub fn new(capacity: usize) -> Self {
let mut entries = Vec::with_capacity(capacity);
for _ in 0..capacity {
entries.push(None);
}
Self {
entries,
size: AtomicUsize::new(0),
capacity,
}
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn len(&self) -> usize {
self.size.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
fn hash(&self, key: &K) -> usize {
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
(hasher.finish() as usize) % self.capacity
}
pub fn get(&self, key: &K) -> Option<Arc<V>> {
let index = self.hash(key);
let mut probe = 0;
while probe < self.capacity {
let current_index = (index + probe) % self.capacity;
if let Some((ref k, ref entry)) = unsafe { &*self.entries.as_ptr().add(current_index) }
{
if k == key {
return entry.get();
}
} else {
return None;
}
probe += 1;
}
None
}
pub fn contains_key(&self, key: &K) -> bool {
self.get(key).is_some()
}
}
#[derive(Debug, Default)]
pub struct LockFreeStats {
pub successes: AtomicUsize,
pub failures: AtomicUsize,
pub retries: AtomicUsize,
}
impl LockFreeStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_success(&self) {
self.successes.fetch_add(1, Ordering::Relaxed);
}
pub fn record_failure(&self) {
self.failures.fetch_add(1, Ordering::Relaxed);
}
pub fn record_retry(&self) {
self.retries.fetch_add(1, Ordering::Relaxed);
}
pub fn successes(&self) -> usize {
self.successes.load(Ordering::Relaxed)
}
pub fn failures(&self) -> usize {
self.failures.load(Ordering::Relaxed)
}
pub fn retries(&self) -> usize {
self.retries.load(Ordering::Relaxed)
}
pub fn success_rate(&self) -> f64 {
let total = self.successes() + self.failures();
if total == 0 {
0.0
} else {
self.successes() as f64 / total as f64
}
}
pub fn reset(&self) {
self.successes.store(0, Ordering::Relaxed);
self.failures.store(0, Ordering::Relaxed);
self.retries.store(0, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_spsc_queue_basic() {
let mut queue = LockFreeSPSCQueue::new(4);
assert!(queue.is_empty());
assert_eq!(queue.len(), 0);
assert!(queue.try_push(1));
assert!(queue.try_push(2));
assert!(queue.try_push(3));
assert_eq!(queue.len(), 3);
assert!(!queue.is_empty());
assert_eq!(queue.try_pop(), Some(1));
assert_eq!(queue.try_pop(), Some(2));
assert_eq!(queue.len(), 1);
assert_eq!(queue.try_pop(), Some(3));
assert!(queue.is_empty());
assert_eq!(queue.try_pop(), None);
}
#[test]
fn test_spsc_queue_full() {
let mut queue = LockFreeSPSCQueue::new(2);
assert!(queue.try_push(1));
assert!(!queue.try_push(2));
assert_eq!(queue.try_pop(), Some(1));
assert!(queue.try_push(3));
}
#[test]
fn test_spsc_queue_wraparound() {
let mut queue = LockFreeSPSCQueue::new(4);
assert!(queue.try_push(1));
assert!(queue.try_push(2));
assert_eq!(queue.try_pop(), Some(1));
assert!(queue.try_push(3));
assert!(queue.try_push(4));
assert_eq!(queue.try_pop(), Some(2));
assert_eq!(queue.try_pop(), Some(3));
assert_eq!(queue.try_pop(), Some(4));
assert_eq!(queue.try_pop(), None);
}
#[test]
fn test_cache_entry_basic() {
let entry = LockFreeCacheEntry::new(42);
assert!(entry.is_valid());
assert_eq!(*entry.get().expect("get should succeed"), 42);
assert_eq!(entry.access_count(), 1);
entry.invalidate();
assert!(!entry.is_valid());
assert!(entry.get().is_none());
}
#[test]
fn test_cache_entry_access_count() {
let entry = LockFreeCacheEntry::new("test");
assert_eq!(entry.access_count(), 0);
entry.get();
assert_eq!(entry.access_count(), 1);
entry.get();
entry.get();
assert_eq!(entry.access_count(), 3);
}
#[test]
fn test_lock_free_cache_basic() {
let cache: LockFreeCache<String, i32> = LockFreeCache::new(10);
assert_eq!(cache.capacity(), 10);
assert_eq!(cache.len(), 0);
assert!(cache.is_empty());
}
#[test]
fn test_lock_free_cache_contains() {
let cache = LockFreeCache::<String, i32>::new(10);
assert!(!cache.contains_key(&"test".to_string()));
}
#[test]
fn test_lockfree_stats() {
let stats = LockFreeStats::new();
assert_eq!(stats.successes(), 0);
assert_eq!(stats.failures(), 0);
assert_eq!(stats.retries(), 0);
stats.record_success();
stats.record_success();
stats.record_failure();
assert_eq!(stats.successes(), 2);
assert_eq!(stats.failures(), 1);
let rate = stats.success_rate();
assert!((rate - 0.666).abs() < 0.01);
stats.reset();
assert_eq!(stats.successes(), 0);
assert_eq!(stats.failures(), 0);
}
#[test]
fn test_spsc_queue_capacity() {
let queue = LockFreeSPSCQueue::<i32>::new(7);
assert_eq!(queue.capacity(), 8);
}
#[test]
fn test_cache_entry_timestamp() {
let entry1 = LockFreeCacheEntry::new(1);
let entry2 = LockFreeCacheEntry::new(2);
let ts1 = entry1.last_access();
let ts2 = entry2.last_access();
assert!(ts2 > ts1);
}
}