pub(crate) mod accept;
pub(crate) mod error_layer;
pub(crate) mod handle;
pub(crate) mod lifecycle;
pub(crate) mod options;
pub(crate) mod pipeline;
pub(crate) mod stack;
use std::{sync::Arc, time::Duration};
use tower::Service;
use crate::{Body, ConnectionEvent, IrohEndpoint};
use self::accept::{accept_loop, AcceptConfig};
use self::options::{
DEFAULT_CONCURRENCY, DEFAULT_DRAIN_TIMEOUT_MS, DEFAULT_MAX_CONNECTIONS_PER_PEER,
DEFAULT_MAX_REQUEST_BODY_BYTES, DEFAULT_REQUEST_TIMEOUT_MS,
};
pub(crate) use self::error_layer::HandleLayerErrorLayer;
pub use self::handle::ServeHandle;
pub use self::options::ServeOptions;
pub(crate) use self::options::DEFAULT_MAX_RESPONSE_BODY_BYTES;
pub(crate) type ConnectionEventFn = Arc<dyn Fn(ConnectionEvent) + Send + Sync>;
#[derive(Clone, Debug)]
pub struct RemoteNodeId(pub Arc<String>);
pub fn serve<S>(endpoint: IrohEndpoint, options: ServeOptions, svc: S) -> ServeHandle
where
S: Service<
hyper::Request<Body>,
Response = hyper::Response<Body>,
Error = std::convert::Infallible,
> + Clone
+ Send
+ Sync
+ 'static,
S::Future: Send + 'static,
{
serve_with_events(endpoint, options, svc, None)
}
pub fn serve_with_events<S>(
endpoint: IrohEndpoint,
options: ServeOptions,
svc: S,
on_connection_event: Option<ConnectionEventFn>,
) -> ServeHandle
where
S: Service<
hyper::Request<Body>,
Response = hyper::Response<Body>,
Error = std::convert::Infallible,
> + Clone
+ Send
+ Sync
+ 'static,
S::Future: Send + 'static,
{
let cfg = AcceptConfig {
max: options.max_concurrency.unwrap_or(DEFAULT_CONCURRENCY),
request_timeout: options
.request_timeout_ms
.map(Duration::from_millis)
.unwrap_or(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MS)),
max_conns_per_peer: options
.max_connections_per_peer
.unwrap_or(DEFAULT_MAX_CONNECTIONS_PER_PEER),
max_request_body_wire_bytes: options
.max_request_body_wire_bytes
.or(Some(DEFAULT_MAX_REQUEST_BODY_BYTES)),
max_request_body_decoded_bytes: options
.max_request_body_decoded_bytes
.or(Some(DEFAULT_MAX_REQUEST_BODY_BYTES)),
max_total_connections: options.max_total_connections,
drain_timeout: Duration::from_millis(
options.drain_timeout_ms.unwrap_or(DEFAULT_DRAIN_TIMEOUT_MS),
),
load_shed_enabled: options.load_shed.unwrap_or(true),
max_header_size: endpoint.max_header_size(),
stack_compression: endpoint.compression().cloned(),
decompression: options.decompression.unwrap_or(true),
};
let shutdown_notify = Arc::new(tokio::sync::Notify::new());
let drain_dur = cfg.drain_timeout;
let (done_tx, done_rx) = tokio::sync::watch::channel(false);
let join = tokio::spawn(accept_loop(
endpoint,
cfg,
svc,
on_connection_event,
shutdown_notify.clone(),
done_tx,
));
ServeHandle {
join,
shutdown_notify,
drain_timeout: drain_dur,
done_rx,
}
}