use crate::{Request, Response, Result};
use dashmap::DashMap;
use parking_lot::{Mutex, RwLock};
use socket2::{Domain, Protocol, Socket, Type};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::net::{TcpListener, TcpStream};
use tracing::{info, warn};
#[derive(Debug, Clone)]
pub struct PerformanceConfig {
pub reuse_port: bool,
pub tcp_nodelay: bool,
pub reuse_addr: bool,
pub keep_alive: Option<Duration>,
pub send_buffer_size: Option<usize>,
pub recv_buffer_size: Option<usize>,
pub backlog: u32,
pub cpu_affinity: bool,
pub worker_threads: usize,
pub fast_path: bool,
pub zero_copy: bool,
}
impl Default for PerformanceConfig {
fn default() -> Self {
Self {
reuse_port: true,
tcp_nodelay: true,
reuse_addr: true,
keep_alive: Some(Duration::from_secs(60)),
send_buffer_size: Some(256 * 1024), recv_buffer_size: Some(256 * 1024), backlog: 8192,
cpu_affinity: true,
worker_threads: 0, fast_path: true,
zero_copy: true,
}
}
}
impl PerformanceConfig {
pub fn max_rps() -> Self {
Self {
reuse_port: true,
tcp_nodelay: true,
reuse_addr: true,
keep_alive: Some(Duration::from_secs(30)),
send_buffer_size: Some(512 * 1024), recv_buffer_size: Some(512 * 1024), backlog: 16384,
cpu_affinity: true,
worker_threads: num_cpus::get() * 2,
fast_path: true,
zero_copy: true,
}
}
pub fn high_throughput_api() -> Self {
Self {
tcp_nodelay: true,
keep_alive: Some(Duration::from_secs(120)),
send_buffer_size: Some(1024 * 1024), recv_buffer_size: Some(256 * 1024), backlog: 8192,
fast_path: true,
zero_copy: true,
..Self::default()
}
}
pub fn memory_constrained() -> Self {
Self {
reuse_port: false, tcp_nodelay: true, keep_alive: Some(Duration::from_secs(300)), send_buffer_size: Some(64 * 1024), recv_buffer_size: Some(32 * 1024), backlog: 1024, cpu_affinity: false, worker_threads: 2, fast_path: false, zero_copy: false, reuse_addr: true,
}
}
}
#[derive(Debug)]
pub struct PerformanceMetrics {
pub requests_total: AtomicU64,
pub requests_per_second: AtomicU64,
pub active_connections: AtomicUsize,
pub response_times: RwLock<Vec<Duration>>,
pub memory_usage: AtomicUsize,
pub cpu_usage: AtomicU64,
pub error_count: AtomicU64,
pub last_update: Mutex<Instant>,
}
impl PerformanceMetrics {
pub fn new() -> Arc<Self> {
Arc::new(Self {
requests_total: AtomicU64::new(0),
requests_per_second: AtomicU64::new(0),
active_connections: AtomicUsize::new(0),
response_times: RwLock::new(Vec::with_capacity(10000)),
memory_usage: AtomicUsize::new(0),
cpu_usage: AtomicU64::new(0),
error_count: AtomicU64::new(0),
last_update: Mutex::new(Instant::now()),
})
}
pub fn record_request(&self, response_time: Duration) {
self.requests_total.fetch_add(1, Ordering::Relaxed);
let mut times = self.response_times.write();
if times.len() >= 10000 {
times.drain(0..1000); }
times.push(response_time);
}
pub fn current_rps(&self) -> u64 {
self.requests_per_second.load(Ordering::Relaxed)
}
pub fn avg_response_time(&self) -> Duration {
let times = self.response_times.read();
if times.is_empty() {
return Duration::from_millis(0);
}
let total: Duration = times.iter().sum();
total / times.len() as u32
}
pub fn p95_response_time(&self) -> Duration {
let mut times = self.response_times.read().clone();
if times.is_empty() {
return Duration::from_millis(0);
}
times.sort();
let index = (times.len() as f64 * 0.95) as usize;
times[index.min(times.len() - 1)]
}
pub fn p99_response_time(&self) -> Duration {
let mut times = self.response_times.read().clone();
if times.is_empty() {
return Duration::from_millis(0);
}
times.sort();
let index = (times.len() as f64 * 0.99) as usize;
times[index.min(times.len() - 1)]
}
pub fn total_requests(&self) -> u64 {
self.requests_total.load(Ordering::Relaxed)
}
pub fn active_connections(&self) -> usize {
self.active_connections.load(Ordering::Relaxed)
}
pub fn error_count(&self) -> u64 {
self.error_count.load(Ordering::Relaxed)
}
pub fn error_rate(&self) -> f64 {
let total = self.total_requests();
if total == 0 {
return 0.0;
}
let errors = self.error_count();
(errors as f64 / total as f64) * 100.0
}
pub fn record_error(&self) {
self.error_count.fetch_add(1, Ordering::Relaxed);
}
pub fn reset(&self) {
self.requests_total.store(0, Ordering::Relaxed);
self.requests_per_second.store(0, Ordering::Relaxed);
self.active_connections.store(0, Ordering::Relaxed);
self.error_count.store(0, Ordering::Relaxed);
self.response_times.write().clear();
*self.last_update.lock() = Instant::now();
}
}
pub struct OptimizedTcpListener {
listener: TcpListener,
config: PerformanceConfig,
metrics: Arc<PerformanceMetrics>,
}
impl OptimizedTcpListener {
pub async fn bind(addr: SocketAddr, config: PerformanceConfig) -> Result<Self> {
let socket = create_optimized_socket(&addr, &config)?;
socket.bind(&addr.into())?;
socket.listen(config.backlog as i32)?;
socket.set_nonblocking(true)?;
let std_listener = std::net::TcpListener::from(socket);
let listener = TcpListener::from_std(std_listener)?;
info!(
"Optimized TCP listener bound to {} with config: {:?}",
addr, config
);
Ok(Self {
listener,
config,
metrics: PerformanceMetrics::new(),
})
}
pub async fn accept(&self) -> Result<(TcpStream, SocketAddr)> {
let (stream, addr) = self.listener.accept().await?;
if let Err(e) = optimize_connection(&stream, &self.config).await {
warn!("Failed to optimize connection from {}: {}", addr, e);
}
self.metrics
.active_connections
.fetch_add(1, Ordering::Relaxed);
Ok((stream, addr))
}
pub fn metrics(&self) -> Arc<PerformanceMetrics> {
Arc::clone(&self.metrics)
}
pub fn local_addr(&self) -> Result<SocketAddr> {
self.listener.local_addr().map_err(Into::into)
}
}
fn create_optimized_socket(addr: &SocketAddr, config: &PerformanceConfig) -> Result<Socket> {
let domain = match addr {
SocketAddr::V4(_) => Domain::IPV4,
SocketAddr::V6(_) => Domain::IPV6,
};
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
socket.set_reuse_address(config.reuse_addr)?;
socket.set_reuse_port(config.reuse_port)?;
socket.set_nodelay(config.tcp_nodelay)?;
if let Some(keep_alive) = config.keep_alive {
socket.set_keepalive(true)?;
socket.set_tcp_keepalive(&socket2::TcpKeepalive::new().with_time(keep_alive))?;
}
if let Some(send_size) = config.send_buffer_size {
socket.set_send_buffer_size(send_size)?;
}
if let Some(recv_size) = config.recv_buffer_size {
socket.set_recv_buffer_size(recv_size)?;
}
Ok(socket)
}
async fn optimize_connection(stream: &TcpStream, config: &PerformanceConfig) -> Result<()> {
use socket2::Socket;
use std::os::fd::{AsRawFd, FromRawFd};
stream.set_nodelay(config.tcp_nodelay)?;
if let Some(keep_alive) = config.keep_alive {
let raw_fd = stream.as_raw_fd();
let socket = unsafe { Socket::from_raw_fd(raw_fd) };
socket.set_keepalive(true)?;
socket.set_tcp_keepalive(&socket2::TcpKeepalive::new().with_time(keep_alive))?;
std::mem::forget(socket);
}
Ok(())
}
pub struct FastPathProcessor {
router: Arc<crate::Router>,
metrics: Arc<PerformanceMetrics>,
cache: DashMap<String, Arc<Response>>,
}
impl FastPathProcessor {
pub fn new(router: Arc<crate::Router>, metrics: Arc<PerformanceMetrics>) -> Self {
Self {
router,
metrics,
cache: DashMap::with_capacity(1000),
}
}
pub async fn process(&self, request: Request) -> Result<Response> {
let start = Instant::now();
let method = request.method.clone();
let path = request.uri.path().to_string();
if method == http::Method::GET {
let cache_key = request.uri.path();
if let Some(cached) = self.cache.get(cache_key) {
self.metrics.record_request(start.elapsed());
return Ok((**cached).clone());
}
}
let response = self.router.handle(request).await?;
if method == http::Method::GET && response.is_cacheable() {
let cache_key = response.cache_key(&path);
self.cache.insert(cache_key, Arc::new(response.clone()));
}
self.metrics.record_request(start.elapsed());
Ok(response)
}
pub fn clear_cache(&self) {
self.cache.clear();
}
}