grpcio 0.13.0

The rust language implementation of gRPC, base on the gRPC c core library.
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

//! gRPC C Core binds a call to a completion queue, all the related readiness
//! will be forwarded to the completion queue. This module utilizes the mechanism
//! and using `Kicker` to wake up completion queue.
//!
//! Apparently, to minimize context switch, it's better to bind the future to the
//! same completion queue as its inner call. Hence method `Executor::spawn` is provided.

use std::cell::UnsafeCell;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};

use futures_util::task::{waker_ref, ArcWake};

use super::CallTag;
use crate::call::Call;
use crate::cq::{CompletionQueue, WorkQueue};
use crate::error::{Error, Result};
use crate::grpc_sys::{self, grpc_call_error};

/// A handle to a `Spawn`.
/// Inner future is expected to be polled in the same thread as cq.
type SpawnHandle = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

/// `Kicker` wakes up the completion queue that the inner call binds to.
pub(crate) struct Kicker {
    call: Call,
}

impl Kicker {
    pub fn from_call(call: Call) -> Kicker {
        Kicker { call }
    }

    /// Wakes up its completion queue.
    ///
    /// `tag` will be popped by `grpc_completion_queue_next` in the future.
    pub fn kick(&self, tag: Box<CallTag>) -> Result<()> {
        let _ref = self.call.cq.borrow()?;
        unsafe {
            let ptr = Box::into_raw(tag);
            let status = grpc_sys::grpcwrap_call_kick_completion_queue(self.call.call, ptr as _);
            if status == grpc_call_error::GRPC_CALL_OK {
                Ok(())
            } else {
                Err(Error::CallFailure(status))
            }
        }
    }
}

unsafe impl Sync for Kicker {}

impl Clone for Kicker {
    fn clone(&self) -> Kicker {
        // Bump call's reference count.
        let call = unsafe {
            grpc_sys::grpc_call_ref(self.call.call);
            self.call.call
        };
        let cq = self.call.cq.clone();
        Kicker {
            call: Call { call, cq },
        }
    }
}

/// When a future is scheduled, it becomes IDLE. When it's ready to be polled,
/// it will be notified via task.wake(), and marked as NOTIFIED. When executor
/// begins to poll the future, it's marked as POLLING. When the executor finishes
/// polling, the future can either be ready or not ready. In the former case, it's
/// marked as COMPLETED. If it's latter, it's marked as IDLE again.
///
/// Note it's possible the future is notified during polling, in which case, executor
/// should polling it when last polling is finished unless it returns ready.
const NOTIFIED: u8 = 1;
const IDLE: u8 = 2;
const POLLING: u8 = 3;
const COMPLETED: u8 = 4;

/// Maintains the spawned future with state, so that it can be notified and polled efficiently.
pub struct SpawnTask {
    handle: UnsafeCell<Option<SpawnHandle>>,
    state: AtomicU8,
    kicker: Kicker,
    queue: Arc<WorkQueue>,
}

/// `SpawnTask` access is guarded by `state` field, which guarantees Sync.
///
/// Sync is required by `ArcWake`.
unsafe impl Sync for SpawnTask {}

impl SpawnTask {
    fn new(s: SpawnHandle, kicker: Kicker, queue: Arc<WorkQueue>) -> SpawnTask {
        SpawnTask {
            handle: UnsafeCell::new(Some(s)),
            state: AtomicU8::new(IDLE),
            kicker,
            queue,
        }
    }

    /// Marks the state of this task to NOTIFIED.
    ///
    /// Returns true means the task was IDLE, needs to be scheduled.
    fn mark_notified(&self) -> bool {
        loop {
            match self.state.compare_exchange_weak(
                IDLE,
                NOTIFIED,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(_) => return true,
                Err(POLLING) => match self.state.compare_exchange_weak(
                    POLLING,
                    NOTIFIED,
                    Ordering::AcqRel,
                    Ordering::Acquire,
                ) {
                    Err(IDLE) | Err(POLLING) => continue,
                    // If it succeeds, then executor will poll the future again;
                    // if it fails, then the future should be resolved. In both
                    // cases, no need to notify the future, hence return false.
                    _ => return false,
                },
                Err(IDLE) => continue,
                _ => return false,
            }
        }
    }
}

pub fn resolve(task: Arc<SpawnTask>, success: bool) {
    // it should always be canceled for now.
    assert!(success);
    poll(task, true);
}

/// A custom Waker.
///
/// It will push the inner future to work_queue if it's notified on the
/// same thread as inner cq.
impl ArcWake for SpawnTask {
    fn wake_by_ref(task: &Arc<Self>) {
        if !task.mark_notified() {
            return;
        }

        // It can lead to deadlock if poll the future immediately. So we need to
        // defer the work instead.
        if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) {
            match task.kicker.kick(Box::new(CallTag::Spawn(w))) {
                // If the queue is shutdown, then the tag will be notified
                // eventually. So just skip here.
                Err(Error::QueueShutdown) => (),
                Err(e) => panic!("unexpected error when canceling call: {:?}", e),
                _ => (),
            }
        }
    }
}

/// Work that should be deferred to be handled.
///
/// Sometimes a work can't be done immediately as it might lead
/// to resource conflict, deadlock for example. So they will be
/// pushed into a queue and handled when current work is done.
pub struct UnfinishedWork(Arc<SpawnTask>);

impl UnfinishedWork {
    pub fn finish(self) {
        resolve(self.0, true);
    }
}

/// Poll the future.
///
/// `woken` indicates that if the cq is waken up by itself.
fn poll(task: Arc<SpawnTask>, woken: bool) {
    let mut init_state = if woken { NOTIFIED } else { IDLE };
    // TODO: maybe we need to break the loop to avoid hunger.
    loop {
        match task
            .state
            .compare_exchange(init_state, POLLING, Ordering::AcqRel, Ordering::Acquire)
        {
            Ok(_) => {}
            Err(COMPLETED) => return,
            Err(s) => panic!("unexpected state {}", s),
        }

        let waker = waker_ref(&task);
        let mut cx = Context::from_waker(&waker);

        // L208 "lock"s state, hence it's safe to get a mutable reference.
        match unsafe { &mut *task.handle.get() }
            .as_mut()
            .unwrap()
            .as_mut()
            .poll(&mut cx)
        {
            Poll::Ready(()) => {
                task.state.store(COMPLETED, Ordering::Release);
                unsafe { &mut *task.handle.get() }.take();
            }
            _ => {
                match task.state.compare_exchange(
                    POLLING,
                    IDLE,
                    Ordering::AcqRel,
                    Ordering::Acquire,
                ) {
                    Ok(_) => return,
                    Err(NOTIFIED) => {
                        init_state = NOTIFIED;
                    }
                    Err(s) => panic!("unexpected state {}", s),
                }
            }
        }
    }
}

/// An executor that drives a future in the gRPC poll thread, which
/// can reduce thread context switching.
pub(crate) struct Executor<'a> {
    cq: &'a CompletionQueue,
}

impl<'a> Executor<'a> {
    pub fn new(cq: &CompletionQueue) -> Executor<'_> {
        Executor { cq }
    }

    pub fn cq(&self) -> &CompletionQueue {
        self.cq
    }

    /// Spawn the future into inner poll loop.
    ///
    /// If you want to trace the future, you may need to create a sender/receiver
    /// pair by yourself.
    pub fn spawn<F>(&self, f: F, kicker: Kicker)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        let s = Box::pin(f);
        let notify = Arc::new(SpawnTask::new(s, kicker, self.cq.worker.clone()));
        poll(notify, false)
    }
}