#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(docsrs, feature(doc_cfg_hide))]
#![cfg_attr(docsrs, doc(cfg_hide(feature = "uring")))]
#![deny(
unreachable_pub,
missing_debug_implementations,
missing_docs,
clippy::pedantic
)]
#![allow(
// I WANT A LONG fn!
clippy::too_many_lines,
// I know what I'm doing with unwraps. They should all be motivated.
clippy::missing_panics_doc,
// When a parameter of a function is prefixed due to `#[cfg]` in an fn.
clippy::used_underscore_binding,
// Same as ↑.
clippy::unused_self,
// When a enum variant has been conditionally compiled away.
irrefutable_let_patterns,
)]
#![doc(html_favicon_url = "https://kvarn.org/favicon.svg")]
#![doc(html_logo_url = "https://kvarn.org/logo.svg")]
#![doc(html_root_url = "https://doc.kvarn.org/")]
#![cfg_attr(
not(feature = "async-networking"),
allow(clippy::must_use_candidate, clippy::unused_async)
)]
pub mod application;
pub mod comprash;
pub mod cors;
pub mod csp;
#[cfg(feature = "handover")]
pub mod ctl;
pub mod encryption;
pub mod error;
pub mod extensions;
pub mod host;
pub mod limiting;
pub mod prelude;
pub mod read;
pub mod shutdown;
#[cfg(all(feature = "uring", feature = "http3"))]
mod uring_udp;
pub mod vary;
pub mod websocket;
use prelude::{chrono::*, internals::*, networking::*, *};
pub use error::{default as default_error, default_response as default_error_response};
pub use extensions::{Extensions, Id};
pub use read::{file as read_file, file_cached as read_file_cached};
#[derive(Debug)]
#[must_use = "must start a server if creating a config"]
pub struct RunConfig {
ports: Vec<PortDescriptor>,
#[cfg(feature = "handover")]
ctl: bool,
#[cfg(feature = "handover")]
ctl_path: Option<PathBuf>,
#[cfg(feature = "handover")]
plugins: ctl::Plugins,
}
impl RunConfig {
pub fn new() -> Self {
RunConfig {
ports: vec![],
#[cfg(feature = "handover")]
ctl: true,
#[cfg(feature = "handover")]
ctl_path: None,
#[cfg(feature = "handover")]
plugins: ctl::Plugins::default(),
}
}
pub fn bind(mut self, port: PortDescriptor) -> Self {
self.ports.push(port);
self
}
#[cfg(feature = "handover")]
pub fn disable_ctl(mut self) -> Self {
self.ctl = false;
self
}
#[cfg(feature = "handover")]
pub fn set_ctl_path(mut self, path: impl AsRef<Path>) -> Self {
self.ctl_path = Some(path.as_ref().to_path_buf());
self
}
#[cfg(feature = "handover")]
pub fn add_plugin(mut self, name: impl AsRef<str>, plugin: ctl::Plugin) -> Self {
self.plugins.add_plugin(name, plugin);
self
}
#[allow(clippy::type_complexity)]
pub async fn execute(self) -> Arc<shutdown::Manager> {
#[cfg(feature = "async-networking")]
use socket2::{Domain, Protocol, Type};
#[cfg(feature = "https")]
encryption::attach_crypto_provider();
let RunConfig {
ports,
#[cfg(feature = "handover")]
ctl,
#[cfg(feature = "handover")]
ctl_path,
#[cfg(feature = "handover")]
plugins,
} = self;
info!("Starting server on {} ports.", ports.len());
let len = ports.len();
#[allow(unused_mut)] let mut shutdown_manager = unsafe { shutdown::Manager::new(len * 8) };
#[cfg(feature = "handover")]
let ports_clone = Arc::new(ports.clone());
#[cfg(feature = "uring")]
let instances = std::thread::available_parallelism()
.map(std::num::NonZeroUsize::get)
.unwrap_or(16);
#[cfg(not(feature = "uring"))]
let instances = 1;
let mut all_listeners: Vec<
Vec<(
Box<dyn Fn() -> AcceptManager + Send + Sync>,
Arc<PortDescriptor>,
)>,
> = Vec::with_capacity(instances);
let ports: Vec<_> = ports.into_iter().map(Arc::new).collect();
#[cfg(feature = "handover")]
let handover_path = ctl_path.unwrap_or_else(ctl::socket_path);
#[cfg(feature = "graceful-shutdown")]
{
shutdown_manager.handover_socket_path = Some(handover_path.clone());
}
let shutdown_manager = shutdown_manager.build();
for _ in 0..instances {
let mut listeners: Vec<(Box<dyn Fn() -> AcceptManager + Send + Sync>, _)> = Vec::new();
#[cfg(feature = "async-networking")]
for descriptor in &ports {
fn create_listener(
create_socket: impl Fn() -> socket2::Socket,
#[allow(unused_variables)] tcp: bool,
address: SocketAddr,
shutdown_manager: &shutdown::Manager,
#[allow(unused_variables)] descriptor: &PortDescriptor,
) -> AcceptManager {
let socket = create_socket();
socket
.set_nonblocking(true)
.expect("Failed to set `nonblocking` for socket.");
#[cfg(unix)]
let _ = socket.set_cloexec(true);
#[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
{
if socket.set_reuse_address(true).is_err()
|| socket.set_reuse_port(true).is_err()
{
error!("Failed to set reuse address/port. This is needed for graceful shutdown handover.");
}
}
socket
.bind(&address.into())
.expect("Failed to bind address");
#[cfg(all(feature = "http3", not(feature = "uring")))]
if !tcp {
return shutdown_manager.add_listener(shutdown::Listener::Udp(
h3_quinn::Endpoint::new(
quinn::EndpointConfig::default(),
Some(quinn::ServerConfig::with_crypto(Arc::new(
quinn::crypto::rustls::QuicServerConfig::try_from(
descriptor.data.make_config(),
)
.unwrap(),
))),
socket.into(),
h3_quinn::quinn::default_runtime().unwrap(),
)
.unwrap(),
));
}
#[cfg(all(feature = "http3", feature = "uring"))]
if !tcp {
let endpoint = h3_quinn::Endpoint::new_with_abstract_socket(
quinn::EndpointConfig::default(),
Some(quinn::ServerConfig::with_crypto(Arc::new(
quinn::crypto::rustls::QuicServerConfig::try_from(
descriptor.data.make_config(),
)
.unwrap(),
))),
Arc::new(
uring_udp::UringUdpSocket::new(
tokio_uring::net::UdpSocket::from_std(socket.into()),
address.is_ipv4(),
)
.expect("failed to change socket settings"),
),
h3_quinn::quinn::default_runtime().unwrap(),
)
.unwrap();
return shutdown_manager.add_listener(shutdown::Listener::Udp(endpoint));
}
socket
.listen(1024)
.expect("Failed to listen on bound address.");
#[cfg(feature = "uring")]
let listener = tokio_uring::net::TcpListener::from_std(socket.into());
#[cfg(not(feature = "uring"))]
let listener = TcpListener::from_std(socket.into()).unwrap();
shutdown_manager.add_listener(shutdown::Listener::Tcp(listener))
}
if matches!(descriptor.version, BindIpVersion::V4 | BindIpVersion::Both) {
let mgr = shutdown_manager.clone();
let d = descriptor.clone();
let listener = move || {
create_listener(
|| {
socket2::Socket::new(
Domain::IPV4,
Type::STREAM,
Some(Protocol::TCP),
)
.expect("Failed to create a new IPv4 socket configuration")
},
true,
SocketAddr::new(IpAddr::V4(net::Ipv4Addr::UNSPECIFIED), d.port),
&mgr,
&d,
)
};
listeners.push((Box::new(listener), Arc::clone(descriptor)));
}
if matches!(descriptor.version, BindIpVersion::V6 | BindIpVersion::Both) {
let mgr = shutdown_manager.clone();
let d = descriptor.clone();
let listener = move || {
create_listener(
|| {
socket2::Socket::new(
Domain::IPV6,
Type::STREAM,
Some(Protocol::TCP),
)
.expect("Failed to create a new IPv6 socket configuration")
},
true,
SocketAddr::new(IpAddr::V6(net::Ipv6Addr::UNSPECIFIED), d.port),
&mgr,
&d,
)
};
listeners.push((Box::new(listener), Arc::clone(descriptor)));
}
#[cfg(feature = "http3")]
if descriptor.server_config.is_some() {
if matches!(descriptor.version, BindIpVersion::V4 | BindIpVersion::Both) {
let mgr = shutdown_manager.clone();
let d = descriptor.clone();
let listener = move || {
create_listener(
|| {
socket2::Socket::new(
Domain::IPV4,
Type::DGRAM,
Some(Protocol::UDP),
)
.expect("Failed to create a new IPv4 socket configuration")
},
false,
SocketAddr::new(IpAddr::V4(net::Ipv4Addr::UNSPECIFIED), d.port),
&mgr,
&d,
)
};
listeners.push((Box::new(listener), Arc::clone(descriptor)));
}
if matches!(descriptor.version, BindIpVersion::V6 | BindIpVersion::Both) {
let mgr = shutdown_manager.clone();
let d = descriptor.clone();
let listener = move || {
create_listener(
|| {
socket2::Socket::new(
Domain::IPV6,
Type::DGRAM,
Some(Protocol::UDP),
)
.expect("Failed to create a new IPv6 socket configuration")
},
false,
SocketAddr::new(IpAddr::V6(net::Ipv6Addr::UNSPECIFIED), d.port),
&mgr,
&d,
)
};
listeners.push((Box::new(listener), Arc::clone(descriptor)));
}
}
}
#[cfg(not(feature = "async-networking"))]
for descriptor in &ports {
if matches!(descriptor.version, BindIpVersion::V4 | BindIpVersion::Both) {
let mgr = shutdown_manager.clone();
let d = descriptor.clone();
let listener = move || {
let listener = TcpListener::bind(SocketAddr::new(
IpAddr::V4(net::Ipv4Addr::UNSPECIFIED),
d.port,
))
.expect("Failed to bind to IPv4");
mgr.add_listener(shutdown::Listener::Tcp(listener))
};
listeners.push((Box::new(listener), descriptor.clone()));
}
if matches!(descriptor.version, BindIpVersion::V6 | BindIpVersion::Both) {
let mgr = shutdown_manager.clone();
let d = descriptor.clone();
let listener = move || {
let listener = TcpListener::bind(SocketAddr::new(
IpAddr::V6(net::Ipv6Addr::UNSPECIFIED),
d.port,
))
.expect("Failed to bind to IPv6");
mgr.add_listener(shutdown::Listener::Tcp(listener))
};
listeners.push((Box::new(listener), descriptor.clone()));
}
}
all_listeners.push(listeners);
}
#[cfg(feature = "handover")]
if ctl {
#[cfg(any(
not(feature = "graceful-shutdown"),
target_os = "illumos",
target_os = "solaris"
))]
ctl::listen(
plugins,
ports_clone,
Arc::clone(&shutdown_manager),
handover_path,
)
.await;
}
#[cfg(feature = "uring")]
info!("Starting {instances} threads with an executor and listener each.");
#[cfg(feature = "uring")]
for (n, listeners) in all_listeners.into_iter().enumerate() {
for (listener, descriptor) in listeners {
let shutdown_manager = Arc::clone(&shutdown_manager);
std::thread::spawn(move || {
tokio_uring::start(async move {
accept(listener(), descriptor, &shutdown_manager, n == 0)
.await
.expect("failed to accept message");
shutdown_manager.wait().await;
});
});
}
}
#[cfg(not(feature = "uring"))]
{
let listeners = all_listeners.into_iter().next().unwrap();
for (listener, descriptor) in listeners {
let shutdown_manager = Arc::clone(&shutdown_manager);
let future = async move {
accept(listener(), descriptor, &shutdown_manager, true)
.await
.expect("Failed to accept message!");
};
let _task = spawn(future).await;
}
}
#[cfg(feature = "handover")]
if ctl {
#[cfg(all(
feature = "graceful-shutdown",
not(target_os = "illumos"),
not(target_os = "solaris")
))]
ctl::listen(
plugins,
ports_clone,
Arc::clone(&shutdown_manager),
handover_path,
)
.await;
}
shutdown_manager
}
}
impl Default for RunConfig {
fn default() -> Self {
Self::new()
}
}
#[macro_export]
macro_rules! run_config {
($($port_descriptor:expr),+ $(,)?) => {
$crate::RunConfig::new()$(.bind($port_descriptor))+
};
}
macro_rules! ret_log_app_error {
($e:expr) => {
match $e {
Err(err) => {
if let application::Error::ClientRefusedResponse = &err {
return Ok(());
}
error!("An error occurred while sending a response. {:?}", &err);
return Err(err.into());
}
Ok(val) => val,
}
};
}
#[cfg(feature = "uring")]
#[allow(clippy::unused_async)]
#[inline]
pub async fn spawn<T: 'static>(task: impl Future<Output = T> + 'static) -> impl Future<Output = T> {
let handle = tokio_uring::spawn(task);
async move { handle.await.unwrap() }
}
#[cfg(any(not(feature = "async-networking"), not(feature = "uring")))]
#[allow(clippy::unused_async)]
#[inline]
pub async fn spawn<T: Send + 'static>(
task: impl Future<Output = T> + Send + 'static,
) -> impl Future<Output = T> {
#[cfg(not(feature = "async-networking"))]
{
task
}
#[cfg(feature = "async-networking")]
{
let handle = tokio::spawn(task);
async move { handle.await.unwrap() }
}
}
#[derive(Debug)]
pub enum Incoming {
Tcp(TcpStream),
#[cfg(feature = "http3")]
Udp(h3_quinn::quinn::Connection),
}
async fn accept(
mut listener: AcceptManager,
descriptor: Arc<PortDescriptor>,
shutdown_manager: &Arc<shutdown::Manager>,
first: bool,
) -> Result<(), io::Error> {
let local_addr = listener.inner().local_addr();
if first {
info!(
"Started listening on port {} using {}/{}",
local_addr.port(),
match listener.inner() {
shutdown::Listener::Tcp(_) => "TCP",
#[cfg(feature = "http3")]
shutdown::Listener::Udp(_) => "QUIC",
},
if local_addr.is_ipv4() { "IPv4" } else { "IPv6" }
);
}
let mut fails_without_accepting = 0usize;
let fails_without_accepting_threshold = 100;
loop {
let (stream, addr) = match listener.accept(shutdown_manager).await {
AcceptAction::Shutdown => {
if first {
info!(
"Closing listener on port {} with {}",
local_addr.port(),
if local_addr.is_ipv4() { "IPv4" } else { "IPv6" }
);
}
return Ok(());
}
AcceptAction::AcceptTcp(result) => match result {
Ok((stream, addr)) => (Incoming::Tcp(stream), addr),
Err(err) => {
#[cfg(feature = "graceful-shutdown")]
let connections = format!(
" {} current connections.",
shutdown_manager.get_connecions()
);
#[cfg(not(feature = "graceful-shutdown"))]
let connections = "";
error!("Failed to accept() on TCP listener.{connections}");
fails_without_accepting += 1;
if fails_without_accepting > fails_without_accepting_threshold {
return Err(err);
}
continue;
}
},
#[cfg(feature = "http3")]
AcceptAction::AcceptUdp(result) => match result {
Ok(stream) => {
let addr = stream.remote_address();
(Incoming::Udp(stream), addr)
}
Err(err) => {
if err.kind() == io::ErrorKind::TimedOut {
continue;
}
#[cfg(feature = "graceful-shutdown")]
let connections = format!(
" {} current connections.",
shutdown_manager.get_connecions()
);
#[cfg(not(feature = "graceful-shutdown"))]
let connections = "";
error!("Failed to accept() on UDP listener.{connections}");
fails_without_accepting += 1;
if fails_without_accepting > fails_without_accepting_threshold {
return Err(err);
}
continue;
}
},
};
fails_without_accepting = 0;
debug!(
"Accepting stream from {addr:?}: {}",
match stream {
Incoming::Tcp(_) => "TCP",
#[cfg(feature = "http3")]
Incoming::Udp(_) => "QUIC",
}
);
match descriptor.data.limiter().register(addr.ip()) {
LimitAction::Drop => {
drop(stream);
return Ok(());
}
LimitAction::Send | LimitAction::Passed => {}
}
let descriptor = Arc::clone(&descriptor);
#[cfg(feature = "graceful-shutdown")]
let shutdown_manager = Arc::clone(shutdown_manager);
let _task = spawn(async move {
#[cfg(feature = "graceful-shutdown")]
shutdown_manager.add_connection();
let _result = handle_connection(stream, addr, descriptor, || {
#[cfg(feature = "async-networking")]
{
#[cfg(feature = "graceful-shutdown")]
{
!shutdown_manager.get_shutdown(threading::Ordering::Relaxed)
}
#[cfg(not(feature = "graceful-shutdown"))]
{
true
}
}
#[cfg(not(feature = "async-networking"))]
{
false
}
})
.await;
#[cfg(feature = "graceful-shutdown")]
shutdown_manager.remove_connection();
})
.await;
}
}
pub async fn handle_connection(
stream: Incoming,
address: SocketAddr,
descriptor: Arc<PortDescriptor>,
mut continue_accepting: impl FnMut() -> bool,
) -> io::Result<()> {
let (mut http, sni, version) = match stream {
Incoming::Tcp(stream) => {
#[cfg(feature = "https")]
let encrypted = {
encryption::Encryption::new_tcp(stream, descriptor.server_config.clone())
.await
.map_err(|err| match err {
encryption::Error::Io(io) => io,
encryption::Error::Tls(tls) => {
io::Error::new(io::ErrorKind::InvalidData, tls)
}
})
}?;
#[cfg(not(feature = "https"))]
let encrypted = encryption::Encryption::new_tcp(stream);
let version = match encrypted.alpn_protocol() {
Some(b"h2") => Version::HTTP_2,
None | Some(b"http/1.1") => Version::HTTP_11,
Some(b"http/1.0") => Version::HTTP_10,
Some(b"http/0.9") => Version::HTTP_09,
Some(proto) => {
warn!(
"HTTP version not supported. \
Something is probably wrong with your alpn config. \
Client requested {}",
String::from_utf8_lossy(proto)
);
return Ok(());
}
};
let sni = encrypted.server_name().map(|s| s.to_compact_string());
debug!("New connection requesting hostname '{sni:?}'");
let http = application::HttpConnection::new(encrypted, version)
.await
.map_err::<io::Error, _>(application::Error::into)?;
(http, sni, version)
}
#[cfg(feature = "http3")]
Incoming::Udp(stream) => {
let handshake_data: Box<h3_quinn::quinn::crypto::rustls::HandshakeData> = stream
.handshake_data()
.expect("connection is established")
.downcast()
.expect("we're using rustls");
(
application::HttpConnection::Http3(
h3::server::builder()
.build(h3_quinn::Connection::new(stream))
.await
.map_err(application::Error::H3)?,
),
handshake_data.server_name.map(CompactString::from),
Version::HTTP_3,
)
}
};
debug!("Accepting requests from {}", address);
#[allow(unused_variables)]
let port = descriptor.port();
#[cfg(feature = "https")]
let secure = descriptor.server_config.is_some();
#[cfg(not(feature = "https"))]
let secure = false;
#[cfg(feature = "http3")]
let alt_svc_header = Some(format!("h3=\":{port}\";ma=2592000"));
#[cfg(not(feature = "http3"))]
let alt_svc_header: Option<String> = None;
let alt_svc_header = alt_svc_header.map(|h| Bytes::from(h.into_bytes()));
while let Ok((mut request, response_pipe)) = http
.accept(
descriptor
.data
.get_default()
.map(|host| host.name.as_bytes()),
)
.await
{
debug!("We got a new request on connection.");
trace!("Got request {:#?}", request);
let host = if let Some(host) = descriptor.data.get_from_request(&request, sni.as_deref()) {
host
} else {
debug!(
"Failed to get host: {}",
utils::parse::Error::NoHost.as_str()
);
let (mut response, body) = utils::split_response(
default_error(
StatusCode::CONFLICT,
None,
Some(b"The host you're looking for wasn't found."),
)
.await,
);
response_pipe.ensure_version_and_length(&mut response, body.len());
let mut body_pipe =
ret_log_app_error!(response_pipe.send_response(response, false).await);
ret_log_app_error!(body_pipe.send_with_maybe_close(body, true).await);
return Ok(());
};
match host.limiter.register(address.ip()) {
LimitAction::Drop => return Ok(()),
LimitAction::Send => {
let (mut response, body) = utils::split_response(limiting::get_too_many_requests());
response_pipe.ensure_version_and_length(&mut response, body.len());
let mut body_pipe =
ret_log_app_error!(response_pipe.send_response(response, false).await);
ret_log_app_error!(body_pipe.send_with_maybe_close(body, true).await);
continue;
}
LimitAction::Passed => {}
}
debug!("Accepting new connection from {} on {}", address, host.name);
debug_assert!(descriptor.data.get_host(&host.name).is_some());
let hostname = host.name.clone();
let moved_host_collection = Arc::clone(&descriptor.data);
let alt_svc_header = alt_svc_header.clone();
let future = async move {
let host = moved_host_collection.get_host(&hostname).unwrap();
#[allow(unused_mut)]
let mut response = handle_cache(&mut request, address, host).await;
if let Some(alt_svc_header) = alt_svc_header {
if secure && version != Version::HTTP_3 {
response.response.headers_mut().append(
HeaderName::from_static("alt-svc"),
HeaderValue::from_maybe_shared(alt_svc_header).unwrap(),
);
}
}
if let Err(err) = SendKind::Send(response_pipe)
.send(response, &request, host, address)
.await
{
error!("Got error when writing response: {:?}", err);
}
drop(request);
};
match version {
Version::HTTP_09 | Version::HTTP_10 | Version::HTTP_11 => future.await,
_ => {
let _task = spawn(future).await;
}
}
if !continue_accepting() {
break;
}
}
debug!("Connection finished.");
http.shutdown().await;
Ok(())
}
#[derive(Debug)]
pub enum SendKind {
Send(application::ResponsePipe),
Push(application::PushedResponsePipe),
}
impl SendKind {
#[inline]
pub fn ensure_version_and_length<T>(&self, response: &mut Response<T>, len: usize) {
match self {
Self::Send(p) => p.ensure_version_and_length(response, len),
Self::Push(p) => p.ensure_version(response),
}
}
#[inline]
pub async fn send(
self,
response: CacheReply,
request: &FatRequest,
host: &Host,
address: SocketAddr,
) -> io::Result<()> {
let CacheReply {
mut response,
identity_body,
sanitize_data: data,
future,
} = response;
let overriden_len = future.as_ref().and_then(|(_, len)| len.as_ref().copied());
if let Ok(data) = &data {
match data.apply_to_response(&mut response, overriden_len) {
Err(SanitizeError::RangeNotSatisfiable) => {
response = default_error(
StatusCode::RANGE_NOT_SATISFIABLE,
Some(host),
Some(b"Range start after end of body"),
)
.await;
}
Err(SanitizeError::UnsafePath) => {
response = default_error(StatusCode::BAD_REQUEST, Some(host), None).await;
}
Ok(()) => {}
}
}
#[allow(clippy::or_fun_call)] let len = overriden_len.unwrap_or(response.body().len());
self.ensure_version_and_length(&mut response, len);
let (mut response, body) = utils::split_response(response);
host.extensions
.resolve_package(&mut response, request, host, address)
.await;
match self {
SendKind::Send(response_pipe) => {
let mut body_pipe =
ret_log_app_error!(response_pipe.send_response(response, false).await);
if !body.is_empty()
&& (utils::method_has_response_body(request.method())
|| (!body.is_empty() && request.method() != Method::HEAD))
{
ret_log_app_error!(body_pipe.send_with_maybe_close(body, false).await);
}
if let Some((mut future, _)) = future {
future.call(&mut body_pipe, host).await;
}
host.extensions
.resolve_post(request, identity_body, &mut body_pipe, address, host)
.await;
ret_log_app_error!(body_pipe.close().await);
}
SendKind::Push(push_pipe) => {
let send_body =
utils::method_has_response_body(request.method()) || !body.is_empty();
let mut body_pipe = ret_log_app_error!(
push_pipe.send_response(response, !send_body && future.is_none())
);
if send_body {
ret_log_app_error!(
body_pipe
.send_with_maybe_close(body, future.is_none())
.await
);
}
if let Some((mut future, _)) = future {
future.call(&mut body_pipe, host).await;
}
if !send_body {
ret_log_app_error!(body_pipe.close().await);
}
}
}
Ok(())
}
}
pub struct CacheReply {
pub response: Response<Bytes>,
pub identity_body: Bytes,
pub sanitize_data: Result<utils::CriticalRequestComponents, SanitizeError>,
#[allow(clippy::doc_markdown)]
pub future: Option<(ResponsePipeFuture, Option<usize>)>,
}
impl Debug for CacheReply {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let mut s = f.debug_struct(utils::ident_str!(CacheReply));
utils::fmt_fields!(
s,
(self.response),
(self.identity_body),
(self.sanitize_data),
(self.future, &"[internal future]".as_clean()),
);
s.finish()
}
}
mod handle_cache_helpers {
use crate::prelude::*;
pub(super) async fn get_response(
request: &mut FatRequest,
host: &Host,
sanitize_data: &Result<utils::CriticalRequestComponents, SanitizeError>,
address: SocketAddr,
overide_uri: Option<&Uri>,
) -> (
comprash::CompressedResponse,
comprash::ClientCachePreference,
comprash::ServerCachePreference,
Option<(ResponsePipeFuture, Option<usize>)>,
comprash::PathQuery,
) {
let path_query = comprash::PathQuery::from(request.uri());
let (mut resp, mut client_cache, mut server_cache, compress, future) =
match sanitize_data {
Ok(_) => {
let path = if host.options.disable_fs {
None
} else if let Ok(decoded) =
percent_encoding::percent_decode_str(request.uri().path()).decode_utf8()
{
Some(utils::make_path(
&host.path,
host.options.public_data_dir.as_deref().unwrap_or("public"),
utils::parse::uri(&decoded).unwrap(),
None,
))
} else {
warn!("Invalid percent encoding in path.");
None
};
handle_request(request, overide_uri, address, host, &path).await
}
Err(err) => error::sanitize_error_into_response(*err, host).await,
}
.into_parts();
if host.options.disable_client_cache {
client_cache = comprash::ClientCachePreference::None;
}
host.extensions
.resolve_present(
request,
&mut resp,
&mut client_cache,
&mut server_cache,
host,
address,
)
.await;
let extension = match Path::new(request.uri().path())
.extension()
.and_then(std::ffi::OsStr::to_str)
{
Some(ext) => ext,
None => match host.options.extension_default.as_ref() {
Some(ext) => ext.as_str(),
None => "",
},
};
(
comprash::CompressedResponse::new(resp, compress, client_cache, extension),
client_cache,
server_cache,
future,
path_query,
)
}
pub(super) fn get_cache<'a, T>(
host: &'a Host,
server_cache: comprash::ServerCachePreference,
status: StatusCode,
method: &Method,
future: &Option<T>,
prepare_ext_cap: impl FnOnce(),
) -> Option<&'a comprash::ResponseCache> {
if future.is_none() {
if let Some(cache) = &host.response_cache {
let cache_action = (host.options.status_code_cache_filter)(status);
if server_cache.cache(cache_action, method) {
return Some(cache);
}
}
} else {
prepare_ext_cap();
}
None
}
pub(super) fn maybe_cache<T>(
host: &Host,
server_cache: comprash::ServerCachePreference,
path_query: PathQuery,
response: VariedResponse,
method: &Method,
future: &Option<T>,
) -> Option<VariedResponse> {
if let Some(cache) = get_cache(
host,
server_cache,
response.first().0.get_identity().status(),
method,
future,
|| {
debug!("Not caching; a Prepare extension has captured. If we cached, it would not be called again.");
},
) {
let key = if server_cache.query_matters() {
comprash::UriKey::PathQuery(path_query)
} else {
comprash::UriKey::Path(path_query.into_path())
};
debug!("Caching uri {:?}!", &key);
cache.insert_cache_item(key, response);
return None;
}
Some(response)
}
pub(super) async fn handle_vary_missing(
request: &mut FatRequest,
host: &Host,
sanitize_data: &Result<utils::CriticalRequestComponents, SanitizeError>,
address: SocketAddr,
overide_uri: Option<&Uri>,
uri_key: UriKey,
params: vary::CacheParams,
) -> (
Arc<(comprash::CompressedResponse, vary::HeaderCollection)>,
Option<(extensions::ResponsePipeFuture, Option<usize>)>,
) {
let (compressed_response, _, server_cache, future, path_query) =
get_response(request, host, sanitize_data, address, overide_uri).await;
let cached = if let Some(cache) = &host.response_cache {
{
match cache.get_cache_item(&uri_key).into_option() {
Some(t) => (uri_key, Some(t)),
None => match uri_key {
UriKey::Path(_) => (uri_key, None),
UriKey::PathQuery(path_query) => {
let uri_key = UriKey::Path(path_query.into_path());
let result = cache.get_cache_item(&uri_key).into_option();
(uri_key, result)
}
},
}
}
} else {
(uri_key, None)
};
#[allow(clippy::single_match_else)]
let arc = match cached {
(key, Some((resp, lifetime))) => {
let mut resp = (*resp).clone();
let a = Arc::clone(resp.push_response(compressed_response, params));
if let Some(cache) = &host.response_cache {
cache.insert(
0,
lifetime.2.map(|dur| {
dur.saturating_sub(
(OffsetDateTime::now_utc() - lifetime.0)
.max(time::Duration::ZERO)
.unsigned_abs(),
)
}),
key,
resp,
);
}
a
}
(_, None) => {
let vary_rules = host.vary.rules_from_request(request);
let varied_response = unsafe {
VariedResponse::new(compressed_response, request, vary_rules.as_ref())
};
let arc = Arc::clone(varied_response.first());
handle_cache_helpers::maybe_cache(
host,
server_cache,
path_query,
varied_response,
request.method(),
&future,
);
arc
}
};
(arc, future)
}
}
pub async fn handle_cache(
request: &mut FatRequest,
address: SocketAddr,
host: &Host,
) -> CacheReply {
let sanitize_data = utils::sanitize_request(request);
let overide_uri = host.extensions.resolve_prime(request, host, address).await;
let uri_key =
comprash::UriKey::path_and_query(overide_uri.as_ref().unwrap_or_else(|| request.uri()));
let cached = if let Some(cache) = &host.response_cache {
match cache.get_cache_item(&uri_key).into_option() {
Some(t) => (uri_key, Some(t)),
None => match uri_key {
UriKey::Path(_) => (uri_key, None),
UriKey::PathQuery(path_query) => {
let uri_key = UriKey::Path(path_query.into_path());
let result = cache.get_cache_item(&uri_key).into_option();
(uri_key, result)
}
},
}
} else {
(uri_key, None)
};
#[allow(clippy::single_match_else, clippy::unnested_or_patterns)]
let (response, identity, future) = match cached {
(uri_key, Some((resp, (creation, creation_formatted, _))))
if sanitize_data.is_ok()
&& matches!(request.method(), &Method::GET | &Method::HEAD) =>
{
debug!("Found in cache!");
let if_modified_since: Option<OffsetDateTime> =
if host.options.disable_if_modified_since {
None
} else {
request
.headers()
.get("if-modified-since")
.and_then(|h| h.to_str().ok())
.and_then(|s| {
time::PrimitiveDateTime::parse(s, &comprash::HTTP_DATE)
.ok()
.map(time::PrimitiveDateTime::assume_utc)
})
};
let client_request_is_fresh = if_modified_since.map_or(false, |timestamp| {
timestamp >= creation - 1.seconds()
});
let mut response_data = if client_request_is_fresh {
let mut response = Response::new(Bytes::new());
*response.status_mut() = StatusCode::NOT_MODIFIED;
(response, Bytes::new(), None)
} else {
let (resp_vary, future) = match resp.get_by_request(request) {
Ok(arc) => {
let arc = Arc::clone(arc);
(arc, None)
}
Err(params) => {
handle_cache_helpers::handle_vary_missing(
request,
host,
&sanitize_data,
address,
overide_uri.as_ref(),
uri_key,
params,
)
.await
}
};
let (resp, vary) = &*resp_vary;
let mut response = if future.as_ref().map_or(false, |(_, len)| len.is_some()) {
let body = resp.get_identity().body().clone();
utils::empty_clone_response(resp.get_identity()).map(|()| body)
} else {
match resp
.clone_preferred(
request,
&host.compression_options_oneshot,
&host.compression_options_cached,
true,
)
.await
{
Err(message) => {
error::default(
StatusCode::NOT_ACCEPTABLE,
Some(host),
Some(message.as_bytes()),
)
.await
}
Ok(response) => response,
}
};
vary::apply_header(&mut response, vary);
let identity_body = Bytes::clone(resp.get_identity().body());
(response, identity_body, future)
};
if !host.options.disable_if_modified_since {
response_data
.0
.headers_mut()
.insert("last-modified", creation_formatted);
}
response_data
}
_ => {
let sanitize_data = &sanitize_data;
let overide_uri = overide_uri.as_ref();
let (compressed_response, _, server_cache, future, path_query) =
handle_cache_helpers::get_response(
request,
host,
sanitize_data,
address,
overide_uri,
)
.await;
let vary_rules = host.vary.rules_from_request(request);
let varied_response =
unsafe { VariedResponse::new(compressed_response, request, vary_rules.as_ref()) };
let compressed_response = &varied_response.first().0;
let mut response = if future.as_ref().map_or(false, |(_, len)| len.is_some()) {
let body = compressed_response.get_identity().body().clone();
utils::empty_clone_response(compressed_response.get_identity()).map(|()| body)
} else {
let cache = handle_cache_helpers::get_cache(
host,
server_cache,
compressed_response.get_identity().status(),
request.method(),
&future,
|| {},
);
match compressed_response
.clone_preferred(
request,
&host.compression_options_oneshot,
&host.compression_options_cached,
cache.is_some(),
)
.await
{
Err(message) => {
error::default(
StatusCode::NOT_ACCEPTABLE,
Some(host),
Some(message.as_bytes()),
)
.await
}
Ok(response) => response,
}
};
let identity_body = Bytes::clone(compressed_response.get_identity().body());
let vary = &varied_response.first().1;
vary::apply_header(&mut response, vary);
let cache_rejected = handle_cache_helpers::maybe_cache(
host,
server_cache,
path_query,
varied_response,
request.method(),
&future,
);
if !host.options.disable_if_modified_since && cache_rejected.is_none() {
let last_modified = HeaderValue::from_str(
&OffsetDateTime::now_utc()
.format(&comprash::HTTP_DATE)
.expect("failed to format datetime"),
)
.expect("We know these bytes are valid.");
response
.headers_mut()
.insert("last-modified", last_modified);
}
(response, identity_body, future)
}
};
CacheReply {
response,
identity_body: identity,
sanitize_data,
future,
}
}
pub async fn handle_request(
request: &mut FatRequest,
overide_uri: Option<&Uri>,
address: SocketAddr,
host: &Host,
path: &Option<CompactString>,
) -> FatResponse {
let mut response = None;
let mut client_cache = None;
let mut server_cache = None;
let mut compress = None;
let mut future = None;
#[allow(unused_mut)]
let mut status = None;
{
if let Some(resp) = host
.extensions
.resolve_prepare(request, overide_uri, host, path, address)
.await
{
let resp = resp.into_parts();
response.replace(resp.0);
client_cache.replace(resp.1);
server_cache.replace(resp.2);
compress.replace(resp.3);
if let Some(f) = resp.4 {
future.replace(f);
}
}
}
if response.is_none() {
if let Some(path) = path {
match *request.method() {
Method::GET | Method::HEAD => {
if let Some(content) = read_file(&path, host.file_cache.as_ref()).await {
response = Some(Response::new(content));
}
}
_ => status = Some(StatusCode::METHOD_NOT_ALLOWED),
}
}
}
let response = match response {
Some(r) => r,
None => {
error::default_response(status.unwrap_or(StatusCode::NOT_FOUND), host, None)
.await
.response
}
};
macro_rules! maybe_with {
($response: expr, $option: expr, $method: tt) => {
if let Some(t) = $option {
$response = $response.$method(t);
}
};
}
let mut response = FatResponse::cache(response);
maybe_with!(response, client_cache, with_client_cache);
maybe_with!(response, server_cache, with_server_cache);
maybe_with!(response, compress, with_compress);
maybe_with!(response, future, with_future_and_maybe_len);
response
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[must_use]
pub enum BindIpVersion {
V4,
V6,
Both,
}
#[derive(Clone)]
#[must_use]
pub struct PortDescriptor {
port: u16,
#[cfg(feature = "https")]
server_config: Option<Arc<rustls::ServerConfig>>,
data: Arc<HostCollection>,
version: BindIpVersion,
}
impl PortDescriptor {
pub fn http(host_data: Arc<HostCollection>) -> Self {
Self {
port: 80,
#[cfg(feature = "https")]
server_config: None,
data: host_data,
version: BindIpVersion::Both,
}
}
#[cfg(feature = "https")]
pub fn https(host_data: Arc<HostCollection>) -> Self {
Self {
port: 443,
server_config: Some(Arc::new(host_data.make_config())),
data: host_data,
version: BindIpVersion::Both,
}
}
#[cfg(feature = "https")]
pub fn with_server_config(
port: u16,
host_data: Arc<HostCollection>,
server_config: Option<Arc<rustls::ServerConfig>>,
) -> Self {
Self {
port,
server_config,
data: host_data,
version: BindIpVersion::Both,
}
}
pub fn new(port: u16, host_data: Arc<HostCollection>) -> Self {
Self {
port,
#[cfg(feature = "https")]
server_config: Some(Arc::new(host_data.make_config())),
data: host_data,
version: BindIpVersion::Both,
}
}
pub fn unsecure(port: u16, host_data: Arc<HostCollection>) -> Self {
Self {
port,
#[cfg(feature = "https")]
server_config: None,
data: host_data,
version: BindIpVersion::Both,
}
}
pub fn ipv4_only(mut self) -> Self {
self.version = BindIpVersion::V4;
self
}
pub fn ipv6_only(mut self) -> Self {
self.version = BindIpVersion::V6;
self
}
}
impl PortDescriptor {
#[must_use]
pub fn port(&self) -> u16 {
self.port
}
#[cfg(feature = "https")]
#[must_use]
pub fn tls_config(&self) -> Option<&rustls::ServerConfig> {
self.server_config.as_deref()
}
pub fn hosts(&self) -> &HostCollection {
&self.data
}
pub fn internet_protocol(&self) -> BindIpVersion {
self.version
}
}
impl Debug for PortDescriptor {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let mut s = f.debug_struct(utils::ident_str!(PortDescriptor));
utils::fmt_fields!(
s,
(self.port),
#[cfg(feature = "https")]
(
self.server_config,
&self
.server_config
.as_ref()
.map(|_| "[opaque certificate]".as_clean())
),
(self.data),
(self.version),
);
s.finish()
}
}
pub type FatRequest = Request<application::Body>;
#[must_use = "send the response"]
pub struct FatResponse {
response: Response<Bytes>,
client: comprash::ClientCachePreference,
server: comprash::ServerCachePreference,
compress: comprash::CompressPreference,
future: Option<(ResponsePipeFuture, Option<usize>)>,
}
impl FatResponse {
pub fn new(
response: Response<Bytes>,
server_cache_preference: comprash::ServerCachePreference,
) -> Self {
Self {
response,
client: comprash::ClientCachePreference::Full,
server: server_cache_preference,
compress: comprash::CompressPreference::Full,
future: None,
}
}
pub fn cache(response: Response<Bytes>) -> Self {
Self::new(response, comprash::ServerCachePreference::Full)
}
pub fn no_cache(response: Response<Bytes>) -> Self {
Self {
response,
client: comprash::ClientCachePreference::None,
server: comprash::ServerCachePreference::None,
compress: comprash::CompressPreference::Full,
future: None,
}
}
pub fn with_client_cache(mut self, preference: comprash::ClientCachePreference) -> Self {
self.client = preference;
self
}
pub fn with_server_cache(mut self, preference: comprash::ServerCachePreference) -> Self {
self.server = preference;
self
}
pub fn with_compress(mut self, preference: comprash::CompressPreference) -> Self {
self.compress = preference;
self
}
pub fn with_future(mut self, future: ResponsePipeFuture) -> Self {
self.future = Some((future, None));
self
}
pub fn with_future_and_len(mut self, future: ResponsePipeFuture, new_len: usize) -> Self {
self.future = Some((future, Some(new_len)));
self
}
pub fn with_future_and_maybe_len(
mut self,
future: (ResponsePipeFuture, Option<usize>),
) -> Self {
self.future = Some(future);
self
}
pub fn with_content_type(mut self, content_type: &Mime) -> Self {
self.response.headers_mut().insert(
"content-type",
HeaderValue::from_maybe_shared::<Bytes>(content_type.to_string().into_bytes().into())
.unwrap(),
);
self
}
#[allow(clippy::type_complexity)] pub fn into_parts(
self,
) -> (
Response<Bytes>,
comprash::ClientCachePreference,
comprash::ServerCachePreference,
comprash::CompressPreference,
Option<(ResponsePipeFuture, Option<usize>)>,
) {
(
self.response,
self.client,
self.server,
self.compress,
self.future,
)
}
}
impl Debug for FatResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
enum BytesOrStr<'a> {
Str(&'a str),
Bytes,
}
impl<'a> Debug for BytesOrStr<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::Str(s) => f.write_str(s),
Self::Bytes => f.write_str("[binary data]"),
}
}
}
let response = utils::empty_clone_response(&self.response);
let body = if let Ok(s) = str::from_utf8(self.response.body()) {
BytesOrStr::Str(s)
} else {
BytesOrStr::Bytes
};
let response = response.map(|()| body);
let mut s = f.debug_struct(utils::ident_str!(FatResponse));
utils::fmt_fields!(
s,
(self.response, &response),
(self.client),
(self.server),
(self.compress),
(self.future, &"[opaque Future]".as_clean()),
);
s.finish()
}
}