grpcio 0.13.0

The rust language implementation of gRPC, base on the gRPC c core library.
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::ptr;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::Arc;
use std::thread::{self, ThreadId};

use crate::error::{Error, Result};
use crate::grpc_sys::{self, gpr_clock_type, grpc_completion_queue};
use crate::task::UnfinishedWork;

pub use crate::grpc_sys::grpc_completion_type as EventType;
pub use crate::grpc_sys::grpc_event as Event;

/// `CompletionQueueHandle` enable notification of the completion of asynchronous actions.
pub struct CompletionQueueHandle {
    cq: *mut grpc_completion_queue,
    // When `ref_cnt` < 0, a shutdown is pending, completion queue should not
    // accept requests anymore; when `ref_cnt` == 0, completion queue should
    // be shutdown; When `ref_cnt` > 0, completion queue can accept requests
    // and should not be shutdown.
    ref_cnt: AtomicIsize,
}

unsafe impl Sync for CompletionQueueHandle {}
unsafe impl Send for CompletionQueueHandle {}

impl CompletionQueueHandle {
    pub fn new() -> CompletionQueueHandle {
        CompletionQueueHandle {
            cq: unsafe { grpc_sys::grpc_completion_queue_create_for_next(ptr::null_mut()) },
            ref_cnt: AtomicIsize::new(1),
        }
    }

    fn add_ref(&self) -> Result<()> {
        let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
        loop {
            if cnt <= 0 {
                // `shutdown` has been called, reject any requests.
                return Err(Error::QueueShutdown);
            }
            let new_cnt = cnt + 1;
            match self.ref_cnt.compare_exchange_weak(
                cnt,
                new_cnt,
                Ordering::SeqCst,
                Ordering::SeqCst,
            ) {
                Ok(_) => return Ok(()),
                Err(c) => cnt = c,
            }
        }
    }

    fn unref(&self) {
        let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
        let shutdown = loop {
            // If `shutdown` is not called, `cnt` > 0, so minus 1 to unref.
            // If `shutdown` is called, `cnt` < 0, so plus 1 to unref.
            let new_cnt = cnt - cnt.signum();
            match self.ref_cnt.compare_exchange_weak(
                cnt,
                new_cnt,
                Ordering::SeqCst,
                Ordering::SeqCst,
            ) {
                Ok(_) => break new_cnt == 0,
                Err(c) => cnt = c,
            }
        };
        if shutdown {
            unsafe {
                grpc_sys::grpc_completion_queue_shutdown(self.cq);
            }
        }
    }

    fn shutdown(&self) {
        let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
        let shutdown = loop {
            if cnt <= 0 {
                // `shutdown` is called, skipped.
                return;
            }
            // Make cnt negative to indicate that `shutdown` has been called.
            // Because `cnt` is initialized to 1, so minus 1 to make it reach
            // toward 0. That is `new_cnt = -(cnt - 1) = -cnt + 1`.
            let new_cnt = -cnt + 1;
            match self.ref_cnt.compare_exchange_weak(
                cnt,
                new_cnt,
                Ordering::SeqCst,
                Ordering::SeqCst,
            ) {
                Ok(_) => break new_cnt == 0,
                Err(c) => cnt = c,
            }
        };
        if shutdown {
            unsafe {
                grpc_sys::grpc_completion_queue_shutdown(self.cq);
            }
        }
    }
}

impl Drop for CompletionQueueHandle {
    fn drop(&mut self) {
        unsafe { grpc_sys::grpc_completion_queue_destroy(self.cq) }
    }
}

pub struct CompletionQueueRef<'a> {
    queue: &'a CompletionQueue,
}

impl<'a> CompletionQueueRef<'a> {
    pub fn as_ptr(&self) -> *mut grpc_completion_queue {
        self.queue.handle.cq
    }
}

impl<'a> Drop for CompletionQueueRef<'a> {
    fn drop(&mut self) {
        self.queue.handle.unref();
    }
}

/// `WorkQueue` stores the unfinished work of a completion queue.
///
/// Every completion queue has a work queue, and every work queue belongs
/// to exact one completion queue. `WorkQueue` is a short path for future
/// notifications. When a future is ready to be polled, there are two way
/// to notify it.
/// 1. If it's in the same thread where the future is spawned, the future
///    will be pushed into `WorkQueue` and be polled when current call tag
///    is handled;
/// 2. If not, the future will be wrapped as a call tag and pushed into
///    completion queue and finally popped at the call to `grpc_completion_queue_next`.
pub struct WorkQueue {
    id: ThreadId,
    pending_work: UnsafeCell<VecDeque<UnfinishedWork>>,
}

unsafe impl Sync for WorkQueue {}
unsafe impl Send for WorkQueue {}

const QUEUE_CAPACITY: usize = 4096;

impl WorkQueue {
    pub fn new() -> WorkQueue {
        WorkQueue {
            id: std::thread::current().id(),
            pending_work: UnsafeCell::new(VecDeque::with_capacity(QUEUE_CAPACITY)),
        }
    }

    /// Pushes an unfinished work into the inner queue.
    ///
    /// If the method is not called from the same thread where it's created,
    /// the work will returned and no work is pushed.
    pub fn push_work(&self, work: UnfinishedWork) -> Option<UnfinishedWork> {
        if self.id == thread::current().id() {
            unsafe { &mut *self.pending_work.get() }.push_back(work);
            None
        } else {
            Some(work)
        }
    }

    /// Pops one unfinished work.
    ///
    /// It should only be called from the same thread where the queue is created.
    /// Otherwise it leads to undefined behavior.
    pub unsafe fn pop_work(&self) -> Option<UnfinishedWork> {
        let queue = &mut *self.pending_work.get();
        if queue.capacity() > QUEUE_CAPACITY && queue.len() < queue.capacity() / 2 {
            queue.shrink_to_fit();
        }
        { &mut *self.pending_work.get() }.pop_back()
    }
}

#[derive(Clone)]
pub struct CompletionQueue {
    handle: Arc<CompletionQueueHandle>,
    pub(crate) worker: Arc<WorkQueue>,
}

impl CompletionQueue {
    pub fn new(handle: Arc<CompletionQueueHandle>, worker: Arc<WorkQueue>) -> CompletionQueue {
        CompletionQueue { handle, worker }
    }

    /// Blocks until an event is available, the completion queue is being shut down.
    pub fn next(&self) -> Event {
        unsafe {
            let inf = grpc_sys::gpr_inf_future(gpr_clock_type::GPR_CLOCK_REALTIME);
            grpc_sys::grpc_completion_queue_next(self.handle.cq, inf, ptr::null_mut())
        }
    }

    pub fn borrow(&self) -> Result<CompletionQueueRef<'_>> {
        self.handle.add_ref()?;
        Ok(CompletionQueueRef { queue: self })
    }

    /// Begin destruction of a completion queue.
    ///
    /// Once all possible events are drained then `next()` will start to produce
    /// `Event::QueueShutdown` events only.
    pub fn shutdown(&self) {
        self.handle.shutdown()
    }

    pub fn worker_id(&self) -> ThreadId {
        self.worker.id
    }
}