#[cfg(test)]
mod tests;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use crate::application::Application;
use crate::core::New;
use crate::mime_type::MimeType;
use crate::range::Range;
use crate::request::Request;
use crate::response::{Response, STATUS_CODE_REASON_PHRASE};
use crate::router::PathParams;
use crate::server::ConnectionInfo;
fn run_with_timeout<T, F>(duration: Duration, compute: F) -> Option<T>
where
T: Send + 'static,
F: FnOnce() -> T + Send + 'static,
{
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let _ = tx.send(compute());
});
rx.recv_timeout(duration).ok()
}
fn timeout_response() -> Response {
let cr = Range::get_content_range(
b"504 Gateway Timeout".to_vec(),
MimeType::TEXT_PLAIN.to_string(),
);
let mut r = Response::new();
r.status_code = *STATUS_CODE_REASON_PHRASE.n504_gateway_timeout.status_code;
r.reason_phrase = STATUS_CODE_REASON_PHRASE.n504_gateway_timeout.reason_phrase.to_string();
r.content_range_list = vec![cr];
r
}
pub fn with_timeout<F>(
duration: Duration,
handler: F,
) -> impl Fn(&Request, &PathParams, &ConnectionInfo) -> Response + Send + Sync + 'static
where
F: Fn(&Request, &PathParams, &ConnectionInfo) -> Response + Send + Sync + 'static,
{
let handler = Arc::new(handler);
move |req, params, conn| {
let handler = Arc::clone(&handler);
let req = req.clone();
let params = params.clone();
let conn = conn.clone();
run_with_timeout(duration, move || handler(&req, ¶ms, &conn))
.unwrap_or_else(timeout_response)
}
}
pub fn with_timeout_state<S, F>(
duration: Duration,
handler: F,
) -> impl Fn(&Request, &PathParams, &ConnectionInfo, &S) -> Response + Send + Sync + 'static
where
S: Clone + Send + Sync + 'static,
F: Fn(&Request, &PathParams, &ConnectionInfo, &S) -> Response + Send + Sync + 'static,
{
let handler = Arc::new(handler);
move |req, params, conn, state: &S| {
let handler = Arc::clone(&handler);
let req = req.clone();
let params = params.clone();
let conn = conn.clone();
let state = state.clone();
run_with_timeout(duration, move || handler(&req, ¶ms, &conn, &state))
.unwrap_or_else(timeout_response)
}
}
pub struct TimeoutLayer<A: ?Sized> {
inner: Arc<A>,
duration: Duration,
}
impl<A: Application + Send + Sync + 'static> TimeoutLayer<A> {
pub fn new(inner: A, duration: Duration) -> Self {
TimeoutLayer { inner: Arc::new(inner), duration }
}
}
impl<A: Application + Send + Sync + ?Sized + 'static> TimeoutLayer<A> {
pub fn from_arc(inner: Arc<A>, duration: Duration) -> Self {
TimeoutLayer { inner, duration }
}
}
impl<A: Application + Send + Sync + ?Sized + 'static> Application for TimeoutLayer<A> {
fn execute(&self, request: &Request, connection: &ConnectionInfo) -> Result<Response, String> {
let inner = Arc::clone(&self.inner);
let request = request.clone();
let connection = connection.clone();
match run_with_timeout(self.duration, move || inner.execute(&request, &connection)) {
Some(result) => result,
None => Ok(timeout_response()),
}
}
}
#[cfg(feature = "http2")]
mod async_timeout {
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use crate::request::Request;
use crate::response::Response;
use crate::router::PathParams;
use crate::server::ConnectionInfo;
type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
pub fn with_timeout_async<S, F, Fut>(
duration: Duration,
handler: F,
) -> impl Fn(Request, PathParams, ConnectionInfo, Arc<S>) -> BoxFuture<Response> + Send + Sync + 'static
where
S: Send + Sync + 'static,
F: Fn(Request, PathParams, ConnectionInfo, Arc<S>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Response> + Send + 'static,
{
move |req, params, conn, state| {
let fut = handler(req, params, conn, state);
Box::pin(async move {
match tokio::time::timeout(duration, fut).await {
Ok(response) => response,
Err(_) => super::timeout_response(),
}
})
}
}
}
#[cfg(feature = "http2")]
pub use async_timeout::with_timeout_async;