#[derive(Debug)]
#[repr(C)]
struct QueueInternal<T, A: Allocator>
{
enq: UnsafeCell<Synch<T>>, deq: UnsafeCell<Synch<T>>, head: UnsafeCell<NonNull<Node<T>>>, tail: UnsafeCell<NonNull<Node<T>>>, allocator: UnsafeCell<A>,
}
impl<T, A: Allocator> AllocatorOpened<A> for QueueInternal<T, A>
{
#[inline(always)]
fn allocator_opened(&mut self, allocator: A)
{
unsafe
{
Synch::ccsynch_init(&self.enq);
Synch::ccsynch_init(&self.deq);
write(&mut self.allocator, UnsafeCell::new(allocator))
}
}
}
impl<T, A: Allocator> Drop for QueueInternal<T, A>
{
#[inline(always)]
fn drop(&mut self)
{
self.clear(&|_data|{});
}
}
impl<T, A: Allocator> QueueInternal<T, A>
{
#[inline(always)]
fn allocator(&self) -> &mut A
{
unsafe { &mut *self.allocator.get() }
}
#[inline(always)]
fn free_after_drop(this: NonNull<Self>, mut allocator: A)
{
allocator.free_page_size(this)
}
#[inline(always)]
fn new(mut allocator: A) -> NonNull<Self>
{
let mut queue = allocator.align_malloc_page_size();
unsafe
{
let queue: &mut Self = queue.as_mut();
Synch::ccsynch_init(&queue.enq);
Synch::ccsynch_init(&queue.deq);
let dummy = Node::dummy_node(&mut allocator);
write(&mut queue.head, UnsafeCell::new(dummy));
write(&mut queue.tail, UnsafeCell::new(dummy));
write(&mut queue.allocator, UnsafeCell::new(allocator))
}
queue
}
#[inline(always)]
fn clear<FreeData: Fn(NonNull<T>)>(&mut self, free_data: &FreeData)
{
#[inline(always)]
fn drop_node<T, A: Allocator, FreeData: Fn(NonNull<T>)>(node: &UnsafeCell<NonNull<Node<T>>>, allocator: &mut A, free_data: &FreeData)
{
let node = unsafe { *node.get() };
Node::clearing_queue_drop(node, allocator, free_data)
}
let allocator = self.allocator();
if self.head.get() != self.tail.get()
{
drop_node(&self.head, allocator, free_data);
drop_node(&self.tail, allocator, free_data);
}
else
{
drop_node(&self.head, allocator, free_data);
}
}
fn enqueue(&self, handle: &mut PerQueueThreadHandleInternal<T, A>, data: NonNull<T>)
{
#[inline(always)]
fn serial_enqueue<T>(tail: &UnsafeCell<NonNull<Node<T>>>, node: &mut Node<T>)
{
let tail = tail.get();
unsafe
{
write(&mut (*tail).as_mut().next, node);
write(tail, NonNull::new_unchecked(node))
}
}
let node = handle.next;
unsafe
{
let mut node = if node.is_not_null()
{
write(&mut handle.next, null_mut());
NonNull::new_unchecked(node)
}
else
{
handle.allocate_next_node()
};
let node = node.as_mut();
write(&mut node.data, data);
write(&mut node.next, null_mut());
Self::ccsynch_apply(&self.enq, &mut handle.enq, serial_enqueue, &self.tail, node)
}
}
fn dequeue(&self, handle: &mut PerQueueThreadHandleInternal<T, A>) -> Option<NonNull<T>>
{
#[inline(always)]
fn serial_dequeue<T>(head: &UnsafeCell<NonNull<Node<T>>>, result: &mut Option<NonNull<Node<T>>>)
{
let head = head.get();
let mut node = unsafe { *head };
let next = unsafe { node.as_ref() }.next;
if next.is_not_null()
{
let next = unsafe { NonNull::new_unchecked(next) };
unsafe
{
write(&mut node.as_mut().data, next.as_ref().data);
*head = next;
}
*result = Some(node)
}
else
{
*result = None
}
}
let mut node: Option<NonNull<Node<T>>> = None;
unsafe
{
Self::ccsynch_apply(&self.deq, &mut handle.deq, serial_dequeue, &self.head, &mut node);
match node
{
None => None,
Some(node) =>
{
let data = node.as_ref().data;
if handle.next.is_not_null()
{
Node::free_after_drop(node, self.allocator())
}
else
{
write(&mut handle.next, node.as_ptr())
}
Some(data)
}
}
}
}
#[inline(always)]
unsafe fn ccsynch_apply<D, Apply: Fn(&UnsafeCell<NonNull<Node<T>>>, &mut D)>(synch: &UnsafeCell<Synch<T>>, synch_handle: &mut SynchHandle<T>, apply: Apply, state: &UnsafeCell<NonNull<Node<T>>>, data: &mut D)
{
let mut next = synch_handle.next;
{
let next = next.as_mut();
write(&mut next.next, AtomicPtr::new(null_mut()));
write(&mut next.status, AtomicU32::new(Status::WAIT as u32));
}
let mut current = Synch::swap_tail_returning_previous(synch, next);
write(&mut synch_handle.next, current);
let mut status = current.as_ref().acquire_status();
if status == Status::WAIT
{
write(&mut current.as_mut().data, transmute_copy(data));
current.as_mut().release_next(next);
while
{
PAUSE();
status = current.as_ref().acquire_status();
status == Status::WAIT
}
{
}
}
if status != Status::DONE
{
apply(state, data);
let mut current = next;
let mut next = current.as_ref().acquire_next();
let mut count: usize = 0;
const CCSYNCH_HELP_BOUND: usize = 256;
while next.is_not_null() && count < CCSYNCH_HELP_BOUND
{
apply(state, transmute_copy(¤t.as_ref().data));
current.as_mut().release_status_done();
current = NonNull::new_unchecked(next);
next = current.as_ref().acquire_next();
count += 1;
}
current.as_mut().release_status_ready();
}
}
}