use std::net::SocketAddr;
use std::time::Duration;
use http::{Method, StatusCode, Uri};
#[cfg(not(feature = "precise-timing"))]
pub use coarsetime::Instant;
#[cfg(feature = "precise-timing")]
pub use std::time::Instant;
#[cfg(feature = "json")]
mod serde_status_code {
use http::StatusCode;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S: Serializer>(status: &StatusCode, s: S) -> Result<S::Ok, S::Error> {
status.as_u16().serialize(s)
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<StatusCode, D::Error> {
let code = u16::deserialize(d)?;
StatusCode::from_u16(code).map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "json", derive(serde::Serialize, serde::Deserialize))]
pub enum PoolOutcome {
Hit,
Coalesced,
Miss,
StaleRetry,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "json", derive(serde::Serialize, serde::Deserialize))]
pub enum NegotiatedProtocol {
Http1,
Http2,
Http3,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "json", derive(serde::Serialize, serde::Deserialize))]
pub enum TransferDirection {
Upload,
Download,
}
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(feature = "json", derive(serde::Serialize, serde::Deserialize))]
pub enum RequestPhase {
Started,
PoolCheckoutComplete {
outcome: PoolOutcome,
blocked_duration: Duration,
},
DnsResolved {
addrs: Vec<SocketAddr>,
duration: Duration,
},
TcpConnected {
remote_addr: SocketAddr,
duration: Duration,
protocol: NegotiatedProtocol,
},
TlsHandshakeComplete {
duration: Duration,
alpn_protocol: Option<String>,
peer_certificate_der: Option<Vec<u8>>,
},
RequestSent {
duration: Duration,
},
ResponseStarted {
waiting_duration: Duration,
},
ResponseComplete {
#[cfg_attr(feature = "json", serde(with = "serde_status_code"))]
status: StatusCode,
protocol: NegotiatedProtocol,
total_duration: Duration,
},
Failed {
error: String,
will_retry: bool,
elapsed: Duration,
},
BytesTransferred {
direction: TransferDirection,
chunk_bytes: u64,
cumulative_bytes: u64,
elapsed: Duration,
},
TransferComplete {
direction: TransferDirection,
total_bytes: u64,
transfer_duration: Duration,
throughput_bytes_per_sec: f32,
},
TransferAborted {
direction: TransferDirection,
bytes_transferred: u64,
elapsed: Duration,
error: String,
},
}
#[derive(Debug, Clone)]
pub struct RequestEvent {
pub method: Method,
pub uri: Uri,
pub phase: RequestPhase,
pub at: Instant,
}
pub trait RequestObserver: Send + Sync + 'static {
fn on_event(&self, event: &RequestEvent);
fn on_connection_event(&self, event: &ConnectionEvent);
}
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(feature = "json", derive(serde::Serialize, serde::Deserialize))]
pub enum ConnectionPhase {
Metrics {
remote_addr: SocketAddr,
protocol: NegotiatedProtocol,
bytes_sent: u64,
bytes_received: u64,
connection_age: Duration,
requests_served: u32,
closed: bool,
},
}
#[derive(Debug, Clone)]
pub struct ConnectionEvent {
pub phase: ConnectionPhase,
pub at: Instant,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pool_outcome_debug() {
assert_eq!(format!("{:?}", PoolOutcome::Hit), "Hit");
assert_eq!(format!("{:?}", PoolOutcome::StaleRetry), "StaleRetry");
}
#[test]
fn negotiated_protocol_debug() {
assert_eq!(format!("{:?}", NegotiatedProtocol::Http1), "Http1");
assert_eq!(format!("{:?}", NegotiatedProtocol::Http2), "Http2");
assert_eq!(format!("{:?}", NegotiatedProtocol::Http3), "Http3");
}
#[test]
fn transfer_direction_debug() {
assert_eq!(format!("{:?}", TransferDirection::Upload), "Upload");
assert_eq!(format!("{:?}", TransferDirection::Download), "Download");
}
#[test]
fn request_phase_variants_are_constructible() {
let _ = RequestPhase::Started;
let _ = RequestPhase::PoolCheckoutComplete {
outcome: PoolOutcome::Hit,
blocked_duration: Duration::from_millis(5),
};
let _ = RequestPhase::DnsResolved {
addrs: vec!["127.0.0.1:80".parse().unwrap()],
duration: Duration::from_millis(10),
};
let _ = RequestPhase::TcpConnected {
remote_addr: "127.0.0.1:80".parse().unwrap(),
duration: Duration::from_millis(20),
protocol: NegotiatedProtocol::Http1,
};
let _ = RequestPhase::TlsHandshakeComplete {
duration: Duration::from_millis(30),
alpn_protocol: Some("h2".into()),
peer_certificate_der: None,
};
let _ = RequestPhase::RequestSent {
duration: Duration::from_millis(1),
};
let _ = RequestPhase::ResponseStarted {
waiting_duration: Duration::from_millis(50),
};
let _ = RequestPhase::ResponseComplete {
status: StatusCode::OK,
protocol: NegotiatedProtocol::Http2,
total_duration: Duration::from_millis(100),
};
let _ = RequestPhase::Failed {
error: "timeout".into(),
will_retry: false,
elapsed: Duration::from_secs(5),
};
let _ = RequestPhase::BytesTransferred {
direction: TransferDirection::Download,
chunk_bytes: 1024,
cumulative_bytes: 4096,
elapsed: Duration::from_millis(200),
};
let _ = RequestPhase::TransferComplete {
direction: TransferDirection::Upload,
total_bytes: 8192,
transfer_duration: Duration::from_millis(500),
throughput_bytes_per_sec: 16384.0,
};
let _ = RequestPhase::TransferAborted {
direction: TransferDirection::Download,
bytes_transferred: 2048,
elapsed: Duration::from_millis(100),
error: "connection reset".into(),
};
let _ = ConnectionPhase::Metrics {
remote_addr: "10.0.0.1:443".parse().unwrap(),
protocol: NegotiatedProtocol::Http2,
bytes_sent: 1024,
bytes_received: 65536,
connection_age: Duration::from_secs(30),
requests_served: 5,
closed: false,
};
}
#[test]
fn request_event_is_constructible() {
let event = RequestEvent {
method: Method::GET,
uri: "http://example.com/".parse().unwrap(),
phase: RequestPhase::Started,
at: Instant::now(),
};
assert_eq!(event.method, Method::GET);
}
use std::sync::{Arc, Mutex};
struct RecordingObserver {
events: Arc<Mutex<Vec<RequestPhase>>>,
}
impl RequestObserver for RecordingObserver {
fn on_event(&self, event: &RequestEvent) {
self.events.lock().unwrap().push(event.phase.clone());
}
fn on_connection_event(&self, _event: &ConnectionEvent) {}
}
#[test]
fn recording_observer_captures_events() {
let events = Arc::new(Mutex::new(Vec::new()));
let observer = RecordingObserver {
events: events.clone(),
};
observer.on_event(&RequestEvent {
method: Method::POST,
uri: "http://localhost/api".parse().unwrap(),
phase: RequestPhase::Started,
at: Instant::now(),
});
observer.on_event(&RequestEvent {
method: Method::POST,
uri: "http://localhost/api".parse().unwrap(),
phase: RequestPhase::ResponseComplete {
status: StatusCode::CREATED,
protocol: NegotiatedProtocol::Http1,
total_duration: Duration::from_millis(42),
},
at: Instant::now(),
});
let captured = events.lock().unwrap();
assert_eq!(captured.len(), 2);
assert!(matches!(captured[0], RequestPhase::Started));
assert!(matches!(captured[1], RequestPhase::ResponseComplete { .. }));
}
#[test]
fn request_event_fields_accessible() {
let event = RequestEvent {
method: Method::PUT,
uri: "https://api.example.com/resource/42".parse().unwrap(),
phase: RequestPhase::RequestSent {
duration: Duration::from_millis(15),
},
at: Instant::now(),
};
assert_eq!(event.method, Method::PUT);
assert_eq!(event.uri, "https://api.example.com/resource/42");
assert!(matches!(event.phase, RequestPhase::RequestSent { .. }));
}
#[test]
fn connection_event_is_constructible() {
let event = ConnectionEvent {
phase: ConnectionPhase::Metrics {
remote_addr: "192.168.1.1:8080".parse().unwrap(),
protocol: NegotiatedProtocol::Http1,
bytes_sent: 512,
bytes_received: 2048,
connection_age: Duration::from_secs(60),
requests_served: 10,
closed: true,
},
at: Instant::now(),
};
match &event.phase {
ConnectionPhase::Metrics {
remote_addr,
protocol,
bytes_sent,
bytes_received,
connection_age,
requests_served,
closed,
} => {
assert_eq!(*remote_addr, "192.168.1.1:8080".parse().unwrap());
assert_eq!(*protocol, NegotiatedProtocol::Http1);
assert_eq!(*bytes_sent, 512);
assert_eq!(*bytes_received, 2048);
assert_eq!(*connection_age, Duration::from_secs(60));
assert_eq!(*requests_served, 10);
assert!(*closed);
}
}
}
#[test]
fn connection_event_debug() {
let event = ConnectionEvent {
phase: ConnectionPhase::Metrics {
remote_addr: "10.0.0.1:443".parse().unwrap(),
protocol: NegotiatedProtocol::Http2,
bytes_sent: 0,
bytes_received: 0,
connection_age: Duration::from_millis(100),
requests_served: 0,
closed: false,
},
at: Instant::now(),
};
let debug = format!("{:?}", event);
assert!(debug.contains("Metrics"));
assert!(debug.contains("10.0.0.1:443"));
}
#[test]
fn connection_phase_metrics_clone_and_eq() {
let phase = ConnectionPhase::Metrics {
remote_addr: "127.0.0.1:80".parse().unwrap(),
protocol: NegotiatedProtocol::Http1,
bytes_sent: 100,
bytes_received: 200,
connection_age: Duration::from_secs(5),
requests_served: 3,
closed: false,
};
let cloned = phase.clone();
assert_eq!(phase, cloned);
}
#[test]
fn request_event_clone() {
let event = RequestEvent {
method: Method::DELETE,
uri: "http://example.com/item/1".parse().unwrap(),
phase: RequestPhase::Failed {
error: "connection refused".into(),
will_retry: true,
elapsed: Duration::from_secs(2),
},
at: Instant::now(),
};
let cloned = event.clone();
assert_eq!(cloned.method, Method::DELETE);
assert_eq!(cloned.phase, event.phase);
}
#[test]
fn connection_event_clone() {
let event = ConnectionEvent {
phase: ConnectionPhase::Metrics {
remote_addr: "10.0.0.5:443".parse().unwrap(),
protocol: NegotiatedProtocol::Http3,
bytes_sent: 4096,
bytes_received: 65536,
connection_age: Duration::from_secs(120),
requests_served: 50,
closed: true,
},
at: Instant::now(),
};
let cloned = event.clone();
assert_eq!(cloned.phase, event.phase);
}
#[test]
fn observer_on_connection_event_is_called() {
struct ConnRecorder {
phases: Arc<Mutex<Vec<ConnectionPhase>>>,
}
impl RequestObserver for ConnRecorder {
fn on_event(&self, _event: &RequestEvent) {}
fn on_connection_event(&self, event: &ConnectionEvent) {
self.phases.lock().unwrap().push(event.phase.clone());
}
}
let phases = Arc::new(Mutex::new(Vec::new()));
let observer = ConnRecorder {
phases: phases.clone(),
};
observer.on_connection_event(&ConnectionEvent {
phase: ConnectionPhase::Metrics {
remote_addr: "1.2.3.4:443".parse().unwrap(),
protocol: NegotiatedProtocol::Http2,
bytes_sent: 256,
bytes_received: 1024,
connection_age: Duration::from_secs(10),
requests_served: 2,
closed: false,
},
at: Instant::now(),
});
let captured = phases.lock().unwrap();
assert_eq!(captured.len(), 1);
assert!(matches!(captured[0], ConnectionPhase::Metrics { .. }));
}
#[cfg(feature = "json")]
#[test]
fn request_phase_serde_roundtrip_started() {
let phase = RequestPhase::Started;
let json = serde_json::to_string(&phase).unwrap();
let deserialized: RequestPhase = serde_json::from_str(&json).unwrap();
assert_eq!(phase, deserialized);
}
#[cfg(feature = "json")]
#[test]
fn request_phase_serde_roundtrip_response_complete() {
let phase = RequestPhase::ResponseComplete {
status: StatusCode::NOT_FOUND,
protocol: NegotiatedProtocol::Http2,
total_duration: Duration::from_millis(250),
};
let json = serde_json::to_string(&phase).unwrap();
let deserialized: RequestPhase = serde_json::from_str(&json).unwrap();
assert_eq!(phase, deserialized);
}
#[cfg(feature = "json")]
#[test]
fn request_phase_serde_roundtrip_failed() {
let phase = RequestPhase::Failed {
error: "timeout".into(),
will_retry: true,
elapsed: Duration::from_secs(30),
};
let json = serde_json::to_string(&phase).unwrap();
let deserialized: RequestPhase = serde_json::from_str(&json).unwrap();
assert_eq!(phase, deserialized);
}
#[cfg(feature = "json")]
#[test]
fn request_phase_serde_roundtrip_bytes_transferred() {
let phase = RequestPhase::BytesTransferred {
direction: TransferDirection::Upload,
chunk_bytes: 4096,
cumulative_bytes: 16384,
elapsed: Duration::from_millis(500),
};
let json = serde_json::to_string(&phase).unwrap();
let deserialized: RequestPhase = serde_json::from_str(&json).unwrap();
assert_eq!(phase, deserialized);
}
#[cfg(feature = "json")]
#[test]
fn request_phase_serde_roundtrip_transfer_complete() {
let phase = RequestPhase::TransferComplete {
direction: TransferDirection::Download,
total_bytes: 1048576,
transfer_duration: Duration::from_secs(2),
throughput_bytes_per_sec: 524288.0,
};
let json = serde_json::to_string(&phase).unwrap();
let deserialized: RequestPhase = serde_json::from_str(&json).unwrap();
assert_eq!(phase, deserialized);
}
#[cfg(feature = "json")]
#[test]
fn request_phase_serde_roundtrip_transfer_aborted() {
let phase = RequestPhase::TransferAborted {
direction: TransferDirection::Download,
bytes_transferred: 512,
elapsed: Duration::from_millis(100),
error: "connection reset".into(),
};
let json = serde_json::to_string(&phase).unwrap();
let deserialized: RequestPhase = serde_json::from_str(&json).unwrap();
assert_eq!(phase, deserialized);
}
#[cfg(feature = "json")]
#[test]
fn connection_phase_metrics_serde_roundtrip() {
let phase = ConnectionPhase::Metrics {
remote_addr: "10.0.0.1:443".parse().unwrap(),
protocol: NegotiatedProtocol::Http2,
bytes_sent: 1024,
bytes_received: 65536,
connection_age: Duration::from_secs(30),
requests_served: 5,
closed: false,
};
let json = serde_json::to_string(&phase).unwrap();
let deserialized: ConnectionPhase = serde_json::from_str(&json).unwrap();
assert_eq!(phase, deserialized);
}
#[cfg(feature = "json")]
#[test]
fn pool_outcome_serde_roundtrip() {
for outcome in [
PoolOutcome::Hit,
PoolOutcome::Coalesced,
PoolOutcome::Miss,
PoolOutcome::StaleRetry,
] {
let json = serde_json::to_string(&outcome).unwrap();
let deserialized: PoolOutcome = serde_json::from_str(&json).unwrap();
assert_eq!(outcome, deserialized);
}
}
#[cfg(feature = "json")]
#[test]
fn negotiated_protocol_serde_roundtrip() {
for proto in [
NegotiatedProtocol::Http1,
NegotiatedProtocol::Http2,
NegotiatedProtocol::Http3,
] {
let json = serde_json::to_string(&proto).unwrap();
let deserialized: NegotiatedProtocol = serde_json::from_str(&json).unwrap();
assert_eq!(proto, deserialized);
}
}
#[cfg(feature = "json")]
#[test]
fn transfer_direction_serde_roundtrip() {
for dir in [TransferDirection::Upload, TransferDirection::Download] {
let json = serde_json::to_string(&dir).unwrap();
let deserialized: TransferDirection = serde_json::from_str(&json).unwrap();
assert_eq!(dir, deserialized);
}
}
}