#[macro_use]
extern crate log;
mod test_misc;
use std::sync::Arc;
use futures::stream::StreamExt;
use grpc::rt::*;
use grpc::*;
use futures::executor;
use futures::{future, TryFutureExt, TryStreamExt};
use std::thread;
use test_misc::*;
fn new_server<H>(service: &str, method: &str, handler: H) -> Server
where
H: MethodHandler<String, String> + 'static + Sync + Send,
H: GrpcStreamingFlavor,
{
let mut methods = Vec::new();
methods.push(ServerMethod::new(
string_string_method(
&format!("{}{}", service, method),
<H as GrpcStreamingFlavor>::streaming(),
),
handler,
));
let mut server = ServerBuilder::new_plain();
server.http.set_port(0);
server.add_service(ServerServiceDefinition::new(service, methods));
server.build().expect("server")
}
fn new_server_unary<H>(service: &str, method: &str, handler: H) -> Server
where
H: Fn(
ServerHandlerContext,
ServerRequestSingle<String>,
ServerResponseUnarySink<String>,
) -> grpc::Result<()>
+ Sync
+ Send
+ 'static,
{
new_server(service, method, MethodHandlerUnary::new(handler))
}
fn new_server_server_streaming<H>(service: &str, method: &str, handler: H) -> Server
where
H: Fn(
ServerHandlerContext,
ServerRequestSingle<String>,
ServerResponseSink<String>,
) -> grpc::Result<()>
+ Sync
+ Send
+ 'static,
{
new_server(service, method, MethodHandlerServerStreaming::new(handler))
}
fn new_server_client_streaming<H>(service: &str, method: &str, handler: H) -> Server
where
H: Fn(
ServerHandlerContext,
ServerRequest<String>,
ServerResponseUnarySink<String>,
) -> grpc::Result<()>
+ Sync
+ Send
+ 'static,
{
new_server(service, method, MethodHandlerClientStreaming::new(handler))
}
struct TesterUnary {
name: String,
client: Client,
_server: Server,
}
impl TesterUnary {
fn new<H>(handler: H) -> TesterUnary
where
H: Fn(
ServerHandlerContext,
ServerRequestSingle<String>,
ServerResponseUnarySink<String>,
) -> grpc::Result<()>
+ Sync
+ Send
+ 'static,
{
let server = new_server_unary("/text", "/Unary", handler);
let port = server.local_addr().port().expect("port");
TesterUnary {
name: "/text/Unary".to_owned(),
_server: server,
client: ClientBuilder::new(BIND_HOST, port).build().unwrap(),
}
}
fn call(&self, param: &str) -> GrpcFuture<String> {
self.client
.call_unary(
RequestOptions::new(),
param.to_owned(),
string_string_method(&self.name, GrpcStreaming::Unary),
)
.drop_metadata()
}
fn call_expect_error<F: FnOnce(&Error) -> bool>(&self, param: &str, expect: F) {
match executor::block_on(self.call(param)) {
Ok(r) => panic!("expecting error, got: {:?}", r),
Err(e) => assert!(expect(&e), "wrong error: {:?}", e),
}
}
fn call_expect_grpc_error<F: FnOnce(&str) -> bool>(&self, param: &str, expect: F) {
self.call_expect_error(param, |e| match e {
&Error::GrpcMessage(GrpcMessageError {
ref grpc_message, ..
}) if expect(&grpc_message) => true,
_ => false,
});
}
fn call_expect_grpc_error_contain(&self, param: &str, expect: &str) {
self.call_expect_grpc_error(param, |m| m.find(expect).is_some());
}
}
struct TesterServerStreaming {
name: String,
client: Client,
_server: Server,
}
impl TesterServerStreaming {
fn new<H>(handler: H) -> Self
where
H: Fn(
ServerHandlerContext,
ServerRequestSingle<String>,
ServerResponseSink<String>,
) -> grpc::Result<()>
+ Sync
+ Send
+ 'static,
{
let server = new_server_server_streaming("/test", "/ServerStreaming", handler);
let port = server.local_addr().port().expect("port");
TesterServerStreaming {
name: "/test/ServerStreaming".to_owned(),
_server: server,
client: ClientBuilder::new(BIND_HOST, port).build().unwrap(),
}
}
fn call(&self, param: &str) -> GrpcStream<String> {
self.client
.call_server_streaming(
RequestOptions::new(),
param.to_owned(),
string_string_method(&self.name, GrpcStreaming::ServerStreaming),
)
.drop_metadata()
}
}
struct TesterClientStreaming {
name: String,
client: Client,
_server: Server,
}
impl TesterClientStreaming {
fn new<H>(handler: H) -> Self
where
H: Fn(
ServerHandlerContext,
ServerRequest<String>,
ServerResponseUnarySink<String>,
) -> grpc::Result<()>
+ Sync
+ Send
+ 'static,
{
let server = new_server_client_streaming("/test", "/ClientStreaming", handler);
let port = server.local_addr().port().expect("port");
TesterClientStreaming {
name: "/test/ClientStreaming".to_owned(),
_server: server,
client: ClientBuilder::new(BIND_HOST, port).build().unwrap(),
}
}
fn call(&self) -> (ClientRequestSink<String>, SingleResponse<String>) {
executor::block_on(self.client.call_client_streaming(
RequestOptions::new(),
string_string_method(&self.name, GrpcStreaming::ClientStreaming),
))
.unwrap()
}
}
#[test]
fn unary() {
init_logger();
let tester = TesterUnary::new(|_m, req, resp| resp.finish(req.message));
assert_eq!("aa", executor::block_on(tester.call("aa")).unwrap());
}
#[test]
fn error_in_handler() {
init_logger();
let tester = TesterUnary::new(|_m, _req, _resp| Err(grpc::Error::Other("my error")));
tester.call_expect_grpc_error_contain("aa", "grpc server handler did not close the sender");
}
fn _panic_in_handler() {
init_logger();
let tester = TesterUnary::new(|_m, _req, _resp| panic!("icnap"));
tester.call_expect_grpc_error("aa", |m| {
m.find("Panic").is_some() && m.find("icnap").is_some()
});
}
#[test]
fn server_streaming() {
init_logger();
let test_sync = Arc::new(TestSync::new());
let test_sync_server = test_sync.clone();
let tester = TesterServerStreaming::new(move |_m, req, mut resp| {
let sync = test_sync_server.clone();
thread::spawn(move || {
for i in 0..3 {
sync.take(i * 2 + 1);
resp.send_data(format!("{}{}", req.message, i)).unwrap();
}
resp.send_trailers(Metadata::new()).unwrap();
});
Ok(())
});
let mut rs = tester.call("x");
test_sync.take(0);
executor::block_on(async {
assert_eq!("x0", rs.next().await.unwrap().unwrap());
test_sync.take(2);
assert_eq!("x1", rs.next().await.unwrap().unwrap());
test_sync.take(4);
assert_eq!("x2", rs.next().await.unwrap().unwrap());
assert!(rs.next().await.is_none());
});
}
#[test]
fn client_streaming() {
init_logger();
let tester = TesterClientStreaming::new(move |m, req, resp| {
let request_stream = req.into_stream();
m.ctx.loop_remote().spawn(
request_stream
.try_fold(String::new(), |mut s, message| {
s.push_str(&message);
future::ok::<_, Error>(s)
})
.map_ok(|r| {
resp.finish(r).unwrap();
})
.map_err(|_| ()),
);
Ok(())
});
let (mut tx, result) = tester.call();
tx.send_data("aa".to_owned()).expect("aa");
tx.send_data("bb".to_owned()).expect("bb");
tx.send_data("cc".to_owned()).expect("cc");
tx.finish().expect("finish");
assert_eq!(
"aabbcc",
executor::block_on(result.drop_metadata()).unwrap()
);
}