use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use ohno::ErrorLabel;
use opentelemetry::{KeyValue, Value};
use tick::{Clock, Stopwatch};
#[derive(Clone, Debug)]
pub struct ConnectionInfo {
inner: Arc<ConnectionInfoInner>,
}
#[derive(Debug)]
struct ConnectionInfoInner {
stopwatch: Stopwatch,
pool_index: usize,
max_age: Option<Duration>,
poisoned: AtomicBool,
}
impl ConnectionInfo {
pub(crate) fn new(clock: &Clock, pool_index: usize, max_age: Option<Duration>) -> Self {
Self {
inner: Arc::new(ConnectionInfoInner {
stopwatch: clock.stopwatch(),
pool_index,
max_age,
poisoned: AtomicBool::new(false),
}),
}
}
#[must_use]
pub fn age(&self) -> Duration {
self.inner.stopwatch.elapsed()
}
#[must_use]
pub fn pool_index(&self) -> usize {
self.inner.pool_index
}
#[must_use]
pub fn poisoned(&self) -> bool {
self.inner.poisoned.load(Ordering::Acquire)
}
#[must_use]
pub fn max_age(&self) -> Option<Duration> {
self.inner.max_age
}
pub(crate) fn is_expired(&self) -> bool {
self.max_age().is_some_and(|max_age| self.age() > max_age)
}
pub(crate) fn mark_poisoned(&self) {
self.inner.poisoned.store(true, Ordering::Release);
}
}
pub(crate) fn connection_network_protocol_version(connected: &hyper_util::client::legacy::connect::Connected) -> Value {
if connected.is_negotiated_h2() {
Value::from("2")
} else {
Value::from("1")
}
}
pub(crate) fn create_connection_attributes(
uri: &templated_uri::BaseUri,
connected: &hyper_util::client::legacy::connect::Connected,
) -> Vec<KeyValue> {
use opentelemetry_semantic_conventions::trace::{NETWORK_PROTOCOL_VERSION, SERVER_ADDRESS, SERVER_PORT, URL_SCHEME};
vec![
KeyValue::new(SERVER_ADDRESS, uri.authority().host().to_string()),
KeyValue::new(SERVER_PORT, server_port_attribute(uri)),
KeyValue::new(URL_SCHEME, uri.scheme().to_string()),
KeyValue::new(NETWORK_PROTOCOL_VERSION, connection_network_protocol_version(connected)),
]
}
pub(crate) fn create_connection_failure_attributes(uri: &templated_uri::BaseUri, error_type: ErrorLabel) -> Vec<KeyValue> {
use opentelemetry_semantic_conventions::trace::{SERVER_ADDRESS, SERVER_PORT, URL_SCHEME};
vec![
KeyValue::new(SERVER_ADDRESS, uri.authority().host().to_string()),
KeyValue::new(SERVER_PORT, server_port_attribute(uri)),
KeyValue::new(URL_SCHEME, uri.scheme().to_string()),
KeyValue::new(opentelemetry_semantic_conventions::attribute::ERROR_TYPE, error_type.into_cow()),
]
}
fn server_port_attribute(uri: &templated_uri::BaseUri) -> i64 {
uri.effective_port().map_or(-1, i64::from)
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
use crate::error_labels::LABEL_CONNECT;
use crate::testing::sorted_attributes;
#[test]
fn new_records_initial_state() {
let info = ConnectionInfo::new(&Clock::new_frozen(), 3, Some(Duration::from_mins(1)));
assert_eq!(info.pool_index(), 3);
assert_eq!(info.max_age(), Some(Duration::from_mins(1)));
assert!(!info.poisoned());
}
#[test]
fn mark_poisoned_is_observable_and_idempotent() {
let info = ConnectionInfo::new(&Clock::new_frozen(), 0, None);
assert!(!info.poisoned());
info.mark_poisoned();
assert!(info.poisoned());
info.mark_poisoned();
assert!(info.poisoned());
}
#[test]
fn is_expired_false_without_max_age() {
let info = ConnectionInfo::new(&Clock::new_frozen(), 0, None);
assert!(!info.is_expired());
}
#[test]
fn frozen_clock_keeps_age_zero() {
let info = ConnectionInfo::new(&Clock::new_frozen(), 0, Some(Duration::from_nanos(1)));
assert_eq!(info.age(), Duration::ZERO);
assert!(!info.is_expired());
}
#[test]
#[cfg_attr(miri, ignore)]
fn create_connection_attributes_emits_expected_keys() {
let uri = templated_uri::BaseUri::from_static("https://example.com:8443");
let connected = hyper_util::client::legacy::connect::Connected::new();
let attrs = create_connection_attributes(&uri, &connected);
insta::assert_debug_snapshot!(sorted_attributes(&attrs));
}
#[test]
fn connection_network_protocol_version_h2() {
let connected = hyper_util::client::legacy::connect::Connected::new().negotiated_h2();
assert_eq!(connection_network_protocol_version(&connected).as_str(), "2");
}
#[test]
fn connection_network_protocol_version_default_is_1() {
let connected = hyper_util::client::legacy::connect::Connected::new();
assert_eq!(connection_network_protocol_version(&connected).as_str(), "1");
}
#[test]
fn is_expired_true_with_clock_control() {
let control = tick::ClockControl::new();
let clock = control.to_clock();
let info = ConnectionInfo::new(&clock, 0, Some(Duration::from_secs(1)));
assert!(!info.is_expired());
control.advance(Duration::from_secs(2));
assert!(info.is_expired());
}
#[test]
fn is_expired_false_at_exact_max_age_boundary() {
let control = tick::ClockControl::new();
let clock = control.to_clock();
let max_age = Duration::from_secs(5);
let info = ConnectionInfo::new(&clock, 0, Some(max_age));
control.advance(max_age);
assert_eq!(info.age(), max_age);
assert!(!info.is_expired());
control.advance(Duration::from_nanos(1));
assert!(info.is_expired());
}
#[test]
fn server_port_attribute_uses_negative_one_sentinel_when_unknown() {
use templated_uri::{Authority, BasePath, Origin, Scheme};
let origin = Origin::from_parts(Scheme::try_from("ftp").unwrap(), Authority::from_static("example.com"));
let uri = templated_uri::BaseUri::from_parts(origin, BasePath::default());
assert_eq!(uri.effective_port(), None);
assert_eq!(server_port_attribute(&uri), -1);
}
#[test]
fn server_port_attribute_returns_explicit_port() {
let uri = templated_uri::BaseUri::from_static("https://example.com:8443");
assert_eq!(server_port_attribute(&uri), 8443);
}
#[test]
#[cfg_attr(miri, ignore)]
fn create_connection_failure_attributes_includes_error_type() {
let uri = templated_uri::BaseUri::from_static("https://example.com");
let attrs = create_connection_failure_attributes(&uri, LABEL_CONNECT);
insta::assert_debug_snapshot!(sorted_attributes(&attrs));
}
}