use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::task::{Context, Poll, Waker};
use super::service::{NamedService, ServiceDescriptor, ServiceHandler};
use super::status::Status;
use super::streaming::{Request, Response, Streaming};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
#[repr(i32)]
pub enum ServingStatus {
#[default]
Unknown = 0,
Serving = 1,
NotServing = 2,
ServiceUnknown = 3,
}
impl ServingStatus {
#[must_use]
pub fn is_healthy(&self) -> bool {
matches!(self, Self::Serving)
}
#[must_use]
pub fn from_i32(value: i32) -> Option<Self> {
match value {
0 => Some(Self::Unknown),
1 => Some(Self::Serving),
2 => Some(Self::NotServing),
3 => Some(Self::ServiceUnknown),
_ => None,
}
}
}
impl std::fmt::Display for ServingStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Unknown => write!(f, "UNKNOWN"),
Self::Serving => write!(f, "SERVING"),
Self::NotServing => write!(f, "NOT_SERVING"),
Self::ServiceUnknown => write!(f, "SERVICE_UNKNOWN"),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct HealthCheckRequest {
pub service: String,
}
impl HealthCheckRequest {
#[must_use]
pub fn new(service: impl Into<String>) -> Self {
Self {
service: service.into(),
}
}
#[must_use]
pub fn server() -> Self {
Self::default()
}
}
#[derive(Debug, Clone)]
pub struct HealthCheckResponse {
pub status: ServingStatus,
}
impl HealthCheckResponse {
#[must_use]
pub fn new(status: ServingStatus) -> Self {
Self { status }
}
}
impl Default for HealthCheckResponse {
fn default() -> Self {
Self {
status: ServingStatus::Unknown,
}
}
}
#[derive(Debug, Clone)]
pub struct HealthService {
statuses: Arc<RwLock<HashMap<String, ServingStatus>>>,
watch_versions: Arc<RwLock<HashMap<String, u64>>>,
reporter_counts: Arc<RwLock<HashMap<String, usize>>>,
watch_waiters: Arc<Mutex<HashMap<String, HashMap<u64, Waker>>>>,
next_waiter_id: Arc<AtomicU64>,
version: Arc<AtomicU64>,
}
impl HealthService {
#[must_use]
pub fn new() -> Self {
Self {
statuses: Arc::new(RwLock::new(HashMap::new())),
watch_versions: Arc::new(RwLock::new(HashMap::new())),
reporter_counts: Arc::new(RwLock::new(HashMap::new())),
watch_waiters: Arc::new(Mutex::new(HashMap::new())),
next_waiter_id: Arc::new(AtomicU64::new(1)),
version: Arc::new(AtomicU64::new(0)),
}
}
#[must_use]
pub fn version(&self) -> u64 {
self.version.load(Ordering::Acquire)
}
pub fn set_status(&self, service: impl Into<String>, status: ServingStatus) {
let service = service.into();
let mut statuses = self.statuses.write();
let changed = statuses.insert(service.clone(), status) != Some(status);
if changed {
self.bump_watch_version(&service);
self.version.fetch_add(1, Ordering::Release);
}
drop(statuses);
if changed {
self.notify_watch_waiters(&service);
}
}
pub fn set_server_status(&self, status: ServingStatus) {
self.set_status("", status);
}
#[must_use]
pub fn get_status(&self, service: &str) -> Option<ServingStatus> {
let statuses = self.statuses.read();
statuses.get(service).copied()
}
#[must_use]
pub fn is_serving(&self, service: &str) -> bool {
self.get_status(service).is_some_and(|s| s.is_healthy())
}
pub fn clear(&self) {
let mut statuses = self.statuses.write();
let changed = !statuses.is_empty();
let affected_services = if changed {
statuses.keys().cloned().collect::<Vec<_>>()
} else {
Vec::new()
};
if changed {
statuses.clear();
self.bump_watch_versions(affected_services.iter().cloned());
self.version.fetch_add(1, Ordering::Release);
}
drop(statuses);
if changed {
self.notify_watch_waiters_for_services(affected_services);
}
}
pub fn clear_status(&self, service: &str) {
let mut statuses = self.statuses.write();
let changed = statuses.remove(service).is_some();
if changed {
self.bump_watch_version(service);
self.version.fetch_add(1, Ordering::Release);
}
drop(statuses);
if changed {
self.notify_watch_waiters(service);
}
}
#[must_use]
pub fn services(&self) -> Vec<String> {
let mut services: Vec<_> = {
let statuses = self.statuses.read();
statuses.keys().cloned().collect()
};
services.sort();
services
}
#[allow(clippy::significant_drop_tightening)]
fn watched_status_and_version(&self, service: &str) -> (ServingStatus, u64) {
if service.is_empty() {
let statuses = self.statuses.read();
let status = if statuses.is_empty() {
ServingStatus::ServiceUnknown
} else if statuses.values().all(ServingStatus::is_healthy) {
ServingStatus::Serving
} else {
ServingStatus::NotServing
};
let version = self.version();
drop(statuses);
(status, version)
} else {
let statuses = self.statuses.read();
let watch_versions = self.watch_versions.read();
let status = statuses
.get(service)
.copied()
.unwrap_or(ServingStatus::ServiceUnknown);
let version = watch_versions.get(service).copied().unwrap_or(0);
drop(watch_versions);
drop(statuses);
(status, version)
}
}
fn bump_watch_version(&self, service: &str) {
self.watch_versions
.write()
.entry(service.to_string())
.and_modify(|version| *version = version.saturating_add(1))
.or_insert(1);
}
#[allow(clippy::significant_drop_tightening)]
fn bump_watch_versions<I>(&self, services: I)
where
I: IntoIterator<Item = String>,
{
let mut watch_versions = self.watch_versions.write();
for service in services {
watch_versions
.entry(service)
.and_modify(|version| *version = version.saturating_add(1))
.or_insert(1);
}
}
fn acquire_reporter(&self, service: &str) {
let mut reporter_counts = self.reporter_counts.write();
*reporter_counts.entry(service.to_string()).or_insert(0) += 1;
}
fn release_reporter_and_maybe_clear_status(&self, service: &str) {
self.release_reporter_and_maybe_clear_status_with_hook(service, || {});
}
#[allow(clippy::significant_drop_tightening)]
fn release_reporter_and_maybe_clear_status_with_hook<F>(
&self,
service: &str,
before_final_clear: F,
) where
F: FnOnce(),
{
let mut reporter_counts = self.reporter_counts.write();
let std::collections::hash_map::Entry::Occupied(mut entry) =
reporter_counts.entry(service.to_string())
else {
return;
};
if *entry.get() > 1 {
*entry.get_mut() -= 1;
return;
}
let mut statuses = self.statuses.write();
before_final_clear();
let changed = statuses.remove(service).is_some();
entry.remove();
if changed {
self.bump_watch_version(service);
self.version.fetch_add(1, Ordering::Release);
}
drop(statuses);
drop(reporter_counts);
if changed {
self.notify_watch_waiters(service);
}
}
pub fn check(&self, request: &HealthCheckRequest) -> Result<HealthCheckResponse, Status> {
let statuses = self.statuses.read();
if let Some(&status) = statuses.get(&request.service) {
drop(statuses);
Ok(HealthCheckResponse::new(status))
} else if request.service.is_empty() {
if statuses.is_empty() {
drop(statuses);
Ok(HealthCheckResponse::new(ServingStatus::ServiceUnknown))
} else {
let all_healthy = statuses.values().all(ServingStatus::is_healthy);
drop(statuses);
if all_healthy {
Ok(HealthCheckResponse::new(ServingStatus::Serving))
} else {
Ok(HealthCheckResponse::new(ServingStatus::NotServing))
}
}
} else {
drop(statuses);
Err(Status::not_found(format!(
"service '{}' not registered for health checking",
request.service
)))
}
}
#[must_use]
pub fn check_async(
&self,
request: &Request<HealthCheckRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<HealthCheckResponse>, Status>> + Send>> {
let result = self.check(request.get_ref());
Box::pin(async move { result.map(Response::new) })
}
#[must_use]
pub fn watch_async(
&self,
request: &Request<HealthCheckRequest>,
) -> Pin<Box<dyn Future<Output = Result<Response<HealthWatchStream>, Status>> + Send>> {
let stream = HealthWatchStream::new(self.clone(), request.get_ref().service.clone());
Box::pin(async move { Ok(Response::new(stream)) })
}
#[must_use]
pub fn watch(&self, service: impl Into<String>) -> HealthWatcher {
let service_name = service.into();
let (last_status, last_version) = self.watched_status_and_version(&service_name);
HealthWatcher {
service: self.clone(),
last_status,
last_version,
service_name,
}
}
fn register_watch_waiter(&self, service: &str, waiter_id: &mut Option<u64>, waker: &Waker) {
let id =
*waiter_id.get_or_insert_with(|| self.next_waiter_id.fetch_add(1, Ordering::Relaxed));
let mut waiters = self.watch_waiters.lock();
waiters
.entry(service.to_string())
.or_default()
.insert(id, waker.clone());
}
fn unregister_watch_waiter(&self, service: &str, waiter_id: &mut Option<u64>) {
let Some(id) = waiter_id.take() else {
return;
};
let mut waiters = self.watch_waiters.lock();
let remove_service_entry = waiters.get_mut(service).is_some_and(|service_waiters| {
service_waiters.remove(&id);
service_waiters.is_empty()
});
if remove_service_entry {
waiters.remove(service);
}
}
fn notify_watch_waiters(&self, service: &str) {
self.notify_watch_waiters_for_services(std::iter::once(service.to_string()));
}
fn notify_watch_waiters_for_services<I>(&self, services: I)
where
I: IntoIterator<Item = String>,
{
let mut keys = Vec::new();
for service in services {
if !keys.iter().any(|existing| existing == &service) {
keys.push(service.clone());
}
if !service.is_empty() && !keys.iter().any(|existing| existing.is_empty()) {
keys.push(String::new());
}
}
if keys.is_empty() {
return;
}
let mut waiters = self.watch_waiters.lock();
let mut wake_list = Vec::new();
for key in keys {
if let Some(service_waiters) = waiters.get_mut(&key) {
wake_list.extend(service_waiters.values().cloned());
}
}
drop(waiters);
for waker in wake_list {
waker.wake();
}
}
}
impl Default for HealthService {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct HealthWatcher {
service: HealthService,
service_name: String,
last_status: ServingStatus,
last_version: u64,
}
#[derive(Debug)]
pub struct HealthWatchStream {
watcher: HealthWatcher,
emitted_initial: bool,
waiter_id: Option<u64>,
}
impl HealthWatchStream {
fn new(service: HealthService, service_name: String) -> Self {
Self {
watcher: service.watch(service_name),
emitted_initial: false,
waiter_id: None,
}
}
fn clear_waiter_registration(&mut self) {
self.watcher
.service
.unregister_watch_waiter(&self.watcher.service_name, &mut self.waiter_id);
}
fn poll_next_with_hook<F>(
&mut self,
cx: &mut Context<'_>,
after_first_status_check: F,
) -> Poll<Option<Result<HealthCheckResponse, Status>>>
where
F: FnOnce(&mut Self),
{
if !self.emitted_initial {
self.emitted_initial = true;
self.clear_waiter_registration();
return Poll::Ready(Some(Ok(HealthCheckResponse::new(self.watcher.status()))));
}
let (changed, status) = self.watcher.poll_status();
if changed {
self.clear_waiter_registration();
return Poll::Ready(Some(Ok(HealthCheckResponse::new(status))));
}
after_first_status_check(self);
let service_name = self.watcher.service_name.clone();
self.watcher
.service
.register_watch_waiter(&service_name, &mut self.waiter_id, cx.waker());
let (changed, status) = self.watcher.poll_status();
if changed {
self.clear_waiter_registration();
return Poll::Ready(Some(Ok(HealthCheckResponse::new(status))));
}
Poll::Pending
}
}
impl Drop for HealthWatchStream {
fn drop(&mut self) {
self.clear_waiter_registration();
}
}
impl Streaming for HealthWatchStream {
type Message = HealthCheckResponse;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Message, Status>>> {
self.as_mut().get_mut().poll_next_with_hook(cx, |_| {})
}
}
impl HealthWatcher {
pub fn changed(&mut self) -> bool {
let (current_status, current_version) =
self.service.watched_status_and_version(&self.service_name);
let changed = current_version != self.last_version;
self.last_status = current_status;
self.last_version = current_version;
changed
}
#[must_use]
pub fn status(&self) -> ServingStatus {
self.last_status
}
pub fn poll_status(&mut self) -> (bool, ServingStatus) {
let (current_status, current_version) =
self.service.watched_status_and_version(&self.service_name);
let changed = current_version != self.last_version;
self.last_status = current_status;
self.last_version = current_version;
(changed, current_status)
}
}
impl NamedService for HealthService {
const NAME: &'static str = "grpc.health.v1.Health";
}
impl ServiceHandler for HealthService {
fn descriptor(&self) -> &ServiceDescriptor {
static METHODS: &[super::service::MethodDescriptor] = &[
super::service::MethodDescriptor::unary("Check", "/grpc.health.v1.Health/Check"),
super::service::MethodDescriptor::server_streaming(
"Watch",
"/grpc.health.v1.Health/Watch",
),
];
static DESC: ServiceDescriptor =
ServiceDescriptor::new("Health", "grpc.health.v1", METHODS);
&DESC
}
fn method_names(&self) -> Vec<&str> {
vec!["Check", "Watch"]
}
}
#[derive(Debug)]
pub struct HealthReporter {
service: HealthService,
service_name: String,
}
impl HealthReporter {
#[must_use]
pub fn new(service: HealthService, service_name: impl Into<String>) -> Self {
let service_name = service_name.into();
service.acquire_reporter(&service_name);
Self {
service,
service_name,
}
}
pub fn set_serving(&self) {
self.service
.set_status(&self.service_name, ServingStatus::Serving);
}
pub fn set_not_serving(&self) {
self.service
.set_status(&self.service_name, ServingStatus::NotServing);
}
#[must_use]
pub fn status(&self) -> ServingStatus {
self.service
.get_status(&self.service_name)
.unwrap_or(ServingStatus::Unknown)
}
}
impl Drop for HealthReporter {
fn drop(&mut self) {
self.service
.release_reporter_and_maybe_clear_status(&self.service_name);
}
}
#[derive(Debug, Default)]
pub struct HealthServiceBuilder {
statuses: HashMap<String, ServingStatus>,
}
impl HealthServiceBuilder {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn add(mut self, service: impl Into<String>, status: ServingStatus) -> Self {
self.statuses.insert(service.into(), status);
self
}
#[must_use]
pub fn add_serving(mut self, services: impl IntoIterator<Item = impl Into<String>>) -> Self {
for service in services {
self.statuses.insert(service.into(), ServingStatus::Serving);
}
self
}
#[must_use]
pub fn build(self) -> HealthService {
let service = HealthService::new();
for (name, status) in self.statuses {
service.set_status(name, status);
}
service
}
}
#[cfg(test)]
mod tests {
use super::*;
use insta::assert_json_snapshot;
use serde_json::json;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll, Wake, Waker};
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn health_response_snapshot(service: &HealthService, query: &str) -> serde_json::Value {
let request = if query.is_empty() {
HealthCheckRequest::server()
} else {
HealthCheckRequest::new(query)
};
let response = service
.check(&request)
.expect("health snapshot queries should succeed");
json!({
"query": request.service,
"status_code": response.status as i32,
"status_text": response.status.to_string(),
})
}
#[derive(Default)]
struct CountingWake {
wakes: AtomicUsize,
}
impl Wake for CountingWake {
fn wake(self: Arc<Self>) {
self.wakes.fetch_add(1, Ordering::SeqCst);
}
fn wake_by_ref(self: &Arc<Self>) {
self.wakes.fetch_add(1, Ordering::SeqCst);
}
}
fn counting_waker(counter: &Arc<CountingWake>) -> Waker {
Waker::from(counter.clone())
}
#[test]
fn serving_status_from_i32() {
init_test("serving_status_from_i32");
crate::assert_with_log!(
ServingStatus::from_i32(0) == Some(ServingStatus::Unknown),
"0",
Some(ServingStatus::Unknown),
ServingStatus::from_i32(0)
);
crate::assert_with_log!(
ServingStatus::from_i32(1) == Some(ServingStatus::Serving),
"1",
Some(ServingStatus::Serving),
ServingStatus::from_i32(1)
);
crate::assert_with_log!(
ServingStatus::from_i32(2) == Some(ServingStatus::NotServing),
"2",
Some(ServingStatus::NotServing),
ServingStatus::from_i32(2)
);
crate::assert_with_log!(
ServingStatus::from_i32(3) == Some(ServingStatus::ServiceUnknown),
"3",
Some(ServingStatus::ServiceUnknown),
ServingStatus::from_i32(3)
);
let none = ServingStatus::from_i32(4).is_none();
crate::assert_with_log!(none, "4 none", true, none);
crate::test_complete!("serving_status_from_i32");
}
#[test]
fn serving_status_is_healthy() {
init_test("serving_status_is_healthy");
let unknown = ServingStatus::Unknown.is_healthy();
crate::assert_with_log!(!unknown, "unknown healthy", false, unknown);
let serving = ServingStatus::Serving.is_healthy();
crate::assert_with_log!(serving, "serving healthy", true, serving);
let not_serving = ServingStatus::NotServing.is_healthy();
crate::assert_with_log!(!not_serving, "not serving healthy", false, not_serving);
let service_unknown = ServingStatus::ServiceUnknown.is_healthy();
crate::assert_with_log!(
!service_unknown,
"service unknown healthy",
false,
service_unknown
);
crate::test_complete!("serving_status_is_healthy");
}
#[test]
fn serving_status_display() {
init_test("serving_status_display");
let serving = ServingStatus::Serving.to_string();
crate::assert_with_log!(serving == "SERVING", "serving", "SERVING", serving);
let not_serving = ServingStatus::NotServing.to_string();
crate::assert_with_log!(
not_serving == "NOT_SERVING",
"not serving",
"NOT_SERVING",
not_serving
);
crate::test_complete!("serving_status_display");
}
#[test]
fn health_service_set_and_get() {
init_test("health_service_set_and_get");
let service = HealthService::new();
service.set_status("test.Service", ServingStatus::Serving);
let status = service.get_status("test.Service");
crate::assert_with_log!(
status == Some(ServingStatus::Serving),
"serving",
Some(ServingStatus::Serving),
status
);
service.set_status("test.Service", ServingStatus::NotServing);
let status = service.get_status("test.Service");
crate::assert_with_log!(
status == Some(ServingStatus::NotServing),
"not serving",
Some(ServingStatus::NotServing),
status
);
crate::test_complete!("health_service_set_and_get");
}
#[test]
fn health_service_is_serving() {
init_test("health_service_is_serving");
let service = HealthService::new();
let unknown = service.is_serving("unknown");
crate::assert_with_log!(!unknown, "unknown not serving", false, unknown);
service.set_status("test", ServingStatus::Serving);
let serving = service.is_serving("test");
crate::assert_with_log!(serving, "test serving", true, serving);
service.set_status("test", ServingStatus::NotServing);
let serving = service.is_serving("test");
crate::assert_with_log!(!serving, "test not serving", false, serving);
crate::test_complete!("health_service_is_serving");
}
#[test]
fn health_service_check() {
init_test("health_service_check");
let service = HealthService::new();
service.set_status("test.Service", ServingStatus::Serving);
let req = HealthCheckRequest::new("test.Service");
let resp = service.check(&req).unwrap();
crate::assert_with_log!(
resp.status == ServingStatus::Serving,
"serving",
ServingStatus::Serving,
resp.status
);
let req = HealthCheckRequest::new("unknown.Service");
let err = service.check(&req).unwrap_err();
let code = err.code();
crate::assert_with_log!(
code == super::super::status::Code::NotFound,
"not found",
super::super::status::Code::NotFound,
code
);
crate::test_complete!("health_service_check");
}
#[test]
fn health_service_server_status() {
init_test("health_service_server_status");
let service = HealthService::new();
let req = HealthCheckRequest::server();
let resp = service.check(&req).unwrap();
crate::assert_with_log!(
resp.status == ServingStatus::ServiceUnknown,
"service unknown",
ServingStatus::ServiceUnknown,
resp.status
);
service.set_status("test", ServingStatus::Serving);
let resp = service.check(&req).unwrap();
crate::assert_with_log!(
resp.status == ServingStatus::Serving,
"serving",
ServingStatus::Serving,
resp.status
);
service.set_status("test2", ServingStatus::NotServing);
let resp = service.check(&req).unwrap();
crate::assert_with_log!(
resp.status == ServingStatus::NotServing,
"not serving",
ServingStatus::NotServing,
resp.status
);
service.set_server_status(ServingStatus::Serving);
let resp = service.check(&req).unwrap();
crate::assert_with_log!(
resp.status == ServingStatus::Serving,
"server serving",
ServingStatus::Serving,
resp.status
);
crate::test_complete!("health_service_server_status");
}
#[test]
fn health_check_response_statuses_snapshot() {
init_test("health_check_response_statuses_snapshot");
let service = HealthService::new();
service.set_server_status(ServingStatus::Unknown);
service.set_status("svc.serving", ServingStatus::Serving);
service.set_status("svc.not_serving", ServingStatus::NotServing);
service.set_status("svc.unknown", ServingStatus::Unknown);
let snapshot = json!({
"server": health_response_snapshot(&service, ""),
"service_queries": [
health_response_snapshot(&service, "svc.serving"),
health_response_snapshot(&service, "svc.not_serving"),
health_response_snapshot(&service, "svc.unknown"),
],
});
assert_json_snapshot!("health_check_response_statuses", snapshot);
crate::test_complete!("health_check_response_statuses_snapshot");
}
#[test]
fn health_service_clear() {
init_test("health_service_clear");
let service = HealthService::new();
service.set_status("a", ServingStatus::Serving);
service.set_status("b", ServingStatus::Serving);
service.clear_status("a");
let a_none = service.get_status("a").is_none();
crate::assert_with_log!(a_none, "a cleared", true, a_none);
let b_some = service.get_status("b").is_some();
crate::assert_with_log!(b_some, "b still set", true, b_some);
service.clear();
let b_none = service.get_status("b").is_none();
crate::assert_with_log!(b_none, "b cleared", true, b_none);
crate::test_complete!("health_service_clear");
}
#[test]
fn health_version_only_tracks_real_changes() {
init_test("health_version_only_tracks_real_changes");
let service = HealthService::new();
let v0 = service.version();
service.clear();
crate::assert_with_log!(
service.version() == v0,
"clear empty is no-op",
v0,
service.version()
);
service.clear_status("missing");
crate::assert_with_log!(
service.version() == v0,
"clear missing is no-op",
v0,
service.version()
);
service.set_status("svc", ServingStatus::Serving);
let v1 = service.version();
crate::assert_with_log!(v1 > v0, "initial set increments", true, v1 > v0);
service.set_status("svc", ServingStatus::Serving);
crate::assert_with_log!(
service.version() == v1,
"idempotent set does not increment",
v1,
service.version()
);
service.set_status("svc", ServingStatus::NotServing);
crate::assert_with_log!(
service.version() > v1,
"real status transition increments",
true,
service.version() > v1
);
crate::test_complete!("health_version_only_tracks_real_changes");
}
#[test]
fn health_watcher_ignores_unrelated_service_changes() {
init_test("health_watcher_ignores_unrelated_service_changes");
let service = HealthService::new();
service.set_status("a", ServingStatus::Serving);
service.set_status("b", ServingStatus::Serving);
let mut watcher_a = service.watch("a");
let mut watcher_b = service.watch("b");
service.set_status("a", ServingStatus::NotServing);
let changed_a = watcher_a.changed();
crate::assert_with_log!(changed_a, "watcher a sees change", true, changed_a);
let changed_b = watcher_b.changed();
crate::assert_with_log!(
!changed_b,
"watcher b ignores unrelated change",
false,
changed_b
);
crate::assert_with_log!(
watcher_b.status() == ServingStatus::Serving,
"watcher b status unchanged",
ServingStatus::Serving,
watcher_b.status()
);
crate::test_complete!("health_watcher_ignores_unrelated_service_changes");
}
#[test]
fn health_watcher_unknown_service_reports_service_unknown() {
init_test("health_watcher_unknown_service_reports_service_unknown");
let service = HealthService::new();
let mut watcher = service.watch("missing");
crate::assert_with_log!(
watcher.status() == ServingStatus::ServiceUnknown,
"unknown service reports watch sentinel",
ServingStatus::ServiceUnknown,
watcher.status()
);
let (changed, status) = watcher.poll_status();
crate::assert_with_log!(!changed, "initial unknown poll is stable", false, changed);
crate::assert_with_log!(
status == ServingStatus::ServiceUnknown,
"poll_status reports service unknown",
ServingStatus::ServiceUnknown,
status
);
service.set_status("missing", ServingStatus::Serving);
let (changed, status) = watcher.poll_status();
crate::assert_with_log!(changed, "registration is observed", true, changed);
crate::assert_with_log!(
status == ServingStatus::Serving,
"watcher sees serving after registration",
ServingStatus::Serving,
status
);
crate::test_complete!("health_watcher_unknown_service_reports_service_unknown");
}
#[test]
fn health_watcher_reports_named_service_transient_round_trip() {
init_test("health_watcher_reports_named_service_transient_round_trip");
let service = HealthService::new();
service.set_status("svc", ServingStatus::Serving);
let mut changed_watcher = service.watch("svc");
let mut poll_watcher = service.watch("svc");
service.set_status("svc", ServingStatus::NotServing);
service.set_status("svc", ServingStatus::Serving);
let changed = changed_watcher.changed();
crate::assert_with_log!(
changed,
"changed() observes transient round trip",
true,
changed
);
crate::assert_with_log!(
changed_watcher.status() == ServingStatus::Serving,
"effective status returns to serving",
ServingStatus::Serving,
changed_watcher.status()
);
let (poll_changed, polled_status) = poll_watcher.poll_status();
crate::assert_with_log!(
poll_changed,
"poll_status observes transient round trip",
true,
poll_changed
);
crate::assert_with_log!(
polled_status == ServingStatus::Serving,
"poll_status reports current effective status",
ServingStatus::Serving,
polled_status
);
crate::test_complete!("health_watcher_reports_named_service_transient_round_trip");
}
#[test]
fn health_watcher_reports_server_transient_round_trip() {
init_test("health_watcher_reports_server_transient_round_trip");
let service = HealthService::new();
service.set_status("svc", ServingStatus::Serving);
let mut watcher = service.watch("");
service.set_status("svc", ServingStatus::NotServing);
service.set_status("svc", ServingStatus::Serving);
let (changed, status) = watcher.poll_status();
crate::assert_with_log!(
changed,
"server watcher observes aggregate transient round trip",
true,
changed
);
crate::assert_with_log!(
status == ServingStatus::Serving,
"server watcher reports recovered aggregate status",
ServingStatus::Serving,
status
);
crate::test_complete!("health_watcher_reports_server_transient_round_trip");
}
#[test]
fn health_watch_initial_snapshot_is_atomic_for_named_services() {
init_test("health_watch_initial_snapshot_is_atomic_for_named_services");
let service = HealthService::new();
service.set_status("svc", ServingStatus::Serving);
let version_guard = service.watch_versions.write();
let watch_service = service.clone();
let (watcher_tx, watcher_rx) = std::sync::mpsc::channel();
let handle = std::thread::spawn(move || {
let watcher = watch_service.watch("svc");
watcher_tx.send(watcher).unwrap();
});
let mut status_lock_held = false;
for _ in 0..10_000 {
if service.statuses.try_write().is_none() {
status_lock_held = true;
break;
}
std::thread::yield_now();
}
crate::assert_with_log!(
status_lock_held,
"watch constructor must hold statuses lock until version snapshot completes",
true,
status_lock_held
);
drop(version_guard);
let mut watcher = watcher_rx.recv().unwrap();
handle.join().unwrap();
crate::assert_with_log!(
watcher.status() == ServingStatus::Serving,
"initial status snapshot preserved",
ServingStatus::Serving,
watcher.status()
);
service.set_status("svc", ServingStatus::NotServing);
let changed = watcher.changed();
crate::assert_with_log!(
changed,
"watcher still observes later transition",
true,
changed
);
crate::assert_with_log!(
watcher.status() == ServingStatus::NotServing,
"watcher reports new status after transition",
ServingStatus::NotServing,
watcher.status()
);
crate::test_complete!("health_watch_initial_snapshot_is_atomic_for_named_services");
}
#[test]
fn health_watch_async_emits_initial_status_and_wakes_on_change() {
init_test("health_watch_async_emits_initial_status_and_wakes_on_change");
let service = HealthService::new();
service.set_status("svc", ServingStatus::Serving);
let request = Request::new(HealthCheckRequest::new("svc"));
let response = futures_lite::future::block_on(service.watch_async(&request))
.expect("watch_async should construct a stream");
let mut stream = response.into_inner();
let first = futures_lite::future::block_on(futures_lite::future::poll_fn(|cx| {
Streaming::poll_next(Pin::new(&mut stream), cx)
}));
let first_ok = matches!(
first,
Some(Ok(HealthCheckResponse {
status: ServingStatus::Serving
}))
);
crate::assert_with_log!(
first_ok,
"initial watch snapshot is emitted immediately",
true,
first_ok
);
let wake_counter = Arc::new(CountingWake::default());
let waker = counting_waker(&wake_counter);
let mut cx = Context::from_waker(&waker);
let pending = matches!(
Streaming::poll_next(Pin::new(&mut stream), &mut cx),
Poll::Pending
);
crate::assert_with_log!(
pending,
"stream waits for the next health transition",
true,
pending
);
service.set_status("svc", ServingStatus::NotServing);
crate::assert_with_log!(
wake_counter.wakes.load(Ordering::SeqCst) == 1,
"status change wakes pending async watch",
1,
wake_counter.wakes.load(Ordering::SeqCst)
);
let next = futures_lite::future::block_on(futures_lite::future::poll_fn(|cx| {
Streaming::poll_next(Pin::new(&mut stream), cx)
}));
let next_ok = matches!(
next,
Some(Ok(HealthCheckResponse {
status: ServingStatus::NotServing
}))
);
crate::assert_with_log!(
next_ok,
"watch stream emits changed status after wake",
true,
next_ok
);
crate::test_complete!("health_watch_async_emits_initial_status_and_wakes_on_change");
}
#[test]
fn health_watch_async_drop_unregisters_pending_waiter() {
init_test("health_watch_async_drop_unregisters_pending_waiter");
let service = HealthService::new();
service.set_status("svc", ServingStatus::Serving);
let request = Request::new(HealthCheckRequest::new("svc"));
let response = futures_lite::future::block_on(service.watch_async(&request))
.expect("watch_async should construct a stream");
let mut stream = response.into_inner();
let _ = futures_lite::future::block_on(futures_lite::future::poll_fn(|cx| {
Streaming::poll_next(Pin::new(&mut stream), cx)
}));
let wake_counter = Arc::new(CountingWake::default());
let waker = counting_waker(&wake_counter);
let mut cx = Context::from_waker(&waker);
assert!(matches!(
Streaming::poll_next(Pin::new(&mut stream), &mut cx),
Poll::Pending
));
let waiter_count_before_drop = service
.watch_waiters
.lock()
.get("svc")
.map_or(0, std::collections::HashMap::len);
crate::assert_with_log!(
waiter_count_before_drop == 1,
"pending watch registers exactly one waiter",
1,
waiter_count_before_drop
);
drop(stream);
let waiter_count_after_drop = service
.watch_waiters
.lock()
.get("svc")
.map_or(0, std::collections::HashMap::len);
crate::assert_with_log!(
waiter_count_after_drop == 0,
"dropping a pending watch unregisters waiter state",
0,
waiter_count_after_drop
);
crate::test_complete!("health_watch_async_drop_unregisters_pending_waiter");
}
#[test]
fn health_watch_async_rechecks_after_waiter_registration() {
init_test("health_watch_async_rechecks_after_waiter_registration");
let service = HealthService::new();
service.set_status("svc", ServingStatus::Serving);
let request = Request::new(HealthCheckRequest::new("svc"));
let response = futures_lite::future::block_on(service.watch_async(&request))
.expect("watch_async should construct a stream");
let mut stream = response.into_inner();
let _ = futures_lite::future::block_on(futures_lite::future::poll_fn(|cx| {
Streaming::poll_next(Pin::new(&mut stream), cx)
}));
let wake_counter = Arc::new(CountingWake::default());
let waker = counting_waker(&wake_counter);
let mut cx = Context::from_waker(&waker);
let poll = stream.poll_next_with_hook(&mut cx, |stream| {
stream
.watcher
.service
.set_status("svc", ServingStatus::NotServing);
});
let changed = matches!(
poll,
Poll::Ready(Some(Ok(HealthCheckResponse {
status: ServingStatus::NotServing
})))
);
crate::assert_with_log!(
changed,
"watch stream must not miss transition between status check and waiter registration",
true,
format!("{poll:?}")
);
let waiter_count = service
.watch_waiters
.lock()
.get("svc")
.map_or(0, std::collections::HashMap::len);
crate::assert_with_log!(
waiter_count == 0,
"caught transition clears waiter registration instead of parking forever",
0,
waiter_count
);
crate::assert_with_log!(
wake_counter.wakes.load(Ordering::SeqCst) == 0,
"recheck path resolves inline without depending on an out-of-band wake",
0,
wake_counter.wakes.load(Ordering::SeqCst)
);
crate::test_complete!("health_watch_async_rechecks_after_waiter_registration");
}
#[test]
fn health_service_services() {
init_test("health_service_services");
let service = HealthService::new();
service.set_status("b", ServingStatus::NotServing);
service.set_status("a", ServingStatus::Serving);
let services = service.services();
crate::assert_with_log!(
services == vec!["a".to_string(), "b".to_string()],
"services are returned in deterministic sorted order",
vec!["a".to_string(), "b".to_string()],
services
);
crate::test_complete!("health_service_services");
}
#[test]
fn health_reporter() {
init_test("health_reporter");
let service = HealthService::new();
{
let reporter = HealthReporter::new(service.clone(), "my.Service");
reporter.set_serving();
let status = reporter.status();
crate::assert_with_log!(
status == ServingStatus::Serving,
"serving",
ServingStatus::Serving,
status
);
let serving = service.is_serving("my.Service");
crate::assert_with_log!(serving, "service serving", true, serving);
}
let none = service.get_status("my.Service").is_none();
crate::assert_with_log!(none, "cleared on drop", true, none);
crate::test_complete!("health_reporter");
}
#[test]
fn health_reporter_only_final_drop_clears_shared_service_status() {
init_test("health_reporter_only_final_drop_clears_shared_service_status");
let service = HealthService::new();
let reporter_a = HealthReporter::new(service.clone(), "shared.Service");
let reporter_b = HealthReporter::new(service.clone(), "shared.Service");
reporter_a.set_serving();
let version_after_set = service.version();
drop(reporter_a);
crate::assert_with_log!(
service.get_status("shared.Service") == Some(ServingStatus::Serving),
"first drop preserves shared registration",
Some(ServingStatus::Serving),
service.get_status("shared.Service")
);
crate::assert_with_log!(
service.version() == version_after_set,
"non-final drop does not clear or bump version",
version_after_set,
service.version()
);
reporter_b.set_not_serving();
crate::assert_with_log!(
service.get_status("shared.Service") == Some(ServingStatus::NotServing),
"remaining reporter still controls shared service state",
Some(ServingStatus::NotServing),
service.get_status("shared.Service")
);
drop(reporter_b);
crate::assert_with_log!(
service.get_status("shared.Service").is_none(),
"final drop clears shared registration",
true,
service.get_status("shared.Service").is_none()
);
crate::test_complete!("health_reporter_only_final_drop_clears_shared_service_status");
}
#[test]
fn health_reporter_final_drop_does_not_clear_replacement_reporter_status() {
init_test("health_reporter_final_drop_does_not_clear_replacement_reporter_status");
let service = HealthService::new();
let reporter = HealthReporter::new(service.clone(), "race.Service");
reporter.set_serving();
let _reporter = std::mem::ManuallyDrop::new(reporter);
let (attempt_tx, attempt_rx) = std::sync::mpsc::channel();
let (created_tx, created_rx) = std::sync::mpsc::channel();
let service_for_thread = service.clone();
let handle = std::thread::spawn(move || {
attempt_rx.recv().unwrap();
let replacement = HealthReporter::new(service_for_thread.clone(), "race.Service");
replacement.set_not_serving();
created_tx.send(()).unwrap();
replacement
});
service.release_reporter_and_maybe_clear_status_with_hook("race.Service", || {
attempt_tx.send(()).unwrap();
std::thread::yield_now();
});
created_rx.recv().unwrap();
crate::assert_with_log!(
service.get_status("race.Service") == Some(ServingStatus::NotServing),
"replacement reporter survives final-drop clear window",
Some(ServingStatus::NotServing),
service.get_status("race.Service")
);
let replacement = handle.join().unwrap();
drop(replacement);
crate::assert_with_log!(
service.get_status("race.Service").is_none(),
"replacement final drop still clears registration",
true,
service.get_status("race.Service").is_none()
);
crate::test_complete!(
"health_reporter_final_drop_does_not_clear_replacement_reporter_status"
);
}
#[test]
fn health_service_builder() {
init_test("health_service_builder");
let service = HealthServiceBuilder::new()
.add("explicit", ServingStatus::NotServing)
.add_serving(["a", "b", "c"])
.build();
let explicit = service.get_status("explicit");
crate::assert_with_log!(
explicit == Some(ServingStatus::NotServing),
"explicit",
Some(ServingStatus::NotServing),
explicit
);
let a = service.get_status("a");
crate::assert_with_log!(
a == Some(ServingStatus::Serving),
"a",
Some(ServingStatus::Serving),
a
);
let b = service.get_status("b");
crate::assert_with_log!(
b == Some(ServingStatus::Serving),
"b",
Some(ServingStatus::Serving),
b
);
let c = service.get_status("c");
crate::assert_with_log!(
c == Some(ServingStatus::Serving),
"c",
Some(ServingStatus::Serving),
c
);
crate::test_complete!("health_service_builder");
}
#[test]
fn health_service_named_service() {
init_test("health_service_named_service");
let name = HealthService::NAME;
crate::assert_with_log!(
name == "grpc.health.v1.Health",
"name",
"grpc.health.v1.Health",
name
);
crate::test_complete!("health_service_named_service");
}
#[test]
fn health_service_descriptor() {
init_test("health_service_descriptor");
let service = HealthService::new();
let desc = service.descriptor();
crate::assert_with_log!(desc.name == "Health", "name", "Health", desc.name);
crate::assert_with_log!(
desc.package == "grpc.health.v1",
"package",
"grpc.health.v1",
desc.package
);
let len = desc.methods.len();
crate::assert_with_log!(len == 2, "methods len", 2, len);
crate::test_complete!("health_service_descriptor");
}
#[test]
fn health_service_method_names() {
init_test("health_service_method_names");
let service = HealthService::new();
let names = service.method_names();
let has_check = names.contains(&"Check");
crate::assert_with_log!(has_check, "has Check", true, has_check);
let has_watch = names.contains(&"Watch");
crate::assert_with_log!(has_watch, "has Watch", true, has_watch);
crate::test_complete!("health_service_method_names");
}
#[test]
fn health_check_request_constructors() {
init_test("health_check_request_constructors");
let req = HealthCheckRequest::new("my.Service");
crate::assert_with_log!(
req.service == "my.Service",
"service",
"my.Service",
req.service
);
let req = HealthCheckRequest::server();
crate::assert_with_log!(req.service.is_empty(), "service", "", req.service);
crate::test_complete!("health_check_request_constructors");
}
#[test]
fn health_service_clone() {
init_test("health_service_clone");
let service1 = HealthService::new();
let service2 = service1.clone();
service1.set_status("test", ServingStatus::Serving);
let status = service2.get_status("test");
crate::assert_with_log!(
status == Some(ServingStatus::Serving),
"serving",
Some(ServingStatus::Serving),
status
);
crate::test_complete!("health_service_clone");
}
#[test]
fn serving_status_debug_clone_copy_eq_hash_default() {
use std::collections::HashSet;
let def = ServingStatus::default();
assert_eq!(def, ServingStatus::Unknown);
let statuses = [
ServingStatus::Unknown,
ServingStatus::Serving,
ServingStatus::NotServing,
ServingStatus::ServiceUnknown,
];
for s in &statuses {
let copied = *s;
let cloned = *s;
assert_eq!(copied, cloned);
assert!(!format!("{s:?}").is_empty());
}
let mut set = HashSet::new();
for s in &statuses {
set.insert(*s);
}
assert_eq!(set.len(), 4);
set.insert(ServingStatus::Serving);
assert_eq!(set.len(), 4);
}
#[test]
fn health_check_request_debug_clone_default() {
let def = HealthCheckRequest::default();
assert!(def.service.is_empty());
let dbg = format!("{def:?}");
assert!(dbg.contains("HealthCheckRequest"), "{dbg}");
let cloned = def;
assert_eq!(cloned.service, "");
}
#[test]
fn health_check_response_debug_clone_default() {
let def = HealthCheckResponse::default();
assert_eq!(def.status, ServingStatus::Unknown);
let dbg = format!("{def:?}");
assert!(dbg.contains("HealthCheckResponse"), "{dbg}");
let resp = HealthCheckResponse::new(ServingStatus::Serving);
let cloned = resp;
assert_eq!(cloned.status, ServingStatus::Serving);
}
}