axum_conf/fluent/request.rs
1//! Request handling middleware: payload limits, concurrency, deduplication, request ID, sensitive headers.
2
3use super::router::FluentRouter;
4use crate::HttpMiddleware;
5
6use {
7 crate::utils::RequestIdGenerator,
8 http::HeaderName,
9 tower_http::request_id::{PropagateRequestIdLayer, SetRequestIdLayer},
10};
11
12#[cfg(feature = "payload-limit")]
13use {axum::extract::DefaultBodyLimit, tower_http::limit::RequestBodyLimitLayer};
14
15#[cfg(feature = "concurrency-limit")]
16use tower::limit::ConcurrencyLimitLayer;
17
18#[cfg(feature = "sensitive-headers")]
19use {
20 http::header::AUTHORIZATION, std::iter::once,
21 tower_http::sensitive_headers::SetSensitiveHeadersLayer,
22};
23
24#[cfg(feature = "deduplication")]
25use {
26 super::dedup::{self, DeduplicationLayer},
27 std::time::Duration,
28 tokio_util::task::AbortOnDropHandle,
29};
30
31impl<State> FluentRouter<State>
32where
33 State: Clone + Send + Sync + 'static,
34{
35 /// Sets up maximum request payload size limits.
36 ///
37 /// Rejects requests with bodies larger than the configured limit with
38 /// a `413 Payload Too Large` response.
39 ///
40 /// # Configuration
41 ///
42 /// ```toml
43 /// [http]
44 /// max_payload_size_bytes = "1MiB" # Supports KiB, MiB, GiB
45 /// ```
46 ///
47 /// # Default
48 ///
49 /// 32 KiB if not configured.
50 #[cfg(feature = "payload-limit")]
51 #[must_use]
52 pub fn setup_max_payload_size(mut self) -> Self {
53 if !self.is_middleware_enabled(HttpMiddleware::MaxPayloadSize) {
54 tracing::trace!("MaxPayloadSize middleware skipped (disabled in config)");
55 return self;
56 }
57
58 tracing::trace!(
59 max_payload_size = %self.config.http.max_payload_size_bytes,
60 "MaxPayloadSize middleware enabled"
61 );
62 self.inner =
63 self.inner
64 .layer(DefaultBodyLimit::disable())
65 .layer(RequestBodyLimitLayer::new(
66 self.config.http.max_payload_size_bytes.as_u64() as usize,
67 ));
68 self
69 }
70
71 /// No-op when `payload-limit` feature is disabled.
72 #[cfg(not(feature = "payload-limit"))]
73 #[must_use]
74 pub fn setup_max_payload_size(self) -> Self {
75 self
76 }
77
78 /// Sets up concurrent request limits.
79 ///
80 /// Limits the number of requests being processed simultaneously. When the
81 /// limit is reached, new requests receive a `503 Service Unavailable` response.
82 ///
83 /// # Configuration
84 ///
85 /// ```toml
86 /// [http]
87 /// max_concurrent_requests = 4096 # Default
88 /// ```
89 ///
90 /// # Use Cases
91 ///
92 /// - Prevent resource exhaustion under heavy load
93 /// - Maintain stable response times
94 /// - Protect downstream services
95 #[cfg(feature = "concurrency-limit")]
96 #[must_use]
97 pub fn setup_concurrency_limit(mut self) -> Self {
98 if !self.is_middleware_enabled(HttpMiddleware::ConcurrencyLimit) {
99 tracing::trace!("ConcurrencyLimit middleware skipped (disabled in config)");
100 return self;
101 }
102
103 tracing::trace!(
104 max_concurrent_requests = self.config.http.max_concurrent_requests,
105 "ConcurrencyLimit middleware enabled"
106 );
107 self.inner = self.inner.layer(ConcurrencyLimitLayer::new(
108 self.config.http.max_concurrent_requests as usize,
109 ));
110 self
111 }
112
113 /// No-op when `concurrency-limit` feature is disabled.
114 #[cfg(not(feature = "concurrency-limit"))]
115 #[must_use]
116 pub fn setup_concurrency_limit(self) -> Self {
117 self
118 }
119
120 /// Sets up request ID generation and propagation.
121 ///
122 /// Adds two middleware layers:
123 /// 1. Generates or preserves `x-request-id` headers
124 /// 2. Propagates the request ID to response headers
125 ///
126 /// Request IDs are UUIDv7 values that enable:
127 /// - Distributed tracing across services
128 /// - Log correlation
129 /// - Request debugging
130 ///
131 /// If a request already has an `x-request-id` header, it is preserved.
132 #[must_use]
133 pub fn setup_request_id(mut self) -> Self {
134 if !self.is_middleware_enabled(HttpMiddleware::RequestId) {
135 tracing::trace!("RequestId middleware skipped (disabled in config)");
136 return self;
137 }
138
139 tracing::trace!("RequestId middleware enabled");
140 let x_request_id = HeaderName::from_static("x-request-id");
141 self.inner = self
142 .inner
143 .layer(SetRequestIdLayer::new(
144 x_request_id.clone(),
145 RequestIdGenerator,
146 ))
147 .layer(PropagateRequestIdLayer::new(x_request_id));
148 self
149 }
150
151 /// Sets up request deduplication middleware using axum-idempotent.
152 ///
153 /// When enabled in configuration, this prevents duplicate requests (identified
154 /// by the same `x-request-id` header) from being processed simultaneously.
155 /// Instead, duplicate requests receive the cached response from the original request.
156 ///
157 /// # Configuration
158 ///
159 /// ```toml
160 /// [http.deduplication]
161 /// enabled = true
162 /// ttl = "5m" # Keep responses cached for 5 minutes
163 /// max_entries = 10000 # Maximum cache size (not used by axum-idempotent)
164 /// ```
165 ///
166 /// # Behavior
167 ///
168 /// - If a request with the same ID is being processed, waits and returns the same response
169 /// - If a request with the same ID completed within TTL, returns the cached response
170 /// - Uses the `x-request-id` header as the idempotency key
171 /// - After TTL expires, the same request ID triggers a new request
172 ///
173 /// # Performance
174 ///
175 /// Uses an in-memory cache with automatic expiration based on TTL.
176 ///
177 /// This middleware should be added **after** `setup_request_id()` to ensure
178 /// all requests have an `x-request-id` header before deduplication checking.
179 /// That's because axum handles layers from the outside in.
180 ///
181 /// # Implementation
182 ///
183 /// Uses a custom session-free implementation that caches responses in memory.
184 /// Only successful responses (2xx, 3xx) are cached. Error responses are not cached
185 /// to avoid caching transient errors.
186 #[cfg(feature = "deduplication")]
187 #[must_use]
188 pub fn setup_deduplication(mut self) -> Self {
189 if !self.is_middleware_enabled(HttpMiddleware::RequestDeduplication) {
190 tracing::trace!("Deduplication middleware skipped (disabled in config)");
191 return self;
192 }
193
194 if !self.is_middleware_enabled(HttpMiddleware::RequestId) {
195 tracing::error!(
196 "RequestId middleware must be enabled and added after deduplication because axum handles layers from the outside in."
197 );
198 return self;
199 }
200
201 if let Some(dedup_config) = &self.config.http.deduplication {
202 tracing::trace!(
203 ttl = ?dedup_config.ttl,
204 max_entries = dedup_config.max_entries,
205 "Deduplication middleware enabled"
206 );
207 let layer =
208 DeduplicationLayer::new(dedup_config.ttl, dedup_config.max_entries, "x-request-id");
209
210 // Spawn cleanup task to remove expired entries every minute
211 let tracker = layer.tracker();
212 let cleanup_interval = Duration::from_secs(60);
213 let handle = tokio::spawn(dedup::cleanup_task(tracker, cleanup_interval));
214 self.dedup_cleanup_handle = Some(AbortOnDropHandle::new(handle));
215
216 self.inner = self.inner.layer(layer);
217 }
218 self
219 }
220
221 /// No-op when `deduplication` feature is disabled.
222 #[cfg(not(feature = "deduplication"))]
223 #[must_use]
224 pub fn setup_deduplication(self) -> Self {
225 self
226 }
227
228 /// Marks sensitive headers to prevent them from appearing in logs.
229 ///
230 /// Protects the `Authorization` header (and any other configured headers)
231 /// from being logged by middleware, preventing credential leaks in logs.
232 ///
233 /// # Protected Headers
234 ///
235 /// - `Authorization` - Bearer tokens, Basic auth, etc.
236 ///
237 /// Additional headers can be protected by modifying this method.
238 #[cfg(feature = "sensitive-headers")]
239 #[must_use]
240 pub fn setup_sensitive_headers(mut self) -> Self {
241 if !self.is_middleware_enabled(HttpMiddleware::SensitiveHeaders) {
242 tracing::trace!("SensitiveHeaders middleware skipped (disabled in config)");
243 return self;
244 }
245
246 tracing::trace!("SensitiveHeaders middleware enabled");
247 self.inner = self
248 .inner
249 .layer(SetSensitiveHeadersLayer::new(once(AUTHORIZATION)));
250 self
251 }
252
253 /// No-op when `sensitive-headers` feature is disabled.
254 #[cfg(not(feature = "sensitive-headers"))]
255 #[must_use]
256 pub fn setup_sensitive_headers(self) -> Self {
257 self
258 }
259}