use std::sync::Arc;
use std::thread::{self, ThreadId};
use std::ptr;
use futures::executor::{self, Notify, Spawn};
use futures::{Async, Future};
use grpc_sys::{self, GprTimespec, GrpcAlarm};
use cq::CompletionQueue;
use error::{Error, Result};
use super::lock::SpinLock;
use super::CallTag;
type BoxFuture<T, E> = Box<Future<Item = T, Error = E> + Send>;
struct Alarm {
alarm: *mut GrpcAlarm,
}
impl Alarm {
fn new(cq: &CompletionQueue, tag: Box<CallTag>) -> Result<Alarm> {
let alarm = unsafe {
let ptr = Box::into_raw(tag);
let timeout = GprTimespec::inf_future();
let cq_ref = cq.borrow()?;
let alarm = grpc_sys::grpc_alarm_create(ptr::null_mut());
grpc_sys::grpc_alarm_set(alarm, cq_ref.as_ptr(), timeout, ptr as _, ptr::null_mut());
alarm
};
Ok(Alarm { alarm: alarm })
}
fn alarm(&mut self) {
unsafe { grpc_sys::grpc_alarm_cancel(self.alarm) }
}
}
impl Drop for Alarm {
fn drop(&mut self) {
unsafe { grpc_sys::grpc_alarm_destroy(self.alarm) }
}
}
pub struct SpawnHandle {
f: Option<Spawn<BoxFuture<(), ()>>>,
cq: CompletionQueue,
alarm: Option<Alarm>,
alarmed: bool,
}
impl SpawnHandle {
pub fn new(s: Spawn<BoxFuture<(), ()>>, cq: CompletionQueue) -> SpawnHandle {
SpawnHandle {
f: Some(s),
cq: cq,
alarm: None,
alarmed: false,
}
}
pub fn notify(&mut self, tag: Box<CallTag>) {
self.alarm.take();
let mut alarm = match Alarm::new(&self.cq, tag) {
Ok(a) => a,
Err(Error::QueueShutdown) => {
return;
}
Err(e) => panic!("failed to create alarm: {:?}", e),
};
alarm.alarm();
self.alarm = Some(alarm);
}
}
#[derive(Clone)]
pub struct SpawnNotify {
handle: Arc<SpinLock<SpawnHandle>>,
worker_id: ThreadId,
}
impl SpawnNotify {
fn new(s: Spawn<BoxFuture<(), ()>>, cq: CompletionQueue) -> SpawnNotify {
SpawnNotify {
worker_id: cq.worker_id(),
handle: Arc::new(SpinLock::new(SpawnHandle::new(s, cq))),
}
}
pub fn resolve(self, success: bool) {
assert!(!success);
poll(Arc::new(self.clone()), true);
}
}
unsafe impl Send for SpawnNotify {}
unsafe impl Sync for SpawnNotify {}
impl Notify for SpawnNotify {
fn notify(&self, _: usize) {
if thread::current().id() == self.worker_id {
poll(Arc::new(self.clone()), false)
} else {
let mut handle = self.handle.lock();
if handle.alarmed {
return;
}
handle.notify(Box::new(CallTag::Spawn(self.clone())));
handle.alarmed = true;
}
}
}
fn poll(notify: Arc<SpawnNotify>, woken: bool) {
let mut handle = notify.handle.lock();
if woken {
handle.alarmed = false;
}
if handle.f.is_none() {
return;
}
match handle.f.as_mut().unwrap().poll_future_notify(¬ify, 0) {
Err(_) | Ok(Async::Ready(_)) => {
handle.f.take();
return;
}
_ => {}
}
}
pub struct Executor<'a> {
cq: &'a CompletionQueue,
}
impl<'a> Executor<'a> {
pub fn new(cq: &CompletionQueue) -> Executor {
Executor { cq: cq }
}
pub(crate) fn cq(&self) -> &CompletionQueue {
self.cq
}
pub fn spawn<F>(&self, f: F)
where
F: Future<Item = (), Error = ()> + Send + 'static,
{
let s = executor::spawn(Box::new(f) as BoxFuture<_, _>);
let notify = Arc::new(SpawnNotify::new(s, self.cq.clone()));
poll(notify, false)
}
}