osproxy_sink/conn.rs
1//! Counting the real TCP connection opens behind a pooled client, so pool reuse
2//! is observable (NFR-P, `docs/01` ยง7) without the pooled client exposing it.
3//!
4//! `hyper-util`'s legacy `Client` reuses connections from its pool but does not
5//! report whether a given request rode a fresh or a reused connection. We learn
6//! it the only honest way: the pool calls its *connector* exactly once per new
7//! connection and never for a reused one, so a thin [`CountingConnector`] that
8//! increments a counter on each [`Service::call`] turns "connections opened" into
9//! a number we can compare against "requests dispatched". A cluster whose opens
10//! stay far below its dispatches is amortizing handshakes, pool reuse, verified.
11
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::Arc;
16use std::task::{Context, Poll};
17
18use hyper::Uri;
19use tower_service::Service;
20
21/// A connector that counts how many connections its inner connector opens.
22///
23/// Wraps any pooled-client connector (e.g. `HttpConnector`), sharing an atomic
24/// open-count with the owning pool. `Clone` is required because the legacy
25/// `Client` clones its connector; clones share the same counter.
26#[derive(Clone)]
27pub(crate) struct CountingConnector<C> {
28 inner: C,
29 opens: Arc<AtomicU64>,
30}
31
32impl<C> CountingConnector<C> {
33 /// Wraps `inner`, incrementing `opens` on each new connection it opens.
34 pub(crate) fn new(inner: C, opens: Arc<AtomicU64>) -> Self {
35 Self { inner, opens }
36 }
37}
38
39impl<C> Service<Uri> for CountingConnector<C>
40where
41 C: Service<Uri>,
42 C::Future: Send + 'static,
43{
44 type Response = C::Response;
45 type Error = C::Error;
46 type Future = Pin<Box<dyn Future<Output = Result<C::Response, C::Error>> + Send>>;
47
48 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49 self.inner.poll_ready(cx)
50 }
51
52 fn call(&mut self, dst: Uri) -> Self::Future {
53 // The pool calls the connector only when it needs a *new* connection;
54 // a reused pooled connection never reaches here, so this is the true
55 // count of TCP (and TLS) handshakes performed for this cluster.
56 self.opens.fetch_add(1, Ordering::Relaxed);
57 Box::pin(self.inner.call(dst))
58 }
59}
60
61/// A snapshot of one cluster pool's connection-reuse counters.
62///
63/// `opened` is the number of TCP connections actually established; `dispatched`
64/// is the number of requests sent. The gap is reuse: `dispatched - opened`
65/// requests rode an already-open pooled connection (NFR-P).
66#[derive(Clone, Copy, PartialEq, Eq, Debug)]
67pub struct PoolStats {
68 /// Connections the pool opened to the cluster (cold handshakes).
69 pub opened: u64,
70 /// Requests dispatched to the cluster (cold + reused).
71 pub dispatched: u64,
72}
73
74impl PoolStats {
75 /// Requests that rode a reused pooled connection.
76 #[must_use]
77 pub fn reused(&self) -> u64 {
78 self.dispatched.saturating_sub(self.opened)
79 }
80}