use crate::{
context, util::deadline_compat, util::AsDuration, util::Compact, ClientMessage,
ClientMessageKind, PollIo, Request, Response, ServerError, Transport,
};
use fnv::FnvHashMap;
use futures::{
channel::mpsc,
future::{abortable, AbortHandle},
prelude::*,
ready,
stream::Fuse,
task::{Context, Poll},
try_ready,
};
use humantime::format_rfc3339;
use log::{debug, error, info, trace, warn};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::{
error::Error as StdError,
io,
marker::PhantomData,
net::SocketAddr,
pin::Pin,
time::{Instant, SystemTime},
};
use tokio_timer::timeout;
use trace::{self, TraceId};
mod filter;
#[derive(Debug)]
pub struct Server<Req, Resp> {
config: Config,
ghost: PhantomData<(Req, Resp)>,
}
impl<Req, Resp> Default for Server<Req, Resp> {
fn default() -> Self {
new(Config::default())
}
}
#[non_exhaustive]
#[derive(Clone, Debug)]
pub struct Config {
pub max_connections: usize,
pub max_connections_per_ip: usize,
pub max_in_flight_requests_per_connection: usize,
pub pending_response_buffer: usize,
}
impl Default for Config {
fn default() -> Self {
Config {
max_connections: 1_000_000,
max_connections_per_ip: 1_000,
max_in_flight_requests_per_connection: 1_000,
pending_response_buffer: 100,
}
}
}
pub fn new<Req, Resp>(config: Config) -> Server<Req, Resp> {
Server {
config,
ghost: PhantomData,
}
}
impl<Req, Resp> Server<Req, Resp> {
pub fn config(&self) -> &Config {
&self.config
}
pub fn incoming<S, T>(
self,
listener: S,
) -> impl Stream<Item = io::Result<Channel<Req, Resp, T>>>
where
Req: Send,
Resp: Send,
S: Stream<Item = io::Result<T>>,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
{
self::filter::ConnectionFilter::filter(listener, self.config.clone())
}
}
#[derive(Debug)]
pub struct Running<S, F> {
incoming: S,
request_handler: F,
}
impl<S, F> Running<S, F> {
unsafe_pinned!(incoming: S);
unsafe_unpinned!(request_handler: F);
}
impl<S, T, Req, Resp, F, Fut> Future for Running<S, F>
where
S: Sized + Stream<Item = io::Result<Channel<Req, Resp, T>>>,
Req: Send + 'static,
Resp: Send + 'static,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send + 'static,
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
while let Some(channel) = ready!(self.as_mut().incoming().poll_next(cx)) {
match channel {
Ok(channel) => {
let peer = channel.client_addr;
if let Err(e) =
crate::spawn(channel.respond_with(self.as_mut().request_handler().clone()))
{
warn!("[{}] Failed to spawn connection handler: {:?}", peer, e);
}
}
Err(e) => {
warn!("Incoming connection error: {}", e);
}
}
}
info!("Server shutting down.");
Poll::Ready(())
}
}
pub trait Handler<T, Req, Resp>
where
Self: Sized + Stream<Item = io::Result<Channel<Req, Resp, T>>>,
Req: Send,
Resp: Send,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
{
fn respond_with<F, Fut>(self, request_handler: F) -> Running<Self, F>
where
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{
Running {
incoming: self,
request_handler,
}
}
}
impl<T, Req, Resp, S> Handler<T, Req, Resp> for S
where
S: Sized + Stream<Item = io::Result<Channel<Req, Resp, T>>>,
Req: Send,
Resp: Send,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
{
}
#[derive(Debug)]
pub struct Channel<Req, Resp, T> {
transport: Fuse<T>,
closed_connections: mpsc::UnboundedSender<SocketAddr>,
config: Config,
client_addr: SocketAddr,
ghost: PhantomData<(Req, Resp)>,
}
impl<Req, Resp, T> Drop for Channel<Req, Resp, T> {
fn drop(&mut self) {
trace!("[{}] Closing channel.", self.client_addr);
if self
.closed_connections
.unbounded_send(self.client_addr)
.is_err()
{
warn!(
"[{}] Failed to send closed connection message.",
self.client_addr
);
}
}
}
impl<Req, Resp, T> Channel<Req, Resp, T> {
unsafe_pinned!(transport: Fuse<T>);
}
impl<Req, Resp, T> Channel<Req, Resp, T>
where
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
Req: Send,
Resp: Send,
{
pub(crate) fn start_send(mut self: Pin<&mut Self>, response: Response<Resp>) -> io::Result<()> {
self.as_mut().transport().start_send(response)
}
pub(crate) fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self.as_mut().transport().poll_ready(cx)
}
pub(crate) fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self.as_mut().transport().poll_flush(cx)
}
pub(crate) fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> PollIo<ClientMessage<Req>> {
self.as_mut().transport().poll_next(cx)
}
pub fn client_addr(&self) -> &SocketAddr {
&self.client_addr
}
pub fn respond_with<F, Fut>(self, f: F) -> impl Future<Output = ()>
where
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
Req: 'static,
Resp: 'static,
{
let (responses_tx, responses) = mpsc::channel(self.config.pending_response_buffer);
let responses = responses.fuse();
let peer = self.client_addr;
ClientHandler {
channel: self,
f,
pending_responses: responses,
responses_tx,
in_flight_requests: FnvHashMap::default(),
}
.unwrap_or_else(move |e| {
info!("[{}] ClientHandler errored out: {}", peer, e);
})
}
}
#[derive(Debug)]
struct ClientHandler<Req, Resp, T, F> {
channel: Channel<Req, Resp, T>,
pending_responses: Fuse<mpsc::Receiver<(context::Context, Response<Resp>)>>,
responses_tx: mpsc::Sender<(context::Context, Response<Resp>)>,
in_flight_requests: FnvHashMap<u64, AbortHandle>,
f: F,
}
impl<Req, Resp, T, F> ClientHandler<Req, Resp, T, F> {
unsafe_pinned!(channel: Channel<Req, Resp, T>);
unsafe_pinned!(in_flight_requests: FnvHashMap<u64, AbortHandle>);
unsafe_pinned!(pending_responses: Fuse<mpsc::Receiver<(context::Context, Response<Resp>)>>);
unsafe_pinned!(responses_tx: mpsc::Sender<(context::Context, Response<Resp>)>);
unsafe_unpinned!(f: F);
}
impl<Req, Resp, T, F, Fut> ClientHandler<Req, Resp, T, F>
where
Req: Send + 'static,
Resp: Send + 'static,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{
fn poll_ready_if_throttling(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
if self.in_flight_requests.len()
>= self.channel.config.max_in_flight_requests_per_connection
{
let peer = self.as_mut().channel().client_addr;
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
info!(
"[{}] In-flight requests at max ({}), and transport is not ready.",
peer,
self.as_mut().in_flight_requests().len(),
);
try_ready!(self.as_mut().channel().poll_flush(cx));
}
}
Poll::Ready(Ok(()))
}
fn pump_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollIo<()> {
ready!(self.as_mut().poll_ready_if_throttling(cx)?);
Poll::Ready(match ready!(self.as_mut().channel().poll_next(cx)?) {
Some(message) => {
match message.message {
ClientMessageKind::Request(request) => {
self.handle_request(message.trace_context, request)?;
}
ClientMessageKind::Cancel { request_id } => {
self.cancel_request(&message.trace_context, request_id);
}
}
Some(Ok(()))
}
None => {
trace!("[{}] Read half closed", self.channel.client_addr);
None
}
})
}
fn pump_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
read_half_closed: bool,
) -> PollIo<()> {
match self.as_mut().poll_next_response(cx)? {
Poll::Ready(Some((_, response))) => {
self.as_mut().channel().start_send(response)?;
Poll::Ready(Some(Ok(())))
}
Poll::Ready(None) => {
ready!(self.as_mut().channel().poll_flush(cx)?);
Poll::Ready(None)
}
Poll::Pending => {
ready!(self.as_mut().channel().poll_flush(cx)?);
if read_half_closed && self.as_mut().in_flight_requests().is_empty() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
}
fn poll_next_response(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> PollIo<(context::Context, Response<Resp>)> {
while let Poll::Pending = self.as_mut().channel().poll_ready(cx)? {
ready!(self.as_mut().channel().poll_flush(cx)?);
}
let peer = self.as_mut().channel().client_addr;
match ready!(self.as_mut().pending_responses().poll_next(cx)) {
Some((ctx, response)) => {
if self
.as_mut()
.in_flight_requests()
.remove(&response.request_id)
.is_some()
{
self.as_mut().in_flight_requests().compact(0.1);
}
trace!(
"[{}/{}] Staging response. In-flight requests = {}.",
ctx.trace_id(),
peer,
self.as_mut().in_flight_requests().len(),
);
Poll::Ready(Some(Ok((ctx, response))))
}
None => {
trace!("[{}] No new responses.", peer);
Poll::Ready(None)
}
}
}
fn handle_request(
mut self: Pin<&mut Self>,
trace_context: trace::Context,
request: Request<Req>,
) -> io::Result<()> {
let request_id = request.id;
let peer = self.as_mut().channel().client_addr;
let ctx = context::Context {
deadline: request.deadline,
trace_context,
};
let request = request.message;
if self.as_mut().in_flight_requests().len()
>= self
.as_mut()
.channel()
.config
.max_in_flight_requests_per_connection
{
debug!(
"[{}/{}] Client has reached in-flight request limit ({}/{}).",
ctx.trace_id(),
peer,
self.as_mut().in_flight_requests().len(),
self.as_mut()
.channel()
.config
.max_in_flight_requests_per_connection
);
self.as_mut().channel().start_send(Response {
request_id,
message: Err(ServerError {
kind: io::ErrorKind::WouldBlock,
detail: Some("Server throttled the request.".into()),
}),
})?;
return Ok(());
}
let deadline = ctx.deadline;
let timeout = deadline.as_duration();
trace!(
"[{}/{}] Received request with deadline {} (timeout {:?}).",
ctx.trace_id(),
peer,
format_rfc3339(deadline),
timeout,
);
let mut response_tx = self.as_mut().responses_tx().clone();
let trace_id = *ctx.trace_id();
let response = self.as_mut().f().clone()(ctx, request);
let response = deadline_compat::Deadline::new(response, Instant::now() + timeout).then(
async move |result| {
let response = Response {
request_id,
message: match result {
Ok(message) => Ok(message),
Err(e) => Err(make_server_error(e, trace_id, peer, deadline)),
},
};
trace!("[{}/{}] Sending response.", trace_id, peer);
await!(response_tx.send((ctx, response)).unwrap_or_else(|_| ()));
},
);
let (abortable_response, abort_handle) = abortable(response);
crate::spawn(abortable_response.map(|_| ())).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!(
"Could not spawn response task. Is shutdown: {}",
e.is_shutdown()
),
)
})?;
self.as_mut()
.in_flight_requests()
.insert(request_id, abort_handle);
Ok(())
}
fn cancel_request(mut self: Pin<&mut Self>, trace_context: &trace::Context, request_id: u64) {
if let Some(cancel_handle) = self.as_mut().in_flight_requests().remove(&request_id) {
self.as_mut().in_flight_requests().compact(0.1);
cancel_handle.abort();
let remaining = self.as_mut().in_flight_requests().len();
trace!(
"[{}/{}] Request canceled. In-flight requests = {}",
trace_context.trace_id,
self.channel.client_addr,
remaining,
);
} else {
trace!(
"[{}/{}] Received cancellation, but response handler \
is already complete.",
trace_context.trace_id,
self.channel.client_addr
);
}
}
}
impl<Req, Resp, T, F, Fut> Future for ClientHandler<Req, Resp, T, F>
where
Req: Send + 'static,
Resp: Send + 'static,
T: Transport<Item = ClientMessage<Req>, SinkItem = Response<Resp>> + Send,
F: FnOnce(context::Context, Req) -> Fut + Send + 'static + Clone,
Fut: Future<Output = io::Result<Resp>> + Send + 'static,
{
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
trace!("[{}] ClientHandler::poll", self.channel.client_addr);
loop {
let read = self.as_mut().pump_read(cx)?;
match (
read,
self.as_mut().pump_write(cx, read == Poll::Ready(None))?,
) {
(Poll::Ready(None), Poll::Ready(None)) => {
info!("[{}] Client disconnected.", self.channel.client_addr);
return Poll::Ready(Ok(()));
}
(read @ Poll::Ready(Some(())), write) | (read, write @ Poll::Ready(Some(()))) => {
trace!(
"[{}] read: {:?}, write: {:?}.",
self.channel.client_addr,
read,
write
)
}
(read, write) => {
trace!(
"[{}] read: {:?}, write: {:?} (not ready).",
self.channel.client_addr,
read,
write,
);
return Poll::Pending;
}
}
}
}
}
fn make_server_error(
e: timeout::Error<io::Error>,
trace_id: TraceId,
peer: SocketAddr,
deadline: SystemTime,
) -> ServerError {
if e.is_elapsed() {
debug!(
"[{}/{}] Response did not complete before deadline of {}s.",
trace_id,
peer,
format_rfc3339(deadline)
);
ServerError {
kind: io::ErrorKind::TimedOut,
detail: Some(format!(
"Response did not complete before deadline of {}s.",
format_rfc3339(deadline)
)),
}
} else if e.is_timer() {
error!(
"[{}/{}] Response failed because of an issue with a timer: {}",
trace_id, peer, e
);
ServerError {
kind: io::ErrorKind::Other,
detail: Some(format!("{}", e)),
}
} else if e.is_inner() {
let e = e.into_inner().unwrap();
ServerError {
kind: e.kind(),
detail: Some(e.description().into()),
}
} else {
error!("[{}/{}] Unexpected response failure: {}", trace_id, peer, e);
ServerError {
kind: io::ErrorKind::Other,
detail: Some(format!("Server unexpectedly failed to respond: {}", e)),
}
}
}