tycho_network/network/
connection.rs

1use std::net::SocketAddr;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use metrics::Label;
8use quinn::{ConnectionError, VarInt};
9use rustls_pki_types::CertificateDer;
10
11use crate::network::config::ConnectionMetricsLevel;
12use crate::network::crypto::peer_id_from_certificate;
13use crate::types::{Direction, InboundRequestMeta, PeerId};
14
15#[derive(Clone)]
16pub struct Connection {
17    inner: quinn::Connection,
18    request_meta: Arc<InboundRequestMeta>,
19}
20
21macro_rules! emit_gauges {
22    ($prefix:literal, $stats:expr, $labels:expr, [ $($field:ident),* $(,)? ]) => {
23        $(
24            metrics::gauge!(concat!($prefix, stringify!($field)), $labels.clone())
25                .set($stats.$field as f64);
26        )*
27    };
28}
29
30impl Connection {
31    pub const LIMIT_EXCEEDED_ERROR_CODE: VarInt = VarInt::from_u32(0xdead);
32
33    pub fn with_peer_id(
34        inner: quinn::Connection,
35        origin: Direction,
36        peer_id: PeerId,
37        connection_metrics: Option<ConnectionMetricsLevel>,
38    ) -> Self {
39        let connection = Self {
40            request_meta: Arc::new(InboundRequestMeta {
41                peer_id,
42                origin,
43                remote_address: inner.remote_address(),
44            }),
45            inner,
46        };
47
48        let conn = connection.inner.clone();
49
50        let Some(connection_metrics) = connection_metrics else {
51            return connection;
52        };
53
54        let peer_id = connection.request_meta.peer_id;
55        let remote_addr = connection.remote_address().to_string();
56
57        // we can't use `spawn_metrics_loop` here because we can't get arc reference to connection
58        tokio::spawn(async move {
59            const INTERVAL: Duration = Duration::from_secs(5);
60
61            let mut labels = vec![Label::new("peer_addr", remote_addr)];
62
63            if connection_metrics.should_export_peer_id() {
64                labels.push(Label::new("peer_id", peer_id.to_string()));
65                labels.shrink_to_fit();
66            }
67
68            loop {
69                let stats = conn.stats();
70
71                metrics::gauge!("tycho_network_connection_rtt_ms", labels.clone())
72                    .set(stats.path.rtt.as_millis() as f64);
73
74                metrics::gauge!("tycho_network_connection_invalid_messages", labels.clone()).set(
75                    stats.frame_rx.connection_close as f64 + stats.frame_rx.reset_stream as f64,
76                );
77
78                emit_gauges!("tycho_network_connection_", stats.path, labels, [
79                    cwnd,              // Congestion window size
80                    congestion_events, // Network congestion indicators
81                    lost_packets,      // Total packet loss
82                    sent_packets       // Baseline for loss calculations
83                ]);
84
85                emit_gauges!("tycho_network_connection_rx_", stats.udp_rx, labels, [
86                    bytes
87                ]);
88
89                emit_gauges!("tycho_network_connection_tx_", stats.udp_tx, labels, [
90                    bytes
91                ]);
92
93                // Frame RX
94                emit_gauges!(
95                    "tycho_network_connection_rx_",
96                    stats.frame_rx,
97                    labels.clone(),
98                    [
99                        acks,
100                        crypto,
101                        connection_close,
102                        data_blocked,
103                        max_data,
104                        max_stream_data,
105                        ping,
106                        reset_stream,
107                        stream_data_blocked,
108                        streams_blocked_bidi,
109                        stop_sending,
110                        stream
111                    ]
112                );
113
114                // Frame TX
115                emit_gauges!("tycho_network_connection_tx_", stats.frame_tx, labels, [
116                    acks,
117                    crypto,
118                    connection_close,
119                    data_blocked,
120                    max_data,
121                    max_stream_data,
122                    ping,
123                    reset_stream,
124                    stream_data_blocked,
125                    streams_blocked_bidi,
126                    stop_sending,
127                    stream
128                ]);
129
130                tokio::select! {
131                    _ = tokio::time::sleep(INTERVAL) => {}
132                    _ = conn.closed() => {
133                        tracing::debug!(
134                            %peer_id,
135                            addr = %conn.remote_address(),
136                            "connection metrics loop stopped",
137                        );
138                        return;
139                    },
140                }
141            }
142        });
143
144        connection
145    }
146
147    pub fn request_meta(&self) -> &Arc<InboundRequestMeta> {
148        &self.request_meta
149    }
150
151    pub fn peer_id(&self) -> &PeerId {
152        &self.request_meta.peer_id
153    }
154
155    pub fn stable_id(&self) -> usize {
156        self.inner.stable_id()
157    }
158
159    pub fn origin(&self) -> Direction {
160        self.request_meta.origin
161    }
162
163    pub fn remote_address(&self) -> SocketAddr {
164        self.request_meta.remote_address
165    }
166
167    pub fn close(&self) {
168        self.inner.close(0u8.into(), b"connection closed");
169    }
170
171    pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
172        self.inner
173            .open_bi()
174            .await
175            .map(|(send, recv)| (SendStream(send), RecvStream(recv)))
176    }
177
178    pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
179        self.inner
180            .accept_bi()
181            .await
182            .map(|(send, recv)| (SendStream(send), RecvStream(recv)))
183    }
184
185    pub async fn open_uni(&self) -> Result<SendStream, ConnectionError> {
186        self.inner.open_uni().await.map(SendStream)
187    }
188
189    pub async fn accept_uni(&self) -> Result<RecvStream, ConnectionError> {
190        self.inner.accept_uni().await.map(RecvStream)
191    }
192
193    pub fn stats(&self) -> quinn::ConnectionStats {
194        self.inner.stats()
195    }
196}
197
198impl std::fmt::Debug for Connection {
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        f.debug_struct("Connection")
201            .field("origin", &self.request_meta.origin)
202            .field("id", &self.stable_id())
203            .field("remote_address", &self.remote_address())
204            .field("peer_id", &self.request_meta.peer_id)
205            .finish_non_exhaustive()
206    }
207}
208
209#[repr(transparent)]
210pub struct SendStream(quinn::SendStream);
211
212impl Drop for SendStream {
213    fn drop(&mut self) {
214        _ = self.0.reset(0u8.into());
215    }
216}
217
218impl std::ops::Deref for SendStream {
219    type Target = quinn::SendStream;
220
221    #[inline]
222    fn deref(&self) -> &Self::Target {
223        &self.0
224    }
225}
226
227impl std::ops::DerefMut for SendStream {
228    #[inline]
229    fn deref_mut(&mut self) -> &mut Self::Target {
230        &mut self.0
231    }
232}
233
234impl tokio::io::AsyncWrite for SendStream {
235    #[inline]
236    fn poll_write(
237        mut self: Pin<&mut Self>,
238        cx: &mut Context<'_>,
239        buf: &[u8],
240    ) -> Poll<Result<usize, std::io::Error>> {
241        Pin::new(&mut self.0)
242            .poll_write(cx, buf)
243            .map_err(std::io::Error::from)
244    }
245
246    #[inline]
247    fn poll_flush(
248        mut self: Pin<&mut Self>,
249        cx: &mut Context<'_>,
250    ) -> Poll<Result<(), std::io::Error>> {
251        Pin::new(&mut self.0).poll_flush(cx)
252    }
253
254    #[inline]
255    fn poll_shutdown(
256        mut self: Pin<&mut Self>,
257        cx: &mut Context<'_>,
258    ) -> Poll<Result<(), std::io::Error>> {
259        Pin::new(&mut self.0).poll_flush(cx)
260    }
261}
262
263#[repr(transparent)]
264pub struct RecvStream(quinn::RecvStream);
265
266impl std::ops::Deref for RecvStream {
267    type Target = quinn::RecvStream;
268
269    #[inline]
270    fn deref(&self) -> &Self::Target {
271        &self.0
272    }
273}
274
275impl std::ops::DerefMut for RecvStream {
276    #[inline]
277    fn deref_mut(&mut self) -> &mut Self::Target {
278        &mut self.0
279    }
280}
281
282impl tokio::io::AsyncRead for RecvStream {
283    #[inline]
284    fn poll_read(
285        mut self: Pin<&mut Self>,
286        cx: &mut Context<'_>,
287        buf: &mut tokio::io::ReadBuf<'_>,
288    ) -> Poll<std::io::Result<()>> {
289        Pin::new(&mut self.0).poll_read(cx, buf)
290    }
291}
292
293pub(crate) fn extract_peer_id(connection: &quinn::Connection) -> Option<PeerId> {
294    connection.peer_identity().and_then(parse_peer_identity)
295}
296
297pub(crate) fn parse_peer_identity(identity: Box<dyn std::any::Any>) -> Option<PeerId> {
298    let certificate = identity
299        .downcast::<Vec<CertificateDer<'static>>>()
300        .ok()?
301        .into_iter()
302        .next()?;
303
304    peer_id_from_certificate(&certificate).ok()
305}