use std::cell::Cell;
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
static CONNECTION_COUNTER: AtomicU64 = AtomicU64::new(1);
pub struct ConnectionMetrics {
pub id: u64,
pub start_time: Instant,
gauge: Rc<Cell<usize>>,
closed: bool,
}
impl ConnectionMetrics {
pub fn new(gauge: Rc<Cell<usize>>) -> Self {
let id = CONNECTION_COUNTER.fetch_add(1, Ordering::Relaxed);
let current = gauge.get();
gauge.set(current + 1);
tracing::debug!(
connection.id = id,
connection.active = current + 1,
"connection accepted"
);
Self {
id,
start_time: Instant::now(),
gauge,
closed: false,
}
}
pub fn close(mut self) -> std::time::Duration {
let duration = self.start_time.elapsed();
let current = self.gauge.get();
if current > 0 {
self.gauge.set(current - 1);
}
self.closed = true;
tracing::debug!(
connection.id = self.id,
connection.duration_ms = duration.as_millis() as u64,
connection.active = self.gauge.get(),
"connection closed"
);
duration
}
}
impl Drop for ConnectionMetrics {
fn drop(&mut self) {
if !self.closed {
let current = self.gauge.get();
if current > 0 {
self.gauge.set(current - 1);
}
}
}
}
pub fn record_server_start(addr: std::net::SocketAddr, config: &super::ServerConfig) {
let io_driver = super::kernel_check::detect_io_driver();
tracing::info!(
server.addr = %addr,
server.io_driver = %io_driver,
server.max_connections = config.max_connections,
server.max_h2_streams = config.max_h2_streams,
server.workers = config.workers,
server.header_read_timeout_ms = config.header_read_timeout.map(|d| d.as_millis() as u64),
server.body_read_timeout_ms = config.body_read_timeout.map(|d| d.as_millis() as u64),
server.connection_timeout_ms = config.connection_timeout.map(|d| d.as_millis() as u64),
server.drain_timeout_ms = config.drain_timeout.as_millis() as u64,
"harrow-monoio server starting"
);
if io_driver == super::kernel_check::IoDriver::Epoll {
tracing::warn!(
"io_uring unavailable — falling back to epoll. For io_uring, run with --security-opt seccomp=unconfined or a custom seccomp profile."
);
}
}
pub fn record_server_shutdown() {
tracing::info!("harrow-monoio server shutting down");
}
pub fn record_drain_complete(active_connections: usize) {
if active_connections == 0 {
tracing::info!("all connections drained successfully");
} else {
tracing::warn!(
connections.still_active = active_connections,
"drain incomplete, connections still active"
);
}
}
pub fn record_drain_timeout(timeout_secs: u64, active_connections: usize) {
tracing::warn!(
drain.timeout_secs = timeout_secs,
connections.still_active = active_connections,
"drain timeout exceeded"
);
}
pub fn record_connection_limit_rejected(max_connections: usize) {
tracing::warn!(
server.max_connections = max_connections,
"connection rejected: limit reached"
);
}
pub fn record_accept_error<E: std::fmt::Display>(error: E) {
tracing::error!(error = %error, "accept failed");
}
pub fn record_tcp_nodelay_error<E: std::fmt::Display>(error: E) {
tracing::warn!(error = %error, "failed to set TCP_NODELAY");
}
pub fn connection_span(
connection_id: u64,
remote_addr: Option<std::net::SocketAddr>,
) -> tracing::Span {
let span = tracing::info_span!(
"http_connection",
connection.id = connection_id,
connection.remote_addr = remote_addr.map(|a| a.to_string()),
);
span
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_connection_metrics() {
let gauge = Rc::new(Cell::new(0));
let metrics1 = ConnectionMetrics::new(gauge.clone());
assert_eq!(gauge.get(), 1);
let id1 = metrics1.id;
let metrics2 = ConnectionMetrics::new(gauge.clone());
assert_eq!(gauge.get(), 2);
let id2 = metrics2.id;
assert_ne!(id1, id2);
let duration1 = metrics1.close();
assert_eq!(gauge.get(), 1);
assert!(duration1.as_secs() < 1);
let duration2 = metrics2.close();
assert_eq!(gauge.get(), 0);
assert!(duration2.as_secs() < 1);
}
#[test]
fn test_connection_metrics_drop() {
let gauge = Rc::new(Cell::new(0));
{
let _metrics = ConnectionMetrics::new(gauge.clone());
assert_eq!(gauge.get(), 1);
}
assert_eq!(gauge.get(), 0);
}
}