#![forbid(unsafe_code)]
#![deny(missing_docs)]
pub mod client;
pub mod server;
pub mod codec;
#[macro_use]
pub mod error;
pub mod middleware;
pub mod redirect;
pub mod request;
pub mod response;
#[cfg(feature = "actix")]
#[doc(hidden)]
pub use ::actix_web as actix_export;
#[cfg(feature = "axum-no-default")]
#[doc(hidden)]
pub use ::axum as axum_export;
#[cfg(feature = "generic")]
#[doc(hidden)]
pub use ::bytes as bytes_export;
#[cfg(feature = "generic")]
#[doc(hidden)]
pub use ::http as http_export;
pub use bytes::Bytes;
use client::Client;
use codec::{Encoding, FromReq, FromRes, IntoReq, IntoRes};
#[doc(hidden)]
pub use const_format;
#[doc(hidden)]
pub use const_str;
use dashmap::DashMap;
pub use error::ServerFnError;
#[cfg(feature = "form-redirects")]
use error::ServerFnUrlError;
use error::{FromServerFnError, ServerFnErrorErr};
use futures::{pin_mut, SinkExt, Stream, StreamExt};
use http::Method;
use middleware::{BoxedService, Layer, Service};
use once_cell::sync::Lazy;
use redirect::call_redirect_hook;
use request::Req;
use response::{ClientRes, Res, TryRes};
#[cfg(feature = "rkyv")]
pub use rkyv;
#[doc(hidden)]
pub use serde;
#[doc(hidden)]
#[cfg(feature = "serde-lite")]
pub use serde_lite;
use server::Server;
use std::{
fmt::{Debug, Display},
future::Future,
marker::PhantomData,
ops::{Deref, DerefMut},
pin::Pin,
sync::Arc,
};
#[doc(hidden)]
pub use xxhash_rust;
type ServerFnServerRequest<Fn> = <<Fn as ServerFn>::Server as crate::Server<
<Fn as ServerFn>::Error,
>>::Request;
type ServerFnServerResponse<Fn> = <<Fn as ServerFn>::Server as crate::Server<
<Fn as ServerFn>::Error,
>>::Response;
pub trait ServerFn: Send + Sized {
const PATH: &'static str;
type Client: Client<Self::Error>;
type Server: Server<Self::Error>;
type Protocol: Protocol<
Self,
Self::Output,
Self::Client,
Self::Server,
Self::Error,
>;
type Output: Send;
type Error: FromServerFnError + Send + Sync;
fn url() -> &'static str {
Self::PATH
}
fn middlewares() -> Vec<
Arc<
dyn Layer<
ServerFnServerRequest<Self>,
ServerFnServerResponse<Self>,
>,
>,
> {
Vec::new()
}
fn run_body(
self,
) -> impl Future<Output = Result<Self::Output, Self::Error>> + Send;
#[doc(hidden)]
fn run_on_server(
req: ServerFnServerRequest<Self>,
) -> impl Future<Output = ServerFnServerResponse<Self>> + Send {
#[cfg(feature = "form-redirects")]
let accepts_html = req
.accepts()
.map(|n| n.contains("text/html"))
.unwrap_or(false);
#[cfg(feature = "form-redirects")]
let mut referer = req.referer().as_deref().map(ToOwned::to_owned);
async move {
#[allow(unused_variables, unused_mut)]
let (mut res, err) =
Self::Protocol::run_server(req, Self::run_body)
.await
.map(|res| (res, None))
.unwrap_or_else(|e| {
(
<<Self as ServerFn>::Server as crate::Server<
Self::Error,
>>::Response::error_response(
Self::PATH, e.ser()
),
Some(e),
)
});
#[cfg(feature = "form-redirects")]
if accepts_html {
if let Some(err) = err {
if let Ok(url) = ServerFnUrlError::new(Self::PATH, err)
.to_url(referer.as_deref().unwrap_or("/"))
{
referer = Some(url.to_string());
}
}
else if let Some(referer) = referer.as_mut() {
ServerFnUrlError::<Self::Error>::strip_error_info(referer)
}
res.redirect(referer.as_deref().unwrap_or("/"));
}
res
}
}
#[doc(hidden)]
fn run_on_client(
self,
) -> impl Future<Output = Result<Self::Output, Self::Error>> + Send {
async move { Self::Protocol::run_client(Self::PATH, self).await }
}
}
pub trait Protocol<Input, Output, Client, Server, E>
where
Server: crate::Server<E>,
Client: crate::Client<E>,
{
const METHOD: Method;
fn run_server<F, Fut>(
request: Server::Request,
server_fn: F,
) -> impl Future<Output = Result<Server::Response, E>> + Send
where
F: Fn(Input) -> Fut + Send,
Fut: Future<Output = Result<Output, E>> + Send;
fn run_client(
path: &str,
input: Input,
) -> impl Future<Output = Result<Output, E>> + Send;
}
pub struct Http<InputProtocol, OutputProtocol>(
PhantomData<(InputProtocol, OutputProtocol)>,
);
impl<InputProtocol, OutputProtocol, Input, Output, Client, Server, E>
Protocol<Input, Output, Client, Server, E>
for Http<InputProtocol, OutputProtocol>
where
Input: IntoReq<InputProtocol, Client::Request, E>
+ FromReq<InputProtocol, Server::Request, E>
+ Send,
Output: IntoRes<OutputProtocol, Server::Response, E>
+ FromRes<OutputProtocol, Client::Response, E>
+ Send,
E: FromServerFnError,
InputProtocol: Encoding,
OutputProtocol: Encoding,
Client: crate::Client<E>,
Server: crate::Server<E>,
{
const METHOD: Method = InputProtocol::METHOD;
async fn run_server<F, Fut>(
request: Server::Request,
server_fn: F,
) -> Result<Server::Response, E>
where
F: Fn(Input) -> Fut + Send,
Fut: Future<Output = Result<Output, E>> + Send,
{
let input = Input::from_req(request).await?;
let output = server_fn(input).await?;
let response = Output::into_res(output).await?;
Ok(response)
}
async fn run_client(path: &str, input: Input) -> Result<Output, E>
where
Client: crate::Client<E>,
{
let req = input.into_req(path, OutputProtocol::CONTENT_TYPE)?;
let res = Client::send(req).await?;
let status = res.status();
let location = res.location();
let has_redirect_header = res.has_redirect();
let res = if (400..=599).contains(&status) {
let text = res.try_into_string().await?;
Err(E::de(&text))
} else {
let output = Output::from_res(res).await?;
Ok(output)
}?;
if (300..=399).contains(&status) || has_redirect_header {
call_redirect_hook(&location);
}
Ok(res)
}
}
pub struct Websocket<InputEncoding, OutputEncoding>(
PhantomData<(InputEncoding, OutputEncoding)>,
);
pub struct BoxedStream<T, E> {
stream: Pin<Box<dyn Stream<Item = Result<T, E>> + Send>>,
}
impl<T, E> From<BoxedStream<T, E>>
for Pin<Box<dyn Stream<Item = Result<T, E>> + Send>>
{
fn from(val: BoxedStream<T, E>) -> Self {
val.stream
}
}
impl<T, E> Deref for BoxedStream<T, E> {
type Target = Pin<Box<dyn Stream<Item = Result<T, E>> + Send>>;
fn deref(&self) -> &Self::Target {
&self.stream
}
}
impl<T, E> DerefMut for BoxedStream<T, E> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.stream
}
}
impl<T, E> Debug for BoxedStream<T, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BoxedStream").finish()
}
}
impl<T, E, S> From<S> for BoxedStream<T, E>
where
S: Stream<Item = Result<T, E>> + Send + 'static,
{
fn from(stream: S) -> Self {
BoxedStream {
stream: Box::pin(stream),
}
}
}
impl<
Input,
InputItem,
OutputItem,
InputEncoding,
OutputEncoding,
Client,
Server,
E,
> Protocol<Input, BoxedStream<OutputItem, E>, Client, Server, E>
for Websocket<InputEncoding, OutputEncoding>
where
Input: Deref<Target = BoxedStream<InputItem, E>>
+ Into<BoxedStream<InputItem, E>>
+ From<BoxedStream<InputItem, E>>,
InputEncoding: Encodes<InputItem> + Decodes<InputItem>,
OutputEncoding: Encodes<OutputItem> + Decodes<OutputItem>,
Server: crate::Server<E>,
E: FromServerFnError + Send,
Client: crate::Client<E>,
OutputItem: Send + 'static,
InputItem: Send + 'static,
{
const METHOD: Method = Method::GET;
async fn run_server<F, Fut>(
request: Server::Request,
server_fn: F,
) -> Result<Server::Response, E>
where
F: Fn(Input) -> Fut + Send,
Fut: Future<Output = Result<BoxedStream<OutputItem, E>, E>> + Send,
{
let (request_bytes, response_stream, response) =
request.try_into_websocket().await?;
let input = request_bytes.map(|request_bytes| match request_bytes {
Ok(request_bytes) => {
InputEncoding::decode(request_bytes).map_err(|e| {
E::from_server_fn_error(ServerFnErrorErr::Deserialization(
e.to_string(),
))
})
}
Err(err) => Err(err),
});
let boxed = Box::pin(input)
as Pin<Box<dyn Stream<Item = Result<InputItem, E>> + Send>>;
let input = BoxedStream { stream: boxed };
let output = server_fn(input.into()).await?;
let output = output.stream.map(|output| match output {
Ok(output) => OutputEncoding::encode(output).map_err(|e| {
E::from_server_fn_error(ServerFnErrorErr::Serialization(
e.to_string(),
))
}),
Err(err) => Err(err),
});
Server::spawn(async move {
pin_mut!(response_stream);
pin_mut!(output);
while let Some(output) = output.next().await {
if response_stream.send(output).await.is_err() {
break;
}
}
})?;
Ok(response)
}
fn run_client(
path: &str,
input: Input,
) -> impl Future<Output = Result<BoxedStream<OutputItem, E>, E>> + Send
{
let input = input.into();
async move {
let (stream, sink) = Client::open_websocket(path).await?;
Client::spawn(async move {
pin_mut!(input);
pin_mut!(sink);
while let Some(input) = input.stream.next().await {
if sink
.send(input.and_then(|input| {
InputEncoding::encode(input).map_err(|e| {
E::from_server_fn_error(
ServerFnErrorErr::Serialization(
e.to_string(),
),
)
})
}))
.await
.is_err()
{
break;
}
}
});
let stream = stream.map(|request_bytes| match request_bytes {
Ok(request_bytes) => OutputEncoding::decode(request_bytes)
.map_err(|e| {
E::from_server_fn_error(
ServerFnErrorErr::Deserialization(e.to_string()),
)
}),
Err(err) => Err(err),
});
let boxed = Box::pin(stream)
as Pin<Box<dyn Stream<Item = Result<OutputItem, E>> + Send>>;
let output = BoxedStream { stream: boxed };
Ok(output)
}
}
}
pub trait ContentType {
const CONTENT_TYPE: &'static str;
}
pub trait Encodes<T>: ContentType {
type Error: Display;
fn encode(output: T) -> Result<Bytes, Self::Error>;
}
pub trait Decodes<T> {
type Error: Display;
fn decode(bytes: Bytes) -> Result<T, Self::Error>;
}
#[cfg(feature = "ssr")]
#[doc(hidden)]
pub use inventory;
#[macro_export]
macro_rules! initialize_server_fn_map {
($req:ty, $res:ty) => {
once_cell::sync::Lazy::new(|| {
$crate::inventory::iter::<ServerFnTraitObj<$req, $res>>
.into_iter()
.map(|obj| {
((obj.path().to_string(), obj.method()), obj.clone())
})
.collect()
})
};
}
pub type MiddlewareSet<Req, Res> = Vec<Arc<dyn Layer<Req, Res>>>;
pub struct ServerFnTraitObj<Req, Res> {
path: &'static str,
method: Method,
handler: fn(Req) -> Pin<Box<dyn Future<Output = Res> + Send>>,
middleware: fn() -> MiddlewareSet<Req, Res>,
ser: fn(ServerFnErrorErr) -> String,
}
impl<Req, Res> ServerFnTraitObj<Req, Res> {
pub const fn new<
S: ServerFn<
Server: crate::Server<S::Error, Request = Req, Response = Res>,
>,
>(
handler: fn(Req) -> Pin<Box<dyn Future<Output = Res> + Send>>,
) -> Self
where
Req: crate::Req<S::Error, WebsocketResponse = Res> + Send + 'static,
Res: crate::TryRes<S::Error> + Send + 'static,
{
Self {
path: S::PATH,
method: S::Protocol::METHOD,
handler,
middleware: S::middlewares,
ser: |e| S::Error::from_server_fn_error(e).ser(),
}
}
pub fn path(&self) -> &'static str {
self.path
}
pub fn method(&self) -> Method {
self.method.clone()
}
pub fn handler(&self, req: Req) -> impl Future<Output = Res> + Send {
(self.handler)(req)
}
pub fn middleware(&self) -> MiddlewareSet<Req, Res> {
(self.middleware)()
}
pub fn boxed(self) -> BoxedService<Req, Res>
where
Self: Service<Req, Res>,
Req: 'static,
Res: 'static,
{
BoxedService::new(self.ser, self)
}
}
impl<Req, Res> Service<Req, Res> for ServerFnTraitObj<Req, Res>
where
Req: Send + 'static,
Res: 'static,
{
fn run(
&mut self,
req: Req,
_ser: fn(ServerFnErrorErr) -> String,
) -> Pin<Box<dyn Future<Output = Res> + Send>> {
let handler = self.handler;
Box::pin(async move { handler(req).await })
}
}
impl<Req, Res> Clone for ServerFnTraitObj<Req, Res> {
fn clone(&self) -> Self {
Self {
path: self.path,
method: self.method.clone(),
handler: self.handler,
middleware: self.middleware,
ser: self.ser,
}
}
}
#[allow(unused)] type LazyServerFnMap<Req, Res> =
Lazy<DashMap<(String, Method), ServerFnTraitObj<Req, Res>>>;
#[cfg(feature = "ssr")]
impl<Req: 'static, Res: 'static> inventory::Collect
for ServerFnTraitObj<Req, Res>
{
#[inline]
fn registry() -> &'static inventory::Registry {
static REGISTRY: inventory::Registry = inventory::Registry::new();
®ISTRY
}
}
#[cfg(feature = "axum-no-default")]
pub mod axum {
use crate::{
error::FromServerFnError, middleware::BoxedService, LazyServerFnMap,
Protocol, Server, ServerFn, ServerFnTraitObj,
};
use axum::body::Body;
use http::{Method, Request, Response, StatusCode};
use std::future::Future;
static REGISTERED_SERVER_FUNCTIONS: LazyServerFnMap<
Request<Body>,
Response<Body>,
> = initialize_server_fn_map!(Request<Body>, Response<Body>);
pub struct AxumServerFnBackend;
impl<E: FromServerFnError + Send + Sync> Server<E> for AxumServerFnBackend {
type Request = Request<Body>;
type Response = Response<Body>;
#[allow(unused_variables)]
fn spawn(
future: impl Future<Output = ()> + Send + 'static,
) -> Result<(), E> {
#[cfg(feature = "axum")]
{
tokio::spawn(future);
Ok(())
}
#[cfg(not(feature = "axum"))]
{
Err(E::from_server_fn_error(
crate::error::ServerFnErrorErr::Request(
"No async runtime available. You need to either \
enable the full axum feature to pull in tokio, or \
implement the `Server` trait for your async runtime \
manually."
.into(),
),
))
}
}
}
pub fn register_explicit<T>()
where
T: ServerFn<
Server: crate::Server<
T::Error,
Request = Request<Body>,
Response = Response<Body>,
>,
> + 'static,
{
REGISTERED_SERVER_FUNCTIONS.insert(
(T::PATH.into(), T::Protocol::METHOD),
ServerFnTraitObj::new::<T>(|req| Box::pin(T::run_on_server(req))),
);
}
pub fn server_fn_paths() -> impl Iterator<Item = (&'static str, Method)> {
REGISTERED_SERVER_FUNCTIONS
.iter()
.map(|item| (item.path(), item.method()))
}
pub async fn handle_server_fn(req: Request<Body>) -> Response<Body> {
let path = req.uri().path();
if let Some(mut service) =
get_server_fn_service(path, req.method().clone())
{
service.run(req).await
} else {
Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(format!(
"Could not find a server function at the route {path}. \
\n\nIt's likely that either\n 1. The API prefix you \
specify in the `#[server]` macro doesn't match the \
prefix at which your server function handler is mounted, \
or \n2. You are on a platform that doesn't support \
automatic server function registration and you need to \
call ServerFn::register_explicit() on the server \
function type, somewhere in your `main` function.",
)))
.unwrap()
}
}
pub fn get_server_fn_service(
path: &str,
method: Method,
) -> Option<BoxedService<Request<Body>, Response<Body>>> {
let key = (path.into(), method);
REGISTERED_SERVER_FUNCTIONS.get(&key).map(|server_fn| {
let middleware = (server_fn.middleware)();
let mut service = server_fn.clone().boxed();
for middleware in middleware {
service = middleware.layer(service);
}
service
})
}
}
#[cfg(feature = "actix")]
pub mod actix {
use crate::{
error::FromServerFnError, middleware::BoxedService,
request::actix::ActixRequest, response::actix::ActixResponse,
server::Server, LazyServerFnMap, Protocol, ServerFn, ServerFnTraitObj,
};
use actix_web::{web::Payload, HttpRequest, HttpResponse};
use http::Method;
#[doc(hidden)]
pub use send_wrapper::SendWrapper;
use std::future::Future;
static REGISTERED_SERVER_FUNCTIONS: LazyServerFnMap<
ActixRequest,
ActixResponse,
> = initialize_server_fn_map!(ActixRequest, ActixResponse);
pub struct ActixServerFnBackend;
impl<E: FromServerFnError + Send + Sync> Server<E> for ActixServerFnBackend {
type Request = ActixRequest;
type Response = ActixResponse;
fn spawn(
future: impl Future<Output = ()> + Send + 'static,
) -> Result<(), E> {
actix_web::rt::spawn(future);
Ok(())
}
}
pub fn register_explicit<T>()
where
T: ServerFn<
Server: crate::Server<
T::Error,
Request = ActixRequest,
Response = ActixResponse,
>,
> + 'static,
{
REGISTERED_SERVER_FUNCTIONS.insert(
(T::PATH.into(), T::Protocol::METHOD),
ServerFnTraitObj::new::<T>(|req| Box::pin(T::run_on_server(req))),
);
}
pub fn server_fn_paths() -> impl Iterator<Item = (&'static str, Method)> {
REGISTERED_SERVER_FUNCTIONS
.iter()
.map(|item| (item.path(), item.method()))
}
pub async fn handle_server_fn(
req: HttpRequest,
payload: Payload,
) -> HttpResponse {
let path = req.uri().path();
let method = req.method();
if let Some(mut service) = get_server_fn_service(path, method) {
service
.run(ActixRequest::from((req, payload)))
.await
.0
.take()
} else {
HttpResponse::BadRequest().body(format!(
"Could not find a server function at the route {path}. \
\n\nIt's likely that either\n 1. The API prefix you specify \
in the `#[server]` macro doesn't match the prefix at which \
your server function handler is mounted, or \n2. You are on \
a platform that doesn't support automatic server function \
registration and you need to call \
ServerFn::register_explicit() on the server function type, \
somewhere in your `main` function.",
))
}
}
pub fn get_server_fn_service(
path: &str,
method: &actix_web::http::Method,
) -> Option<BoxedService<ActixRequest, ActixResponse>> {
use actix_web::http::Method as ActixMethod;
let method = match *method {
ActixMethod::GET => Method::GET,
ActixMethod::POST => Method::POST,
ActixMethod::PUT => Method::PUT,
ActixMethod::PATCH => Method::PATCH,
ActixMethod::DELETE => Method::DELETE,
ActixMethod::HEAD => Method::HEAD,
ActixMethod::TRACE => Method::TRACE,
ActixMethod::OPTIONS => Method::OPTIONS,
ActixMethod::CONNECT => Method::CONNECT,
_ => unreachable!(),
};
REGISTERED_SERVER_FUNCTIONS.get(&(path.into(), method)).map(
|server_fn| {
let middleware = (server_fn.middleware)();
let mut service = server_fn.clone().boxed();
for middleware in middleware {
service = middleware.layer(service);
}
service
},
)
}
}
pub mod mock {
use std::future::Future;
pub struct BrowserMockServer;
impl<E: Send + 'static> crate::server::Server<E> for BrowserMockServer {
type Request = crate::request::BrowserMockReq;
type Response = crate::response::BrowserMockRes;
fn spawn(
_: impl Future<Output = ()> + Send + 'static,
) -> Result<(), E> {
unreachable!()
}
}
}