#![no_std]
#![allow(async_fn_in_trait)]
#![deny(
unsafe_code,
clippy::missing_safety_doc,
clippy::multiple_unsafe_ops_per_block,
clippy::undocumented_unsafe_blocks
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(any(feature = "alloc", test))]
extern crate alloc;
#[cfg(any(feature = "std", test))]
extern crate std;
#[cfg(feature = "json")]
mod json;
#[macro_use]
mod logging;
pub mod extract;
pub mod futures;
pub mod io;
pub mod request;
pub mod response;
pub mod routing;
mod sync;
pub mod time;
pub mod url_encoded;
#[cfg(test)]
mod tests;
#[doc(hidden)]
pub mod doctests_utils;
use core::marker::PhantomData;
pub use logging::LogDisplay;
pub use routing::Router;
pub use time::Timer;
use time::{Duration, TimerExt};
use crate::sync::oneshot_broadcast;
pub use response::response_stream::ResponseSent;
#[derive(Debug, thiserror::Error)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum Error<E: io::Error> {
#[error("Bad Request")]
BadRequest,
#[error("Read error: {0}")]
Read(#[source] E),
#[error("Read timeout")]
ReadTimeout(crate::time::TimeoutError),
#[error("Write error: {0}")]
Write(#[source] E),
#[error("Write timeout")]
WriteTimeout(crate::time::TimeoutError),
}
impl<E: io::Error + 'static> io::Error for Error<E> {
fn kind(&self) -> io::ErrorKind {
match self {
Self::BadRequest => io::ErrorKind::InvalidData,
Self::ReadTimeout(error) | Self::WriteTimeout(error) => error.kind(),
Self::Read(error) | Self::Write(error) => error.kind(),
}
}
}
trait SwapErrors {
type Output;
fn swap_errors(self) -> Self::Output;
}
impl<T, E0, E1> SwapErrors for Result<Result<T, E0>, E1> {
type Output = Result<Result<T, E1>, E0>;
fn swap_errors(self) -> Self::Output {
match self {
Ok(Ok(value)) => Ok(Ok(value)),
Ok(Err(error)) => Err(error),
Err(error) => Ok(Err(error)),
}
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Timeouts {
pub start_read_request: Duration,
pub persistent_start_read_request: Duration,
pub read_request: Duration,
pub write: Duration,
}
impl Timeouts {
pub const fn const_default() -> Self {
Self {
start_read_request: Duration::from_secs(5),
persistent_start_read_request: Duration::from_secs(1),
read_request: Duration::from_secs(3),
write: Duration::from_secs(1),
}
}
}
impl Default for Timeouts {
fn default() -> Self {
Self::const_default()
}
}
#[derive(Debug, Clone, Copy)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum KeepAlive {
Close,
KeepAlive,
}
impl KeepAlive {
pub const fn const_default() -> Self {
Self::Close
}
}
impl Default for KeepAlive {
fn default() -> Self {
Self::const_default()
}
}
impl core::fmt::Display for KeepAlive {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
KeepAlive::Close => "close",
KeepAlive::KeepAlive => "keep-alive",
}
.fmt(f)
}
}
impl KeepAlive {
fn default_for_http_version(http_version: &str) -> Self {
if http_version == "HTTP/1.1" {
Self::KeepAlive
} else {
Self::Close
}
}
fn from_request(http_version: &str, headers: request::Headers) -> Self {
match headers.get("connection") {
None => Self::default_for_http_version(http_version),
Some(close_header) if close_header == "close" => Self::Close,
Some(connection_headers) => {
if connection_headers
.split(b',')
.any(|connection_header| connection_header == "upgrade")
{
Self::Close
} else {
Self::default_for_http_version(http_version)
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct Config {
pub timeouts: Timeouts,
pub connection: KeepAlive,
}
impl Config {
pub const fn new(timeouts: Timeouts) -> Self {
Self {
timeouts,
connection: KeepAlive::Close,
}
}
pub const fn const_default() -> Self {
Self {
timeouts: Timeouts::const_default(),
connection: KeepAlive::const_default(),
}
}
pub const fn keep_connection_alive(mut self) -> Self {
self.connection = KeepAlive::KeepAlive;
self
}
pub const fn close_connection_after_response(mut self) -> Self {
self.connection = KeepAlive::Close;
self
}
}
impl Default for Config {
fn default() -> Self {
Self::const_default()
}
}
struct MapReadErrorReader<R: io::Read>(R);
impl<R: io::Read> io::ErrorType for MapReadErrorReader<R>
where
R::Error: 'static,
{
type Error = Error<R::Error>;
}
impl<R: io::Read> io::Read for MapReadErrorReader<R>
where
R::Error: 'static,
{
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
self.0.read(buf).await.map_err(Error::Read)
}
async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), io::ReadExactError<Self::Error>> {
self.0.read_exact(buf).await.map_err(|err| match err {
io::ReadExactError::UnexpectedEof => io::ReadExactError::UnexpectedEof,
io::ReadExactError::Other(err) => io::ReadExactError::Other(Error::Read(err)),
})
}
}
pub struct DisconnectionInfo<S> {
pub handled_requests_count: u64,
pub shutdown_reason: Option<S>,
}
impl<S> DisconnectionInfo<S> {
fn no_shutdown_reason(handled_requests_count: u64) -> Self {
Self {
handled_requests_count,
shutdown_reason: None,
}
}
fn with_shutdown_reason(handled_requests_count: u64, shutdown_reason: S) -> Self {
Self {
handled_requests_count,
shutdown_reason: Some(shutdown_reason),
}
}
}
async fn serve_and_shutdown<
Runtime,
T: Timer<Runtime>,
P: routing::PathRouter,
S: io::Socket<Runtime>,
ShutdownReason,
ShutdownSignal: core::future::Future<Output = (ShutdownReason, Duration)>,
>(
app: &Router<P>,
timer: &mut T,
config: &Config,
http_buffer: &mut [u8],
mut socket: S,
shutdown_signal: ShutdownSignal,
) -> Result<DisconnectionInfo<ShutdownReason>, Error<S::Error>> {
let mut connection_flags = request::ConnectionFlags::new();
let result: Result<DisconnectionInfo<ShutdownReason>, Error<S::Error>> = async {
let (reader, writer) = socket.split();
let reader = MapReadErrorReader(reader);
let mut writer = time::WriteWithTimeout {
inner: writer,
timer,
timeout_duration: config.timeouts.write,
_runtime: PhantomData,
};
let mut request_reader = request::Reader::new(reader, http_buffer, &mut connection_flags);
let mut shutdown_signal = core::pin::pin!(shutdown_signal);
let mut shutdown_broadcast = oneshot_broadcast::Signal::core();
let shutdown_broadcast = shutdown_broadcast.make_signal();
let mut request_count_iter = {
let mut n = 0_u64;
move || {
let request_count = n;
n = n.saturating_add(1);
request_count
}
};
loop {
let request_count = request_count_iter();
let request_is_pending = match timer
.run_with_timeout(
if request_count == 0 {
config.timeouts.start_read_request
} else {
config.timeouts.persistent_start_read_request
},
futures::select_either(
shutdown_signal.as_mut(),
request_reader.request_is_pending(),
),
)
.await
{
Ok(futures::Either::First((shutdown_reason, _))) => {
return Ok(DisconnectionInfo::with_shutdown_reason(
request_count,
shutdown_reason,
));
}
Ok(futures::Either::Second(Ok(Some(request_is_pending)))) => request_is_pending,
Ok(futures::Either::Second(Ok(None))) | Err(time::TimeoutError) => {
return Ok(DisconnectionInfo::no_shutdown_reason(request_count))
}
Ok(futures::Either::Second(Err(err))) => return Err(err),
};
let mut read_request_timeout_signal = oneshot_broadcast::Signal::core();
let read_request_timeout_signal = read_request_timeout_signal.make_signal();
let request_signals = request::RequestSignals {
shutdown_signal: shutdown_broadcast.listen(),
read_request_timeout_signal: read_request_timeout_signal.listen(),
make_read_timeout_error: || Error::ReadTimeout(crate::time::TimeoutError),
};
let mut read_request_timeout = core::pin::pin!(async {
let timeout = timer.timeout(config.timeouts.read_request).await;
read_request_timeout_signal.notify(());
Error::ReadTimeout(timeout)
});
let request = futures::select_either(
read_request_timeout.as_mut(),
request_reader.read(request_is_pending, request_signals),
)
.await
.first_is_error()?;
match request {
Ok(request) => {
let connection_header = match config.connection {
KeepAlive::Close => KeepAlive::Close,
KeepAlive::KeepAlive => KeepAlive::from_request(
request.parts.http_version(),
request.parts.headers(),
),
};
let mut handle_request = core::pin::pin!(crate::futures::select(
async {
read_request_timeout.await;
core::future::pending().await
},
app.handle_request(
request,
response::ResponseStream::new(&mut writer, connection_header),
)
));
return Ok(
match crate::futures::select_either(
shutdown_signal.as_mut(),
handle_request.as_mut(),
)
.await
{
futures::Either::First((shutdown_reason, shutdown_timeout)) => {
shutdown_broadcast.notify(());
DisconnectionInfo::with_shutdown_reason(
match timer
.run_with_timeout(shutdown_timeout, handle_request)
.await
.swap_errors()?
{
Ok(ResponseSent(_)) => request_count + 1,
Err(time::TimeoutError) => request_count,
},
shutdown_reason,
)
}
futures::Either::Second(response_sent) => {
let ResponseSent(_) = response_sent?;
if let KeepAlive::KeepAlive = connection_header {
continue;
}
DisconnectionInfo::no_shutdown_reason(request_count + 1)
}
},
);
}
Err(err) => {
use response::IntoResponse;
let message = match err {
request::ReadError::BadRequestLine => "Bad Request Line",
request::ReadError::HeaderDoesNotContainColon => {
"Invalid Header line: No ':' character"
}
request::ReadError::UnexpectedEof => "Unexpected EOF while reading request",
request::ReadError::IO(err) => return Err(err),
};
let ResponseSent { .. } = timer
.run_with_timeout(
config.timeouts.write,
(response::StatusCode::BAD_REQUEST, message).write_to(
response::Connection::empty(&mut Default::default()),
response::ResponseStream::new(writer, KeepAlive::Close),
),
)
.await
.map_err(Error::WriteTimeout)??;
return Err(Error::BadRequest);
}
}
}
}
.await;
match result {
Ok(disconnection_info) => {
if connection_flags.connection_must_be_aborted() {
socket.abort(&config.timeouts, timer).await?;
} else {
socket.shutdown(&config.timeouts, timer).await?;
}
Ok(disconnection_info)
}
Err(error) => {
let _ = socket.abort(&config.timeouts, timer).await;
Err(error)
}
}
}
pub enum NoGracefulShutdown {}
impl NoGracefulShutdown {
pub fn into_never<T>(self) -> T {
match self {}
}
}
pub struct Server<
'a,
Runtime,
T: Timer<Runtime>,
P: routing::PathRouter,
ShutdownSignal: core::future::Future,
> {
app: &'a Router<P>,
timer: T,
config: &'a Config,
http_buffer: &'a mut [u8],
shutdown_signal: ShutdownSignal,
_runtime: PhantomData<fn(&Runtime)>,
}
impl<'a, Runtime, T: Timer<Runtime>, P: routing::PathRouter>
Server<'a, Runtime, T, P, core::future::Pending<(NoGracefulShutdown, Duration)>>
{
pub fn custom(
app: &'a Router<P>,
timer: T,
config: &'a Config,
http_buffer: &'a mut [u8],
) -> Self {
Self {
app,
timer,
config,
http_buffer,
shutdown_signal: core::future::pending(),
_runtime: PhantomData,
}
}
#[allow(clippy::type_complexity)]
pub fn with_graceful_shutdown<ShutdownSignal: core::future::Future>(
self,
shutdown_signal: ShutdownSignal,
shutdown_timeout: impl Into<Duration>,
) -> Server<
'a,
Runtime,
T,
P,
impl core::future::Future<Output = (ShutdownSignal::Output, Duration)>,
> {
let Self {
app,
timer,
config,
http_buffer,
shutdown_signal: _,
_runtime,
} = self;
let shutdown_timeout = shutdown_timeout.into();
Server {
app,
timer,
config,
http_buffer,
shutdown_signal: async move {
let shutdown_reason = shutdown_signal.await;
(shutdown_reason, shutdown_timeout)
},
_runtime: PhantomData,
}
}
}
impl<
Runtime,
T: Timer<Runtime>,
P: routing::PathRouter,
ShutdownReason,
ShutdownSignal: core::future::Future<Output = (ShutdownReason, Duration)>,
> Server<'_, Runtime, T, P, ShutdownSignal>
{
pub async fn serve<S: io::Socket<Runtime>>(
self,
socket: S,
) -> Result<DisconnectionInfo<ShutdownReason>, Error<S::Error>> {
let Self {
app,
mut timer,
config,
http_buffer,
shutdown_signal,
_runtime,
} = self;
serve_and_shutdown(
app,
&mut timer,
config,
http_buffer,
socket,
shutdown_signal,
)
.await
}
}
#[cfg(any(feature = "tokio", test))]
#[doc(hidden)]
pub struct TokioRuntime;
#[cfg(any(feature = "tokio", test))]
impl<'a, P: routing::PathRouter>
Server<
'a,
TokioRuntime,
time::TokioTimer,
P,
core::future::Pending<(NoGracefulShutdown, time::Duration)>,
>
{
pub fn new_tokio(app: &'a Router<P>, config: &'a Config, http_buffer: &'a mut [u8]) -> Self {
Self {
app,
timer: time::TokioTimer,
config,
http_buffer,
shutdown_signal: core::future::pending(),
_runtime: PhantomData,
}
}
}
#[cfg(feature = "embassy")]
#[doc(hidden)]
pub struct EmbassyRuntime;
#[cfg(feature = "embassy")]
impl<'a, P: routing::PathRouter>
Server<
'a,
EmbassyRuntime,
time::EmbassyTimer,
P,
core::future::Pending<(NoGracefulShutdown, Duration)>,
>
{
pub fn new(app: &'a Router<P>, config: &'a Config, http_buffer: &'a mut [u8]) -> Self {
Self {
app,
timer: time::EmbassyTimer,
config,
http_buffer,
shutdown_signal: core::future::pending(),
_runtime: PhantomData,
}
}
}
#[cfg(feature = "embassy")]
impl<
'a,
P: routing::PathRouter,
ShutdownReason,
ShutdownSignal: core::future::Future<Output = (ShutdownReason, embassy_time::Duration)>,
> Server<'a, EmbassyRuntime, time::EmbassyTimer, P, ShutdownSignal>
{
pub async fn listen_and_serve(
self,
task_id: impl LogDisplay,
stack: embassy_net::Stack<'_>,
port: u16,
tcp_rx_buffer: &mut [u8],
tcp_tx_buffer: &mut [u8],
) -> ShutdownReason {
let Self {
app,
mut timer,
config,
http_buffer,
shutdown_signal,
_runtime,
} = self;
let mut shutdown_signal = core::pin::pin!(shutdown_signal);
loop {
let mut socket = match futures::select_either(shutdown_signal.as_mut(), async {
let mut socket =
embassy_net::tcp::TcpSocket::new(stack, tcp_rx_buffer, tcp_tx_buffer);
log_info!("{}: Listening on TCP:{}...", task_id, port);
socket.accept(port).await.map(|()| socket)
})
.await
{
futures::Either::First((shutdown_reason, _)) => return shutdown_reason,
futures::Either::Second(Err(error)) => {
log_warn!("{}: accept error: {:?}", task_id, error);
continue;
}
futures::Either::Second(Ok(socket)) => socket,
};
let remote_endpoint = socket.remote_endpoint();
log_info!(
"{}: Received connection from {:?}",
task_id,
remote_endpoint
);
socket.set_keep_alive(Some(embassy_time::Duration::from_secs(30)));
socket.set_timeout(Some(embassy_time::Duration::from_secs(45)));
return match serve_and_shutdown(
app,
&mut timer,
config,
http_buffer,
socket,
shutdown_signal.as_mut(),
)
.await
{
Ok(DisconnectionInfo {
handled_requests_count,
shutdown_reason,
}) => {
log_info!(
"{} requests handled from {:?}",
handled_requests_count,
remote_endpoint
);
match shutdown_reason {
Some(shutdown_reason) => shutdown_reason,
None => continue,
}
}
Err(err) => {
log_error!("{:?}", crate::logging::Debug2Format(&err));
continue;
}
};
}
}
}
pub trait AppBuilder {
type PathRouter: routing::PathRouter;
fn build_app(self) -> Router<Self::PathRouter>;
}
pub trait AppWithStateBuilder {
type State;
type PathRouter: routing::PathRouter<Self::State>;
fn build_app(self) -> Router<Self::PathRouter, Self::State>;
}
impl<T: AppBuilder> AppWithStateBuilder for T {
type State = ();
type PathRouter = <Self as AppBuilder>::PathRouter;
fn build_app(self) -> Router<Self::PathRouter, Self::State> {
<Self as AppBuilder>::build_app(self)
}
}
pub type AppRouter<Props> =
Router<<Props as AppWithStateBuilder>::PathRouter, <Props as AppWithStateBuilder>::State>;
#[macro_export]
macro_rules! make_static {
($t:ty, $val:expr) => ($crate::make_static!($t, $val,));
($t:ty, $val:expr, $(#[$m:meta])*) => {{
$(#[$m])*
static STATIC_CELL: static_cell::StaticCell<$t> = static_cell::StaticCell::new();
STATIC_CELL.init($val)
}};
}