axum_idempotent/lib.rs
1//! Middleware for handling idempotent requests in axum applications.
2//!
3//! This crate provides middleware that ensures idempotency of HTTP requests. When an
4//! identical request is made, a cached response is returned instead of re-executing
5//! the handler, preventing duplicate operations like accidental double payments.
6//!
7//! ## How it Works
8//!
9//! The middleware operates in one of two modes:
10//!
11//! 1. **Direct Key Mode (Recommended):** By configuring `use_idempotency_key_header()`, the
12//! middleware uses a client-provided header (e.g., `Idempotency-Key`) value directly
13//! as the cache key. This is the most performant and observable method, as it avoids
14//! server-side hashing and uses an identifier known to both the client and server.
15//!
16//! 2. **Hashing Mode:** If not using a direct key, a unique hash is generated
17//! from the request's method, path, headers (configurable), and body. This hash is
18//! then used as the cache key.
19//!
20//! If a key is found in the session store, the cached response is returned immediately.
21//! If not, the request is processed by the handler, and the response is cached before
22//! being sent to the client.
23//!
24//! ## Features
25//!
26//! - Request deduplication using either a direct client-provided key or automatic request hashing.
27//! - Configurable response caching duration.
28//! - Fine-grained controls for hashing, including ignoring the request body or specific headers.
29//! - Observability through a replay header (default: `idempotency-replayed`) on cached responses.
30//! - Seamless integration with session-based storage via the `ruts` crate.
31//!
32//! ## Example
33//!
34//! ```rust,no_run
35//! use std::sync::Arc;
36//! use axum::{Router, routing::post};
37//! use ruts::{CookieOptions, SessionLayer};
38//! use axum_idempotent::{IdempotentLayer, IdempotentOptions};
39//! use tower_cookies::CookieManagerLayer;
40//! use ruts::store::memory::MemoryStore;
41//!
42//! #[tokio::main]
43//! async fn main() {
44//! // Your session store
45//! let store = Arc::new(MemoryStore::new());
46//!
47//! // Configure the idempotency layer to use the "Idempotency-Key" header
48//! let idempotent_options = IdempotentOptions::default()
49//! .use_idempotency_key_header(Some("Idempotency-Key"))
50//! .expire_after(60 * 5); // Cache responses for 5 minutes
51//!
52//! // Create the router
53//! let app = Router::new()
54//! .route("/payments", post(process_payment))
55//! .layer(IdempotentLayer::<MemoryStore>::new(idempotent_options))
56//! .layer(SessionLayer::new(store)
57//! .with_cookie_options(CookieOptions::build().name("session")))
58//! .layer(CookieManagerLayer::new());
59//!
60//! let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
61//! axum::serve(listener, app).await.unwrap();
62//! }
63//!
64//! async fn process_payment() -> &'static str {
65//! "Payment processed"
66//! }
67//! ```
68//!
69//! ## Default Behavior
70//!
71//! `axum-idempotent` is configured with safe defaults to prevent common issues.
72//!
73//! ### Ignored Status Codes
74//!
75//! To avoid caching transient server errors or certain client errors, responses with
76//! the following HTTP status codes are **not cached** by default:
77//! - `400 Bad Request`
78//! - `401 Unauthorized`
79//! - `403 Forbidden`
80//! - `408 Request Timeout`
81//! - `429 Too Many Requests`
82//! - `500 Internal Server Error`
83//! - `502 Bad Gateway`
84//! - `503 Service Unavailable`
85//! - `504 Gateway Timeout`
86//!
87//! ### Ignored Headers
88//!
89//! In hashing mode, common, request-specific headers
90//! are ignored by default to ensure that requests from different clients are treated as
91//! identical if the core parameters are the same. This does not apply when using
92//! `use_idempotency_key_header`.
93//!
94//! - user-agent,
95//! - accept,
96//! - accept-encoding,
97//! - accept-language,
98//! - cache-control,
99//! - connection,
100//! - cookie,
101//! - host,
102//! - pragma,
103//! - referer,
104//! - sec-fetch-dest,
105//! - sec-fetch-mode,
106//! - sec-fetch-site,
107//! - sec-ch-ua,
108//! - sec-ch-ua-mobile,
109//! - sec-ch-ua-platform
110
111use axum::extract::Request;
112use axum::response::Response;
113use axum::RequestExt;
114use ruts::store::SessionStore;
115use ruts::Session;
116use std::error::Error;
117use std::future::Future;
118use std::marker::PhantomData;
119use std::pin::Pin;
120use std::task::{Context, Poll};
121use tower_layer::Layer;
122use tower_service::Service;
123
124mod utils;
125
126mod config;
127pub use crate::config::IdempotentOptions;
128use crate::utils::{bytes_to_response, hash_request, response_to_bytes};
129
130/// Service that handles idempotent request processing.
131#[derive(Clone, Debug)]
132pub struct IdempotentService<S, T> {
133 inner: S,
134 config: IdempotentOptions,
135 phantom: PhantomData<T>,
136}
137
138impl<S, T> IdempotentService<S, T> {
139 pub const fn new(inner: S, config: IdempotentOptions) -> Self {
140 IdempotentService::<S, T> {
141 inner,
142 config,
143 phantom: PhantomData,
144 }
145 }
146}
147
148impl<S, T> Service<Request> for IdempotentService<S, T>
149where
150 S: Service<Request, Response = Response> + Clone + Send + 'static,
151 S::Error: Send,
152 S::Future: Send + 'static,
153 T: SessionStore,
154{
155 type Response = S::Response;
156 type Error = S::Error;
157 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
158
159 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
160 self.inner.poll_ready(cx).map_err(Into::into)
161 }
162
163 fn call(&mut self, mut req: Request) -> Self::Future {
164 let clone = self.inner.clone();
165 let mut inner = std::mem::replace(&mut self.inner, clone);
166 let config = self.config.clone();
167
168 Box::pin(async move {
169 let session = match req.extract_parts::<Session<T>>().await {
170 Ok(session) => session,
171 Err(err) => {
172 tracing::error!("Failed to extract Session from request: {err:?}");
173 // Forward the request to the inner service without idempotency
174 return inner.call(req).await;
175 }
176 };
177
178 let (req, hash) = hash_request(req, &config).await;
179
180 if let Some(hash) = &hash {
181 match check_cached_response(hash, &session).await {
182 Ok(Some(mut res)) => {
183 res.headers_mut()
184 .insert(config.replay_header_name, "true".parse().unwrap());
185 return Ok(res)
186 },
187 Ok(None) => {} // No cached response, continue
188 Err(err) => {
189 tracing::error!("Failed to check idempotent cached response: {err:?}");
190 // Continue without cache
191 }
192 }
193 }
194
195 let res = inner.call(req).await?;
196 let status_code = res.status();
197 if !config.ignored_res_status_codes.contains(&status_code) {
198 if let Some(hash) = &hash {
199 let (res, response_bytes) = response_to_bytes(res).await;
200
201 #[cfg(feature = "layered-store")]
202 let result = session
203 .update(&hash, &response_bytes, Some(config.body_cache_ttl_secs), config.layered_hot_cache_ttl_secs)
204 .await;
205 #[cfg(not(feature = "layered-store"))]
206 let result = session
207 .update(&hash, &response_bytes, Some(config.body_cache_ttl_secs), None)
208 .await;
209
210 if let Err(err) = result {
211 tracing::error!("Failed to cache idempotent response: {err:?}");
212 }
213
214 return Ok(res)
215 }
216 }
217
218 Ok(res)
219 })
220 }
221}
222
223/// Layer to apply [`IdempotentService`] middleware in `axum`.
224///
225/// This layer caches responses in a session store and returns the cached response
226/// for identical requests within the configured expiration time.
227///
228/// # Example
229/// ```rust,no_run
230/// use std::sync::Arc;
231/// use axum::Router;
232/// use axum::routing::get;
233/// use ruts::{CookieOptions, SessionLayer};
234/// use axum_idempotent::{IdempotentLayer, IdempotentOptions};
235/// use tower_cookies::CookieManagerLayer;
236///
237/// #[tokio::main]
238/// async fn main() {
239/// use ruts::store::memory::MemoryStore;
240/// let store = Arc::new(MemoryStore::new());
241///
242/// let idempotent_options = IdempotentOptions::default().expire_after(3);
243/// let idempotent_layer = IdempotentLayer::<MemoryStore>::new(idempotent_options);
244///
245/// let app = Router::new()
246/// .route("/test", get(|| async { "Hello, World!"}))
247/// .layer(idempotent_layer)
248/// .layer(SessionLayer::new(store.clone())
249/// .with_cookie_options(CookieOptions::build().name("session").max_age(10).path("/")))
250/// .layer(CookieManagerLayer::new());
251/// let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
252/// axum::serve(listener, app).await.unwrap();
253/// }
254/// ```
255#[derive(Clone, Debug)]
256pub struct IdempotentLayer<T> {
257 config: IdempotentOptions,
258 phantom_data: PhantomData<T>,
259}
260
261impl<T> IdempotentLayer<T> {
262 pub const fn new(config: IdempotentOptions) -> Self {
263 IdempotentLayer {
264 config,
265 phantom_data: PhantomData,
266 }
267 }
268}
269
270impl<S, T> Layer<S> for IdempotentLayer<T> {
271 type Service = IdempotentService<S, T>;
272
273 fn layer(&self, service: S) -> Self::Service {
274 IdempotentService::new(service, self.config.clone())
275 }
276}
277
278async fn check_cached_response<T: SessionStore>(
279 hash: impl AsRef<str>,
280 session: &Session<T>,
281) -> Result<Option<Response>, Box<dyn Error + Send + Sync>> {
282 let response_bytes = session.get::<Vec<u8>>(hash.as_ref()).await?;
283
284 let res = if let Some(bytes) = response_bytes {
285 let response = bytes_to_response(bytes)?;
286
287 Some(response)
288 } else {
289 None
290 };
291
292 Ok(res)
293}