use std::sync::Arc;
use bytes::Bytes;
use httpbis;
use httpbis::Header;
use httpbis::Headers;
use stream_item::ItemOrMetadata;
use result::Result;
use tls_api;
use tls_api_stub;
use futures::stream;
use futures::stream::Stream;
use futures::Future;
use error::*;
use grpc::*;
use grpc_frame::*;
use httpbis::AnySocketAddr;
use httpbis::DataOrTrailers;
use httpbis::HttpStreamAfterHeaders;
use metadata::Metadata;
use req::*;
use resp::*;
use server_method::*;
pub struct ServerServiceDefinition {
pub prefix: String,
pub methods: Vec<ServerMethod>,
}
impl ServerServiceDefinition {
pub fn new(prefix: &str, methods: Vec<ServerMethod>) -> ServerServiceDefinition {
ServerServiceDefinition {
prefix: prefix.to_owned(),
methods: methods,
}
}
pub fn find_method(&self, name: &str) -> Option<&ServerMethod> {
self.methods.iter().filter(|m| m.name == name).next()
}
pub fn handle_method(
&self,
name: &str,
o: RequestOptions,
message: StreamingRequest<Bytes>,
) -> StreamingResponse<Vec<u8>> {
match self.find_method(name) {
Some(method) => method.dispatch.start_request(o, message),
None => StreamingResponse::no_metadata(Box::new(stream::once(Err(
Error::GrpcMessage(GrpcMessageError {
grpc_status: GrpcStatus::Unimplemented as i32,
grpc_message: String::from("Unimplemented method"),
}),
)))),
}
}
}
#[derive(Default, Debug, Clone)]
pub struct ServerConf {}
impl ServerConf {
pub fn new() -> ServerConf {
Default::default()
}
}
pub struct ServerBuilder<A: tls_api::TlsAcceptor = tls_api_stub::TlsAcceptor> {
pub http: httpbis::ServerBuilder<A>,
pub conf: ServerConf,
}
impl ServerBuilder<tls_api_stub::TlsAcceptor> {
pub fn new_plain() -> ServerBuilder {
ServerBuilder::new()
}
}
impl<A: tls_api::TlsAcceptor> ServerBuilder<A> {
pub fn new() -> ServerBuilder<A> {
ServerBuilder {
http: httpbis::ServerBuilder::new(),
conf: ServerConf::new(),
}
}
#[cfg(unix)]
pub fn new_unix() -> ServerBuilder<A> {
ServerBuilder {
http: httpbis::ServerBuilder::new(),
conf: ServerConf::new(),
}
}
pub fn add_service(&mut self, def: ServerServiceDefinition) {
self.http.service.set_service(
&def.prefix.clone(),
Arc::new(GrpcHttpService {
service_definition: Arc::new(def),
}),
);
}
pub fn build(mut self) -> Result<Server> {
self.http.conf.thread_name = Some(
self.http
.conf
.thread_name
.unwrap_or_else(|| "grpc-server-loop".to_owned()),
);
Ok(Server {
server: self.http.build()?,
})
}
}
pub struct Server {
server: httpbis::Server,
}
impl Server {
pub fn local_addr(&self) -> &AnySocketAddr {
self.server.local_addr()
}
pub fn is_alive(&self) -> bool {
self.server.is_alive()
}
}
struct GrpcHttpService {
service_definition: Arc<ServerServiceDefinition>,
}
fn http_response_500(message: &str) -> httpbis::Response {
let headers = Headers(vec![
Header::new(":status", "500"),
Header::new(HEADER_GRPC_MESSAGE, message.to_owned()),
]);
httpbis::Response::headers_and_stream(headers, httpbis::HttpStreamAfterHeaders::empty())
}
impl httpbis::Service for GrpcHttpService {
fn start_request(&self, headers: Headers, req: HttpStreamAfterHeaders) -> httpbis::Response {
let path = match headers.get_opt(":path") {
Some(path) => path.to_owned(),
None => return http_response_500("no :path header"),
};
let grpc_request = GrpcFrameFromHttpFramesStreamRequest::new(req);
let metadata = match Metadata::from_headers(headers) {
Ok(metadata) => metadata,
Err(_) => return http_response_500("decode metadata error"),
};
let request_options = RequestOptions { metadata: metadata };
let grpc_response = self.service_definition.handle_method(
&path,
request_options,
StreamingRequest::new(grpc_request),
);
httpbis::Response::new(grpc_response.0.map_err(httpbis::Error::from).map(
|(metadata, grpc_frames)| {
let mut init_headers = Headers(vec![
Header::new(":status", "200"),
Header::new("content-type", "application/grpc"),
]);
init_headers.extend(metadata.into_headers());
let s2 = grpc_frames
.map_items(|frame| {
DataOrTrailers::intermediate_data(Bytes::from(write_grpc_frame_to_vec(
&frame,
)))
}).then_items(|result| match result {
Ok(part) => Ok(part),
Err(e) => {
let (grpc_status, grpc_message) = match e {
Error::GrpcMessage(GrpcMessageError {
grpc_status,
grpc_message,
}) => (grpc_status, grpc_message),
e => (GrpcStatus::Internal as i32, format!("error: {:?}", e)),
};
Ok(DataOrTrailers::Trailers(Headers(vec![
Header::new(HEADER_GRPC_STATUS, format!("{}", grpc_status)),
Header::new(HEADER_GRPC_MESSAGE, grpc_message),
])))
}
}).0
.map(|item| match item {
ItemOrMetadata::Item(part) => part,
ItemOrMetadata::TrailingMetadata(trailing_metadata) => {
DataOrTrailers::Trailers({
let mut trailing =
Headers(vec![Header::new(HEADER_GRPC_STATUS, "0")]);
trailing.extend(trailing_metadata.into_headers());
trailing
})
}
}).map_err(httpbis::Error::from);
let s3 = stream::once(Ok(DataOrTrailers::Trailers(Headers(vec![Header::new(
HEADER_GRPC_STATUS,
"0",
)]))));
let http_parts = HttpStreamAfterHeaders::new(s2.chain(s3));
(init_headers, http_parts)
},
))
}
}