axum_conf/fluent/router.rs
1//! Core FluentRouter struct and initialization methods.
2
3use tokio_util::sync::CancellationToken;
4#[cfg(any(feature = "rate-limiting", feature = "deduplication"))]
5use tokio_util::task::AbortOnDropHandle;
6
7use {
8 super::shutdown::ShutdownNotifier,
9 crate::{Config, HttpMiddleware, Result, StaticDirRoute},
10 axum::Router,
11 serde::de::DeserializeOwned,
12 tokio::sync::broadcast,
13 tower_http::{services::fs::ServeDir, set_header::SetResponseHeaderLayer},
14};
15
16/// Fluent builder for axum::Router with configuration-based middleware setup.
17///
18/// This wrapper around `axum::Router` provides a fluent API for configuring middleware
19/// and routes based on the application configuration. Create instances using
20/// [`FluentRouter::without_state`] or [`FluentRouter::with_state`].
21///
22/// The router forwards layering and nesting calls to the underlying `axum::Router`,
23/// allowing middleware to be set up at any stage through dedicated `setup_*` methods.
24///
25/// If the configuration has a static files directory configured as a fallback,
26/// it will be automatically set up. For all other directories, call
27/// [`FluentRouter::setup_directories`] to install the necessary middleware.
28///
29/// # Graceful Shutdown
30///
31/// `FluentRouter` provides built-in support for graceful shutdown notifications.
32/// Components can subscribe to shutdown events or use a cancellation token:
33///
34/// ```rust,no_run
35/// use axum_conf::{Config, FluentRouter, ShutdownPhase};
36///
37/// # async fn example() -> axum_conf::Result<()> {
38/// let router = FluentRouter::without_state(Config::default())?;
39///
40/// // Option 1: Simple cancellation token for background tasks
41/// let token = router.cancellation_token();
42/// tokio::spawn(async move {
43/// loop {
44/// tokio::select! {
45/// _ = token.cancelled() => break,
46/// _ = do_work() => {}
47/// }
48/// }
49/// });
50///
51/// // Option 2: Subscribe to shutdown phases for complex cleanup
52/// let mut rx = router.shutdown_notifier().subscribe();
53/// tokio::spawn(async move {
54/// while let Ok(phase) = rx.recv().await {
55/// match phase {
56/// ShutdownPhase::Initiated => println!("Shutting down..."),
57/// _ => {}
58/// }
59/// }
60/// });
61/// # async fn do_work() {}
62/// # Ok(())
63/// # }
64/// ```
65pub struct FluentRouter<State = ()> {
66 pub(crate) config: Config,
67 pub(crate) state: State,
68 pub(crate) inner: Router<State>,
69 #[cfg(feature = "rate-limiting")]
70 pub(crate) governor_handle: Option<AbortOnDropHandle<()>>,
71 #[cfg(feature = "deduplication")]
72 pub(crate) dedup_cleanup_handle: Option<AbortOnDropHandle<()>>,
73 pub(crate) panic_channel: Option<tokio::sync::mpsc::Sender<String>>,
74 pub(crate) shutdown_notifier: ShutdownNotifier,
75 #[cfg(feature = "postgres")]
76 pub(crate) db_pool: sqlx_postgres::PgPool,
77 #[cfg(feature = "circuit-breaker")]
78 pub(crate) circuit_breaker_registry: crate::circuit_breaker::CircuitBreakerRegistry,
79}
80
81impl FluentRouter {
82 /// Creates a new `FluentRouter` without application state.
83 ///
84 /// Accepts any `Config<T>` and extracts the base configuration fields,
85 /// discarding the application-specific config. To preserve your app config,
86 /// clone `config.app` before calling this method.
87 ///
88 /// # Example
89 ///
90 /// ```rust,no_run
91 /// use axum_conf::{Config, FluentRouter};
92 /// use serde::Deserialize;
93 ///
94 /// #[derive(Debug, Clone, Default, Deserialize)]
95 /// struct MyAppConfig {
96 /// #[serde(default)]
97 /// api_key: String,
98 /// }
99 ///
100 /// # fn example() -> axum_conf::Result<()> {
101 /// let config: Config<MyAppConfig> = Config::default();
102 /// let my_settings = config.app.clone(); // Preserve app config
103 /// let router = FluentRouter::without_state(config)?;
104 /// // Use my_settings in your handlers via state
105 /// # Ok(())
106 /// # }
107 /// ```
108 pub fn without_state<T>(config: Config<T>) -> Result<FluentRouter<()>>
109 where
110 T: DeserializeOwned + Clone + Default,
111 {
112 // Extract base config, discarding app-specific fields
113 let base_config: Config<()> = Config {
114 http: config.http,
115 #[cfg(feature = "postgres")]
116 database: config.database,
117 logging: config.logging,
118 #[cfg(feature = "circuit-breaker")]
119 circuit_breaker: config.circuit_breaker,
120 app: (),
121 };
122 FluentRouter::<()>::with_state(base_config, ())
123 }
124}
125
126impl<State> FluentRouter<State>
127where
128 State: Clone + Send + Sync + 'static,
129{
130 /// Creates a new `FluentRouter` with the provided configuration.
131 ///
132 /// Validates the configuration and sets up any fallback static file directories.
133 /// Other static directories must be set up explicitly using `setup_directories()`.
134 /// If a configuration for a database pool is provided, the pool will be created
135 /// and made available for health checks. It can also be accessed via a call to
136 /// `db_pool()`.
137 ///
138 /// Accepts any `Config<T>` and extracts the base configuration fields,
139 /// discarding the application-specific config. To preserve your app config,
140 /// clone `config.app` before calling this method.
141 ///
142 /// # Arguments
143 ///
144 /// * `config` - The service configuration (with any app-specific type)
145 /// * `state` - The application state to be shared across handlers
146 ///
147 /// # Returns
148 ///
149 /// A `Result` containing the new `FluentRouter` or an error if configuration is invalid.
150 ///
151 /// # Errors
152 ///
153 /// Returns an error if:
154 /// - Configuration validation fails
155 /// - Fallback directories are marked as protected
156 /// - Required configuration values are missing
157 pub fn with_state<S, T>(config: Config<T>, state: S) -> Result<FluentRouter<S>>
158 where
159 S: Clone + Send + Sync + 'static,
160 T: DeserializeOwned + Clone + Default,
161 {
162 // Validate the configuration
163 config.validate()?;
164
165 #[cfg(feature = "postgres")]
166 let db_pool = config.create_pgpool()?;
167
168 #[cfg(feature = "circuit-breaker")]
169 let circuit_breaker_registry =
170 crate::circuit_breaker::CircuitBreakerRegistry::new(&config.circuit_breaker);
171
172 // Extract base config, discarding app-specific fields
173 let base_config = Config {
174 http: config.http,
175 #[cfg(feature = "postgres")]
176 database: config.database,
177 logging: config.logging,
178 #[cfg(feature = "circuit-breaker")]
179 circuit_breaker: config.circuit_breaker,
180 app: (),
181 };
182
183 // Create the base router and add public fallback files if configured
184 let me = FluentRouter {
185 config: base_config,
186 state,
187 inner: Router::new(),
188 #[cfg(feature = "rate-limiting")]
189 governor_handle: None,
190 #[cfg(feature = "deduplication")]
191 dedup_cleanup_handle: None,
192 panic_channel: None,
193 shutdown_notifier: ShutdownNotifier::default(),
194 #[cfg(feature = "postgres")]
195 db_pool,
196 #[cfg(feature = "circuit-breaker")]
197 circuit_breaker_registry,
198 };
199
200 me.setup_fallback_files()
201 }
202
203 /// Returns a reference to the shutdown notifier.
204 ///
205 /// Use this to subscribe to shutdown phase notifications for coordinated
206 /// cleanup across multiple components.
207 ///
208 /// # Example
209 ///
210 /// ```rust,no_run
211 /// use axum_conf::{Config, FluentRouter, ShutdownPhase};
212 ///
213 /// # async fn example() -> axum_conf::Result<()> {
214 /// let router = FluentRouter::without_state(Config::default())?;
215 /// let notifier = router.shutdown_notifier();
216 ///
217 /// // Subscribe from multiple places
218 /// let mut rx1 = notifier.subscribe();
219 /// let mut rx2 = notifier.subscribe();
220 ///
221 /// // Each subscriber receives all phases
222 /// tokio::spawn(async move {
223 /// while let Ok(phase) = rx1.recv().await {
224 /// match phase {
225 /// ShutdownPhase::Initiated => {
226 /// // Close external connections
227 /// }
228 /// ShutdownPhase::GracePeriodStarted { timeout } => {
229 /// // Log remaining time
230 /// }
231 /// ShutdownPhase::GracePeriodEnded => {
232 /// // Flush buffers
233 /// }
234 /// }
235 /// }
236 /// });
237 /// # Ok(())
238 /// # }
239 /// ```
240 #[must_use]
241 pub fn shutdown_notifier(&self) -> &ShutdownNotifier {
242 &self.shutdown_notifier
243 }
244
245 /// Returns a cancellation token that is triggered when shutdown begins.
246 ///
247 /// This is a convenience method equivalent to calling
248 /// `router.shutdown_notifier().cancellation_token()`.
249 ///
250 /// The token is triggered when the server receives a shutdown signal (SIGTERM/SIGINT).
251 /// Use it in background tasks to gracefully stop work.
252 ///
253 /// # Example
254 ///
255 /// ```rust,no_run
256 /// use axum_conf::{Config, FluentRouter};
257 /// use std::time::Duration;
258 ///
259 /// # async fn example() -> axum_conf::Result<()> {
260 /// let router = FluentRouter::without_state(Config::default())?;
261 /// let token = router.cancellation_token();
262 ///
263 /// // Background task that respects shutdown
264 /// tokio::spawn(async move {
265 /// let mut interval = tokio::time::interval(Duration::from_secs(60));
266 /// loop {
267 /// tokio::select! {
268 /// _ = token.cancelled() => {
269 /// tracing::info!("Periodic task stopping");
270 /// break;
271 /// }
272 /// _ = interval.tick() => {
273 /// // Do periodic work
274 /// tracing::debug!("Running periodic task");
275 /// }
276 /// }
277 /// }
278 /// });
279 /// # Ok(())
280 /// # }
281 /// ```
282 ///
283 /// # Multiple Tokens
284 ///
285 /// Each call returns a new clone of the token. All tokens share the same
286 /// cancellation state - when one is cancelled, all are cancelled:
287 ///
288 /// ```rust,no_run
289 /// # use axum_conf::{Config, FluentRouter};
290 /// # fn example() -> axum_conf::Result<()> {
291 /// let router = FluentRouter::without_state(Config::default())?;
292 ///
293 /// let token1 = router.cancellation_token();
294 /// let token2 = router.cancellation_token();
295 ///
296 /// // Both tokens will be cancelled simultaneously on shutdown
297 /// # Ok(())
298 /// # }
299 /// ```
300 #[must_use]
301 pub fn cancellation_token(&self) -> CancellationToken {
302 self.shutdown_notifier.cancellation_token()
303 }
304
305 /// Returns a receiver for shutdown phase notifications.
306 ///
307 /// This is a convenience method equivalent to calling
308 /// `router.shutdown_notifier().subscribe()`.
309 ///
310 /// Each call creates a new independent subscriber. Subscribers created
311 /// after a phase is emitted will not receive that phase.
312 ///
313 /// # Example
314 ///
315 /// ```rust,no_run
316 /// use axum_conf::{Config, FluentRouter, ShutdownPhase};
317 ///
318 /// # async fn example() -> axum_conf::Result<()> {
319 /// let router = FluentRouter::without_state(Config::default())?;
320 /// let mut rx = router.subscribe_to_shutdown();
321 ///
322 /// tokio::spawn(async move {
323 /// while let Ok(phase) = rx.recv().await {
324 /// tracing::info!("Shutdown phase: {:?}", phase);
325 /// }
326 /// });
327 /// # Ok(())
328 /// # }
329 /// ```
330 #[must_use]
331 pub fn subscribe_to_shutdown(&self) -> broadcast::Receiver<super::shutdown::ShutdownPhase> {
332 self.shutdown_notifier.subscribe()
333 }
334
335 /// Returns the configured PostgreSQL database pool.
336 #[cfg(feature = "postgres")]
337 pub fn db_pool(&self) -> sqlx_postgres::PgPool {
338 self.db_pool.clone()
339 }
340
341 /// Returns the circuit breaker registry.
342 ///
343 /// Use this to access circuit breakers for external service calls.
344 ///
345 /// # Example
346 ///
347 /// ```rust,no_run
348 /// # use axum_conf::{Config, FluentRouter};
349 /// # fn example(router: &FluentRouter) {
350 /// let breaker = router.circuit_breakers().get_or_default("payment-api");
351 /// if breaker.should_allow() {
352 /// // make external call
353 /// }
354 /// # }
355 /// ```
356 #[cfg(feature = "circuit-breaker")]
357 pub fn circuit_breakers(&self) -> &crate::circuit_breaker::CircuitBreakerRegistry {
358 &self.circuit_breaker_registry
359 }
360
361 /// Returns a guarded database pool with circuit breaker protection.
362 ///
363 /// The returned [`crate::circuit_breaker::GuardedPool`] wraps the underlying database pool and
364 /// tracks failures/successes for the specified target circuit breaker.
365 ///
366 /// # Arguments
367 ///
368 /// * `target` - The circuit breaker target name (e.g., "database")
369 ///
370 /// # Example
371 ///
372 /// ```rust,no_run
373 /// # use axum_conf::{Config, FluentRouter};
374 /// # async fn example(router: &FluentRouter) {
375 /// let pool = router.guarded_db_pool("database");
376 /// // Use pool.fetch_one(), pool.execute(), etc.
377 /// # }
378 /// ```
379 #[cfg(all(feature = "circuit-breaker", feature = "postgres"))]
380 pub fn guarded_db_pool(&self, target: &str) -> crate::circuit_breaker::GuardedPool {
381 crate::circuit_breaker::GuardedPool::new(
382 self.db_pool.clone(),
383 self.circuit_breaker_registry.clone(),
384 target,
385 )
386 }
387
388 /// Helper method to check if a middleware is enabled in the configuration.
389 /// Returns true if no middleware config is specified (all enabled by default),
390 /// or if the middleware is explicitly enabled/not excluded.
391 pub(crate) fn is_middleware_enabled(&self, middleware: HttpMiddleware) -> bool {
392 self.config
393 .http
394 .middleware
395 .as_ref()
396 .map(|config| config.is_enabled(middleware))
397 .unwrap_or(true) // If no middleware config, all are enabled
398 }
399
400 /// Sets a notification channel for panic messages.
401 ///
402 /// When configured, any panics caught by the panic handler middleware will
403 /// send a message to this channel. Useful for monitoring, alerting, or logging
404 /// panic events in production.
405 ///
406 /// # Arguments
407 ///
408 /// * `ch` - A tokio mpsc sender for panic notification messages
409 ///
410 /// # Examples
411 ///
412 /// ```rust,no_run
413 /// # use axum_conf::{Config, FluentRouter};
414 /// # async fn example() -> axum_conf::Result<()> {
415 /// let (tx, mut rx) = tokio::sync::mpsc::channel(100);
416 /// let config = Config::default();
417 ///
418 /// let router = FluentRouter::without_state(config)?
419 /// .with_panic_notification_channel(tx);
420 ///
421 /// // In another task, receive panic notifications
422 /// tokio::spawn(async move {
423 /// while let Some(panic_msg) = rx.recv().await {
424 /// eprintln!("Panic caught: {}", panic_msg);
425 /// }
426 /// });
427 /// # Ok(())
428 /// # }
429 /// ```
430 #[must_use]
431 pub fn with_panic_notification_channel(self, ch: tokio::sync::mpsc::Sender<String>) -> Self {
432 Self {
433 panic_channel: Some(ch),
434 ..self
435 }
436 }
437
438 /// Sets up all static directories configured in the HTTP section except the fallback one.
439 /// If protected is true, only protected directories will be added.
440 /// Otherwise only public directories are added.
441 pub fn setup_directories(mut self, protected: bool) -> Result<Self> {
442 // Add all other static directories
443 for dir in &self.config.http.directories {
444 if let StaticDirRoute::Route(route) = &dir.route
445 && dir.protected == protected
446 {
447 let serve_dir = ServeDir::new(&dir.directory)
448 .append_index_html_on_directories(true)
449 .precompressed_br()
450 .precompressed_gzip();
451
452 tracing::trace!(
453 %protected,
454 %route,
455 directory = %dir.directory,
456 "Setup static file mapping",
457 );
458
459 // Add cache headers if configured
460 if let Some(max_age) = dir.cache_max_age {
461 let cache_value = format!("public, max-age={}", max_age);
462 self.inner = self.inner.nest_service(
463 route,
464 tower::ServiceBuilder::new()
465 .layer(SetResponseHeaderLayer::if_not_present(
466 http::header::CACHE_CONTROL,
467 http::HeaderValue::from_str(&cache_value)?,
468 ))
469 .service(serve_dir),
470 );
471 } else {
472 self.inner = self.inner.nest_service(route, serve_dir);
473 }
474 }
475 }
476 Ok(self)
477 }
478
479 /// Sets up a fallback static file directory if configured.
480 pub fn setup_fallback_files(mut self) -> Result<Self> {
481 if let Some(dir) = self
482 .config
483 .http
484 .directories
485 .iter()
486 .find(|dir| dir.is_fallback())
487 {
488 let serve_dir = ServeDir::new(&dir.directory)
489 .append_index_html_on_directories(true)
490 .precompressed_br()
491 .precompressed_gzip();
492
493 // Add cache headers if configured
494 if let Some(max_age) = dir.cache_max_age {
495 let cache_value = format!("public, max-age={}", max_age);
496 tracing::trace!(%max_age, directory=%dir.directory, "Mapping fallback path");
497 self.inner = self.inner.fallback_service(
498 tower::ServiceBuilder::new()
499 .layer(SetResponseHeaderLayer::if_not_present(
500 http::header::CACHE_CONTROL,
501 http::HeaderValue::from_str(&cache_value)?,
502 ))
503 .service(serve_dir),
504 );
505 } else {
506 self.inner = self.inner.fallback_service(serve_dir);
507 }
508 }
509 Ok(self)
510 }
511
512 /// Sets up public (unprotected) static file directories.
513 ///
514 /// Convenience method that calls `setup_directories(false)` to serve
515 /// static files that don't require authentication.
516 ///
517 /// # Examples
518 ///
519 /// ```toml
520 /// [[http.directories]]
521 /// directory = "./public"
522 /// route = "/static"
523 /// protected = false
524 /// ```
525 pub fn setup_public_files(self) -> Result<Self> {
526 self.setup_directories(false)
527 }
528
529 /// Sets up protected static file directories that require authentication.
530 ///
531 /// Convenience method that calls `setup_directories(true)` to serve
532 /// static files only to authenticated users. Must be called after
533 /// authentication middleware is set up.
534 ///
535 /// # Examples
536 ///
537 /// ```toml
538 /// [[http.directories]]
539 /// directory = "./private"
540 /// route = "/downloads"
541 /// protected = true
542 /// ```
543 pub fn setup_protected_files(self) -> Result<Self> {
544 self.setup_directories(true)
545 }
546}