use core::time::Duration;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::watch;
use tonic::service::Routes;
use tonic::transport::Server as TonicServer;
use tsoracle_consensus::ConsensusDriver;
#[cfg(any(test, feature = "test-fakes"))]
use tsoracle_core::{CoreError, WindowGrant};
use tsoracle_core::{Epoch, PeerEndpoint};
use tsoracle_proto::v1::tso_service_server::TsoServiceServer;
use crate::bt::Bt;
use crate::clock::{Clock, SystemClock};
use crate::service::TsoServiceImpl;
use crate::serving_core::ServingCore;
#[derive(Debug, thiserror::Error)]
pub enum BuildError {
#[error("consensus_driver is required")]
MissingConsensusDriver,
}
#[derive(Debug, thiserror::Error)]
pub enum ServerError {
#[error("transport: {0}")]
Transport(#[from] tonic::transport::Error),
#[error("consensus: {0}")]
Consensus(#[from] tsoracle_consensus::ConsensusError),
#[error("core: {0}")]
Core(#[from] tsoracle_core::CoreError),
#[error("leader-watch task panicked: {payload}{bt}")]
WatchPanic { payload: String, bt: Bt },
#[error("consensus driver leadership stream closed")]
WatchStreamClosed,
#[cfg(feature = "reflection")]
#[error("failed to build gRPC reflection service from embedded descriptor set: {0}")]
ReflectionInit(#[source] tonic_reflection::server::Error),
}
#[derive(Clone, Debug)]
pub enum ServingState {
NotServing {
leader_endpoint: Option<PeerEndpoint>,
leader_epoch: Option<Epoch>,
},
Serving,
}
const DEFAULT_SHUTDOWN_GRACE: Duration = Duration::from_secs(10);
pub struct ServerBuilder {
consensus: Option<Arc<dyn ConsensusDriver>>,
clock: Option<Arc<dyn Clock>>,
window_ahead: Duration,
failover_advance: Duration,
shutdown_grace: Duration,
heartbeat_interval: Duration,
#[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
tls_config: Option<tonic::transport::ServerTlsConfig>,
}
impl Default for ServerBuilder {
fn default() -> Self {
ServerBuilder {
consensus: None,
clock: None,
window_ahead: Duration::from_secs(3),
failover_advance: Duration::from_secs(1),
shutdown_grace: DEFAULT_SHUTDOWN_GRACE,
heartbeat_interval: Duration::from_secs(10),
#[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
tls_config: None,
}
}
}
impl ServerBuilder {
pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
self.consensus = Some(driver);
self
}
pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
self.clock = Some(clock);
self
}
pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
self.window_ahead = window_ahead;
self
}
pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
self.failover_advance = failover_advance;
self
}
pub fn shutdown_grace(mut self, shutdown_grace: Duration) -> Self {
self.shutdown_grace = shutdown_grace;
self
}
pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
self.heartbeat_interval = interval;
self
}
#[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
self.tls_config = Some(cfg);
self
}
pub fn build(self) -> Result<Server, BuildError> {
let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
Ok(Server {
consensus,
clock,
window_ahead: self.window_ahead,
failover_advance: self.failover_advance,
shutdown_grace: self.shutdown_grace,
heartbeat_interval: self.heartbeat_interval,
core: Arc::new(ServingCore::new(self.window_ahead)),
reporter: Arc::new(crate::reporter::Reporter::new()),
#[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
tls_config: self.tls_config,
})
}
}
pub struct Server {
pub(crate) consensus: Arc<dyn ConsensusDriver>,
pub(crate) clock: Arc<dyn Clock>,
pub(crate) window_ahead: Duration,
pub(crate) failover_advance: Duration,
pub(crate) shutdown_grace: Duration,
#[cfg_attr(not(feature = "tracing"), allow(dead_code))]
pub(crate) heartbeat_interval: Duration,
pub(crate) core: Arc<ServingCore>,
pub(crate) reporter: Arc<crate::reporter::Reporter>,
#[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
}
pub(crate) struct RouterParts {
pub routes: Routes,
pub cancel_tx: tokio::sync::oneshot::Sender<()>,
pub watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
pub heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
pub heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
}
impl Server {
pub fn builder() -> ServerBuilder {
ServerBuilder::default()
}
pub fn subscribe(&self) -> watch::Receiver<ServingState> {
self.core.subscribe()
}
}
impl Server {
pub fn into_router(self) -> Result<(Routes, WatchGuard), ServerError> {
let shutdown_grace = self.shutdown_grace;
let core = self.core.clone();
let reporter = self.reporter.clone();
let parts = self.into_router_parts()?;
Ok((
parts.routes,
WatchGuard {
cancel_tx: Some(parts.cancel_tx),
handle: Some(parts.watch_handle),
shutdown_grace,
core,
reporter,
heartbeat_cancel_tx: parts.heartbeat_cancel_tx,
heartbeat_handle: parts.heartbeat_handle,
},
))
}
fn into_router_parts(self) -> Result<RouterParts, ServerError> {
#[cfg(feature = "reflection")]
let reflection = build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET)?;
let server = Arc::new(self);
let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
let watch_server = server.clone();
let watch_handle = tokio::spawn(async move {
use futures::FutureExt;
let cancel = async move {
let _ = cancel_rx.await;
};
let outcome = std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(
watch_server.clone(),
cancel,
))
.catch_unwind()
.await;
match outcome {
Ok(result) => {
if let Err(ref _e) = result {
watch_server.core.step_down(None, None);
#[cfg(feature = "tracing")]
tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
}
result
}
Err(panic_payload) => {
watch_server.core.step_down(None, None);
#[cfg(feature = "tracing")]
tracing::error!("leader-watch panicked; serving disabled");
std::panic::resume_unwind(panic_payload);
}
}
});
let (heartbeat_cancel_tx, heartbeat_handle) = {
#[cfg(feature = "tracing")]
{
if server.heartbeat_interval.is_zero() {
(None, None)
} else {
use futures::FutureExt;
let (htx, hrx) = tokio::sync::oneshot::channel::<()>();
let hb_reporter = server.reporter.clone();
let hb_core = server.core.clone();
let hb_interval = server.heartbeat_interval;
let handle = tokio::spawn(async move {
let outcome =
std::panic::AssertUnwindSafe(crate::heartbeat::run_heartbeat(
hb_interval,
hb_core,
hb_reporter.clone(),
hrx,
))
.catch_unwind()
.await;
if outcome.is_err() {
hb_reporter.heartbeat_task_panicked.increment(1);
tracing::error!(
target: "tsoracle::heartbeat",
"heartbeat task panicked; liveness logs disabled until restart"
);
}
});
(Some(htx), Some(handle))
}
}
#[cfg(not(feature = "tracing"))]
{
(None, None)
}
};
let service = TsoServiceImpl { server };
#[allow(unused_mut)]
let mut routes = Routes::new(TsoServiceServer::new(service));
#[cfg(feature = "reflection")]
{
routes = routes.add_service(reflection);
}
Ok(RouterParts {
routes,
cancel_tx,
watch_handle,
heartbeat_cancel_tx,
heartbeat_handle,
})
}
pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
self.serve_with_shutdown(addr, futures::future::pending())
.await
}
pub async fn serve_with_shutdown(
self,
addr: SocketAddr,
shutdown: impl Future<Output = ()> + Send + 'static,
) -> Result<(), ServerError> {
#[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
let tls_config = self.tls_config.clone();
let shutdown_grace = self.shutdown_grace;
let core = self.core.clone();
let reporter = self.reporter.clone();
let parts = self.into_router_parts()?;
let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
let mut tonic = TonicServer::builder();
#[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
if let Some(cfg) = tls_config {
tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
}
let serve = tonic
.add_routes(parts.routes)
.serve_with_shutdown(addr, combined_shutdown);
serve_inner(
parts.cancel_tx,
parts.watch_handle,
parts.heartbeat_cancel_tx,
parts.heartbeat_handle,
serve,
cancel_tx,
shutdown_grace,
core,
reporter,
)
.await
}
pub async fn serve_with_listener(
self,
listener: tokio::net::TcpListener,
shutdown: impl Future<Output = ()> + Send + 'static,
) -> Result<(), ServerError> {
#[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
let tls_config = self.tls_config.clone();
let shutdown_grace = self.shutdown_grace;
let core = self.core.clone();
let reporter = self.reporter.clone();
let parts = self.into_router_parts()?;
let (combined_shutdown, cancel_tx) = combined_shutdown_with_cancel(shutdown);
let incoming = tonic::transport::server::TcpIncoming::from(listener);
let mut tonic = TonicServer::builder();
#[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
if let Some(cfg) = tls_config {
tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
}
let serve = tonic
.add_routes(parts.routes)
.serve_with_incoming_shutdown(incoming, combined_shutdown);
serve_inner(
parts.cancel_tx,
parts.watch_handle,
parts.heartbeat_cancel_tx,
parts.heartbeat_handle,
serve,
cancel_tx,
shutdown_grace,
core,
reporter,
)
.await
}
}
pub struct WatchGuard {
cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
handle: Option<tokio::task::JoinHandle<Result<(), ServerError>>>,
shutdown_grace: Duration,
core: Arc<ServingCore>,
reporter: Arc<crate::reporter::Reporter>,
heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
}
impl WatchGuard {
pub async fn shutdown(mut self) -> Result<(), ServerError> {
self.heartbeat_cancel_tx.take();
self.cancel_tx.take();
if let Some(mut hb_handle) = self.heartbeat_handle.take() {
match tokio::time::timeout(self.shutdown_grace, &mut hb_handle).await {
Ok(Ok(())) => {}
Ok(Err(_join_err)) => {}
Err(_elapsed) => {
hb_handle.abort();
let _ = (&mut hb_handle).await;
}
}
}
match self.handle.take() {
Some(mut handle) => join_to_server_result(
await_watch_within_grace(&mut handle, self.shutdown_grace, &self.reporter).await,
),
None => Ok(()),
}
}
pub fn abort(mut self) {
if let Some(handle) = self.handle.take() {
handle.abort();
}
if let Some(hb_handle) = self.heartbeat_handle.take() {
hb_handle.abort();
}
}
pub fn is_finished(&self) -> bool {
self.handle
.as_ref()
.is_none_or(|handle| handle.is_finished())
}
}
impl Drop for WatchGuard {
fn drop(&mut self) {
self.core.step_down(None, None);
self.cancel_tx.take();
self.heartbeat_cancel_tx.take();
if let Some(hb_handle) = self.heartbeat_handle.take() {
hb_handle.abort();
}
}
}
fn combined_shutdown_with_cancel(
shutdown: impl Future<Output = ()> + Send + 'static,
) -> (
impl Future<Output = ()> + Send + 'static,
tokio::sync::oneshot::Sender<()>,
) {
let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
let combined_shutdown = async move {
tokio::select! {
_ = shutdown => {}
_ = cancel_rx => {}
}
};
(combined_shutdown, cancel_tx)
}
async fn await_watch_within_grace(
watch_handle: &mut tokio::task::JoinHandle<Result<(), ServerError>>,
grace: Duration,
reporter: &Arc<crate::reporter::Reporter>,
) -> Result<Result<(), ServerError>, tokio::task::JoinError> {
match tokio::time::timeout(grace, &mut *watch_handle).await {
Ok(join_result) => join_result,
Err(_elapsed) => {
reporter.shutdown_watch_aborted.increment(1);
#[cfg(feature = "tracing")]
tracing::warn!(
grace_ms = grace.as_millis() as u64,
"leader-watch task did not stop within the shutdown grace; aborting it (a consensus-driver call likely exceeded its latency bound)"
);
watch_handle.abort();
(&mut *watch_handle).await
}
}
}
#[allow(clippy::too_many_arguments)]
async fn serve_inner<S>(
watch_cancel_tx: tokio::sync::oneshot::Sender<()>,
mut watch_handle: tokio::task::JoinHandle<Result<(), ServerError>>,
heartbeat_cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
serve_future: S,
tonic_cancel_tx: tokio::sync::oneshot::Sender<()>,
shutdown_grace: Duration,
core: Arc<ServingCore>,
reporter: Arc<crate::reporter::Reporter>,
) -> Result<(), ServerError>
where
S: Future<Output = Result<(), tonic::transport::Error>>,
{
tokio::pin!(serve_future);
let outcome = tokio::select! {
biased;
watch_result = &mut watch_handle => {
let _ = tonic_cancel_tx.send(());
let drain_result = serve_future.await;
combine_watch_and_drain(watch_result, drain_result)
}
serve_result = &mut serve_future => {
core.step_down(None, None);
drop(watch_cancel_tx);
let _ = await_watch_within_grace(&mut watch_handle, shutdown_grace, &reporter).await;
serve_result?;
Ok(())
}
};
drop(heartbeat_cancel_tx);
if let Some(mut hb_handle) = heartbeat_handle {
match tokio::time::timeout(shutdown_grace, &mut hb_handle).await {
Ok(Ok(())) => {}
Ok(Err(_join_err)) => {} Err(_elapsed) => {
hb_handle.abort();
let _ = (&mut hb_handle).await;
}
}
}
outcome
}
fn join_to_server_result(
join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
) -> Result<(), ServerError> {
match join_result {
Ok(inner) => inner,
Err(join_err) if join_err.is_panic() => {
let payload = panic_payload_to_string(join_err.into_panic());
Err(ServerError::WatchPanic {
payload,
bt: Bt::capture(),
})
}
Err(_cancelled) => Ok(()),
}
}
fn combine_watch_and_drain<E>(
watch_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
drain_result: Result<(), E>,
) -> Result<(), ServerError>
where
ServerError: From<E>,
{
match join_to_server_result(watch_result) {
Err(watch_err) => Err(watch_err),
Ok(()) => drain_result.map_err(ServerError::from),
}
}
#[cfg(feature = "reflection")]
fn build_reflection_service(
descriptor_set: &[u8],
) -> Result<
tonic_reflection::server::v1::ServerReflectionServer<
impl tonic_reflection::server::v1::ServerReflection,
>,
ServerError,
> {
tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(descriptor_set)
.build_v1()
.map_err(ServerError::ReflectionInit)
}
fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
if let Some(text) = panic.downcast_ref::<&'static str>() {
(*text).to_string()
} else if let Some(text) = panic.downcast_ref::<String>() {
text.clone()
} else {
"watch task panicked with non-string payload".to_string()
}
}
#[cfg(any(test, feature = "test-fakes"))]
impl Server {
#[doc(hidden)]
pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
crate::fence::run_leader_watch(self, futures::future::pending()).await
}
#[doc(hidden)]
pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
self.core.try_grant(self.clock.now_ms(), count)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn panic_payload_to_string_recovers_static_str() {
let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
assert_eq!(panic_payload_to_string(payload), "watch boom");
}
#[test]
fn panic_payload_to_string_recovers_owned_string() {
let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
assert_eq!(panic_payload_to_string(payload), "formatted");
}
#[test]
fn panic_payload_to_string_falls_back_for_other_types() {
struct Custom;
let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
assert_eq!(
panic_payload_to_string(payload),
"watch task panicked with non-string payload",
);
}
#[test]
fn serving_transitions_publish_through_core() {
let server = Server::builder()
.consensus_driver(Arc::new(crate::test_fakes::InMemoryDriver::new()))
.build()
.expect("build must succeed");
let hint = PeerEndpoint::try_from("new-leader:9000").unwrap();
server.core.step_down(Some(hint.clone()), Some(Epoch(7)));
match server.core.serving_state() {
ServingState::NotServing {
leader_endpoint,
leader_epoch,
} => {
assert_eq!(leader_endpoint, Some(hint));
assert_eq!(leader_epoch, Some(Epoch(7)));
}
ServingState::Serving => panic!("expected NotServing after step_down"),
}
}
#[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
#[test]
fn builder_stores_tls_config() {
use crate::test_fakes::InMemoryDriver;
let driver = Arc::new(InMemoryDriver::new());
let cfg = tonic::transport::ServerTlsConfig::new();
let server = Server::builder()
.consensus_driver(driver)
.tls_config(cfg)
.build()
.expect("build with tls_config must succeed");
assert!(server.tls_config.is_some());
}
#[tokio::test]
async fn join_to_server_result_passes_through_clean_outcome() {
let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
let join = handle.await;
assert!(matches!(join_to_server_result(join), Ok(())));
}
#[tokio::test]
async fn join_to_server_result_forwards_inner_error() {
let handle = tokio::spawn(async {
Err::<(), ServerError>(ServerError::WatchPanic {
payload: "synthetic".into(),
bt: Bt::capture(),
})
});
let join = handle.await;
match join_to_server_result(join) {
Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "synthetic"),
other => panic!("expected forwarded WatchPanic, got {other:?}"),
}
}
#[tokio::test]
async fn join_to_server_result_translates_panic_to_watch_panic() {
let handle = tokio::spawn(async {
panic!("intentional");
#[allow(unreachable_code)]
Ok::<(), ServerError>(())
});
let join = handle.await;
match join_to_server_result(join) {
Err(ServerError::WatchPanic { payload, .. }) => {
assert!(payload.contains("intentional"))
}
other => panic!("expected WatchPanic, got {other:?}"),
}
}
#[tokio::test]
async fn join_to_server_result_treats_cancellation_as_clean_exit() {
let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
tokio::spawn(async { futures::future::pending().await });
handle.abort();
let join = handle.await;
assert!(matches!(join_to_server_result(join), Ok(())));
}
#[tokio::test]
async fn combine_watch_and_drain_surfaces_drain_error_when_watch_ok() {
let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
assert!(matches!(
combine_watch_and_drain(watch, drain),
Err(ServerError::WatchStreamClosed)
));
}
#[tokio::test]
async fn combine_watch_and_drain_returns_ok_when_both_succeed() {
let watch = tokio::spawn(async { Ok::<(), ServerError>(()) }).await;
let drain: Result<(), ServerError> = Ok(());
assert!(matches!(combine_watch_and_drain(watch, drain), Ok(())));
}
#[tokio::test]
async fn combine_watch_and_drain_prefers_watch_error_over_drain_error() {
let watch = tokio::spawn(async {
Err::<(), ServerError>(ServerError::WatchPanic {
payload: "watch".into(),
bt: Bt::capture(),
})
})
.await;
let drain: Result<(), ServerError> = Err(ServerError::WatchStreamClosed);
match combine_watch_and_drain(watch, drain) {
Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
other => panic!("expected watch error to win, got {other:?}"),
}
}
#[tokio::test]
async fn combine_watch_and_drain_returns_watch_error_when_drain_ok() {
let watch = tokio::spawn(async {
Err::<(), ServerError>(ServerError::WatchPanic {
payload: "watch".into(),
bt: Bt::capture(),
})
})
.await;
let drain: Result<(), ServerError> = Ok(());
match combine_watch_and_drain(watch, drain) {
Err(ServerError::WatchPanic { payload, .. }) => assert_eq!(payload, "watch"),
other => panic!("expected forwarded watch error, got {other:?}"),
}
}
#[tokio::test]
async fn dropping_watch_guard_closes_serving_gate_synchronously() {
let core = Arc::new(ServingCore::new(Duration::from_secs(3)));
core.publish_serving();
let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
tokio::spawn(async { Ok(()) });
let (cancel_tx, _cancel_rx) = tokio::sync::oneshot::channel::<()>();
let guard = WatchGuard {
cancel_tx: Some(cancel_tx),
handle: Some(handle),
shutdown_grace: Duration::from_secs(10),
core: core.clone(),
reporter: Arc::new(crate::reporter::Reporter::new()),
heartbeat_cancel_tx: None,
heartbeat_handle: None,
};
drop(guard);
assert!(
matches!(core.serving_state(), ServingState::NotServing { .. }),
"dropping the WatchGuard must close the serving gate synchronously",
);
}
#[tokio::test]
async fn serve_inner_closes_serving_gate_on_user_shutdown() {
let core = Arc::new(ServingCore::new(Duration::from_secs(3)));
core.publish_serving();
let watch_handle: tokio::task::JoinHandle<Result<(), ServerError>> =
tokio::spawn(async { futures::future::pending().await });
let (watch_cancel_tx, _watch_cancel_rx) = tokio::sync::oneshot::channel::<()>();
let (tonic_cancel_tx, _tonic_cancel_rx) = tokio::sync::oneshot::channel::<()>();
let serve_future = async { Ok::<(), tonic::transport::Error>(()) };
let result = serve_inner(
watch_cancel_tx,
watch_handle,
None, None, serve_future,
tonic_cancel_tx,
Duration::from_millis(0),
core.clone(),
Arc::new(crate::reporter::Reporter::new()),
)
.await;
assert!(
result.is_ok(),
"user shutdown must return Ok, got {result:?}"
);
assert!(
matches!(core.serving_state(), ServingState::NotServing { .. }),
"serve_inner's user-shutdown arm must close the serving gate synchronously",
);
}
#[cfg(feature = "reflection")]
#[test]
fn build_reflection_service_accepts_embedded_descriptor_set() {
assert!(build_reflection_service(tsoracle_proto::FILE_DESCRIPTOR_SET).is_ok());
}
#[cfg(feature = "reflection")]
#[test]
fn build_reflection_service_maps_corrupt_descriptor_to_typed_error() {
let corrupt = b"\xff\xff\xff\xff not a descriptor set";
match build_reflection_service(corrupt).map(|_| ()) {
Err(ServerError::ReflectionInit(_)) => {}
other => panic!("expected ReflectionInit error, got {other:?}"),
}
}
}