use parking_lot::RwLock;
use std::collections::BTreeMap;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use crate::cx::Cx;
use super::service::{MethodDescriptor, NamedService, ServiceDescriptor, ServiceHandler};
use super::status::Status;
use super::streaming::{Request, Response};
pub type ReflectionAuthCallback = Arc<dyn Fn(&Cx, &str) -> Result<(), Status> + Send + Sync>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReflectedMethod {
pub name: String,
pub path: String,
pub client_streaming: bool,
pub server_streaming: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReflectedService {
pub name: String,
pub methods: Vec<ReflectedMethod>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ReflectionListServicesRequest;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ReflectionListServicesResponse {
pub services: Vec<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ReflectionDescribeServiceRequest {
pub service: String,
}
impl ReflectionDescribeServiceRequest {
#[must_use]
pub fn new(service: impl Into<String>) -> Self {
Self {
service: service.into(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReflectionDescribeServiceResponse {
pub service: ReflectedService,
}
#[derive(Clone, Default)]
enum ReflectionAuthMode {
#[default]
Locked,
Required(ReflectionAuthCallback),
Anonymous,
}
#[derive(Clone, Default)]
pub struct ReflectionService {
services: Arc<RwLock<BTreeMap<String, ReflectedService>>>,
auth: ReflectionAuthMode,
}
impl fmt::Debug for ReflectionService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let auth_label = match &self.auth {
ReflectionAuthMode::Locked => "Locked (no auth opt-in)",
ReflectionAuthMode::Required(_) => "Required(<fn>)",
ReflectionAuthMode::Anonymous => "Anonymous (dev only)",
};
f.debug_struct("ReflectionService")
.field("services", &self.services)
.field("auth", &auth_label)
.finish()
}
}
impl ReflectionService {
#[must_use]
pub fn new() -> Self {
Self {
services: Arc::new(RwLock::new(BTreeMap::new())),
auth: ReflectionAuthMode::Locked,
}
}
#[must_use]
pub fn with_auth<F>(mut self, callback: F) -> Self
where
F: Fn(&Cx, &str) -> Result<(), Status> + Send + Sync + 'static,
{
self.auth = ReflectionAuthMode::Required(Arc::new(callback));
self
}
#[must_use]
pub fn allow_anonymous(mut self) -> Self {
self.auth = ReflectionAuthMode::Anonymous;
self
}
#[must_use]
pub fn auth_installed(&self) -> bool {
matches!(self.auth, ReflectionAuthMode::Required(_))
}
#[allow(dead_code)]
fn check_auth(&self, method: &str) -> Result<(), Status> {
fn current_remote_cx(method: &str) -> Result<Cx, Status> {
let Some(cx) = Cx::current() else {
return Err(Status::permission_denied(format!(
"reflection.{method}: requires REMOTE capability"
)));
};
if !cx.has_remote() {
return Err(Status::permission_denied(format!(
"reflection.{method}: requires REMOTE capability"
)));
}
Ok(cx)
}
match &self.auth {
ReflectionAuthMode::Locked => Err(Status::permission_denied(format!(
"reflection.{method}: service is in Locked mode — call \
.with_auth(...) for production or .allow_anonymous() for \
dev/test before serving reflection RPCs"
))),
ReflectionAuthMode::Anonymous => {
current_remote_cx(method)?;
Ok(())
}
ReflectionAuthMode::Required(auth) => {
let cx = current_remote_cx(method)?;
auth(&cx, method)
}
}
}
#[must_use]
pub fn from_handlers<'a, I>(handlers: I) -> Self
where
I: IntoIterator<Item = &'a dyn ServiceHandler>,
{
let reflection = Self::new();
for handler in handlers {
reflection.register_handler(handler);
}
reflection
}
pub fn register_descriptor(&self, descriptor: &ServiceDescriptor) {
let reflected = ReflectedService {
name: descriptor.full_name(),
methods: descriptor
.methods
.iter()
.map(|method| ReflectedMethod {
name: method.name.to_string(),
path: method.path.to_string(),
client_streaming: method.client_streaming,
server_streaming: method.server_streaming,
})
.collect(),
};
self.services
.write()
.insert(reflected.name.clone(), reflected);
}
pub fn register_handler(&self, handler: &dyn ServiceHandler) {
self.register_descriptor(handler.descriptor());
}
pub fn list_services(&self) -> Result<Vec<String>, Status> {
self.check_auth("ListServices")?;
Ok(self.services.read().keys().cloned().collect())
}
pub fn describe_service(&self, service: &str) -> Result<ReflectedService, Status> {
self.check_auth("DescribeService")?;
self.services
.read()
.get(service)
.cloned()
.ok_or_else(|| Status::not_found(format!("service '{service}' not found")))
}
#[must_use]
pub fn list_services_async(
&self,
_request: &Request<ReflectionListServicesRequest>,
) -> Pin<
Box<dyn Future<Output = Result<Response<ReflectionListServicesResponse>, Status>> + Send>,
> {
let result = self
.list_services()
.map(|services| ReflectionListServicesResponse { services });
Box::pin(async move { result.map(Response::new) })
}
#[must_use]
pub fn describe_service_async(
&self,
request: &Request<ReflectionDescribeServiceRequest>,
) -> Pin<
Box<
dyn Future<Output = Result<Response<ReflectionDescribeServiceResponse>, Status>> + Send,
>,
> {
let result = self
.describe_service(&request.get_ref().service)
.map(|service| ReflectionDescribeServiceResponse { service });
Box::pin(async move { result.map(Response::new) })
}
}
impl NamedService for ReflectionService {
const NAME: &'static str = "grpc.reflection.v1alpha.ServerReflection";
}
impl ServiceHandler for ReflectionService {
fn descriptor(&self) -> &ServiceDescriptor {
static METHODS: &[MethodDescriptor] = &[MethodDescriptor::bidi_streaming(
"ServerReflectionInfo",
"/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo",
)];
static DESC: ServiceDescriptor =
ServiceDescriptor::new("ServerReflection", "grpc.reflection.v1alpha", METHODS);
&DESC
}
fn method_names(&self) -> Vec<&str> {
vec!["ServerReflectionInfo"]
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use insta::assert_json_snapshot;
use serde_json::{Value, json};
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
struct EchoService;
impl ServiceHandler for EchoService {
fn descriptor(&self) -> &ServiceDescriptor {
static METHODS: &[MethodDescriptor] = &[
MethodDescriptor::unary("Ping", "/pkg.Echo/Ping"),
MethodDescriptor::server_streaming("Watch", "/pkg.Echo/Watch"),
];
static DESC: ServiceDescriptor = ServiceDescriptor::new("Echo", "pkg", METHODS);
&DESC
}
fn method_names(&self) -> Vec<&str> {
vec!["Ping", "Watch"]
}
}
struct EnumShapeService;
impl ServiceHandler for EnumShapeService {
fn descriptor(&self) -> &ServiceDescriptor {
static METHODS: &[MethodDescriptor] = &[
MethodDescriptor::unary("Unary", "/pkg.EnumShape/Unary"),
MethodDescriptor::server_streaming("ServerStream", "/pkg.EnumShape/ServerStream"),
MethodDescriptor::client_streaming("ClientStream", "/pkg.EnumShape/ClientStream"),
MethodDescriptor::bidi_streaming("BidiStream", "/pkg.EnumShape/BidiStream"),
];
static DESC: ServiceDescriptor = ServiceDescriptor::new("EnumShape", "pkg", METHODS);
&DESC
}
fn method_names(&self) -> Vec<&str> {
vec!["Unary", "ServerStream", "ClientStream", "BidiStream"]
}
}
fn method_kind(method: &ReflectedMethod) -> &'static str {
match (method.client_streaming, method.server_streaming) {
(false, false) => "unary",
(false, true) => "server_streaming",
(true, false) => "client_streaming",
(true, true) => "bidi_streaming",
}
}
fn reflected_service_snapshot(service: &ReflectedService) -> Value {
json!({
"service": service.name,
"methods": service.methods.iter().map(|method| {
json!({
"name": method.name,
"path": method.path,
"kind": method_kind(method),
})
}).collect::<Vec<_>>(),
})
}
fn install_remote_reflection_cx() -> crate::cx::cx::CurrentCxGuard {
Cx::set_current(Some(Cx::for_testing_with_remote(
crate::remote::RemoteCap::new(),
)))
}
#[test]
fn reflection_register_list_and_describe() {
init_test("reflection_register_list_and_describe");
let reflection = ReflectionService::new().allow_anonymous();
let echo = EchoService;
reflection.register_handler(&echo);
let _remote_cx = install_remote_reflection_cx();
let services = reflection
.list_services()
.expect("anonymous mode permits list");
crate::assert_with_log!(
services == vec!["pkg.Echo".to_string()],
"service list",
vec!["pkg.Echo".to_string()],
services
);
let described = reflection
.describe_service("pkg.Echo")
.expect("service exists");
crate::assert_with_log!(
described.methods.len() == 2,
"method count",
2,
described.methods.len()
);
crate::assert_with_log!(
described.methods[0].name == "Ping",
"first method name",
"Ping",
&described.methods[0].name
);
crate::assert_with_log!(
described.methods[1].server_streaming,
"server streaming flag",
true,
described.methods[1].server_streaming
);
crate::test_complete!("reflection_register_list_and_describe");
}
#[test]
fn reflection_describe_missing_service() {
init_test("reflection_describe_missing_service");
let reflection = ReflectionService::new().allow_anonymous();
let _remote_cx = install_remote_reflection_cx();
let err = reflection
.describe_service("pkg.Missing")
.expect_err("missing service should fail");
crate::assert_with_log!(
err.code() == super::super::status::Code::NotFound,
"not found code",
super::super::status::Code::NotFound,
err.code()
);
crate::test_complete!("reflection_describe_missing_service");
}
#[test]
fn reflection_async_helpers() {
init_test("reflection_async_helpers");
let reflection = ReflectionService::new().allow_anonymous();
let echo = EchoService;
reflection.register_handler(&echo);
let _remote_cx = install_remote_reflection_cx();
let list = futures_lite::future::block_on(
reflection.list_services_async(&Request::new(ReflectionListServicesRequest)),
)
.expect("list succeeds");
crate::assert_with_log!(
list.get_ref().services == vec!["pkg.Echo".to_string()],
"async list",
vec!["pkg.Echo".to_string()],
&list.get_ref().services
);
let describe = futures_lite::future::block_on(reflection.describe_service_async(
&Request::new(ReflectionDescribeServiceRequest::new("pkg.Echo")),
))
.expect("describe succeeds");
crate::assert_with_log!(
describe.get_ref().service.name == "pkg.Echo",
"async describe name",
"pkg.Echo",
&describe.get_ref().service.name
);
crate::test_complete!("reflection_async_helpers");
}
#[test]
fn reflection_service_traits() {
init_test("reflection_service_traits");
let reflection = ReflectionService::new().allow_anonymous();
crate::assert_with_log!(
ReflectionService::NAME == "grpc.reflection.v1alpha.ServerReflection",
"service name",
"grpc.reflection.v1alpha.ServerReflection",
ReflectionService::NAME
);
let desc = reflection.descriptor();
crate::assert_with_log!(
desc.full_name() == "grpc.reflection.v1alpha.ServerReflection",
"descriptor full name",
"grpc.reflection.v1alpha.ServerReflection",
desc.full_name()
);
let methods = reflection.method_names();
crate::assert_with_log!(
methods == vec!["ServerReflectionInfo"],
"method names match the descriptor-exposed RPCs",
vec!["ServerReflectionInfo"],
methods
);
crate::test_complete!("reflection_service_traits");
}
#[test]
fn reflection_descriptor_enum_output_snapshot() {
init_test("reflection_descriptor_enum_output_snapshot");
let reflection = ReflectionService::new().allow_anonymous();
let enum_shape = EnumShapeService;
reflection.register_handler(&enum_shape);
let _remote_cx = install_remote_reflection_cx();
let described = reflection
.describe_service("pkg.EnumShape")
.expect("enum-shape service exists");
assert_json_snapshot!(
"grpc_reflection_descriptor_enum_output",
reflected_service_snapshot(&described)
);
crate::test_complete!("reflection_descriptor_enum_output_snapshot");
}
#[test]
fn auth_allow_anonymous_is_open() {
init_test("auth_allow_anonymous_is_open");
let reflection = ReflectionService::new().allow_anonymous();
reflection.register_handler(&EchoService);
assert!(
!reflection.auth_installed(),
"anonymous mode must not report a production auth callback",
);
let _remote_cx = install_remote_reflection_cx();
assert!(reflection.list_services().is_ok());
assert!(reflection.describe_service("pkg.Echo").is_ok());
crate::test_complete!("auth_allow_anonymous_is_open");
}
#[test]
fn auth_callback_can_reject() {
init_test("auth_callback_can_reject");
let reflection = ReflectionService::new()
.register_for_test()
.with_auth(|_cx, method| Err(Status::permission_denied(format!("denied: {method}"))));
assert!(reflection.auth_installed());
let _remote_cx = install_remote_reflection_cx();
let err_list = reflection.list_services().expect_err("auth must reject");
assert_eq!(
err_list.code(),
super::super::status::Code::PermissionDenied
);
assert!(err_list.message().contains("denied: ListServices"));
let err_desc = reflection
.describe_service("pkg.Echo")
.expect_err("auth must reject");
assert_eq!(
err_desc.code(),
super::super::status::Code::PermissionDenied
);
assert!(err_desc.message().contains("denied: DescribeService"));
crate::test_complete!("auth_callback_can_reject");
}
#[test]
fn auth_method_name_is_passed_to_callback() {
init_test("auth_method_name_is_passed_to_callback");
let saw_list = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let saw_describe = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let saw_list_clone = saw_list.clone();
let saw_describe_clone = saw_describe.clone();
let reflection =
ReflectionService::new()
.register_for_test()
.with_auth(move |_cx, method| {
match method {
"ListServices" => {
saw_list_clone.store(true, std::sync::atomic::Ordering::Relaxed)
}
"DescribeService" => {
saw_describe_clone.store(true, std::sync::atomic::Ordering::Relaxed)
}
_ => {}
}
Ok(())
});
let _guard = install_remote_reflection_cx();
let _ = reflection.list_services();
let _ = reflection.describe_service("pkg.Echo");
drop(_guard);
assert!(
saw_list.load(std::sync::atomic::Ordering::Relaxed),
"callback must see ListServices"
);
assert!(
saw_describe.load(std::sync::atomic::Ordering::Relaxed),
"callback must see DescribeService"
);
crate::test_complete!("auth_method_name_is_passed_to_callback");
}
impl ReflectionService {
fn register_for_test(self) -> Self {
self.register_handler(&EchoService);
self
}
}
#[test]
fn mi4hzh_locked_default_rejects_list_services() {
init_test("mi4hzh_locked_default_rejects_list_services");
let reflection = ReflectionService::new().register_for_test();
let err = reflection
.list_services()
.expect_err("Locked mode must reject ListServices");
assert_eq!(err.code(), super::super::status::Code::PermissionDenied);
let msg = err.message();
assert!(
msg.contains(".with_auth") && msg.contains(".allow_anonymous"),
"PermissionDenied message must name both opt-ins: {msg}"
);
crate::test_complete!("mi4hzh_locked_default_rejects_list_services");
}
#[test]
fn mi4hzh_locked_default_rejects_describe_service() {
init_test("mi4hzh_locked_default_rejects_describe_service");
let reflection = ReflectionService::new().register_for_test();
let err = reflection
.describe_service("pkg.Echo")
.expect_err("Locked mode must reject DescribeService");
assert_eq!(err.code(), super::super::status::Code::PermissionDenied);
crate::test_complete!("mi4hzh_locked_default_rejects_describe_service");
}
#[test]
fn mi4hzh_allow_anonymous_unlocks_dev_mode() {
init_test("mi4hzh_allow_anonymous_unlocks_dev_mode");
let reflection = ReflectionService::new()
.register_for_test()
.allow_anonymous();
let _remote_cx = install_remote_reflection_cx();
assert!(reflection.list_services().is_ok());
assert!(reflection.describe_service("pkg.Echo").is_ok());
assert!(!reflection.auth_installed());
crate::test_complete!("mi4hzh_allow_anonymous_unlocks_dev_mode");
}
#[test]
fn mi4hzh_with_auth_reports_installed() {
init_test("mi4hzh_with_auth_reports_installed");
let reflection = ReflectionService::new()
.register_for_test()
.with_auth(|_cx, _method| Ok(()));
assert!(reflection.auth_installed());
crate::test_complete!("mi4hzh_with_auth_reports_installed");
}
#[test]
fn tlp8m9_reflection_requires_remote_capability() {
init_test("tlp8m9_reflection_requires_remote_capability");
let reflection = ReflectionService::new()
.register_for_test()
.allow_anonymous();
let cx = crate::cx::Cx::for_testing();
let _current = crate::cx::Cx::set_current(Some(cx));
let _restricted = crate::cx::Cx::push_restriction(crate::cx::cap::CapMask::none());
let observed_cx = crate::cx::Cx::current().expect("restricted cx should be current");
assert!(
!observed_cx.has_remote(),
"test precondition: restricted current cx must not expose REMOTE"
);
let result = reflection.list_services();
assert!(
result.is_err(),
"list_services should fail without REMOTE capability"
);
if let Err(status) = result {
assert_eq!(status.code(), super::super::status::Code::PermissionDenied);
assert!(
status.message().contains("requires REMOTE capability"),
"Error message should mention REMOTE capability: {}",
status.message()
);
}
crate::test_complete!("tlp8m9_reflection_requires_remote_capability");
}
#[test]
fn tlp8m9_reflection_allows_with_remote_capability() {
init_test("tlp8m9_reflection_allows_with_remote_capability");
let reflection = ReflectionService::new()
.register_for_test()
.allow_anonymous();
let full_cx = crate::cx::Cx::for_testing_with_remote(crate::remote::RemoteCap::new());
let _current = crate::cx::Cx::set_current(Some(full_cx));
let result = reflection.list_services();
assert!(
result.is_ok(),
"list_services should succeed with REMOTE capability"
);
crate::test_complete!("tlp8m9_reflection_allows_with_remote_capability");
}
#[test]
fn tlp8m9_capability_check_applies_to_all_methods() {
init_test("tlp8m9_capability_check_applies_to_all_methods");
let reflection = ReflectionService::new()
.register_for_test()
.allow_anonymous();
let restricted_cx = crate::cx::Cx::for_testing();
let _current = crate::cx::Cx::set_current(Some(restricted_cx));
let _restricted = crate::cx::Cx::push_restriction(crate::cx::cap::CapMask::none());
let list_result = reflection.list_services();
let describe_result = reflection.describe_service("test.TestService");
assert!(
list_result.is_err(),
"list_services should fail without REMOTE"
);
assert!(
describe_result.is_err(),
"describe_service should fail without REMOTE"
);
crate::test_complete!("tlp8m9_capability_check_applies_to_all_methods");
}
#[test]
fn tlp8m9_capability_check_with_auth_callback() {
init_test("tlp8m9_capability_check_with_auth_callback");
let reflection = ReflectionService::new()
.register_for_test()
.with_auth(|_cx, method| {
Err(super::super::status::Status::permission_denied(format!(
"denied: {method}"
)))
});
let restricted_cx = crate::cx::Cx::for_testing();
let _current = crate::cx::Cx::set_current(Some(restricted_cx));
let _restricted = crate::cx::Cx::push_restriction(crate::cx::cap::CapMask::none());
let result = reflection.list_services();
assert!(result.is_err());
if let Err(status) = result {
assert!(
status.message().contains("requires REMOTE capability"),
"Should get capability error, not auth callback error: {}",
status.message()
);
}
crate::test_complete!("tlp8m9_capability_check_with_auth_callback");
}
}