pub mod connection;
pub mod connection_service;
pub mod listener;
pub mod net_server;
pub mod protocol;
#[cfg(feature = "quic")]
pub mod quic;
pub mod route_connection;
pub mod stream;
#[cfg(feature = "tls")]
pub mod tls;
#[cfg(feature = "tls")]
pub use tls::{CertificateStore, CertificateStoreBuilder};
mod config;
#[cfg(feature = "metrics")]
pub mod metrics;
pub use config::{ConnectionLimits, ServerConfig};
pub use route_connection::RouteConnectionService;
use crate::core::socket_addr::SocketAddr as CoreSocketAddr;
use config::set_global_server_config;
pub use connection_service::{BoxError, ConnectionFuture, ConnectionService};
use listener::{Listen, ListenersBuilder};
pub use net_server::RateLimiterConfig;
use std::net::SocketAddr;
#[cfg(not(target_os = "windows"))]
use std::path::Path;
use std::time::Duration;
type ListenCallback = Box<dyn Fn(&[CoreSocketAddr]) + Send + Sync>;
pub struct Server {
listeners_builder: ListenersBuilder,
shutdown_callback: Option<Box<dyn Fn() + Send + Sync>>,
listen_callback: Option<ListenCallback>,
rate_limiter_config: Option<RateLimiterConfig>,
graceful_shutdown_duration: Option<Duration>,
config: ServerConfig,
}
impl Default for Server {
fn default() -> Self {
Self::new()
}
}
impl Server {
pub fn new() -> Self {
Self {
listeners_builder: ListenersBuilder::new(),
shutdown_callback: None,
listen_callback: None,
rate_limiter_config: None,
graceful_shutdown_duration: None,
config: ServerConfig::default(),
}
}
#[inline]
pub fn bind(mut self, addr: SocketAddr) -> Self {
self.listeners_builder
.bind(addr)
.expect("Failed to bind to address");
self
}
#[cfg(not(target_os = "windows"))]
#[inline]
pub fn bind_unix<P: AsRef<Path>>(mut self, path: P) -> Self {
self.listeners_builder
.bind_unix(&path)
.expect("Failed to bind to Unix socket");
self
}
#[inline]
pub fn listen<T: Listen + Send + Sync + 'static>(mut self, listener: T) -> Self {
self.listeners_builder.add_listener(Box::new(listener));
self
}
pub fn set_shutdown_callback<F>(mut self, callback: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
self.shutdown_callback = Some(Box::new(callback));
self
}
pub fn on_listen<F>(mut self, callback: F) -> Self
where
F: Fn(&[CoreSocketAddr]) + Send + Sync + 'static,
{
self.listen_callback = Some(Box::new(callback));
self
}
pub fn with_rate_limiter(mut self, config: RateLimiterConfig) -> Self {
self.rate_limiter_config = Some(config);
self
}
pub fn with_shutdown(mut self, graceful_wait: Duration) -> Self {
self.graceful_shutdown_duration = Some(graceful_wait);
self
}
#[inline]
pub fn with_config(mut self, config: ServerConfig) -> Self {
self.config = config;
self
}
#[inline]
pub fn with_connection_limits(mut self, limits: ConnectionLimits) -> Self {
self.config.connection_limits = limits;
self
}
pub async fn serve<H>(self, handler: H)
where
H: ConnectionService + Clone,
{
set_global_server_config(self.config.clone());
let mut net_server = net_server::NetServer::from_parts(
self.listeners_builder,
self.shutdown_callback,
self.listen_callback,
self.config.clone(),
);
if let Some(config) = self.rate_limiter_config {
net_server = net_server.with_rate_limiter(config);
}
if let Some(duration) = self.graceful_shutdown_duration {
net_server = net_server.with_shutdown(duration);
}
net_server.serve(handler).await
}
pub fn run<H>(self, handler: H)
where
H: ConnectionService + Clone,
{
set_global_server_config(self.config.clone());
let mut net_server = net_server::NetServer::from_parts(
self.listeners_builder,
self.shutdown_callback,
self.listen_callback,
self.config.clone(),
);
if let Some(config) = self.rate_limiter_config {
net_server = net_server.with_rate_limiter(config);
}
if let Some(duration) = self.graceful_shutdown_duration {
net_server = net_server.with_shutdown(duration);
}
net_server.run(handler)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_server_new() {
let server = Server::new();
let _ = server.listeners_builder;
}
#[test]
fn test_server_default() {
let server = Server::default();
let _ = server.listeners_builder;
}
#[tokio::test]
async fn test_server_bind() {
let server = Server::new().bind("127.0.0.1:0".parse().unwrap());
let _ = server.listeners_builder;
}
#[tokio::test]
async fn test_server_bind_multiple() {
let server = Server::new()
.bind("127.0.0.1:0".parse().unwrap())
.bind("127.0.0.1:0".parse().unwrap());
let _ = server.listeners_builder;
}
#[cfg(not(target_os = "windows"))]
#[test]
fn test_server_bind_unix_type_check() {
use std::path::Path;
fn assert_bind_unix<T: AsRef<Path>>() {}
assert_bind_unix::<std::path::PathBuf>();
assert_bind_unix::<&str>();
}
#[test]
fn test_server_listen() {
use crate::server::listener::Listen;
fn assert_listen<T: Listen + Send + Sync + 'static>() {}
assert_listen::<crate::server::listener::Listener>();
}
#[test]
fn test_server_set_shutdown_callback() {
let callback_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let callback_called_clone = callback_called.clone();
let _server = Server::new().set_shutdown_callback(move || {
callback_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
});
}
#[test]
fn test_server_on_listen() {
let _server = Server::new().on_listen(|addrs| {
assert!(!addrs.is_empty() || addrs.is_empty()); });
}
#[test]
fn test_server_with_rate_limiter() {
let config = RateLimiterConfig {
capacity: 100,
refill_every: Duration::from_millis(100),
max_wait: Duration::from_secs(5),
};
let server = Server::new().with_rate_limiter(config);
assert!(server.rate_limiter_config.is_some());
}
#[test]
fn test_server_with_shutdown() {
let duration = Duration::from_secs(30);
let server = Server::new().with_shutdown(duration);
assert_eq!(server.graceful_shutdown_duration, Some(duration));
}
#[test]
fn test_server_with_config() {
let config = ServerConfig::default();
let server = Server::new().with_config(config.clone());
assert_eq!(server.config.connection_limits.handler_timeout, None);
assert_eq!(server.config.connection_limits.max_body_size, None);
}
#[test]
fn test_server_with_connection_limits() {
let limits = ConnectionLimits {
handler_timeout: Some(Duration::from_secs(30)),
max_body_size: Some(1024 * 1024),
h3_read_timeout: None,
max_webtransport_frame_size: None,
webtransport_read_timeout: None,
max_webtransport_sessions: None,
webtransport_datagram_max_size: None,
webtransport_datagram_rate: None,
webtransport_datagram_drop_metric: false,
h3_chunk_size: None,
h3_yield_bytes: None,
};
let server = Server::new().with_connection_limits(limits);
assert_eq!(
server.config.connection_limits.handler_timeout,
Some(Duration::from_secs(30))
);
assert_eq!(
server.config.connection_limits.max_body_size,
Some(1024 * 1024)
);
}
#[tokio::test]
async fn test_server_builder_chain() {
let server = Server::new()
.bind("127.0.0.1:0".parse().unwrap())
.on_listen(|_addrs| {})
.with_rate_limiter(RateLimiterConfig {
capacity: 1,
refill_every: Duration::from_millis(10),
max_wait: Duration::from_millis(10),
})
.with_shutdown(Duration::from_millis(1));
assert!(server.rate_limiter_config.is_some());
assert!(server.graceful_shutdown_duration.is_some());
assert!(server.listen_callback.is_some());
}
#[tokio::test]
async fn test_server_full_builder_chain() {
let limits = ConnectionLimits {
handler_timeout: Some(Duration::from_secs(60)),
max_body_size: Some(512 * 1024),
h3_read_timeout: None,
max_webtransport_frame_size: None,
webtransport_read_timeout: None,
max_webtransport_sessions: None,
webtransport_datagram_max_size: None,
webtransport_datagram_rate: None,
webtransport_datagram_drop_metric: false,
h3_chunk_size: None,
h3_yield_bytes: None,
};
let server = Server::new()
.bind("127.0.0.1:0".parse().unwrap())
.bind("127.0.0.1:0".parse().unwrap())
.set_shutdown_callback(|| {})
.on_listen(|_addrs| {})
.with_rate_limiter(RateLimiterConfig {
capacity: 10,
refill_every: Duration::from_millis(100),
max_wait: Duration::from_secs(2),
})
.with_shutdown(Duration::from_secs(30))
.with_connection_limits(limits);
assert!(server.shutdown_callback.is_some());
assert!(server.listen_callback.is_some());
assert!(server.rate_limiter_config.is_some());
assert!(server.graceful_shutdown_duration.is_some());
assert_eq!(
server.config.connection_limits.handler_timeout,
Some(Duration::from_secs(60))
);
}
#[test]
fn test_server_config_default() {
let config = ServerConfig::default();
assert_eq!(config.connection_limits.handler_timeout, None);
assert_eq!(config.connection_limits.max_body_size, None);
assert_eq!(config.connection_limits.h3_read_timeout, None);
}
#[test]
fn test_server_config_with_limits() {
let limits = ConnectionLimits {
handler_timeout: Some(Duration::from_secs(120)),
max_body_size: Some(2048 * 1024),
h3_read_timeout: Some(Duration::from_secs(30)),
max_webtransport_frame_size: None,
webtransport_read_timeout: None,
max_webtransport_sessions: None,
webtransport_datagram_max_size: None,
webtransport_datagram_rate: None,
webtransport_datagram_drop_metric: false,
h3_chunk_size: None,
h3_yield_bytes: None,
};
let config = ServerConfig {
connection_limits: limits.clone(),
..Default::default()
};
assert_eq!(
config.connection_limits.handler_timeout,
Some(Duration::from_secs(120))
);
assert_eq!(config.connection_limits.max_body_size, Some(2048 * 1024));
assert_eq!(
config.connection_limits.h3_read_timeout,
Some(Duration::from_secs(30))
);
}
#[test]
fn test_server_config_clone() {
let config = ServerConfig::default();
let config_clone = config.clone();
assert_eq!(
config.connection_limits.handler_timeout,
config_clone.connection_limits.handler_timeout
);
assert_eq!(
config.connection_limits.max_body_size,
config_clone.connection_limits.max_body_size
);
}
#[test]
fn test_rate_limiter_config() {
let config = RateLimiterConfig {
capacity: 1000,
refill_every: Duration::from_millis(50),
max_wait: Duration::from_secs(10),
};
assert_eq!(config.capacity, 1000);
assert_eq!(config.refill_every, Duration::from_millis(50));
assert_eq!(config.max_wait, Duration::from_secs(10));
}
#[test]
fn test_rate_limiter_config_copy() {
let config = RateLimiterConfig {
capacity: 500,
refill_every: Duration::from_millis(100),
max_wait: Duration::from_secs(5),
};
let config_copy = config;
assert_eq!(config.capacity, config_copy.capacity);
assert_eq!(config.refill_every, config_copy.refill_every);
assert_eq!(config.max_wait, config_copy.max_wait);
}
#[test]
fn test_connection_limits_default() {
let limits = ConnectionLimits::default();
assert_eq!(limits.handler_timeout, None);
assert_eq!(limits.max_body_size, None);
assert_eq!(limits.h3_read_timeout, None);
}
#[test]
fn test_connection_limits_custom() {
let limits = ConnectionLimits {
handler_timeout: Some(Duration::from_secs(30)),
max_body_size: Some(1024 * 1024),
h3_read_timeout: Some(Duration::from_secs(20)),
max_webtransport_frame_size: Some(4096),
webtransport_read_timeout: None,
max_webtransport_sessions: Some(10),
webtransport_datagram_max_size: None,
webtransport_datagram_rate: None,
webtransport_datagram_drop_metric: true,
h3_chunk_size: None,
h3_yield_bytes: None,
};
assert_eq!(limits.handler_timeout, Some(Duration::from_secs(30)));
assert_eq!(limits.max_body_size, Some(1024 * 1024));
assert_eq!(limits.h3_read_timeout, Some(Duration::from_secs(20)));
assert_eq!(limits.max_webtransport_frame_size, Some(4096));
assert_eq!(limits.max_webtransport_sessions, Some(10));
assert!(limits.webtransport_datagram_drop_metric);
}
#[test]
fn test_connection_limits_no_timeout() {
let limits = ConnectionLimits {
handler_timeout: None,
max_body_size: Some(512 * 1024),
h3_read_timeout: None,
max_webtransport_frame_size: None,
webtransport_read_timeout: None,
max_webtransport_sessions: None,
webtransport_datagram_max_size: None,
webtransport_datagram_rate: None,
webtransport_datagram_drop_metric: false,
h3_chunk_size: None,
h3_yield_bytes: None,
};
assert_eq!(limits.handler_timeout, None);
assert_eq!(limits.max_body_size, Some(512 * 1024));
}
#[test]
fn test_duration_values() {
let millis = Duration::from_millis(100);
let secs = Duration::from_secs(1);
let zero = Duration::ZERO;
assert_eq!(millis.as_millis(), 100);
assert_eq!(secs.as_secs(), 1);
assert_eq!(zero.as_secs(), 0);
}
#[test]
fn test_socket_addr_parsing() {
let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let addr2: SocketAddr = "[::1]:8080".parse().unwrap();
assert_eq!(addr1.port(), 8080);
assert_eq!(addr2.port(), 8080);
assert!(addr1.is_ipv4());
assert!(addr2.is_ipv6());
}
#[test]
fn test_socket_addr_any_port() {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
assert_eq!(addr.port(), 0);
}
#[test]
fn test_global_server_config_default() {
use config::{global_server_config, set_global_server_config};
set_global_server_config(ServerConfig::default());
let config = global_server_config();
assert_eq!(config.connection_limits.handler_timeout, None);
assert_eq!(config.connection_limits.max_body_size, None);
}
#[test]
fn test_set_global_server_config() {
use config::{global_server_config, set_global_server_config};
let custom_config = ServerConfig {
connection_limits: ConnectionLimits {
handler_timeout: Some(Duration::from_secs(100)),
max_body_size: Some(2048),
h3_read_timeout: Some(Duration::from_secs(50)),
max_webtransport_frame_size: None,
webtransport_read_timeout: None,
max_webtransport_sessions: None,
webtransport_datagram_max_size: None,
webtransport_datagram_rate: None,
webtransport_datagram_drop_metric: false,
h3_chunk_size: None,
h3_yield_bytes: None,
},
..Default::default()
};
set_global_server_config(custom_config.clone());
let config = global_server_config();
assert_eq!(
config.connection_limits.handler_timeout,
Some(Duration::from_secs(100))
);
assert_eq!(config.connection_limits.max_body_size, Some(2048));
}
#[test]
fn test_connection_service_trait_bound() {
use crate::server::ConnectionService;
fn assert_connection_service<T: ConnectionService + Clone>() {}
assert_connection_service::<DummyConnectionService>();
}
#[derive(Clone)]
struct DummyConnectionService;
impl ConnectionService for DummyConnectionService {
fn call(
&self,
_stream: crate::server::connection::BoxedConnection,
_remote_addr: crate::core::socket_addr::SocketAddr,
) -> crate::server::connection_service::ConnectionFuture {
Box::pin(async { Ok(()) })
}
}
#[test]
fn test_listen_callback_type() {
use std::sync::Arc;
let callback_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
let callback_called_clone = callback_called.clone();
let _callback: ListenCallback = Box::new(move |addrs| {
let _ = addrs.len();
callback_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
});
}
#[test]
fn test_server_config_combinations() {
let config1 = ServerConfig {
connection_limits: ConnectionLimits {
handler_timeout: Some(Duration::from_secs(10)),
..Default::default()
},
..Default::default()
};
let config2 = ServerConfig {
connection_limits: ConnectionLimits {
max_body_size: Some(1024),
..Default::default()
},
..Default::default()
};
assert_eq!(
config1.connection_limits.handler_timeout,
Some(Duration::from_secs(10))
);
assert_eq!(config1.connection_limits.max_body_size, None);
assert_eq!(config2.connection_limits.handler_timeout, None);
assert_eq!(config2.connection_limits.max_body_size, Some(1024));
}
#[test]
fn test_connection_limits_all_fields() {
let limits = ConnectionLimits {
handler_timeout: Some(Duration::from_secs(60)),
max_body_size: Some(1048576),
h3_read_timeout: Some(Duration::from_secs(30)),
max_webtransport_frame_size: Some(16384),
webtransport_read_timeout: Some(Duration::from_secs(20)),
max_webtransport_sessions: Some(100),
webtransport_datagram_max_size: Some(1350),
webtransport_datagram_rate: Some(1000),
webtransport_datagram_drop_metric: true,
h3_chunk_size: None,
h3_yield_bytes: None,
};
assert_eq!(limits.handler_timeout, Some(Duration::from_secs(60)));
assert_eq!(limits.max_body_size, Some(1048576));
assert_eq!(limits.h3_read_timeout, Some(Duration::from_secs(30)));
assert_eq!(limits.max_webtransport_frame_size, Some(16384));
assert_eq!(
limits.webtransport_read_timeout,
Some(Duration::from_secs(20))
);
assert_eq!(limits.max_webtransport_sessions, Some(100));
assert_eq!(limits.webtransport_datagram_max_size, Some(1350));
assert_eq!(limits.webtransport_datagram_rate, Some(1000));
assert!(limits.webtransport_datagram_drop_metric);
}
#[tokio::test]
async fn test_server_netserver_parts_construction() {
let server = Server::new()
.bind("127.0.0.1:0".parse().unwrap())
.with_shutdown(Duration::from_secs(10));
assert_eq!(
server.graceful_shutdown_duration,
Some(Duration::from_secs(10))
);
}
#[test]
fn test_server_zero_duration_shutdown() {
let server = Server::new().with_shutdown(Duration::ZERO);
assert_eq!(server.graceful_shutdown_duration, Some(Duration::ZERO));
}
#[test]
fn test_server_large_duration() {
let large_duration = Duration::from_secs(3600 * 24); let server = Server::new().with_shutdown(large_duration);
assert_eq!(server.graceful_shutdown_duration, Some(large_duration));
}
#[test]
fn test_rate_limiter_zero_capacity() {
let config = RateLimiterConfig {
capacity: 0,
refill_every: Duration::from_millis(100),
max_wait: Duration::from_secs(1),
};
let server = Server::new().with_rate_limiter(config);
assert!(server.rate_limiter_config.is_some());
assert_eq!(server.rate_limiter_config.as_ref().unwrap().capacity, 0);
}
#[test]
fn test_rate_limiter_zero_refill() {
let config = RateLimiterConfig {
capacity: 100,
refill_every: Duration::ZERO,
max_wait: Duration::from_secs(1),
};
let server = Server::new().with_rate_limiter(config);
assert!(server.rate_limiter_config.is_some());
assert_eq!(
server.rate_limiter_config.as_ref().unwrap().refill_every,
Duration::ZERO
);
}
#[test]
fn test_connection_limits_zero_values() {
let limits = ConnectionLimits {
handler_timeout: Some(Duration::ZERO),
max_body_size: Some(0),
h3_read_timeout: Some(Duration::ZERO),
max_webtransport_frame_size: Some(0),
webtransport_read_timeout: Some(Duration::ZERO),
max_webtransport_sessions: Some(0),
webtransport_datagram_max_size: Some(0),
webtransport_datagram_rate: Some(0),
webtransport_datagram_drop_metric: false,
h3_chunk_size: None,
h3_yield_bytes: None,
};
let server = Server::new().with_connection_limits(limits);
assert_eq!(
server.config.connection_limits.handler_timeout,
Some(Duration::ZERO)
);
assert_eq!(server.config.connection_limits.max_body_size, Some(0));
}
}