Skip to main content

rustapi_grpc/
lib.rs

1//! # rustapi-grpc
2//!
3//! gRPC integration helpers for RustAPI using [`tonic`].
4//!
5//! This crate keeps RustAPI's facade approach: your app code stays simple while you can
6//! run a RustAPI HTTP server and a Tonic gRPC server side-by-side in the same process.
7//!
8//! ## Quick start
9//!
10//! ```rust,ignore
11//! use rustapi_rs::grpc::{run_rustapi_and_grpc, tonic};
12//! use rustapi_rs::prelude::*;
13//!
14//! #[rustapi_rs::get("/health")]
15//! async fn health() -> &'static str { "ok" }
16//!
17//! #[tokio::main]
18//! async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
19//!     let http_app = RustApi::new().route("/health", get(health));
20//!
21//!     let grpc_addr = "127.0.0.1:50051".parse()?;
22//!     let grpc_server = tonic::transport::Server::builder()
23//!         .add_service(MyGreeterServer::new(MyGreeter::default()))
24//!         .serve(grpc_addr);
25//!
26//!     run_rustapi_and_grpc(http_app, "127.0.0.1:8080", grpc_server).await?;
27//!     Ok(())
28//! }
29//! ```
30
31#![warn(missing_docs)]
32#![warn(rustdoc::missing_crate_level_docs)]
33
34use rustapi_core::RustApi;
35use std::error::Error;
36use std::future::Future;
37use std::pin::Pin;
38use tokio::sync::watch;
39
40/// Boxed error used by this crate.
41pub type BoxError = Box<dyn Error + Send + Sync>;
42
43/// Result type used by this crate.
44pub type Result<T> = std::result::Result<T, BoxError>;
45
46/// Shutdown future type used by gRPC server builders.
47pub type ShutdownFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
48
49/// Re-export `tonic` so users can use a single dependency surface from `rustapi-rs`.
50pub use tonic;
51
52/// Re-export `prost` for protobuf message derives and runtime types.
53pub use prost;
54
55fn to_boxed_error<E>(err: E) -> BoxError
56where
57    E: Error + Send + Sync + 'static,
58{
59    Box::new(err)
60}
61
62/// Run two independent servers/tasks concurrently.
63///
64/// This is useful for running a RustAPI HTTP server together with a Tonic gRPC server.
65///
66/// The function returns when one of the futures returns an error, or when both complete successfully.
67pub async fn run_concurrently<HF, GF, HE, GE>(http_future: HF, grpc_future: GF) -> Result<()>
68where
69    HF: Future<Output = std::result::Result<(), HE>> + Send,
70    GF: Future<Output = std::result::Result<(), GE>> + Send,
71    HE: Error + Send + Sync + 'static,
72    GE: Error + Send + Sync + 'static,
73{
74    let http_task = async move { http_future.await.map_err(to_boxed_error) };
75    let grpc_task = async move { grpc_future.await.map_err(to_boxed_error) };
76
77    let (_http_ok, _grpc_ok) = tokio::try_join!(http_task, grpc_task)?;
78    Ok(())
79}
80
81/// Run a `RustApi` HTTP server and any gRPC future side-by-side.
82///
83/// `grpc_future` is typically a Tonic server future:
84/// `tonic::transport::Server::builder().add_service(...).serve(addr)`.
85pub async fn run_rustapi_and_grpc<GF, GE>(
86    app: RustApi,
87    http_addr: impl AsRef<str>,
88    grpc_future: GF,
89) -> Result<()>
90where
91    GF: Future<Output = std::result::Result<(), GE>> + Send,
92    GE: Error + Send + Sync + 'static,
93{
94    let http_addr = http_addr.as_ref().to_string();
95    let http_task = async move { app.run(&http_addr).await };
96    let grpc_task = async move { grpc_future.await.map_err(to_boxed_error) };
97
98    let (_http_ok, _grpc_ok) = tokio::try_join!(http_task, grpc_task)?;
99    Ok(())
100}
101
102/// Run RustAPI HTTP and gRPC servers together with a shared shutdown signal.
103///
104/// This helper lets you provide a single shutdown signal (for example `tokio::signal::ctrl_c()`)
105/// and uses it for both servers.
106///
107/// # Example
108///
109/// ```rust,ignore
110/// use rustapi_rs::grpc::{run_rustapi_and_grpc_with_shutdown, tonic};
111/// use rustapi_rs::prelude::*;
112///
113/// #[rustapi_rs::get("/health")]
114/// async fn health() -> &'static str { "ok" }
115///
116/// #[tokio::main]
117/// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
118///     let app = RustApi::new().route("/health", get(health));
119///     let grpc_addr = "127.0.0.1:50051".parse()?;
120///
121///     run_rustapi_and_grpc_with_shutdown(
122///         app,
123///         "127.0.0.1:8080",
124///         tokio::signal::ctrl_c(),
125///         move |shutdown| {
126///             tonic::transport::Server::builder()
127///                 .add_service(MyGreeterServer::new(MyGreeter::default()))
128///                 .serve_with_shutdown(grpc_addr, shutdown)
129///         },
130///     ).await?;
131///
132///     Ok(())
133/// }
134/// ```
135pub async fn run_rustapi_and_grpc_with_shutdown<GF, GE, SF, F>(
136    app: RustApi,
137    http_addr: impl AsRef<str>,
138    shutdown_signal: SF,
139    grpc_with_shutdown: F,
140) -> Result<()>
141where
142    GF: Future<Output = std::result::Result<(), GE>> + Send,
143    GE: Error + Send + Sync + 'static,
144    SF: Future<Output = ()> + Send + 'static,
145    F: FnOnce(ShutdownFuture) -> GF,
146{
147    let http_addr = http_addr.as_ref().to_string();
148    let (shutdown_tx, shutdown_rx) = watch::channel(false);
149
150    // Fan out a single shutdown signal to both servers.
151    let shutdown_dispatch = tokio::spawn(async move {
152        shutdown_signal.await;
153        let _ = shutdown_tx.send(true);
154    });
155
156    let http_shutdown = shutdown_notifier(shutdown_rx.clone());
157    let grpc_shutdown = shutdown_notifier(shutdown_rx);
158
159    let http_task = async move { app.run_with_shutdown(&http_addr, http_shutdown).await };
160    let grpc_task = async move {
161        grpc_with_shutdown(Box::pin(grpc_shutdown))
162            .await
163            .map_err(to_boxed_error)
164    };
165
166    let joined = tokio::try_join!(http_task, grpc_task).map(|_| ());
167
168    shutdown_dispatch.abort();
169    let _ = shutdown_dispatch.await;
170
171    joined
172}
173
174async fn shutdown_notifier(mut rx: watch::Receiver<bool>) {
175    if *rx.borrow() {
176        return;
177    }
178
179    while rx.changed().await.is_ok() {
180        if *rx.borrow() {
181            break;
182        }
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189    use rustapi_core::get;
190    use std::io;
191    use tokio::sync::oneshot;
192    use tokio::time::{sleep, timeout, Duration};
193
194    #[tokio::test]
195    async fn run_concurrently_returns_ok_when_both_succeed() {
196        let http = async { Ok::<(), io::Error>(()) };
197        let grpc = async { Ok::<(), io::Error>(()) };
198
199        let result = run_concurrently(http, grpc).await;
200        assert!(result.is_ok());
201    }
202
203    #[tokio::test]
204    async fn run_concurrently_returns_err_when_any_fails() {
205        let http = async { Err::<(), _>(io::Error::other("http failed")) };
206
207        let grpc = async {
208            sleep(Duration::from_millis(20)).await;
209            Ok::<(), io::Error>(())
210        };
211
212        let result = run_concurrently(http, grpc).await;
213        assert!(result.is_err());
214    }
215
216    #[tokio::test]
217    async fn run_rustapi_and_grpc_with_shutdown_stops_both_servers() {
218        async fn health() -> &'static str {
219            "ok"
220        }
221
222        let app = RustApi::new().route("/health", get(health));
223        let grpc_addr = "127.0.0.1:0".parse().expect("valid socket addr");
224        let (tx, rx) = oneshot::channel::<()>();
225
226        let run_future = run_rustapi_and_grpc_with_shutdown(
227            app,
228            "127.0.0.1:0",
229            async move {
230                let _ = rx.await;
231            },
232            move |shutdown| {
233                let (_reporter, health_service) = tonic_health::server::health_reporter();
234                tonic::transport::Server::builder()
235                    .add_service(health_service)
236                    .serve_with_shutdown(grpc_addr, shutdown)
237            },
238        );
239
240        tokio::spawn(async move {
241            sleep(Duration::from_millis(75)).await;
242            let _ = tx.send(());
243        });
244
245        let result = timeout(Duration::from_secs(3), run_future).await;
246        assert!(result.is_ok(), "servers should stop before timeout");
247        assert!(
248            result.expect("timeout checked").is_ok(),
249            "graceful shutdown should succeed"
250        );
251    }
252}