use parking_lot::RwLock;
use std::collections::BTreeMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use super::service::{MethodDescriptor, NamedService, ServiceDescriptor, ServiceHandler};
use super::status::Status;
use super::streaming::{Request, Response};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReflectedMethod {
pub name: String,
pub path: String,
pub client_streaming: bool,
pub server_streaming: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReflectedService {
pub name: String,
pub methods: Vec<ReflectedMethod>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ReflectionListServicesRequest;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ReflectionListServicesResponse {
pub services: Vec<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ReflectionDescribeServiceRequest {
pub service: String,
}
impl ReflectionDescribeServiceRequest {
#[must_use]
pub fn new(service: impl Into<String>) -> Self {
Self {
service: service.into(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReflectionDescribeServiceResponse {
pub service: ReflectedService,
}
#[derive(Debug, Clone, Default)]
pub struct ReflectionService {
services: Arc<RwLock<BTreeMap<String, ReflectedService>>>,
}
impl ReflectionService {
#[must_use]
pub fn new() -> Self {
Self {
services: Arc::new(RwLock::new(BTreeMap::new())),
}
}
#[must_use]
pub fn from_handlers<'a, I>(handlers: I) -> Self
where
I: IntoIterator<Item = &'a dyn ServiceHandler>,
{
let reflection = Self::new();
for handler in handlers {
reflection.register_handler(handler);
}
reflection
}
pub fn register_descriptor(&self, descriptor: &ServiceDescriptor) {
let reflected = ReflectedService {
name: descriptor.full_name(),
methods: descriptor
.methods
.iter()
.map(|method| ReflectedMethod {
name: method.name.to_string(),
path: method.path.to_string(),
client_streaming: method.client_streaming,
server_streaming: method.server_streaming,
})
.collect(),
};
self.services
.write()
.insert(reflected.name.clone(), reflected);
}
pub fn register_handler(&self, handler: &dyn ServiceHandler) {
self.register_descriptor(handler.descriptor());
}
#[must_use]
pub fn list_services(&self) -> Vec<String> {
self.services.read().keys().cloned().collect()
}
pub fn describe_service(&self, service: &str) -> Result<ReflectedService, Status> {
self.services
.read()
.get(service)
.cloned()
.ok_or_else(|| Status::not_found(format!("service '{service}' not found")))
}
#[must_use]
pub fn list_services_async(
&self,
_request: &Request<ReflectionListServicesRequest>,
) -> Pin<
Box<dyn Future<Output = Result<Response<ReflectionListServicesResponse>, Status>> + Send>,
> {
let response = ReflectionListServicesResponse {
services: self.list_services(),
};
Box::pin(async move { Ok(Response::new(response)) })
}
#[must_use]
pub fn describe_service_async(
&self,
request: &Request<ReflectionDescribeServiceRequest>,
) -> Pin<
Box<
dyn Future<Output = Result<Response<ReflectionDescribeServiceResponse>, Status>> + Send,
>,
> {
let result = self
.describe_service(&request.get_ref().service)
.map(|service| ReflectionDescribeServiceResponse { service });
Box::pin(async move { result.map(Response::new) })
}
}
impl NamedService for ReflectionService {
const NAME: &'static str = "grpc.reflection.v1alpha.ServerReflection";
}
impl ServiceHandler for ReflectionService {
fn descriptor(&self) -> &ServiceDescriptor {
static METHODS: &[MethodDescriptor] = &[MethodDescriptor::bidi_streaming(
"ServerReflectionInfo",
"/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo",
)];
static DESC: ServiceDescriptor =
ServiceDescriptor::new("ServerReflection", "grpc.reflection.v1alpha", METHODS);
&DESC
}
fn method_names(&self) -> Vec<&str> {
vec!["ServerReflectionInfo"]
}
}
#[cfg(test)]
mod tests {
use super::*;
use insta::assert_json_snapshot;
use serde_json::{Value, json};
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
struct EchoService;
impl ServiceHandler for EchoService {
fn descriptor(&self) -> &ServiceDescriptor {
static METHODS: &[MethodDescriptor] = &[
MethodDescriptor::unary("Ping", "/pkg.Echo/Ping"),
MethodDescriptor::server_streaming("Watch", "/pkg.Echo/Watch"),
];
static DESC: ServiceDescriptor = ServiceDescriptor::new("Echo", "pkg", METHODS);
&DESC
}
fn method_names(&self) -> Vec<&str> {
vec!["Ping", "Watch"]
}
}
struct EnumShapeService;
impl ServiceHandler for EnumShapeService {
fn descriptor(&self) -> &ServiceDescriptor {
static METHODS: &[MethodDescriptor] = &[
MethodDescriptor::unary("Unary", "/pkg.EnumShape/Unary"),
MethodDescriptor::server_streaming("ServerStream", "/pkg.EnumShape/ServerStream"),
MethodDescriptor::client_streaming("ClientStream", "/pkg.EnumShape/ClientStream"),
MethodDescriptor::bidi_streaming("BidiStream", "/pkg.EnumShape/BidiStream"),
];
static DESC: ServiceDescriptor = ServiceDescriptor::new("EnumShape", "pkg", METHODS);
&DESC
}
fn method_names(&self) -> Vec<&str> {
vec!["Unary", "ServerStream", "ClientStream", "BidiStream"]
}
}
fn method_kind(method: &ReflectedMethod) -> &'static str {
match (method.client_streaming, method.server_streaming) {
(false, false) => "unary",
(false, true) => "server_streaming",
(true, false) => "client_streaming",
(true, true) => "bidi_streaming",
}
}
fn reflected_service_snapshot(service: &ReflectedService) -> Value {
json!({
"service": service.name,
"methods": service.methods.iter().map(|method| {
json!({
"name": method.name,
"path": method.path,
"kind": method_kind(method),
})
}).collect::<Vec<_>>(),
})
}
#[test]
fn reflection_register_list_and_describe() {
init_test("reflection_register_list_and_describe");
let reflection = ReflectionService::new();
let echo = EchoService;
reflection.register_handler(&echo);
let services = reflection.list_services();
crate::assert_with_log!(
services == vec!["pkg.Echo".to_string()],
"service list",
vec!["pkg.Echo".to_string()],
services
);
let described = reflection
.describe_service("pkg.Echo")
.expect("service exists");
crate::assert_with_log!(
described.methods.len() == 2,
"method count",
2,
described.methods.len()
);
crate::assert_with_log!(
described.methods[0].name == "Ping",
"first method name",
"Ping",
&described.methods[0].name
);
crate::assert_with_log!(
described.methods[1].server_streaming,
"server streaming flag",
true,
described.methods[1].server_streaming
);
crate::test_complete!("reflection_register_list_and_describe");
}
#[test]
fn reflection_describe_missing_service() {
init_test("reflection_describe_missing_service");
let reflection = ReflectionService::new();
let err = reflection
.describe_service("pkg.Missing")
.expect_err("missing service should fail");
crate::assert_with_log!(
err.code() == super::super::status::Code::NotFound,
"not found code",
super::super::status::Code::NotFound,
err.code()
);
crate::test_complete!("reflection_describe_missing_service");
}
#[test]
fn reflection_async_helpers() {
init_test("reflection_async_helpers");
let reflection = ReflectionService::new();
let echo = EchoService;
reflection.register_handler(&echo);
let list = futures_lite::future::block_on(
reflection.list_services_async(&Request::new(ReflectionListServicesRequest)),
)
.expect("list succeeds");
crate::assert_with_log!(
list.get_ref().services == vec!["pkg.Echo".to_string()],
"async list",
vec!["pkg.Echo".to_string()],
&list.get_ref().services
);
let describe = futures_lite::future::block_on(reflection.describe_service_async(
&Request::new(ReflectionDescribeServiceRequest::new("pkg.Echo")),
))
.expect("describe succeeds");
crate::assert_with_log!(
describe.get_ref().service.name == "pkg.Echo",
"async describe name",
"pkg.Echo",
&describe.get_ref().service.name
);
crate::test_complete!("reflection_async_helpers");
}
#[test]
fn reflection_service_traits() {
init_test("reflection_service_traits");
let reflection = ReflectionService::new();
crate::assert_with_log!(
ReflectionService::NAME == "grpc.reflection.v1alpha.ServerReflection",
"service name",
"grpc.reflection.v1alpha.ServerReflection",
ReflectionService::NAME
);
let desc = reflection.descriptor();
crate::assert_with_log!(
desc.full_name() == "grpc.reflection.v1alpha.ServerReflection",
"descriptor full name",
"grpc.reflection.v1alpha.ServerReflection",
desc.full_name()
);
let methods = reflection.method_names();
crate::assert_with_log!(
methods == vec!["ServerReflectionInfo"],
"method names match the descriptor-exposed RPCs",
vec!["ServerReflectionInfo"],
methods
);
crate::test_complete!("reflection_service_traits");
}
#[test]
fn reflection_descriptor_enum_output_snapshot() {
init_test("reflection_descriptor_enum_output_snapshot");
let reflection = ReflectionService::new();
let enum_shape = EnumShapeService;
reflection.register_handler(&enum_shape);
let described = reflection
.describe_service("pkg.EnumShape")
.expect("enum-shape service exists");
assert_json_snapshot!(
"grpc_reflection_descriptor_enum_output",
reflected_service_snapshot(&described)
);
crate::test_complete!("reflection_descriptor_enum_output_snapshot");
}
}