use dashmap::DashMap;
use rustapi_core::{
middleware::{BoxedNext, MiddlewareLayer},
Request, Response, ResponseBody,
};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Clone)]
pub struct DedupConfig {
pub header_name: String,
pub ttl: Duration,
}
impl Default for DedupConfig {
fn default() -> Self {
Self {
header_name: "Idempotency-Key".to_string(),
ttl: Duration::from_secs(300), }
}
}
#[derive(Clone)]
pub struct DedupLayer {
config: DedupConfig,
store: Arc<DashMap<String, Instant>>,
}
impl DedupLayer {
pub fn new() -> Self {
Self {
config: DedupConfig::default(),
store: Arc::new(DashMap::new()),
}
}
pub fn header_name(mut self, name: impl Into<String>) -> Self {
self.config.header_name = name.into();
self
}
pub fn ttl(mut self, ttl: Duration) -> Self {
self.config.ttl = ttl;
self
}
}
impl Default for DedupLayer {
fn default() -> Self {
Self::new()
}
}
impl MiddlewareLayer for DedupLayer {
fn call(
&self,
req: Request,
next: BoxedNext,
) -> Pin<Box<dyn Future<Output = Response> + Send + 'static>> {
let config = self.config.clone();
let store = self.store.clone();
Box::pin(async move {
let key = if let Some(val) = req.headers().get(&config.header_name) {
match val.to_str() {
Ok(s) => s.to_string(),
Err(_) => return next(req).await, }
} else {
return next(req).await;
};
if let Some(created_at) = store.get(&key) {
if created_at.elapsed() < config.ttl {
return http::Response::builder()
.status(409) .header("Content-Type", "application/json")
.body(ResponseBody::Full(http_body_util::Full::new(bytes::Bytes::from(
serde_json::json!({
"error": {
"type": "duplicate_request",
"message": format!("Request with key '{}' has already been processed or is processing", key)
}
})
.to_string(),
))))
.unwrap();
} else {
drop(created_at);
store.remove(&key);
}
}
store.insert(key.clone(), Instant::now());
let response = next(req).await;
response
})
}
fn clone_box(&self) -> Box<dyn MiddlewareLayer> {
Box::new(self.clone())
}
}