1#![warn(missing_docs)]
32#![warn(rustdoc::missing_crate_level_docs)]
33
34use rustapi_core::RustApi;
35use std::error::Error;
36use std::future::Future;
37use std::pin::Pin;
38use tokio::sync::watch;
39
40pub type BoxError = Box<dyn Error + Send + Sync>;
42
43pub type Result<T> = std::result::Result<T, BoxError>;
45
46pub type ShutdownFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
48
49pub use tonic;
51
52pub use prost;
54
55fn to_boxed_error<E>(err: E) -> BoxError
56where
57 E: Error + Send + Sync + 'static,
58{
59 Box::new(err)
60}
61
62pub async fn run_concurrently<HF, GF, HE, GE>(http_future: HF, grpc_future: GF) -> Result<()>
68where
69 HF: Future<Output = std::result::Result<(), HE>> + Send,
70 GF: Future<Output = std::result::Result<(), GE>> + Send,
71 HE: Error + Send + Sync + 'static,
72 GE: Error + Send + Sync + 'static,
73{
74 let http_task = async move { http_future.await.map_err(to_boxed_error) };
75 let grpc_task = async move { grpc_future.await.map_err(to_boxed_error) };
76
77 let (_http_ok, _grpc_ok) = tokio::try_join!(http_task, grpc_task)?;
78 Ok(())
79}
80
81pub async fn run_rustapi_and_grpc<GF, GE>(
86 app: RustApi,
87 http_addr: impl AsRef<str>,
88 grpc_future: GF,
89) -> Result<()>
90where
91 GF: Future<Output = std::result::Result<(), GE>> + Send,
92 GE: Error + Send + Sync + 'static,
93{
94 let http_addr = http_addr.as_ref().to_string();
95 let http_task = async move { app.run(&http_addr).await };
96 let grpc_task = async move { grpc_future.await.map_err(to_boxed_error) };
97
98 let (_http_ok, _grpc_ok) = tokio::try_join!(http_task, grpc_task)?;
99 Ok(())
100}
101
102pub async fn run_rustapi_and_grpc_with_shutdown<GF, GE, SF, F>(
136 app: RustApi,
137 http_addr: impl AsRef<str>,
138 shutdown_signal: SF,
139 grpc_with_shutdown: F,
140) -> Result<()>
141where
142 GF: Future<Output = std::result::Result<(), GE>> + Send,
143 GE: Error + Send + Sync + 'static,
144 SF: Future<Output = ()> + Send + 'static,
145 F: FnOnce(ShutdownFuture) -> GF,
146{
147 let http_addr = http_addr.as_ref().to_string();
148 let (shutdown_tx, shutdown_rx) = watch::channel(false);
149
150 let shutdown_dispatch = tokio::spawn(async move {
152 shutdown_signal.await;
153 let _ = shutdown_tx.send(true);
154 });
155
156 let http_shutdown = shutdown_notifier(shutdown_rx.clone());
157 let grpc_shutdown = shutdown_notifier(shutdown_rx);
158
159 let http_task = async move { app.run_with_shutdown(&http_addr, http_shutdown).await };
160 let grpc_task = async move {
161 grpc_with_shutdown(Box::pin(grpc_shutdown))
162 .await
163 .map_err(to_boxed_error)
164 };
165
166 let joined = tokio::try_join!(http_task, grpc_task).map(|_| ());
167
168 shutdown_dispatch.abort();
169 let _ = shutdown_dispatch.await;
170
171 joined
172}
173
174async fn shutdown_notifier(mut rx: watch::Receiver<bool>) {
175 if *rx.borrow() {
176 return;
177 }
178
179 while rx.changed().await.is_ok() {
180 if *rx.borrow() {
181 break;
182 }
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189 use rustapi_core::get;
190 use std::io;
191 use tokio::sync::oneshot;
192 use tokio::time::{sleep, timeout, Duration};
193
194 #[tokio::test]
195 async fn run_concurrently_returns_ok_when_both_succeed() {
196 let http = async { Ok::<(), io::Error>(()) };
197 let grpc = async { Ok::<(), io::Error>(()) };
198
199 let result = run_concurrently(http, grpc).await;
200 assert!(result.is_ok());
201 }
202
203 #[tokio::test]
204 async fn run_concurrently_returns_err_when_any_fails() {
205 let http = async { Err::<(), _>(io::Error::other("http failed")) };
206
207 let grpc = async {
208 sleep(Duration::from_millis(20)).await;
209 Ok::<(), io::Error>(())
210 };
211
212 let result = run_concurrently(http, grpc).await;
213 assert!(result.is_err());
214 }
215
216 #[tokio::test]
217 async fn run_rustapi_and_grpc_with_shutdown_stops_both_servers() {
218 async fn health() -> &'static str {
219 "ok"
220 }
221
222 let app = RustApi::new().route("/health", get(health));
223 let grpc_addr = "127.0.0.1:0".parse().expect("valid socket addr");
224 let (tx, rx) = oneshot::channel::<()>();
225
226 let run_future = run_rustapi_and_grpc_with_shutdown(
227 app,
228 "127.0.0.1:0",
229 async move {
230 let _ = rx.await;
231 },
232 move |shutdown| {
233 let (_reporter, health_service) = tonic_health::server::health_reporter();
234 tonic::transport::Server::builder()
235 .add_service(health_service)
236 .serve_with_shutdown(grpc_addr, shutdown)
237 },
238 );
239
240 tokio::spawn(async move {
241 sleep(Duration::from_millis(75)).await;
242 let _ = tx.send(());
243 });
244
245 let result = timeout(Duration::from_secs(3), run_future).await;
246 assert!(result.is_ok(), "servers should stop before timeout");
247 assert!(
248 result.expect("timeout checked").is_ok(),
249 "graceful shutdown should succeed"
250 );
251 }
252}