scion_stack/scionstack/
quic.rs

1// Copyright 2025 Anapaya Systems
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//   http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//! SCION stack QUICK endpoint.
15
16use std::{
17    collections::HashMap,
18    fmt::{self, Debug},
19    hash::{BuildHasher, Hash as _, Hasher as _},
20    io::ErrorKind,
21    net::{IpAddr, Ipv6Addr},
22    pin::Pin,
23    sync::{Arc, Mutex},
24    task::{Poll, ready},
25    time::{Duration, Instant},
26};
27
28use bytes::BufMut as _;
29use chrono::Utc;
30use foldhash::fast::FixedState;
31use quinn::{AsyncUdpSocket, udp::RecvMeta};
32use scion_proto::{
33    address::SocketAddr,
34    packet::{ByEndpoint, ScionPacketUdp},
35};
36
37use super::{AsyncUdpUnderlaySocket, udp_polling::UdpPoller};
38use crate::{
39    path::manager::{PathPrefetcher, SyncPathManager},
40    quic::ScionQuinnConn,
41};
42
43/// Log at most 1 IO error every 3 seconds.
44const IO_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(3);
45
46/// A wrapper around a quinn::Endpoint that translates between SCION and ip:port addresses.
47///
48/// This is necessary because quinn expects a std::net::SocketAddr, but SCION uses
49/// scion_proto::address::SocketAddr.
50///
51/// Addresses are mapped by the provided ScionAsyncUdpSocket.
52pub struct Endpoint {
53    inner: quinn::Endpoint,
54    path_prefetcher: Arc<dyn PathPrefetcher + Send + Sync>,
55    address_translator: Arc<AddressTranslator>,
56}
57
58impl Endpoint {
59    /// Creates a new endpoint.
60    pub fn new_with_abstract_socket(
61        config: quinn::EndpointConfig,
62        server_config: Option<quinn::ServerConfig>,
63        socket: Arc<dyn quinn::AsyncUdpSocket>,
64        runtime: Arc<dyn quinn::Runtime>,
65        pather: Arc<dyn PathPrefetcher + Send + Sync>,
66        address_translator: Arc<AddressTranslator>,
67    ) -> std::io::Result<Self> {
68        Ok(Self {
69            inner: quinn::Endpoint::new_with_abstract_socket(
70                config,
71                server_config,
72                socket,
73                runtime,
74            )?,
75            path_prefetcher: pather,
76            address_translator,
77        })
78    }
79
80    /// Connect to the address.
81    pub fn connect(
82        &self,
83        addr: scion_proto::address::SocketAddr,
84        server_name: &str,
85    ) -> Result<quinn::Connecting, quinn::ConnectError> {
86        let mapped_addr = self
87            .address_translator
88            .register_scion_address(addr.scion_address());
89        let local_addr = self
90            .address_translator
91            .lookup_scion_address(self.inner.local_addr().unwrap().ip())
92            .unwrap();
93        self.path_prefetcher
94            .prefetch_path(local_addr.isd_asn(), addr.isd_asn());
95        self.inner.connect(
96            std::net::SocketAddr::new(mapped_addr, addr.port()),
97            server_name,
98        )
99    }
100
101    /// Accepts a new incoming connection.
102    pub async fn accept(&self) -> Result<Option<ScionQuinnConn>, quinn::ConnectionError> {
103        let incoming = self.inner.accept().await;
104        if let Some(incoming) = incoming {
105            let remote_socket_addr = incoming.remote_address();
106            let local_scion_addr = incoming
107                .local_ip()
108                .and_then(|ip| self.address_translator.lookup_scion_address(ip));
109            let conn = ScionQuinnConn {
110                inner: incoming.await?,
111                // XXX(uniquefine): For now the ScionAsyncUdpSocket does not have access to a
112                // packets destination address, so we cannot lookup the local SCION
113                // address.
114                local_addr: local_scion_addr,
115                remote_addr: scion_proto::address::SocketAddr::new(
116                    self.address_translator
117                        .lookup_scion_address(remote_socket_addr.ip())
118                        .or_else(|| {
119                            panic!(
120                                "no scion address mapped for ip, this should never happen: {}",
121                                remote_socket_addr.ip(),
122                            );
123                        })
124                        .unwrap(),
125                    remote_socket_addr.port(),
126                ),
127            };
128            Ok(Some(conn))
129        } else {
130            Ok(None)
131        }
132    }
133
134    /// Set the default QUIC client configuration.
135    pub fn set_default_client_config(&mut self, config: quinn::ClientConfig) {
136        self.inner.set_default_client_config(config);
137    }
138
139    /// Wait until all connections on the endpoint cleanly shut down.
140    pub async fn wait_idle(&self) {
141        self.inner.wait_idle().await;
142    }
143
144    /// Returns the local socket address of the endpoint.
145    pub fn local_addr(&self) -> std::io::Result<std::net::SocketAddr> {
146        self.inner.local_addr()
147    }
148}
149
150/// Type that can translate between SCION and IP addresses.
151// TODO(uniquefine): Expiration or cleanup of translated addresses
152pub struct AddressTranslator {
153    build_hasher: FixedState,
154    addr_map: Mutex<HashMap<std::net::Ipv6Addr, scion_proto::address::ScionAddr>>,
155}
156
157impl Debug for AddressTranslator {
158    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159        write!(
160            f,
161            "AddressTranslatorImpl {{ {} }}",
162            self.addr_map
163                .lock()
164                .unwrap()
165                .iter()
166                .map(|(ip, addr)| format!("{ip} -> {addr}"))
167                .collect::<Vec<_>>()
168                .join(", ")
169        )
170    }
171}
172
173impl AddressTranslator {
174    /// Creates a new address translator.
175    pub fn new(build_hasher: FixedState) -> Self {
176        Self {
177            build_hasher,
178            addr_map: Mutex::new(HashMap::new()),
179        }
180    }
181
182    fn hash_scion_address(&self, addr: scion_proto::address::ScionAddr) -> std::net::Ipv6Addr {
183        let mut hasher = self.build_hasher.build_hasher();
184        hasher.write_u64(addr.isd_asn().to_u64());
185        addr.local_address().hash(&mut hasher);
186        Ipv6Addr::from(hasher.finish() as u128)
187    }
188
189    /// Registers the SCION address and returns the corresponding IP address.
190    pub fn register_scion_address(
191        &self,
192        addr: scion_proto::address::ScionAddr,
193    ) -> std::net::IpAddr {
194        let ip = self.hash_scion_address(addr);
195        let mut addr_map = self.addr_map.lock().unwrap();
196        addr_map.entry(ip).or_insert(addr);
197        IpAddr::V6(ip)
198    }
199
200    /// Looks up the SCION address for the given IP address.
201    pub fn lookup_scion_address(
202        &self,
203        ip: std::net::IpAddr,
204    ) -> Option<scion_proto::address::ScionAddr> {
205        let ip = match ip {
206            IpAddr::V6(ip) => ip,
207            IpAddr::V4(_) => return None,
208        };
209        self.addr_map.lock().unwrap().get(&ip).cloned()
210    }
211}
212
213impl Default for AddressTranslator {
214    fn default() -> Self {
215        Self {
216            build_hasher: FixedState::with_seed(42),
217            addr_map: Mutex::new(HashMap::new()),
218        }
219    }
220}
221
222/// A path-aware UDP socket that implements the [quinn::AsyncUdpSocket] trait.
223///
224/// The socket translates the SCION addresses of incoming packets to IP addresses that
225/// are used by quinn.
226/// To connect to a SCION destination, the destination SCION address must first be registered
227/// with the [AddressTranslator].
228pub(crate) struct ScionAsyncUdpSocket {
229    socket: Arc<dyn AsyncUdpUnderlaySocket>,
230    path_manager: Arc<dyn SyncPathManager + Send + Sync>,
231    address_translator: Arc<AddressTranslator>,
232    /// The last time a poll_recv error was logged.
233    last_recv_error: Mutex<Instant>,
234    /// The last time a try_send error was logged.
235    last_send_error: Mutex<Instant>,
236}
237
238impl ScionAsyncUdpSocket {
239    pub fn new(
240        socket: Arc<dyn AsyncUdpUnderlaySocket>,
241        path_manager: Arc<dyn SyncPathManager + Send + Sync>,
242        address_translator: Arc<AddressTranslator>,
243    ) -> Self {
244        let now = Instant::now();
245        let instant = now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now);
246        Self {
247            socket,
248            path_manager,
249            address_translator,
250            last_recv_error: Mutex::new(instant),
251            last_send_error: Mutex::new(instant),
252        }
253    }
254}
255
256impl std::fmt::Debug for ScionAsyncUdpSocket {
257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258        f.write_fmt(format_args!(
259            "ScionAsyncUdpSocket({})",
260            match self.local_addr() {
261                Ok(addr) => addr.to_string(),
262                Err(e) => e.to_string(),
263            }
264        ))
265    }
266}
267
268/// A wrapper that implements quinn::UdpPoller by delegating to scionstack::UdpPoller
269/// This allows scionstack to remain decoupled from the quinn crate
270struct QuinnUdpPollerWrapper(Pin<Box<dyn UdpPoller>>);
271
272impl std::fmt::Debug for QuinnUdpPollerWrapper {
273    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274        self.0.fmt(f)
275    }
276}
277
278impl QuinnUdpPollerWrapper {
279    fn new(inner: Pin<Box<dyn UdpPoller>>) -> Self {
280        Self(inner)
281    }
282}
283
284impl quinn::UdpPoller for QuinnUdpPollerWrapper {
285    fn poll_writable(
286        mut self: Pin<&mut Self>,
287        cx: &mut std::task::Context,
288    ) -> Poll<std::io::Result<()>> {
289        self.0.as_mut().poll_writable(cx)
290    }
291}
292
293impl AsyncUdpSocket for ScionAsyncUdpSocket {
294    fn create_io_poller(self: Arc<Self>) -> std::pin::Pin<Box<dyn quinn::UdpPoller>> {
295        let socket = self.socket.clone();
296        let inner_poller = socket.create_io_poller();
297        let wrapper = QuinnUdpPollerWrapper::new(inner_poller);
298        Box::pin(wrapper)
299    }
300
301    fn try_send(&self, transmit: &quinn::udp::Transmit) -> std::io::Result<()> {
302        let buf = bytes::Bytes::copy_from_slice(transmit.contents);
303        let remote_scion_addr = SocketAddr::new(
304            self.address_translator
305                .lookup_scion_address(transmit.destination.ip())
306                .ok_or(std::io::Error::other(format!(
307                    "no scion address mapped for ip, this should never happen: {}",
308                    transmit.destination.ip(),
309                )))?,
310            transmit.destination.port(),
311        );
312        let path = self.path_manager.try_cached_path(
313            self.socket.local_addr().isd_asn(),
314            remote_scion_addr.isd_asn(),
315            Utc::now(),
316        )?;
317
318        let path = match path {
319            Some(path) => path,
320            None => return Ok(()),
321        };
322
323        let packet = ScionPacketUdp::new(
324            ByEndpoint {
325                source: self.socket.local_addr(),
326                destination: remote_scion_addr,
327            },
328            path.data_plane_path.to_bytes_path(),
329            buf,
330        )
331        .map_err(|_| std::io::Error::other("failed to encode packet"))?;
332
333        match self.socket.try_send(packet.into()) {
334            Ok(_) => Ok(()),
335            Err(e) if e.kind() == ErrorKind::WouldBlock => Err(e),
336            Err(e) => {
337                // XXX: We only log the error such that the quinn connection driver doesn't quit.
338                debounced_warn(
339                    &self.last_send_error,
340                    "Failed to send on the underlying socket",
341                    e,
342                );
343                Ok(())
344            }
345        }
346    }
347
348    fn poll_recv(
349        &self,
350        cx: &mut std::task::Context,
351        bufs: &mut [std::io::IoSliceMut<'_>],
352        meta: &mut [quinn::udp::RecvMeta],
353    ) -> std::task::Poll<std::io::Result<usize>> {
354        match ready!(self.socket.poll_recv_from_with_path(cx)) {
355            Ok((remote, bytes, path)) => {
356                match path.to_reversed() {
357                    Ok(path) => {
358                        // Register the path for later reuse
359                        self.path_manager.register_path(
360                            remote.isd_asn(),
361                            self.socket.local_addr().isd_asn(),
362                            Utc::now(),
363                            path,
364                        );
365                    }
366                    Err(e) => {
367                        tracing::trace!("Failed to reverse path for registration: {}", e)
368                    }
369                }
370
371                let remote_ip = self
372                    .address_translator
373                    .register_scion_address(remote.scion_address());
374
375                meta[0] = RecvMeta {
376                    addr: std::net::SocketAddr::new(remote_ip, remote.port()),
377                    len: bytes.len(),
378                    ecn: None,
379                    stride: bytes.len(),
380                    dst_ip: self.socket.local_addr().local_address().map(|s| s.ip()),
381                };
382                bufs[0].as_mut().put_slice(&bytes);
383
384                Poll::Ready(Ok(1))
385            }
386            Err(e) if e.kind() == ErrorKind::WouldBlock => Poll::Ready(Err(e)),
387            Err(e) => {
388                // XXX: We only log the error such that the endpoint driver doesn't quit.
389                debounced_warn(
390                    &self.last_recv_error,
391                    "Failed to receive on the underlying socket",
392                    e,
393                );
394
395                Poll::Pending
396            }
397        }
398    }
399
400    fn local_addr(&self) -> std::io::Result<std::net::SocketAddr> {
401        Ok(std::net::SocketAddr::new(
402            self.address_translator
403                .register_scion_address(self.socket.local_addr().scion_address()),
404            self.socket.local_addr().port(),
405        ))
406    }
407}
408
409/// Logs a warning message when an error occurs.
410///
411/// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`]
412/// has elapsed since the last error was logged.
413// Inspired by quinn's `log_sendmsg_error`.
414fn debounced_warn(last_send_error: &Mutex<Instant>, msg: &str, err: impl core::fmt::Debug) {
415    let now = Instant::now();
416    let last_send_error = &mut *last_send_error.lock().expect("poisoned lock");
417    if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL {
418        *last_send_error = now;
419        tracing::warn!(?err, "{msg}");
420    }
421}