1use std::{convert::Infallible, future::Future, net::SocketAddr};
2
3use tonic::server::NamedService;
4
5use crate::core::{CoreError, Service, ServiceFuture, ShutdownToken};
6
7pub async fn serve_health_with_shutdown<F>(
12 addr: SocketAddr,
13 shutdown: F,
14) -> Result<(), tonic::transport::Error>
15where
16 F: Future<Output = ()> + Send + 'static,
17{
18 let (_reporter, health_service) = tonic_health::server::health_reporter();
19
20 tonic::transport::Server::builder()
21 .add_service(health_service)
22 .serve_with_shutdown(addr, shutdown)
23 .await
24}
25
26pub struct TonicService<S> {
28 name: String,
29 addr: SocketAddr,
30 service: std::sync::Mutex<Option<S>>,
31}
32
33impl<S> TonicService<S> {
34 pub fn new(name: impl Into<String>, addr: SocketAddr, service: S) -> Self {
36 Self {
37 name: name.into(),
38 addr,
39 service: std::sync::Mutex::new(Some(service)),
40 }
41 }
42
43 pub fn addr(&self) -> SocketAddr {
45 self.addr
46 }
47}
48
49impl<S> Service for TonicService<S>
50where
51 S: tower::Service<
52 http::Request<tonic::body::Body>,
53 Response = http::Response<tonic::body::Body>,
54 Error = Infallible,
55 > + NamedService
56 + Clone
57 + Send
58 + Sync
59 + 'static,
60 S::Future: Send + 'static,
61{
62 fn name(&self) -> &str {
63 &self.name
64 }
65
66 fn start(&self, shutdown: ShutdownToken) -> ServiceFuture<'_> {
67 Box::pin(async move {
68 let service = self
69 .service
70 .lock()
71 .expect("tonic service mutex")
72 .take()
73 .ok_or_else(|| {
74 CoreError::Service(format!("service {} already started", self.name))
75 })?;
76 tonic::transport::Server::builder()
77 .add_service(service)
78 .serve_with_shutdown(self.addr, async move {
79 shutdown.cancelled().await;
80 })
81 .await
82 .map_err(|error| {
83 CoreError::Service(format!("RPC service {} failed: {error}", self.name))
84 })
85 })
86 }
87}
88
89pub struct TonicHealthService {
91 name: String,
92 addr: SocketAddr,
93}
94
95impl TonicHealthService {
96 pub fn new(name: impl Into<String>, addr: SocketAddr) -> Self {
98 Self {
99 name: name.into(),
100 addr,
101 }
102 }
103}
104
105impl Service for TonicHealthService {
106 fn name(&self) -> &str {
107 &self.name
108 }
109
110 fn start(&self, shutdown: ShutdownToken) -> ServiceFuture<'_> {
111 Box::pin(async move {
112 serve_health_with_shutdown(self.addr, async move {
113 shutdown.cancelled().await;
114 })
115 .await
116 .map_err(|error| {
117 CoreError::Service(format!("RPC service {} failed: {error}", self.name))
118 })
119 })
120 }
121}