Skip to main content

rustapi_core/app/
run.rs

1#[cfg(feature = "dashboard")]
2use super::helpers::{
3    infer_route_feature_gates, is_dashboard_replay_eligible, openapi_tags_for_route,
4};
5use super::types::RustApi;
6use crate::error::Result;
7use crate::middleware::BodyLimitLayer;
8use crate::response::IntoResponse;
9use crate::server::Server;
10
11impl RustApi {
12    async fn prepare_for_serve(&mut self, addr: &str) {
13        self.maybe_dump_openapi();
14        self.print_hot_reload_banner(addr);
15        self.apply_health_endpoints();
16        self.apply_status_page();
17        #[cfg(feature = "dashboard")]
18        self.apply_dashboard();
19        if let Some(limit) = self.body_limit {
20            self.layers.prepend(Box::new(BodyLimitLayer::new(limit)));
21        }
22        for hook in std::mem::take(&mut self.lifecycle_hooks.on_start) {
23            hook().await;
24        }
25    }
26
27    pub(super) fn print_hot_reload_banner(&self, addr: &str) -> Option<bool> {
28        if !self.hot_reload {
29            return None;
30        }
31
32        let is_under_watcher = std::env::var("RUSTAPI_HOT_RELOAD")
33            .map(|v| v == "1")
34            .unwrap_or(false);
35
36        std::env::set_var("RUSTAPI_HOT_RELOAD", "1");
37
38        tracing::info!("Hot-reload mode enabled");
39
40        if is_under_watcher {
41            tracing::info!("   File watcher active - changes will trigger rebuild + restart");
42        } else {
43            tracing::info!("   Tip: Run with `cargo rustapi run --watch` for automatic hot-reload");
44        }
45
46        tracing::info!("   Listening on http://{addr}");
47        Some(is_under_watcher)
48    }
49
50    async fn run_shutdown_hooks(hooks: Vec<crate::events::LifecycleHook>) {
51        for hook in hooks {
52            hook().await;
53        }
54    }
55
56    pub(super) fn apply_status_page(&mut self) {
57        if let Some(config) = &self.status_config {
58            let monitor = std::sync::Arc::new(crate::status::StatusMonitor::new());
59
60            self.layers
61                .push(Box::new(crate::status::StatusLayer::new(monitor.clone())));
62
63            use crate::router::MethodRouter;
64            use std::collections::HashMap;
65
66            let monitor = monitor.clone();
67            let config = config.clone();
68            let path = config.path.clone();
69
70            let handler: crate::handler::BoxedHandler = std::sync::Arc::new(move |_| {
71                let monitor = monitor.clone();
72                let config = config.clone();
73                Box::pin(async move {
74                    crate::status::status_handler(monitor, config)
75                        .await
76                        .into_response()
77                })
78            });
79
80            let mut handlers = HashMap::new();
81            handlers.insert(http::Method::GET, handler);
82            let method_router = MethodRouter::from_boxed(handlers);
83
84            let router = std::mem::take(&mut self.router);
85            self.router = router.route(&path, method_router);
86        }
87    }
88
89    #[cfg(feature = "dashboard")]
90    pub(super) fn apply_dashboard(&mut self) {
91        use crate::dashboard::{DashboardMetrics, RouteInventoryItem};
92        use crate::handler::BoxedHandler;
93        use crate::response::Body;
94        use crate::router::MethodRouter;
95        use std::collections::HashMap;
96
97        let mut config = match self.dashboard_config.take() {
98            Some(c) => c,
99            None => return,
100        };
101        config.normalize_paths();
102
103        let mut inventory: Vec<RouteInventoryItem> = self
104            .router
105            .registered_routes()
106            .values()
107            .map(|info| {
108                let methods: Vec<String> = info.methods.iter().map(|m| m.to_string()).collect();
109                let health_eligible = self
110                    .health_endpoint_config
111                    .as_ref()
112                    .map(|health| {
113                        info.path == health.health_path
114                            || info.path == health.readiness_path
115                            || info.path == health.liveness_path
116                    })
117                    .unwrap_or(false);
118
119                RouteInventoryItem::new(info.path.clone(), methods)
120                    .with_tags(openapi_tags_for_route(
121                        &self.openapi_spec,
122                        &info.path,
123                        &info.methods,
124                    ))
125                    .with_feature_gates(infer_route_feature_gates(&info.path))
126                    .health_eligible(health_eligible)
127                    .replay_eligible(is_dashboard_replay_eligible(&info.path, health_eligible))
128            })
129            .collect();
130        inventory.sort_by(|a, b| a.path.cmp(&b.path));
131
132        let metrics = std::sync::Arc::new(DashboardMetrics::new_with_replay_admin_path(
133            inventory,
134            config.replay_api_path.clone(),
135        ));
136
137        let router = std::mem::take(&mut self.router);
138        self.router = router.state(std::sync::Arc::clone(&metrics));
139
140        let prefix = config.path.trim_end_matches('/').to_owned();
141
142        fn not_found() -> crate::response::Response {
143            http::Response::builder()
144                .status(404)
145                .body(Body::Full(http_body_util::Full::new(bytes::Bytes::from(
146                    "Not Found",
147                ))))
148                .unwrap()
149        }
150
151        {
152            let metrics_c = std::sync::Arc::clone(&metrics);
153            let config_c = config.clone();
154            let handler: BoxedHandler = std::sync::Arc::new(move |req| {
155                let metrics = std::sync::Arc::clone(&metrics_c);
156                let cfg = config_c.clone();
157                Box::pin(async move {
158                    let headers = req.headers().clone();
159                    let method = req.method().to_string();
160                    let path = req.uri().path().to_owned();
161                    crate::dashboard::routes::dispatch(&headers, &method, &path, &metrics, &cfg)
162                        .await
163                        .unwrap_or_else(not_found)
164                })
165            });
166            let mut h = HashMap::new();
167            h.insert(http::Method::GET, handler);
168            let router = std::mem::take(&mut self.router);
169            self.router = router.route(&prefix, MethodRouter::from_boxed(h));
170        }
171
172        {
173            let metrics_c = std::sync::Arc::clone(&metrics);
174            let config_c = config.clone();
175            let wildcard_path = format!("{}/*path", prefix);
176            let handler: BoxedHandler = std::sync::Arc::new(move |req| {
177                let metrics = std::sync::Arc::clone(&metrics_c);
178                let cfg = config_c.clone();
179                Box::pin(async move {
180                    let headers = req.headers().clone();
181                    let method = req.method().to_string();
182                    let path = req.uri().path().to_owned();
183                    crate::dashboard::routes::dispatch(&headers, &method, &path, &metrics, &cfg)
184                        .await
185                        .unwrap_or_else(not_found)
186                })
187            });
188            let mut h = HashMap::new();
189            h.insert(http::Method::GET, handler);
190            let router = std::mem::take(&mut self.router);
191            self.router = router.route(&wildcard_path, MethodRouter::from_boxed(h));
192        }
193    }
194
195    #[cfg(feature = "dashboard")]
196    pub fn dashboard(mut self, config: crate::dashboard::DashboardConfig) -> Self {
197        self.dashboard_config = Some(config);
198        self
199    }
200
201    pub async fn run(mut self, addr: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
202        self.prepare_for_serve(addr).await;
203
204        let shutdown_hooks = std::mem::take(&mut self.lifecycle_hooks.on_shutdown);
205        let server = Server::new(self.router, self.layers, self.interceptors);
206        let result = server.run(addr).await;
207        Self::run_shutdown_hooks(shutdown_hooks).await;
208        result
209    }
210
211    /// Run the server with graceful shutdown signal
212    pub async fn run_with_shutdown<F>(
213        mut self,
214        addr: impl AsRef<str>,
215        signal: F,
216    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
217    where
218        F: std::future::Future<Output = ()> + Send + 'static,
219    {
220        self.prepare_for_serve(addr.as_ref()).await;
221
222        let shutdown_hooks = std::mem::take(&mut self.lifecycle_hooks.on_shutdown);
223        let server = Server::new(self.router, self.layers, self.interceptors);
224        server.run_with_shutdown(addr.as_ref(), signal).await?;
225        Self::run_shutdown_hooks(shutdown_hooks).await;
226        Ok(())
227    }
228
229    /// Run HTTP/3 with TLS certificates and a graceful shutdown signal.
230    #[cfg(feature = "http3")]
231    pub async fn run_http3_with_shutdown<F>(
232        mut self,
233        config: crate::http3::Http3Config,
234        signal: F,
235    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
236    where
237        F: std::future::Future<Output = ()> + Send + 'static,
238    {
239        use std::sync::Arc;
240
241        let addr = config.socket_addr();
242        self.prepare_for_serve(&addr).await;
243
244        let shutdown_hooks = std::mem::take(&mut self.lifecycle_hooks.on_shutdown);
245        let server = crate::http3::Http3Server::new(
246            &config,
247            Arc::new(self.router.clone()),
248            Arc::new(self.layers.clone()),
249            Arc::new(self.interceptors.clone()),
250        )
251        .await?;
252
253        server.run_with_shutdown(signal).await?;
254        Self::run_shutdown_hooks(shutdown_hooks).await;
255        Ok(())
256    }
257
258    #[cfg(feature = "http3")]
259    pub async fn run_http3(
260        mut self,
261        config: crate::http3::Http3Config,
262    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
263        use std::sync::Arc;
264
265        let addr = config.socket_addr();
266        self.prepare_for_serve(&addr).await;
267
268        let shutdown_hooks = std::mem::take(&mut self.lifecycle_hooks.on_shutdown);
269        let server = crate::http3::Http3Server::new(
270            &config,
271            Arc::new(self.router.clone()),
272            Arc::new(self.layers.clone()),
273            Arc::new(self.interceptors.clone()),
274        )
275        .await?;
276
277        let result = server.run().await;
278        Self::run_shutdown_hooks(shutdown_hooks).await;
279        result
280    }
281
282    /// Run HTTP/3 (self-signed) with a graceful shutdown signal.
283    #[cfg(feature = "http3-dev")]
284    pub async fn run_http3_dev_with_shutdown<F>(
285        mut self,
286        addr: &str,
287        signal: F,
288    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
289    where
290        F: std::future::Future<Output = ()> + Send + 'static,
291    {
292        use std::sync::Arc;
293
294        self.prepare_for_serve(addr).await;
295
296        let shutdown_hooks = std::mem::take(&mut self.lifecycle_hooks.on_shutdown);
297        let server = crate::http3::Http3Server::new_with_self_signed(
298            addr,
299            Arc::new(self.router.clone()),
300            Arc::new(self.layers.clone()),
301            Arc::new(self.interceptors.clone()),
302        )
303        .await?;
304
305        server.run_with_shutdown(signal).await?;
306        Self::run_shutdown_hooks(shutdown_hooks).await;
307        Ok(())
308    }
309
310    #[cfg(feature = "http3-dev")]
311    pub async fn run_http3_dev(
312        mut self,
313        addr: &str,
314    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
315        use std::sync::Arc;
316
317        self.prepare_for_serve(addr).await;
318
319        let shutdown_hooks = std::mem::take(&mut self.lifecycle_hooks.on_shutdown);
320        let server = crate::http3::Http3Server::new_with_self_signed(
321            addr,
322            Arc::new(self.router.clone()),
323            Arc::new(self.layers.clone()),
324            Arc::new(self.interceptors.clone()),
325        )
326        .await?;
327
328        let result = server.run().await;
329        Self::run_shutdown_hooks(shutdown_hooks).await;
330        result
331    }
332
333    /// Configure HTTP/3 support for `run_http3` and `run_dual_stack`.
334    #[cfg(feature = "http3")]
335    pub fn with_http3(mut self, cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
336        self.http3_config = Some(crate::http3::Http3Config::new(cert_path, key_path));
337        self
338    }
339
340    /// Run HTTP/1.1 and HTTP/3 together with a graceful shutdown signal.
341    #[cfg(feature = "http3")]
342    pub async fn run_dual_stack_with_shutdown<F>(
343        mut self,
344        http_addr: &str,
345        signal: F,
346    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
347    where
348        F: std::future::Future<Output = ()> + Send + 'static,
349    {
350        use std::sync::Arc;
351
352        let mut config = self
353            .http3_config
354            .take()
355            .ok_or("HTTP/3 config not set. Use .with_http3(...)")?;
356
357        let http_socket: std::net::SocketAddr = http_addr.parse()?;
358        config.bind_addr = if http_socket.ip().is_ipv6() {
359            format!("[{}]", http_socket.ip())
360        } else {
361            http_socket.ip().to_string()
362        };
363        config.port = http_socket.port();
364        let http_addr = http_socket.to_string();
365
366        self.prepare_for_serve(&http_addr).await;
367
368        let shutdown_hooks = std::mem::take(&mut self.lifecycle_hooks.on_shutdown);
369        let router = Arc::new(self.router);
370        let layers = Arc::new(self.layers);
371        let interceptors = Arc::new(self.interceptors);
372
373        let http1_server =
374            Server::from_shared(router.clone(), layers.clone(), interceptors.clone());
375        let http3_server =
376            crate::http3::Http3Server::new(&config, router, layers, interceptors).await?;
377
378        tracing::info!(
379            http1_addr = %http_addr,
380            http3_addr = %config.socket_addr(),
381            "Starting dual-stack HTTP/1.1 + HTTP/3 servers"
382        );
383
384        let notify = std::sync::Arc::new(tokio::sync::Notify::new());
385        let notify_for_signal = notify.clone();
386        tokio::spawn(async move {
387            signal.await;
388            notify_for_signal.notify_waiters();
389        });
390        let wait_for_shutdown = {
391            let notify = notify.clone();
392            async move {
393                notify.notified().await;
394            }
395        };
396        let wait_for_shutdown_http3 = async move {
397            notify.notified().await;
398        };
399
400        tokio::try_join!(
401            http1_server.run_with_shutdown(&http_addr, wait_for_shutdown),
402            http3_server.run_with_shutdown(wait_for_shutdown_http3),
403        )?;
404        Self::run_shutdown_hooks(shutdown_hooks).await;
405        Ok(())
406    }
407
408    #[cfg(feature = "http3")]
409    pub async fn run_dual_stack(
410        mut self,
411        http_addr: &str,
412    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
413        use std::sync::Arc;
414
415        let mut config = self
416            .http3_config
417            .take()
418            .ok_or("HTTP/3 config not set. Use .with_http3(...)")?;
419
420        let http_socket: std::net::SocketAddr = http_addr.parse()?;
421        config.bind_addr = if http_socket.ip().is_ipv6() {
422            format!("[{}]", http_socket.ip())
423        } else {
424            http_socket.ip().to_string()
425        };
426        config.port = http_socket.port();
427        let http_addr = http_socket.to_string();
428
429        self.prepare_for_serve(&http_addr).await;
430
431        let shutdown_hooks = std::mem::take(&mut self.lifecycle_hooks.on_shutdown);
432        let router = Arc::new(self.router);
433        let layers = Arc::new(self.layers);
434        let interceptors = Arc::new(self.interceptors);
435
436        let http1_server =
437            Server::from_shared(router.clone(), layers.clone(), interceptors.clone());
438        let http3_server =
439            crate::http3::Http3Server::new(&config, router, layers, interceptors).await?;
440
441        tracing::info!(
442            http1_addr = %http_addr,
443            http3_addr = %config.socket_addr(),
444            "Starting dual-stack HTTP/1.1 + HTTP/3 servers"
445        );
446
447        tokio::try_join!(http1_server.run(&http_addr), http3_server.run(),)?;
448        Self::run_shutdown_hooks(shutdown_hooks).await;
449        Ok(())
450    }
451}