1use std::{
17 collections::HashMap,
18 fmt::{self, Debug},
19 hash::{BuildHasher, Hash as _, Hasher as _},
20 io::ErrorKind,
21 net::{IpAddr, Ipv6Addr},
22 pin::Pin,
23 sync::{Arc, Mutex},
24 task::{Poll, ready},
25 time::{Duration, Instant},
26};
27
28use bytes::BufMut as _;
29use chrono::Utc;
30use foldhash::fast::FixedState;
31use quinn::{AsyncUdpSocket, udp::RecvMeta};
32use scion_proto::{
33 address::SocketAddr,
34 packet::{ByEndpoint, ScionPacketUdp},
35};
36
37use super::{AsyncUdpUnderlaySocket, udp_polling::UdpPoller};
38use crate::{
39 path::manager::{PathPrefetcher, SyncPathManager},
40 quic::ScionQuinnConn,
41};
42
43const IO_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(3);
45
46pub struct Endpoint {
53 inner: quinn::Endpoint,
54 path_prefetcher: Arc<dyn PathPrefetcher + Send + Sync>,
55 address_translator: Arc<AddressTranslator>,
56}
57
58impl Endpoint {
59 pub fn new_with_abstract_socket(
61 config: quinn::EndpointConfig,
62 server_config: Option<quinn::ServerConfig>,
63 socket: Arc<dyn quinn::AsyncUdpSocket>,
64 runtime: Arc<dyn quinn::Runtime>,
65 pather: Arc<dyn PathPrefetcher + Send + Sync>,
66 address_translator: Arc<AddressTranslator>,
67 ) -> std::io::Result<Self> {
68 Ok(Self {
69 inner: quinn::Endpoint::new_with_abstract_socket(
70 config,
71 server_config,
72 socket,
73 runtime,
74 )?,
75 path_prefetcher: pather,
76 address_translator,
77 })
78 }
79
80 pub fn connect(
82 &self,
83 addr: scion_proto::address::SocketAddr,
84 server_name: &str,
85 ) -> Result<quinn::Connecting, quinn::ConnectError> {
86 let mapped_addr = self
87 .address_translator
88 .register_scion_address(addr.scion_address());
89 let local_addr = self
90 .address_translator
91 .lookup_scion_address(self.inner.local_addr().unwrap().ip())
92 .unwrap();
93 self.path_prefetcher
94 .prefetch_path(local_addr.isd_asn(), addr.isd_asn());
95 self.inner.connect(
96 std::net::SocketAddr::new(mapped_addr, addr.port()),
97 server_name,
98 )
99 }
100
101 pub async fn accept(&self) -> Result<Option<ScionQuinnConn>, quinn::ConnectionError> {
103 let incoming = self.inner.accept().await;
104 if let Some(incoming) = incoming {
105 let remote_socket_addr = incoming.remote_address();
106 let local_scion_addr = incoming
107 .local_ip()
108 .and_then(|ip| self.address_translator.lookup_scion_address(ip));
109 let conn = ScionQuinnConn {
110 inner: incoming.await?,
111 local_addr: local_scion_addr,
115 remote_addr: scion_proto::address::SocketAddr::new(
116 self.address_translator
117 .lookup_scion_address(remote_socket_addr.ip())
118 .or_else(|| {
119 panic!(
120 "no scion address mapped for ip, this should never happen: {}",
121 remote_socket_addr.ip(),
122 );
123 })
124 .unwrap(),
125 remote_socket_addr.port(),
126 ),
127 };
128 Ok(Some(conn))
129 } else {
130 Ok(None)
131 }
132 }
133
134 pub fn set_default_client_config(&mut self, config: quinn::ClientConfig) {
136 self.inner.set_default_client_config(config);
137 }
138
139 pub async fn wait_idle(&self) {
141 self.inner.wait_idle().await;
142 }
143
144 pub fn local_addr(&self) -> std::io::Result<std::net::SocketAddr> {
146 self.inner.local_addr()
147 }
148}
149
150pub struct AddressTranslator {
153 build_hasher: FixedState,
154 addr_map: Mutex<HashMap<std::net::Ipv6Addr, scion_proto::address::ScionAddr>>,
155}
156
157impl Debug for AddressTranslator {
158 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159 write!(
160 f,
161 "AddressTranslatorImpl {{ {} }}",
162 self.addr_map
163 .lock()
164 .unwrap()
165 .iter()
166 .map(|(ip, addr)| format!("{ip} -> {addr}"))
167 .collect::<Vec<_>>()
168 .join(", ")
169 )
170 }
171}
172
173impl AddressTranslator {
174 pub fn new(build_hasher: FixedState) -> Self {
176 Self {
177 build_hasher,
178 addr_map: Mutex::new(HashMap::new()),
179 }
180 }
181
182 fn hash_scion_address(&self, addr: scion_proto::address::ScionAddr) -> std::net::Ipv6Addr {
183 let mut hasher = self.build_hasher.build_hasher();
184 hasher.write_u64(addr.isd_asn().to_u64());
185 addr.local_address().hash(&mut hasher);
186 Ipv6Addr::from(hasher.finish() as u128)
187 }
188
189 pub fn register_scion_address(
191 &self,
192 addr: scion_proto::address::ScionAddr,
193 ) -> std::net::IpAddr {
194 let ip = self.hash_scion_address(addr);
195 let mut addr_map = self.addr_map.lock().unwrap();
196 addr_map.entry(ip).or_insert(addr);
197 IpAddr::V6(ip)
198 }
199
200 pub fn lookup_scion_address(
202 &self,
203 ip: std::net::IpAddr,
204 ) -> Option<scion_proto::address::ScionAddr> {
205 let ip = match ip {
206 IpAddr::V6(ip) => ip,
207 IpAddr::V4(_) => return None,
208 };
209 self.addr_map.lock().unwrap().get(&ip).cloned()
210 }
211}
212
213impl Default for AddressTranslator {
214 fn default() -> Self {
215 Self {
216 build_hasher: FixedState::with_seed(42),
217 addr_map: Mutex::new(HashMap::new()),
218 }
219 }
220}
221
222pub(crate) struct ScionAsyncUdpSocket {
229 socket: Arc<dyn AsyncUdpUnderlaySocket>,
230 path_manager: Arc<dyn SyncPathManager + Send + Sync>,
231 address_translator: Arc<AddressTranslator>,
232 last_recv_error: Mutex<Instant>,
234 last_send_error: Mutex<Instant>,
236}
237
238impl ScionAsyncUdpSocket {
239 pub fn new(
240 socket: Arc<dyn AsyncUdpUnderlaySocket>,
241 path_manager: Arc<dyn SyncPathManager + Send + Sync>,
242 address_translator: Arc<AddressTranslator>,
243 ) -> Self {
244 let now = Instant::now();
245 let instant = now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now);
246 Self {
247 socket,
248 path_manager,
249 address_translator,
250 last_recv_error: Mutex::new(instant),
251 last_send_error: Mutex::new(instant),
252 }
253 }
254}
255
256impl std::fmt::Debug for ScionAsyncUdpSocket {
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 f.write_fmt(format_args!(
259 "ScionAsyncUdpSocket({})",
260 match self.local_addr() {
261 Ok(addr) => addr.to_string(),
262 Err(e) => e.to_string(),
263 }
264 ))
265 }
266}
267
268struct QuinnUdpPollerWrapper(Pin<Box<dyn UdpPoller>>);
271
272impl std::fmt::Debug for QuinnUdpPollerWrapper {
273 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274 self.0.fmt(f)
275 }
276}
277
278impl QuinnUdpPollerWrapper {
279 fn new(inner: Pin<Box<dyn UdpPoller>>) -> Self {
280 Self(inner)
281 }
282}
283
284impl quinn::UdpPoller for QuinnUdpPollerWrapper {
285 fn poll_writable(
286 mut self: Pin<&mut Self>,
287 cx: &mut std::task::Context,
288 ) -> Poll<std::io::Result<()>> {
289 self.0.as_mut().poll_writable(cx)
290 }
291}
292
293impl AsyncUdpSocket for ScionAsyncUdpSocket {
294 fn create_io_poller(self: Arc<Self>) -> std::pin::Pin<Box<dyn quinn::UdpPoller>> {
295 let socket = self.socket.clone();
296 let inner_poller = socket.create_io_poller();
297 let wrapper = QuinnUdpPollerWrapper::new(inner_poller);
298 Box::pin(wrapper)
299 }
300
301 fn try_send(&self, transmit: &quinn::udp::Transmit) -> std::io::Result<()> {
302 let buf = bytes::Bytes::copy_from_slice(transmit.contents);
303 let remote_scion_addr = SocketAddr::new(
304 self.address_translator
305 .lookup_scion_address(transmit.destination.ip())
306 .ok_or(std::io::Error::other(format!(
307 "no scion address mapped for ip, this should never happen: {}",
308 transmit.destination.ip(),
309 )))?,
310 transmit.destination.port(),
311 );
312 let path = self.path_manager.try_cached_path(
313 self.socket.local_addr().isd_asn(),
314 remote_scion_addr.isd_asn(),
315 Utc::now(),
316 )?;
317
318 let path = match path {
319 Some(path) => path,
320 None => return Ok(()),
321 };
322
323 let packet = ScionPacketUdp::new(
324 ByEndpoint {
325 source: self.socket.local_addr(),
326 destination: remote_scion_addr,
327 },
328 path.data_plane_path.to_bytes_path(),
329 buf,
330 )
331 .map_err(|_| std::io::Error::other("failed to encode packet"))?;
332
333 match self.socket.try_send(packet.into()) {
334 Ok(_) => Ok(()),
335 Err(e) if e.kind() == ErrorKind::WouldBlock => Err(e),
336 Err(e) => {
337 debounced_warn(
339 &self.last_send_error,
340 "Failed to send on the underlying socket",
341 e,
342 );
343 Ok(())
344 }
345 }
346 }
347
348 fn poll_recv(
349 &self,
350 cx: &mut std::task::Context,
351 bufs: &mut [std::io::IoSliceMut<'_>],
352 meta: &mut [quinn::udp::RecvMeta],
353 ) -> std::task::Poll<std::io::Result<usize>> {
354 match ready!(self.socket.poll_recv_from_with_path(cx)) {
355 Ok((remote, bytes, path)) => {
356 match path.to_reversed() {
357 Ok(path) => {
358 self.path_manager.register_path(
360 remote.isd_asn(),
361 self.socket.local_addr().isd_asn(),
362 Utc::now(),
363 path,
364 );
365 }
366 Err(e) => {
367 tracing::trace!("Failed to reverse path for registration: {}", e)
368 }
369 }
370
371 let remote_ip = self
372 .address_translator
373 .register_scion_address(remote.scion_address());
374
375 meta[0] = RecvMeta {
376 addr: std::net::SocketAddr::new(remote_ip, remote.port()),
377 len: bytes.len(),
378 ecn: None,
379 stride: bytes.len(),
380 dst_ip: self.socket.local_addr().local_address().map(|s| s.ip()),
381 };
382 bufs[0].as_mut().put_slice(&bytes);
383
384 Poll::Ready(Ok(1))
385 }
386 Err(e) if e.kind() == ErrorKind::WouldBlock => Poll::Ready(Err(e)),
387 Err(e) => {
388 debounced_warn(
390 &self.last_recv_error,
391 "Failed to receive on the underlying socket",
392 e,
393 );
394
395 Poll::Pending
396 }
397 }
398 }
399
400 fn local_addr(&self) -> std::io::Result<std::net::SocketAddr> {
401 Ok(std::net::SocketAddr::new(
402 self.address_translator
403 .register_scion_address(self.socket.local_addr().scion_address()),
404 self.socket.local_addr().port(),
405 ))
406 }
407}
408
409fn debounced_warn(last_send_error: &Mutex<Instant>, msg: &str, err: impl core::fmt::Debug) {
415 let now = Instant::now();
416 let last_send_error = &mut *last_send_error.lock().expect("poisoned lock");
417 if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL {
418 *last_send_error = now;
419 tracing::warn!(?err, "{msg}");
420 }
421}