use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::ptr;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::Arc;
use std::thread::{self, ThreadId};
use crate::error::{Error, Result};
use crate::grpc_sys::{self, gpr_clock_type, grpc_completion_queue};
use crate::task::UnfinishedWork;
pub use crate::grpc_sys::grpc_completion_type as EventType;
pub use crate::grpc_sys::grpc_event as Event;
pub struct CompletionQueueHandle {
cq: *mut grpc_completion_queue,
ref_cnt: AtomicIsize,
}
unsafe impl Sync for CompletionQueueHandle {}
unsafe impl Send for CompletionQueueHandle {}
impl CompletionQueueHandle {
pub fn new() -> CompletionQueueHandle {
CompletionQueueHandle {
cq: unsafe { grpc_sys::grpc_completion_queue_create_for_next(ptr::null_mut()) },
ref_cnt: AtomicIsize::new(1),
}
}
fn add_ref(&self) -> Result<()> {
let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
loop {
if cnt <= 0 {
return Err(Error::QueueShutdown);
}
let new_cnt = cnt + 1;
match self.ref_cnt.compare_exchange_weak(
cnt,
new_cnt,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => return Ok(()),
Err(c) => cnt = c,
}
}
}
fn unref(&self) {
let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
let shutdown = loop {
let new_cnt = cnt - cnt.signum();
match self.ref_cnt.compare_exchange_weak(
cnt,
new_cnt,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break new_cnt == 0,
Err(c) => cnt = c,
}
};
if shutdown {
unsafe {
grpc_sys::grpc_completion_queue_shutdown(self.cq);
}
}
}
fn shutdown(&self) {
let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
let shutdown = loop {
if cnt <= 0 {
return;
}
let new_cnt = -cnt + 1;
match self.ref_cnt.compare_exchange_weak(
cnt,
new_cnt,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break new_cnt == 0,
Err(c) => cnt = c,
}
};
if shutdown {
unsafe {
grpc_sys::grpc_completion_queue_shutdown(self.cq);
}
}
}
}
impl Drop for CompletionQueueHandle {
fn drop(&mut self) {
unsafe { grpc_sys::grpc_completion_queue_destroy(self.cq) }
}
}
pub struct CompletionQueueRef<'a> {
queue: &'a CompletionQueue,
}
impl<'a> CompletionQueueRef<'a> {
pub fn as_ptr(&self) -> *mut grpc_completion_queue {
self.queue.handle.cq
}
}
impl<'a> Drop for CompletionQueueRef<'a> {
fn drop(&mut self) {
self.queue.handle.unref();
}
}
pub struct WorkQueue {
id: ThreadId,
pending_work: UnsafeCell<VecDeque<UnfinishedWork>>,
}
unsafe impl Sync for WorkQueue {}
unsafe impl Send for WorkQueue {}
const QUEUE_CAPACITY: usize = 4096;
impl WorkQueue {
pub fn new() -> WorkQueue {
WorkQueue {
id: std::thread::current().id(),
pending_work: UnsafeCell::new(VecDeque::with_capacity(QUEUE_CAPACITY)),
}
}
pub fn push_work(&self, work: UnfinishedWork) -> Option<UnfinishedWork> {
if self.id == thread::current().id() {
unsafe { &mut *self.pending_work.get() }.push_back(work);
None
} else {
Some(work)
}
}
pub unsafe fn pop_work(&self) -> Option<UnfinishedWork> {
let queue = &mut *self.pending_work.get();
if queue.capacity() > QUEUE_CAPACITY && queue.len() < queue.capacity() / 2 {
queue.shrink_to_fit();
}
{ &mut *self.pending_work.get() }.pop_back()
}
}
#[derive(Clone)]
pub struct CompletionQueue {
handle: Arc<CompletionQueueHandle>,
pub(crate) worker: Arc<WorkQueue>,
}
impl CompletionQueue {
pub fn new(handle: Arc<CompletionQueueHandle>, worker: Arc<WorkQueue>) -> CompletionQueue {
CompletionQueue { handle, worker }
}
pub fn next(&self) -> Event {
unsafe {
let inf = grpc_sys::gpr_inf_future(gpr_clock_type::GPR_CLOCK_REALTIME);
grpc_sys::grpc_completion_queue_next(self.handle.cq, inf, ptr::null_mut())
}
}
pub fn borrow(&self) -> Result<CompletionQueueRef<'_>> {
self.handle.add_ref()?;
Ok(CompletionQueueRef { queue: self })
}
pub fn shutdown(&self) {
self.handle.shutdown()
}
pub fn worker_id(&self) -> ThreadId {
self.worker.id
}
}