mod executor;
mod promise;
mod callback;
mod lock;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use futures::{Async, Future, Poll};
use futures::task::{self, Task};
use call::{BatchContext, Call};
use call::server::RequestContext;
use cq::CompletionQueue;
use error::{Error, Result};
use self::executor::SpawnNotify;
use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback};
use self::promise::{Batch as BatchPromise, Shutdown as ShutdownPromise};
use server::Inner as ServerInner;
pub use self::executor::Executor;
pub use self::promise::BatchType;
pub use self::lock::SpinLock;
pub struct NotifyHandle<T> {
result: Option<Result<T>>,
task: Option<Task>,
stale: bool,
}
impl<T> NotifyHandle<T> {
fn new() -> NotifyHandle<T> {
NotifyHandle {
result: None,
task: None,
stale: false,
}
}
fn set_result(&mut self, res: Result<T>) -> Option<Task> {
self.result = Some(res);
self.task.take()
}
}
type Inner<T> = SpinLock<NotifyHandle<T>>;
fn new_inner<T>() -> Arc<Inner<T>> {
Arc::new(SpinLock::new(NotifyHandle::new()))
}
pub fn check_alive<T>(f: &CqFuture<T>) -> Result<()> {
let guard = f.inner.lock();
match guard.result {
None => Ok(()),
Some(Err(Error::RpcFailure(ref status))) => {
Err(Error::RpcFinished(Some(status.to_owned())))
}
Some(Ok(_)) | Some(Err(_)) => Err(Error::RpcFinished(None)),
}
}
pub struct CqFuture<T> {
inner: Arc<Inner<T>>,
}
impl<T> CqFuture<T> {
fn new(inner: Arc<Inner<T>>) -> CqFuture<T> {
CqFuture { inner: inner }
}
}
impl<T> Future for CqFuture<T> {
type Item = T;
type Error = Error;
fn poll(&mut self) -> Poll<T, Error> {
let mut guard = self.inner.lock();
if guard.stale {
panic!("Resolved future is not supposed to be polled again.");
}
if let Some(res) = guard.result.take() {
guard.stale = true;
return Ok(Async::Ready(res?));
}
if guard.task.is_none() || !guard.task.as_ref().unwrap().will_notify_current() {
guard.task = Some(task::current());
}
Ok(Async::NotReady)
}
}
pub type BatchMessage = Option<Vec<u8>>;
pub type BatchFuture = CqFuture<BatchMessage>;
pub enum CallTag {
Batch(BatchPromise),
Request(RequestCallback),
UnaryRequest(UnaryRequestCallback),
Abort(Abort),
Shutdown(ShutdownPromise),
Spawn(SpawnNotify),
}
impl CallTag {
pub fn batch_pair(ty: BatchType) -> (BatchFuture, CallTag) {
let inner = new_inner();
let batch = BatchPromise::new(ty, inner.clone());
(CqFuture::new(inner), CallTag::Batch(batch))
}
pub fn request(inner: Arc<ServerInner>) -> CallTag {
CallTag::Request(RequestCallback::new(inner))
}
pub fn shutdown_pair() -> (CqFuture<()>, CallTag) {
let inner = new_inner();
let shutdown = ShutdownPromise::new(inner.clone());
(CqFuture::new(inner), CallTag::Shutdown(shutdown))
}
pub fn abort(call: Call) -> CallTag {
CallTag::Abort(Abort::new(call))
}
pub fn unary_request(ctx: RequestContext, inner: Arc<ServerInner>) -> CallTag {
let cb = UnaryRequestCallback::new(ctx, inner);
CallTag::UnaryRequest(cb)
}
pub fn batch_ctx(&self) -> Option<&BatchContext> {
match *self {
CallTag::Batch(ref prom) => Some(prom.context()),
CallTag::UnaryRequest(ref cb) => Some(cb.batch_ctx()),
CallTag::Abort(ref cb) => Some(cb.batch_ctx()),
_ => None,
}
}
pub fn request_ctx(&self) -> Option<&RequestContext> {
match *self {
CallTag::Request(ref prom) => Some(prom.context()),
CallTag::UnaryRequest(ref cb) => Some(cb.request_ctx()),
_ => None,
}
}
pub fn resolve(self, cq: &CompletionQueue, success: bool) {
match self {
CallTag::Batch(prom) => prom.resolve(success),
CallTag::Request(cb) => cb.resolve(cq, success),
CallTag::UnaryRequest(cb) => cb.resolve(cq, success),
CallTag::Abort(_) => {}
CallTag::Shutdown(prom) => prom.resolve(success),
CallTag::Spawn(notify) => notify.resolve(success),
}
}
}
impl Debug for CallTag {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match *self {
CallTag::Batch(ref ctx) => write!(f, "CallTag::Batch({:?})", ctx),
CallTag::Request(_) => write!(f, "CallTag::Request(..)"),
CallTag::UnaryRequest(_) => write!(f, "CallTag::UnaryRequest(..)"),
CallTag::Abort(_) => write!(f, "CallTag::Abort(..)"),
CallTag::Shutdown(_) => write!(f, "CallTag::Shutdown"),
CallTag::Spawn(_) => write!(f, "CallTag::Spawn"),
}
}
}
#[cfg(test)]
mod tests {
use std::thread;
use std::sync::*;
use std::sync::mpsc::*;
use super::*;
use env::Environment;
#[test]
fn test_resolve() {
let env = Environment::new(1);
let (cq_f1, tag1) = CallTag::shutdown_pair();
let (cq_f2, tag2) = CallTag::shutdown_pair();
let (tx, rx) = mpsc::channel();
let handler = thread::spawn(move || {
tx.send(cq_f1.wait()).unwrap();
tx.send(cq_f2.wait()).unwrap();
});
assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
tag1.resolve(&env.pick_cq(), true);
assert!(rx.recv().unwrap().is_ok());
assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
tag2.resolve(&env.pick_cq(), false);
match rx.recv() {
Ok(Err(Error::ShutdownFailed)) => {}
res => panic!("expect shutdown failed, but got {:?}", res),
}
handler.join().unwrap();
}
}