Skip to main content

scion_stack/scionstack/
quic.rs

1// Copyright 2025 Anapaya Systems
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//   http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//! SCION stack QUICK endpoint.
15
16use std::{
17    collections::HashMap,
18    fmt::{self, Debug},
19    hash::{BuildHasher, Hash as _, Hasher as _},
20    io::ErrorKind,
21    net::{self, IpAddr, Ipv6Addr},
22    pin::Pin,
23    sync::{Arc, Mutex},
24    task::{Poll, ready},
25    time::{Duration, Instant},
26};
27
28use anapaya_quinn::{AsyncUdpSocket, udp::RecvMeta};
29use bytes::BufMut as _;
30use chrono::Utc;
31use foldhash::fast::FixedState;
32use scion_proto::{
33    address::SocketAddr,
34    packet::{ByEndpoint, ScionPacketUdp},
35};
36
37use super::{AsyncUdpUnderlaySocket, udp_polling::UdpPoller};
38use crate::{
39    path::manager::traits::{PathPrefetcher, SyncPathManager},
40    quic::ScionQuinnConn,
41};
42
43/// Log at most 1 IO error every 3 seconds.
44const IO_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(3);
45
46/// A wrapper around a anapaya_quinn::Endpoint that translates between SCION and ip:port addresses.
47///
48/// This is necessary because anapaya_quinn expects a std::net::SocketAddr, but SCION uses
49/// scion_proto::address::SocketAddr.
50///
51/// Addresses are mapped by the provided ScionAsyncUdpSocket.
52pub struct Endpoint {
53    inner: anapaya_quinn::Endpoint,
54    socket: Arc<ScionAsyncUdpSocket>,
55    path_prefetcher: Arc<dyn PathPrefetcher + Send + Sync>,
56    address_translator: Arc<AddressTranslator>,
57    local_scion_addr: scion_proto::address::SocketAddr,
58}
59
60impl Endpoint {
61    /// Creates a new endpoint.
62    pub(crate) fn new_with_abstract_socket(
63        config: anapaya_quinn::EndpointConfig,
64        server_config: Option<anapaya_quinn::ServerConfig>,
65        socket: Arc<ScionAsyncUdpSocket>,
66        local_scion_addr: scion_proto::address::SocketAddr,
67        runtime: Arc<dyn anapaya_quinn::Runtime>,
68        pather: Arc<dyn PathPrefetcher + Send + Sync>,
69        address_translator: Arc<AddressTranslator>,
70    ) -> std::io::Result<Self> {
71        Ok(Self {
72            inner: anapaya_quinn::Endpoint::new_with_abstract_socket(
73                config,
74                server_config,
75                socket.clone(),
76                runtime,
77            )?,
78            socket,
79            path_prefetcher: pather,
80            address_translator,
81            local_scion_addr,
82        })
83    }
84
85    /// Connect to the address.
86    pub fn connect(
87        &self,
88        addr: scion_proto::address::SocketAddr,
89        server_name: &str,
90    ) -> Result<anapaya_quinn::Connecting, anapaya_quinn::ConnectError> {
91        let mapped_addr = self
92            .address_translator
93            .register_scion_address(addr.scion_address());
94        let local_addr = self
95            .address_translator
96            .lookup_scion_address(self.inner.local_addr().unwrap().ip())
97            .unwrap();
98        self.path_prefetcher
99            .prefetch_path(local_addr.isd_asn(), addr.isd_asn());
100        self.inner.connect(
101            std::net::SocketAddr::new(mapped_addr, addr.port()),
102            server_name,
103        )
104    }
105
106    /// Accepts a new incoming connection.
107    pub async fn accept(&self) -> Result<Option<ScionQuinnConn>, anapaya_quinn::ConnectionError> {
108        let incoming = self.inner.accept().await;
109        if let Some(incoming) = incoming {
110            let remote_socket_addr = incoming.remote_address();
111            let local_scion_addr = incoming
112                .local_ip()
113                .and_then(|ip| self.address_translator.lookup_scion_address(ip));
114            let conn = ScionQuinnConn {
115                inner: incoming.await?,
116                // XXX(uniquefine): For now the ScionAsyncUdpSocket does not have access to a
117                // packets destination address, so we cannot lookup the local SCION
118                // address.
119                local_addr: local_scion_addr,
120                remote_addr: scion_proto::address::SocketAddr::new(
121                    self.address_translator
122                        .lookup_scion_address(remote_socket_addr.ip())
123                        .or_else(|| {
124                            panic!(
125                                "no scion address mapped for ip, this should never happen: {}",
126                                remote_socket_addr.ip(),
127                            );
128                        })
129                        .unwrap(),
130                    remote_socket_addr.port(),
131                ),
132            };
133            Ok(Some(conn))
134        } else {
135            Ok(None)
136        }
137    }
138
139    /// Set the default QUIC client configuration.
140    pub fn set_default_client_config(&mut self, config: anapaya_quinn::ClientConfig) {
141        self.inner.set_default_client_config(config);
142    }
143
144    /// Wait until all connections on the endpoint cleanly shut down.
145    pub async fn wait_idle(&self) {
146        self.inner.wait_idle().await;
147    }
148
149    /// Returns the local socket address of the endpoint.
150    pub fn local_addr(&self) -> std::io::Result<std::net::SocketAddr> {
151        self.inner.local_addr()
152    }
153
154    /// Returns the local SCION address of the endpoint.
155    pub fn local_scion_addr(&self) -> scion_proto::address::SocketAddr {
156        self.local_scion_addr
157    }
158
159    /// Snap data plane address the endpoint is connected to, if any.
160    pub fn snap_data_plane(&self) -> Option<net::SocketAddr> {
161        self.socket.snap_data_plane()
162    }
163}
164
165/// Type that can translate between SCION and IP addresses.
166// TODO(uniquefine): Expiration or cleanup of translated addresses
167pub struct AddressTranslator {
168    build_hasher: FixedState,
169    addr_map: Mutex<HashMap<std::net::Ipv6Addr, scion_proto::address::ScionAddr>>,
170}
171
172impl Debug for AddressTranslator {
173    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174        write!(
175            f,
176            "AddressTranslatorImpl {{ {} }}",
177            self.addr_map
178                .lock()
179                .unwrap()
180                .iter()
181                .map(|(ip, addr)| format!("{ip} -> {addr}"))
182                .collect::<Vec<_>>()
183                .join(", ")
184        )
185    }
186}
187
188impl AddressTranslator {
189    /// Creates a new address translator.
190    pub fn new(build_hasher: FixedState) -> Self {
191        Self {
192            build_hasher,
193            addr_map: Mutex::new(HashMap::new()),
194        }
195    }
196
197    fn hash_scion_address(&self, addr: scion_proto::address::ScionAddr) -> std::net::Ipv6Addr {
198        let mut hasher = self.build_hasher.build_hasher();
199        hasher.write_u64(addr.isd_asn().to_u64());
200        addr.local_address().hash(&mut hasher);
201        Ipv6Addr::from(hasher.finish() as u128)
202    }
203
204    /// Registers the SCION address and returns the corresponding IP address.
205    pub fn register_scion_address(
206        &self,
207        addr: scion_proto::address::ScionAddr,
208    ) -> std::net::IpAddr {
209        let ip = self.hash_scion_address(addr);
210        let mut addr_map = self.addr_map.lock().unwrap();
211        addr_map.entry(ip).or_insert(addr);
212        IpAddr::V6(ip)
213    }
214
215    /// Looks up the SCION address for the given IP address.
216    pub fn lookup_scion_address(
217        &self,
218        ip: std::net::IpAddr,
219    ) -> Option<scion_proto::address::ScionAddr> {
220        let ip = match ip {
221            IpAddr::V6(ip) => ip,
222            IpAddr::V4(_) => return None,
223        };
224        self.addr_map.lock().unwrap().get(&ip).cloned()
225    }
226}
227
228impl Default for AddressTranslator {
229    fn default() -> Self {
230        Self {
231            build_hasher: FixedState::with_seed(42),
232            addr_map: Mutex::new(HashMap::new()),
233        }
234    }
235}
236
237/// A path-aware UDP socket that implements the [anapaya_quinn::AsyncUdpSocket] trait.
238///
239/// The socket translates the SCION addresses of incoming packets to IP addresses that
240/// are used by quinn.
241/// To connect to a SCION destination, the destination SCION address must first be registered
242/// with the [AddressTranslator].
243pub(crate) struct ScionAsyncUdpSocket {
244    socket: Arc<dyn AsyncUdpUnderlaySocket>,
245    path_manager: Arc<dyn SyncPathManager + Send + Sync>,
246    address_translator: Arc<AddressTranslator>,
247    /// The last time a poll_recv error was logged.
248    last_recv_error: Mutex<Instant>,
249    /// The last time a try_send error was logged.
250    last_send_error: Mutex<Instant>,
251}
252
253impl ScionAsyncUdpSocket {
254    pub fn new(
255        socket: Arc<dyn AsyncUdpUnderlaySocket>,
256        path_manager: Arc<dyn SyncPathManager + Send + Sync>,
257        address_translator: Arc<AddressTranslator>,
258    ) -> Self {
259        let now = Instant::now();
260        let instant = now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now);
261        Self {
262            socket,
263            path_manager,
264            address_translator,
265            last_recv_error: Mutex::new(instant),
266            last_send_error: Mutex::new(instant),
267        }
268    }
269
270    /// Returns the address of the SNAP data plane address the socket is connected to, if any.
271    pub fn snap_data_plane(&self) -> Option<net::SocketAddr> {
272        self.socket.snap_data_plane()
273    }
274}
275
276impl std::fmt::Debug for ScionAsyncUdpSocket {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        f.write_fmt(format_args!(
279            "ScionAsyncUdpSocket({})",
280            match self.local_addr() {
281                Ok(addr) => addr.to_string(),
282                Err(e) => e.to_string(),
283            }
284        ))
285    }
286}
287
288/// A wrapper that implements anapaya_quinn::UdpPoller by delegating to scionstack::UdpPoller
289/// This allows scionstack to remain decoupled from the quinn crate
290struct QuinnUdpPollerWrapper(Pin<Box<dyn UdpPoller>>);
291
292impl std::fmt::Debug for QuinnUdpPollerWrapper {
293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294        self.0.fmt(f)
295    }
296}
297
298impl QuinnUdpPollerWrapper {
299    fn new(inner: Pin<Box<dyn UdpPoller>>) -> Self {
300        Self(inner)
301    }
302}
303
304impl anapaya_quinn::UdpPoller for QuinnUdpPollerWrapper {
305    fn poll_writable(
306        mut self: Pin<&mut Self>,
307        cx: &mut std::task::Context,
308    ) -> Poll<std::io::Result<()>> {
309        self.0.as_mut().poll_writable(cx)
310    }
311}
312
313impl AsyncUdpSocket for ScionAsyncUdpSocket {
314    fn create_io_poller(self: Arc<Self>) -> std::pin::Pin<Box<dyn anapaya_quinn::UdpPoller>> {
315        let socket = self.socket.clone();
316        let inner_poller = socket.create_io_poller();
317        let wrapper = QuinnUdpPollerWrapper::new(inner_poller);
318        Box::pin(wrapper)
319    }
320
321    fn try_send(&self, transmit: &anapaya_quinn::udp::Transmit) -> std::io::Result<()> {
322        let buf = bytes::Bytes::copy_from_slice(transmit.contents);
323        let remote_scion_addr = SocketAddr::new(
324            self.address_translator
325                .lookup_scion_address(transmit.destination.ip())
326                .ok_or(std::io::Error::other(format!(
327                    "no scion address mapped for ip, this should never happen: {}",
328                    transmit.destination.ip(),
329                )))?,
330            transmit.destination.port(),
331        );
332        let path = self.path_manager.try_cached_path(
333            self.socket.local_addr().isd_asn(),
334            remote_scion_addr.isd_asn(),
335            Utc::now(),
336        )?;
337
338        let path = match path {
339            Some(path) => path,
340            None => return Ok(()),
341        };
342
343        let packet = ScionPacketUdp::new(
344            ByEndpoint {
345                source: self.socket.local_addr(),
346                destination: remote_scion_addr,
347            },
348            path.data_plane_path.to_bytes_path(),
349            buf,
350        )
351        .map_err(|_| std::io::Error::other("failed to encode packet"))?;
352
353        match self.socket.try_send(packet.into()) {
354            Ok(_) => Ok(()),
355            Err(e) if e.kind() == ErrorKind::WouldBlock => Err(e),
356            Err(e) => {
357                // XXX: We only log the error such that the quinn connection driver doesn't quit.
358                debounced_warn(
359                    &self.last_send_error,
360                    "Failed to send on the underlying socket",
361                    e,
362                );
363                Ok(())
364            }
365        }
366    }
367
368    fn poll_recv(
369        &self,
370        cx: &mut std::task::Context,
371        bufs: &mut [std::io::IoSliceMut<'_>],
372        meta: &mut [anapaya_quinn::udp::RecvMeta],
373    ) -> std::task::Poll<std::io::Result<usize>> {
374        match ready!(self.socket.poll_recv_from_with_path(cx)) {
375            Ok((remote, bytes, path)) => {
376                match path.to_reversed() {
377                    Ok(path) => {
378                        // Register the path for later reuse
379                        self.path_manager.register_path(
380                            remote.isd_asn(),
381                            self.socket.local_addr().isd_asn(),
382                            Utc::now(),
383                            path,
384                        );
385                    }
386                    Err(e) => {
387                        tracing::trace!("Failed to reverse path for registration: {}", e)
388                    }
389                }
390
391                let remote_ip = self
392                    .address_translator
393                    .register_scion_address(remote.scion_address());
394
395                meta[0] = RecvMeta {
396                    addr: std::net::SocketAddr::new(remote_ip, remote.port()),
397                    len: bytes.len(),
398                    ecn: None,
399                    stride: bytes.len(),
400                    dst_ip: self.socket.local_addr().local_address().map(|s| s.ip()),
401                };
402                bufs[0].as_mut().put_slice(&bytes);
403
404                Poll::Ready(Ok(1))
405            }
406            Err(e) if e.kind() == ErrorKind::WouldBlock => Poll::Ready(Err(e)),
407            Err(e) => {
408                // XXX: We only log the error such that the endpoint driver doesn't quit.
409                debounced_warn(
410                    &self.last_recv_error,
411                    "Failed to receive on the underlying socket",
412                    e,
413                );
414
415                Poll::Pending
416            }
417        }
418    }
419
420    fn local_addr(&self) -> std::io::Result<std::net::SocketAddr> {
421        Ok(std::net::SocketAddr::new(
422            self.address_translator
423                .register_scion_address(self.socket.local_addr().scion_address()),
424            self.socket.local_addr().port(),
425        ))
426    }
427}
428
429/// Logs a warning message when an error occurs.
430///
431/// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`]
432/// has elapsed since the last error was logged.
433// Inspired by quinn's `log_sendmsg_error`.
434fn debounced_warn(last_send_error: &Mutex<Instant>, msg: &str, err: impl core::fmt::Debug) {
435    let now = Instant::now();
436    let last_send_error = &mut *last_send_error.lock().expect("poisoned lock");
437    if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL {
438        *last_send_error = now;
439        tracing::warn!(?err, "{msg}");
440    }
441}