#![no_std]
#![allow(unsafe_code)]
extern crate alloc;
use alloc::boxed::Box;
use core::fmt;
use core::ptr::{self, NonNull};
use core::sync::atomic::{AtomicPtr, Ordering};
struct Node<T> {
data: Option<T>,
next: AtomicPtr<Node<T>>,
}
pub struct LockFreeQueue<T> {
head: AtomicPtr<Node<T>>,
tail: AtomicPtr<Node<T>>,
}
unsafe impl<T: Send> Send for LockFreeQueue<T> {}
unsafe impl<T: Send> Sync for LockFreeQueue<T> {}
impl<T> Node<T> {
fn new(data: Option<T>) -> *mut Self {
Box::into_raw(Box::new(Self {
data,
next: AtomicPtr::new(ptr::null_mut()),
}))
}
unsafe fn as_ref<'a>(ptr: *mut Self) -> Option<&'a Self> {
NonNull::new(ptr).map(|p| &*p.as_ptr())
}
}
impl<T> LockFreeQueue<T> {
pub fn new() -> Self {
let dummy = Node::new(None);
Self {
head: AtomicPtr::new(dummy),
tail: AtomicPtr::new(dummy),
}
}
pub fn push(&self, value: T) {
let new_node = Node::new(Some(value));
loop {
let tail = self.tail.load(Ordering::Acquire);
let tail_ref = unsafe { Node::as_ref(tail) };
let Some(tail_ref) = tail_ref else {
continue;
};
let next = tail_ref.next.load(Ordering::Acquire);
if ptr::null_mut() == next {
match tail_ref.next.compare_exchange(
ptr::null_mut(),
new_node,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {
let _ = self.tail.compare_exchange(
tail,
new_node,
Ordering::Release,
Ordering::Relaxed,
);
break;
}
Err(_) => continue,
}
} else {
let _ =
self.tail
.compare_exchange(tail, next, Ordering::Release, Ordering::Relaxed);
}
}
}
pub fn pop(&self) -> Option<T> {
loop {
let head = self.head.load(Ordering::Acquire);
let head_ref = unsafe { Node::as_ref(head) };
let Some(head_ref) = head_ref else {
continue;
};
let next = head_ref.next.load(Ordering::Acquire);
if ptr::null_mut() == next {
return None;
}
let next_ref = unsafe { Node::as_ref(next) };
let Some(next_ref) = next_ref else {
continue;
};
if self
.head
.compare_exchange(head, next, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
unsafe {
let old_head = Box::from_raw(head);
return ptr::read(&next_ref.data);
}
}
}
}
pub fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::Acquire);
let head_ref = unsafe { Node::as_ref(head) };
let Some(head_ref) = head_ref else {
return true;
};
head_ref.next.load(Ordering::Acquire).is_null()
}
}
impl<T: Clone> LockFreeQueue<T> {
pub fn peek(&self) -> Option<T> {
let head = self.head.load(Ordering::Acquire);
let head_ref = unsafe { Node::as_ref(head) };
let Some(head_ref) = head_ref else {
return None;
};
let next = head_ref.next.load(Ordering::Acquire);
if ptr::null_mut() == next {
return None;
}
unsafe { Node::as_ref(next) }.and_then(|node| node.data.clone())
}
}
impl<T> Default for LockFreeQueue<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Drop for LockFreeQueue<T> {
fn drop(&mut self) {
while self.pop().is_some() {}
let head = self.head.load(Ordering::Relaxed);
unsafe {
let _ = Box::from_raw(head);
}
}
}
impl<T: fmt::Debug> fmt::Debug for LockFreeQueue<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LockFreeQueue")
.field("is_empty", &self.is_empty())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_operations() {
let queue = LockFreeQueue::new();
assert!(queue.is_empty());
queue.push(1);
assert!(!queue.is_empty());
assert_eq!(queue.pop(), Some(1));
assert!(queue.is_empty());
}
#[test]
fn test_multiple_pushes_pops() {
let queue = LockFreeQueue::new();
for i in 0..100 {
queue.push(i);
}
for i in 0..100 {
assert_eq!(queue.pop(), Some(i));
}
assert!(queue.is_empty());
}
#[test]
fn test_peek() {
let queue = LockFreeQueue::new();
assert_eq!(queue.peek(), None);
queue.push(1);
assert_eq!(queue.peek(), Some(1));
assert_eq!(queue.peek(), Some(1)); assert_eq!(queue.pop(), Some(1));
assert_eq!(queue.peek(), None);
}
}