1use std::net::SocketAddr;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use metrics::Label;
8use quinn::{ConnectionError, VarInt};
9use rustls_pki_types::CertificateDer;
10
11use crate::network::config::ConnectionMetricsLevel;
12use crate::network::crypto::peer_id_from_certificate;
13use crate::types::{Direction, InboundRequestMeta, PeerId};
14
15#[derive(Clone)]
16pub struct Connection {
17 inner: quinn::Connection,
18 request_meta: Arc<InboundRequestMeta>,
19}
20
21macro_rules! emit_gauges {
22 ($prefix:literal, $stats:expr, $labels:expr, [ $($field:ident),* $(,)? ]) => {
23 $(
24 metrics::gauge!(concat!($prefix, stringify!($field)), $labels.clone())
25 .set($stats.$field as f64);
26 )*
27 };
28}
29
30impl Connection {
31 pub const LIMIT_EXCEEDED_ERROR_CODE: VarInt = VarInt::from_u32(0xdead);
32
33 pub fn with_peer_id(
34 inner: quinn::Connection,
35 origin: Direction,
36 peer_id: PeerId,
37 connection_metrics: Option<ConnectionMetricsLevel>,
38 ) -> Self {
39 let connection = Self {
40 request_meta: Arc::new(InboundRequestMeta {
41 peer_id,
42 origin,
43 remote_address: inner.remote_address(),
44 }),
45 inner,
46 };
47
48 let conn = connection.inner.clone();
49
50 let Some(connection_metrics) = connection_metrics else {
51 return connection;
52 };
53
54 let peer_id = connection.request_meta.peer_id;
55 let remote_addr = connection.remote_address().to_string();
56
57 tokio::spawn(async move {
59 const INTERVAL: Duration = Duration::from_secs(5);
60
61 let mut labels = vec![Label::new("peer_addr", remote_addr)];
62
63 if connection_metrics.should_export_peer_id() {
64 labels.push(Label::new("peer_id", peer_id.to_string()));
65 labels.shrink_to_fit();
66 }
67
68 loop {
69 let stats = conn.stats();
70
71 metrics::gauge!("tycho_network_connection_rtt_ms", labels.clone())
72 .set(stats.path.rtt.as_millis() as f64);
73
74 metrics::gauge!("tycho_network_connection_invalid_messages", labels.clone()).set(
75 stats.frame_rx.connection_close as f64 + stats.frame_rx.reset_stream as f64,
76 );
77
78 emit_gauges!("tycho_network_connection_", stats.path, labels, [
79 cwnd, congestion_events, lost_packets, sent_packets ]);
84
85 emit_gauges!("tycho_network_connection_rx_", stats.udp_rx, labels, [
86 bytes
87 ]);
88
89 emit_gauges!("tycho_network_connection_tx_", stats.udp_tx, labels, [
90 bytes
91 ]);
92
93 emit_gauges!(
95 "tycho_network_connection_rx_",
96 stats.frame_rx,
97 labels.clone(),
98 [
99 acks,
100 crypto,
101 connection_close,
102 data_blocked,
103 max_data,
104 max_stream_data,
105 ping,
106 reset_stream,
107 stream_data_blocked,
108 streams_blocked_bidi,
109 stop_sending,
110 stream
111 ]
112 );
113
114 emit_gauges!("tycho_network_connection_tx_", stats.frame_tx, labels, [
116 acks,
117 crypto,
118 connection_close,
119 data_blocked,
120 max_data,
121 max_stream_data,
122 ping,
123 reset_stream,
124 stream_data_blocked,
125 streams_blocked_bidi,
126 stop_sending,
127 stream
128 ]);
129
130 tokio::select! {
131 _ = tokio::time::sleep(INTERVAL) => {}
132 _ = conn.closed() => {
133 tracing::debug!(
134 %peer_id,
135 addr = %conn.remote_address(),
136 "connection metrics loop stopped",
137 );
138 return;
139 },
140 }
141 }
142 });
143
144 connection
145 }
146
147 pub fn request_meta(&self) -> &Arc<InboundRequestMeta> {
148 &self.request_meta
149 }
150
151 pub fn peer_id(&self) -> &PeerId {
152 &self.request_meta.peer_id
153 }
154
155 pub fn stable_id(&self) -> usize {
156 self.inner.stable_id()
157 }
158
159 pub fn origin(&self) -> Direction {
160 self.request_meta.origin
161 }
162
163 pub fn remote_address(&self) -> SocketAddr {
164 self.request_meta.remote_address
165 }
166
167 pub fn close(&self) {
168 self.inner.close(0u8.into(), b"connection closed");
169 }
170
171 pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
172 self.inner
173 .open_bi()
174 .await
175 .map(|(send, recv)| (SendStream(send), RecvStream(recv)))
176 }
177
178 pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
179 self.inner
180 .accept_bi()
181 .await
182 .map(|(send, recv)| (SendStream(send), RecvStream(recv)))
183 }
184
185 pub async fn open_uni(&self) -> Result<SendStream, ConnectionError> {
186 self.inner.open_uni().await.map(SendStream)
187 }
188
189 pub async fn accept_uni(&self) -> Result<RecvStream, ConnectionError> {
190 self.inner.accept_uni().await.map(RecvStream)
191 }
192
193 pub fn stats(&self) -> quinn::ConnectionStats {
194 self.inner.stats()
195 }
196}
197
198impl std::fmt::Debug for Connection {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 f.debug_struct("Connection")
201 .field("origin", &self.request_meta.origin)
202 .field("id", &self.stable_id())
203 .field("remote_address", &self.remote_address())
204 .field("peer_id", &self.request_meta.peer_id)
205 .finish_non_exhaustive()
206 }
207}
208
209#[repr(transparent)]
210pub struct SendStream(quinn::SendStream);
211
212impl Drop for SendStream {
213 fn drop(&mut self) {
214 _ = self.0.reset(0u8.into());
215 }
216}
217
218impl std::ops::Deref for SendStream {
219 type Target = quinn::SendStream;
220
221 #[inline]
222 fn deref(&self) -> &Self::Target {
223 &self.0
224 }
225}
226
227impl std::ops::DerefMut for SendStream {
228 #[inline]
229 fn deref_mut(&mut self) -> &mut Self::Target {
230 &mut self.0
231 }
232}
233
234impl tokio::io::AsyncWrite for SendStream {
235 #[inline]
236 fn poll_write(
237 mut self: Pin<&mut Self>,
238 cx: &mut Context<'_>,
239 buf: &[u8],
240 ) -> Poll<Result<usize, std::io::Error>> {
241 Pin::new(&mut self.0)
242 .poll_write(cx, buf)
243 .map_err(std::io::Error::from)
244 }
245
246 #[inline]
247 fn poll_flush(
248 mut self: Pin<&mut Self>,
249 cx: &mut Context<'_>,
250 ) -> Poll<Result<(), std::io::Error>> {
251 Pin::new(&mut self.0).poll_flush(cx)
252 }
253
254 #[inline]
255 fn poll_shutdown(
256 mut self: Pin<&mut Self>,
257 cx: &mut Context<'_>,
258 ) -> Poll<Result<(), std::io::Error>> {
259 Pin::new(&mut self.0).poll_flush(cx)
260 }
261}
262
263#[repr(transparent)]
264pub struct RecvStream(quinn::RecvStream);
265
266impl std::ops::Deref for RecvStream {
267 type Target = quinn::RecvStream;
268
269 #[inline]
270 fn deref(&self) -> &Self::Target {
271 &self.0
272 }
273}
274
275impl std::ops::DerefMut for RecvStream {
276 #[inline]
277 fn deref_mut(&mut self) -> &mut Self::Target {
278 &mut self.0
279 }
280}
281
282impl tokio::io::AsyncRead for RecvStream {
283 #[inline]
284 fn poll_read(
285 mut self: Pin<&mut Self>,
286 cx: &mut Context<'_>,
287 buf: &mut tokio::io::ReadBuf<'_>,
288 ) -> Poll<std::io::Result<()>> {
289 Pin::new(&mut self.0).poll_read(cx, buf)
290 }
291}
292
293pub(crate) fn extract_peer_id(connection: &quinn::Connection) -> Option<PeerId> {
294 connection.peer_identity().and_then(parse_peer_identity)
295}
296
297pub(crate) fn parse_peer_identity(identity: Box<dyn std::any::Any>) -> Option<PeerId> {
298 let certificate = identity
299 .downcast::<Vec<CertificateDer<'static>>>()
300 .ok()?
301 .into_iter()
302 .next()?;
303
304 peer_id_from_certificate(&certificate).ok()
305}