Skip to main content

osproxy_sink/
conn.rs

1//! Counting the real TCP connection opens behind a pooled client, so pool reuse
2//! is observable (NFR-P, `docs/01` ยง7) without the pooled client exposing it.
3//!
4//! `hyper-util`'s legacy `Client` reuses connections from its pool but does not
5//! report whether a given request rode a fresh or a reused connection. We learn
6//! it the only honest way: the pool calls its *connector* exactly once per new
7//! connection and never for a reused one, so a thin [`CountingConnector`] that
8//! increments a counter on each [`Service::call`] turns "connections opened" into
9//! a number we can compare against "requests dispatched". A cluster whose opens
10//! stay far below its dispatches is amortizing handshakes, pool reuse, verified.
11
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16use std::task::{Context, Poll};
17
18use hyper::Uri;
19use tower_service::Service;
20
21/// A connector that counts how many connections its inner connector opens.
22///
23/// Wraps any pooled-client connector (e.g. `HttpConnector`), sharing an atomic
24/// open-count with the owning pool. `Clone` is required because the legacy
25/// `Client` clones its connector; clones share the same counter.
26#[derive(Clone)]
27pub(crate) struct CountingConnector<C> {
28    inner: C,
29    opens: Arc<AtomicU64>,
30}
31
32impl<C> CountingConnector<C> {
33    /// Wraps `inner`, incrementing `opens` on each new connection it opens.
34    pub(crate) fn new(inner: C, opens: Arc<AtomicU64>) -> Self {
35        Self { inner, opens }
36    }
37}
38
39impl<C> Service<Uri> for CountingConnector<C>
40where
41    C: Service<Uri>,
42    C::Future: Send + 'static,
43{
44    type Response = C::Response;
45    type Error = C::Error;
46    type Future = Pin<Box<dyn Future<Output = Result<C::Response, C::Error>> + Send>>;
47
48    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49        self.inner.poll_ready(cx)
50    }
51
52    fn call(&mut self, dst: Uri) -> Self::Future {
53        // The pool calls the connector only when it needs a *new* connection;
54        // a reused pooled connection never reaches here, so this is the true
55        // count of TCP (and TLS) handshakes performed for this cluster.
56        self.opens.fetch_add(1, Ordering::Relaxed);
57        Box::pin(self.inner.call(dst))
58    }
59}
60
61/// A snapshot of one cluster pool's connection-reuse counters.
62///
63/// `opened` is the number of TCP connections actually established; `dispatched`
64/// is the number of requests sent. The gap is reuse: `dispatched - opened`
65/// requests rode an already-open pooled connection (NFR-P).
66#[derive(Clone, Copy, PartialEq, Eq, Debug)]
67pub struct PoolStats {
68    /// Connections the pool opened to the cluster (cold handshakes).
69    pub opened: u64,
70    /// Requests dispatched to the cluster (cold + reused).
71    pub dispatched: u64,
72}
73
74impl PoolStats {
75    /// Requests that rode a reused pooled connection.
76    #[must_use]
77    pub fn reused(&self) -> u64 {
78        self.dispatched.saturating_sub(self.opened)
79    }
80}