grpc 0.6.2

Rust implementation of gRPC
Documentation
use std::sync::Arc;

use std::panic::catch_unwind;
use std::panic::AssertUnwindSafe;

use bytes::Bytes;

use futures::future::Future;
use futures::stream::Stream;

use error::Error;

use method::*;
use req::*;
use resp::*;

use futures_misc::stream_single;
use misc::any_to_string;

pub trait MethodHandler<Req, Resp>
where
    Req: Send + 'static,
    Resp: Send + 'static,
{
    fn handle(&self, m: RequestOptions, req: StreamingRequest<Req>) -> StreamingResponse<Resp>;
}

pub struct MethodHandlerUnary<F> {
    f: Arc<F>,
}

pub struct MethodHandlerServerStreaming<F> {
    f: Arc<F>,
}

pub struct MethodHandlerClientStreaming<F> {
    f: Arc<F>,
}

pub struct MethodHandlerBidi<F> {
    f: Arc<F>,
}

impl<F> GrpcStreamingFlavor for MethodHandlerUnary<F> {
    type Flavor = GrpcStreamingUnary;

    fn streaming() -> GrpcStreaming {
        GrpcStreaming::Unary
    }
}

impl<F> GrpcStreamingFlavor for MethodHandlerClientStreaming<F> {
    type Flavor = GrpcStreamingClientStreaming;

    fn streaming() -> GrpcStreaming {
        GrpcStreaming::ClientStreaming
    }
}

impl<F> GrpcStreamingFlavor for MethodHandlerServerStreaming<F> {
    type Flavor = GrpcStreamingServerStreaming;

    fn streaming() -> GrpcStreaming {
        GrpcStreaming::ServerStreaming
    }
}

impl<F> GrpcStreamingFlavor for MethodHandlerBidi<F> {
    type Flavor = GrpcStreamingBidi;

    fn streaming() -> GrpcStreaming {
        GrpcStreaming::Bidi
    }
}

impl<F> MethodHandlerUnary<F> {
    pub fn new<Req, Resp>(f: F) -> Self
    where
        Req: Send + 'static,
        Resp: Send + 'static,
        F: Fn(RequestOptions, Req) -> SingleResponse<Resp> + Send + 'static,
    {
        MethodHandlerUnary { f: Arc::new(f) }
    }
}

impl<F> MethodHandlerClientStreaming<F> {
    pub fn new<Req, Resp>(f: F) -> Self
    where
        Req: Send + 'static,
        Resp: Send + 'static,
        F: Fn(RequestOptions, StreamingRequest<Req>) -> SingleResponse<Resp> + Send + 'static,
    {
        MethodHandlerClientStreaming { f: Arc::new(f) }
    }
}

impl<F> MethodHandlerServerStreaming<F> {
    pub fn new<Req, Resp>(f: F) -> Self
    where
        Req: Send + 'static,
        Resp: Send + 'static,
        F: Fn(RequestOptions, Req) -> StreamingResponse<Resp> + Send + 'static,
    {
        MethodHandlerServerStreaming { f: Arc::new(f) }
    }
}

impl<F> MethodHandlerBidi<F> {
    pub fn new<Req, Resp>(f: F) -> Self
    where
        Req: Send + 'static,
        Resp: Send + 'static,
        F: Fn(RequestOptions, StreamingRequest<Req>) -> StreamingResponse<Resp> + Send + 'static,
    {
        MethodHandlerBidi { f: Arc::new(f) }
    }
}

impl<Req, Resp, F> MethodHandler<Req, Resp> for MethodHandlerUnary<F>
where
    Req: Send + 'static,
    Resp: Send + 'static,
    F: Fn(RequestOptions, Req) -> SingleResponse<Resp> + Send + Sync + 'static,
{
    fn handle(&self, m: RequestOptions, req: StreamingRequest<Req>) -> StreamingResponse<Resp> {
        let f = self.f.clone();
        SingleResponse::new(stream_single(req.0).and_then(move |req| f(m, req).0)).into_stream()
    }
}

impl<Req: Send + 'static, Resp: Send + 'static, F> MethodHandler<Req, Resp>
    for MethodHandlerClientStreaming<F>
where
    Resp: Send + 'static,
    F: Fn(RequestOptions, StreamingRequest<Req>) -> SingleResponse<Resp> + Send + Sync + 'static,
{
    fn handle(&self, m: RequestOptions, req: StreamingRequest<Req>) -> StreamingResponse<Resp> {
        ((self.f)(m, req)).into_stream()
    }
}

impl<Req, Resp, F> MethodHandler<Req, Resp> for MethodHandlerServerStreaming<F>
where
    Req: Send + 'static,
    Resp: Send + 'static,
    F: Fn(RequestOptions, Req) -> StreamingResponse<Resp> + Send + Sync + 'static,
{
    fn handle(&self, o: RequestOptions, req: StreamingRequest<Req>) -> StreamingResponse<Resp> {
        let f = self.f.clone();
        StreamingResponse(Box::new(
            stream_single(req.0).and_then(move |req| f(o, req).0),
        ))
    }
}

impl<Req, Resp, F> MethodHandler<Req, Resp> for MethodHandlerBidi<F>
where
    Req: Send + 'static,
    Resp: Send + 'static,
    F: Fn(RequestOptions, StreamingRequest<Req>) -> StreamingResponse<Resp> + Send + Sync + 'static,
{
    fn handle(&self, m: RequestOptions, req: StreamingRequest<Req>) -> StreamingResponse<Resp> {
        (self.f)(m, req)
    }
}

pub(crate) trait MethodHandlerDispatch {
    fn start_request(
        &self,
        m: RequestOptions,
        grpc_frames: StreamingRequest<Bytes>,
    ) -> StreamingResponse<Vec<u8>>;
}

struct MethodHandlerDispatchImpl<Req, Resp> {
    desc: Arc<MethodDescriptor<Req, Resp>>,
    method_handler: Box<MethodHandler<Req, Resp> + Sync + Send>,
}

impl<Req, Resp> MethodHandlerDispatch for MethodHandlerDispatchImpl<Req, Resp>
where
    Req: Send + 'static,
    Resp: Send + 'static,
{
    fn start_request(
        &self,
        o: RequestOptions,
        req_grpc_frames: StreamingRequest<Bytes>,
    ) -> StreamingResponse<Vec<u8>> {
        let desc = self.desc.clone();
        let req = req_grpc_frames
            .0
            .and_then(move |frame| desc.req_marshaller.read(frame));
        let resp = catch_unwind(AssertUnwindSafe(|| {
            self.method_handler.handle(o, StreamingRequest::new(req))
        }));
        match resp {
            Ok(resp) => {
                let desc_copy = self.desc.clone();
                resp.and_then_items(move |resp| desc_copy.resp_marshaller.write(&resp))
            }
            Err(e) => {
                let message = any_to_string(e);
                StreamingResponse::err(Error::Panic(message))
            }
        }
    }
}

pub struct ServerMethod {
    pub(crate) name: String,
    pub(crate) dispatch: Box<MethodHandlerDispatch + Sync + Send>,
}

impl ServerMethod {
    pub fn new<Req, Resp, H>(method: Arc<MethodDescriptor<Req, Resp>>, handler: H) -> ServerMethod
    where
        Req: Send + 'static,
        Resp: Send + 'static,
        H: MethodHandler<Req, Resp> + 'static + Sync + Send,
    {
        ServerMethod {
            name: method.name.clone(),
            dispatch: Box::new(MethodHandlerDispatchImpl {
                desc: method,
                method_handler: Box::new(handler),
            }),
        }
    }
}