1use std::{
17 collections::HashMap,
18 fmt::{self, Debug},
19 hash::{BuildHasher, Hash as _, Hasher as _},
20 io::ErrorKind,
21 net::{self, IpAddr, Ipv6Addr},
22 pin::Pin,
23 sync::{Arc, Mutex},
24 task::{Poll, ready},
25 time::{Duration, Instant},
26};
27
28use anapaya_quinn::{AsyncUdpSocket, udp::RecvMeta};
29use bytes::BufMut as _;
30use chrono::Utc;
31use foldhash::fast::FixedState;
32use scion_proto::{
33 address::SocketAddr,
34 packet::{ByEndpoint, ScionPacketUdp},
35};
36
37use super::{AsyncUdpUnderlaySocket, udp_polling::UdpPoller};
38use crate::{
39 path::manager::traits::{PathPrefetcher, SyncPathManager},
40 quic::ScionQuinnConn,
41};
42
43const IO_ERROR_LOG_INTERVAL: Duration = Duration::from_secs(3);
45
46pub struct Endpoint {
53 inner: anapaya_quinn::Endpoint,
54 socket: Arc<ScionAsyncUdpSocket>,
55 path_prefetcher: Arc<dyn PathPrefetcher + Send + Sync>,
56 address_translator: Arc<AddressTranslator>,
57 local_scion_addr: scion_proto::address::SocketAddr,
58}
59
60impl Endpoint {
61 pub(crate) fn new_with_abstract_socket(
63 config: anapaya_quinn::EndpointConfig,
64 server_config: Option<anapaya_quinn::ServerConfig>,
65 socket: Arc<ScionAsyncUdpSocket>,
66 local_scion_addr: scion_proto::address::SocketAddr,
67 runtime: Arc<dyn anapaya_quinn::Runtime>,
68 pather: Arc<dyn PathPrefetcher + Send + Sync>,
69 address_translator: Arc<AddressTranslator>,
70 ) -> std::io::Result<Self> {
71 Ok(Self {
72 inner: anapaya_quinn::Endpoint::new_with_abstract_socket(
73 config,
74 server_config,
75 socket.clone(),
76 runtime,
77 )?,
78 socket,
79 path_prefetcher: pather,
80 address_translator,
81 local_scion_addr,
82 })
83 }
84
85 pub fn connect(
87 &self,
88 addr: scion_proto::address::SocketAddr,
89 server_name: &str,
90 ) -> Result<anapaya_quinn::Connecting, anapaya_quinn::ConnectError> {
91 let mapped_addr = self
92 .address_translator
93 .register_scion_address(addr.scion_address());
94 let local_addr = self
95 .address_translator
96 .lookup_scion_address(self.inner.local_addr().unwrap().ip())
97 .unwrap();
98 self.path_prefetcher
99 .prefetch_path(local_addr.isd_asn(), addr.isd_asn());
100 self.inner.connect(
101 std::net::SocketAddr::new(mapped_addr, addr.port()),
102 server_name,
103 )
104 }
105
106 pub async fn accept(&self) -> Result<Option<ScionQuinnConn>, anapaya_quinn::ConnectionError> {
108 let incoming = self.inner.accept().await;
109 if let Some(incoming) = incoming {
110 let remote_socket_addr = incoming.remote_address();
111 let local_scion_addr = incoming
112 .local_ip()
113 .and_then(|ip| self.address_translator.lookup_scion_address(ip));
114 let conn = ScionQuinnConn {
115 inner: incoming.await?,
116 local_addr: local_scion_addr,
120 remote_addr: scion_proto::address::SocketAddr::new(
121 self.address_translator
122 .lookup_scion_address(remote_socket_addr.ip())
123 .or_else(|| {
124 panic!(
125 "no scion address mapped for ip, this should never happen: {}",
126 remote_socket_addr.ip(),
127 );
128 })
129 .unwrap(),
130 remote_socket_addr.port(),
131 ),
132 };
133 Ok(Some(conn))
134 } else {
135 Ok(None)
136 }
137 }
138
139 pub fn set_default_client_config(&mut self, config: anapaya_quinn::ClientConfig) {
141 self.inner.set_default_client_config(config);
142 }
143
144 pub async fn wait_idle(&self) {
146 self.inner.wait_idle().await;
147 }
148
149 pub fn local_addr(&self) -> std::io::Result<std::net::SocketAddr> {
151 self.inner.local_addr()
152 }
153
154 pub fn local_scion_addr(&self) -> scion_proto::address::SocketAddr {
156 self.local_scion_addr
157 }
158
159 pub fn snap_data_plane(&self) -> Option<net::SocketAddr> {
161 self.socket.snap_data_plane()
162 }
163}
164
165pub struct AddressTranslator {
168 build_hasher: FixedState,
169 addr_map: Mutex<HashMap<std::net::Ipv6Addr, scion_proto::address::ScionAddr>>,
170}
171
172impl Debug for AddressTranslator {
173 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174 write!(
175 f,
176 "AddressTranslatorImpl {{ {} }}",
177 self.addr_map
178 .lock()
179 .unwrap()
180 .iter()
181 .map(|(ip, addr)| format!("{ip} -> {addr}"))
182 .collect::<Vec<_>>()
183 .join(", ")
184 )
185 }
186}
187
188impl AddressTranslator {
189 pub fn new(build_hasher: FixedState) -> Self {
191 Self {
192 build_hasher,
193 addr_map: Mutex::new(HashMap::new()),
194 }
195 }
196
197 fn hash_scion_address(&self, addr: scion_proto::address::ScionAddr) -> std::net::Ipv6Addr {
198 let mut hasher = self.build_hasher.build_hasher();
199 hasher.write_u64(addr.isd_asn().to_u64());
200 addr.local_address().hash(&mut hasher);
201 Ipv6Addr::from(hasher.finish() as u128)
202 }
203
204 pub fn register_scion_address(
206 &self,
207 addr: scion_proto::address::ScionAddr,
208 ) -> std::net::IpAddr {
209 let ip = self.hash_scion_address(addr);
210 let mut addr_map = self.addr_map.lock().unwrap();
211 addr_map.entry(ip).or_insert(addr);
212 IpAddr::V6(ip)
213 }
214
215 pub fn lookup_scion_address(
217 &self,
218 ip: std::net::IpAddr,
219 ) -> Option<scion_proto::address::ScionAddr> {
220 let ip = match ip {
221 IpAddr::V6(ip) => ip,
222 IpAddr::V4(_) => return None,
223 };
224 self.addr_map.lock().unwrap().get(&ip).cloned()
225 }
226}
227
228impl Default for AddressTranslator {
229 fn default() -> Self {
230 Self {
231 build_hasher: FixedState::with_seed(42),
232 addr_map: Mutex::new(HashMap::new()),
233 }
234 }
235}
236
237pub(crate) struct ScionAsyncUdpSocket {
244 socket: Arc<dyn AsyncUdpUnderlaySocket>,
245 path_manager: Arc<dyn SyncPathManager + Send + Sync>,
246 address_translator: Arc<AddressTranslator>,
247 last_recv_error: Mutex<Instant>,
249 last_send_error: Mutex<Instant>,
251}
252
253impl ScionAsyncUdpSocket {
254 pub fn new(
255 socket: Arc<dyn AsyncUdpUnderlaySocket>,
256 path_manager: Arc<dyn SyncPathManager + Send + Sync>,
257 address_translator: Arc<AddressTranslator>,
258 ) -> Self {
259 let now = Instant::now();
260 let instant = now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now);
261 Self {
262 socket,
263 path_manager,
264 address_translator,
265 last_recv_error: Mutex::new(instant),
266 last_send_error: Mutex::new(instant),
267 }
268 }
269
270 pub fn snap_data_plane(&self) -> Option<net::SocketAddr> {
272 self.socket.snap_data_plane()
273 }
274}
275
276impl std::fmt::Debug for ScionAsyncUdpSocket {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 f.write_fmt(format_args!(
279 "ScionAsyncUdpSocket({})",
280 match self.local_addr() {
281 Ok(addr) => addr.to_string(),
282 Err(e) => e.to_string(),
283 }
284 ))
285 }
286}
287
288struct QuinnUdpPollerWrapper(Pin<Box<dyn UdpPoller>>);
291
292impl std::fmt::Debug for QuinnUdpPollerWrapper {
293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 self.0.fmt(f)
295 }
296}
297
298impl QuinnUdpPollerWrapper {
299 fn new(inner: Pin<Box<dyn UdpPoller>>) -> Self {
300 Self(inner)
301 }
302}
303
304impl anapaya_quinn::UdpPoller for QuinnUdpPollerWrapper {
305 fn poll_writable(
306 mut self: Pin<&mut Self>,
307 cx: &mut std::task::Context,
308 ) -> Poll<std::io::Result<()>> {
309 self.0.as_mut().poll_writable(cx)
310 }
311}
312
313impl AsyncUdpSocket for ScionAsyncUdpSocket {
314 fn create_io_poller(self: Arc<Self>) -> std::pin::Pin<Box<dyn anapaya_quinn::UdpPoller>> {
315 let socket = self.socket.clone();
316 let inner_poller = socket.create_io_poller();
317 let wrapper = QuinnUdpPollerWrapper::new(inner_poller);
318 Box::pin(wrapper)
319 }
320
321 fn try_send(&self, transmit: &anapaya_quinn::udp::Transmit) -> std::io::Result<()> {
322 let buf = bytes::Bytes::copy_from_slice(transmit.contents);
323 let remote_scion_addr = SocketAddr::new(
324 self.address_translator
325 .lookup_scion_address(transmit.destination.ip())
326 .ok_or(std::io::Error::other(format!(
327 "no scion address mapped for ip, this should never happen: {}",
328 transmit.destination.ip(),
329 )))?,
330 transmit.destination.port(),
331 );
332 let path = self.path_manager.try_cached_path(
333 self.socket.local_addr().isd_asn(),
334 remote_scion_addr.isd_asn(),
335 Utc::now(),
336 )?;
337
338 let path = match path {
339 Some(path) => path,
340 None => return Ok(()),
341 };
342
343 let packet = ScionPacketUdp::new(
344 ByEndpoint {
345 source: self.socket.local_addr(),
346 destination: remote_scion_addr,
347 },
348 path.data_plane_path.to_bytes_path(),
349 buf,
350 )
351 .map_err(|_| std::io::Error::other("failed to encode packet"))?;
352
353 match self.socket.try_send(packet.into()) {
354 Ok(_) => Ok(()),
355 Err(e) if e.kind() == ErrorKind::WouldBlock => Err(e),
356 Err(e) => {
357 debounced_warn(
359 &self.last_send_error,
360 "Failed to send on the underlying socket",
361 e,
362 );
363 Ok(())
364 }
365 }
366 }
367
368 fn poll_recv(
369 &self,
370 cx: &mut std::task::Context,
371 bufs: &mut [std::io::IoSliceMut<'_>],
372 meta: &mut [anapaya_quinn::udp::RecvMeta],
373 ) -> std::task::Poll<std::io::Result<usize>> {
374 match ready!(self.socket.poll_recv_from_with_path(cx)) {
375 Ok((remote, bytes, path)) => {
376 match path.to_reversed() {
377 Ok(path) => {
378 self.path_manager.register_path(
380 remote.isd_asn(),
381 self.socket.local_addr().isd_asn(),
382 Utc::now(),
383 path,
384 );
385 }
386 Err(e) => {
387 tracing::trace!("Failed to reverse path for registration: {}", e)
388 }
389 }
390
391 let remote_ip = self
392 .address_translator
393 .register_scion_address(remote.scion_address());
394
395 meta[0] = RecvMeta {
396 addr: std::net::SocketAddr::new(remote_ip, remote.port()),
397 len: bytes.len(),
398 ecn: None,
399 stride: bytes.len(),
400 dst_ip: self.socket.local_addr().local_address().map(|s| s.ip()),
401 };
402 bufs[0].as_mut().put_slice(&bytes);
403
404 Poll::Ready(Ok(1))
405 }
406 Err(e) if e.kind() == ErrorKind::WouldBlock => Poll::Ready(Err(e)),
407 Err(e) => {
408 debounced_warn(
410 &self.last_recv_error,
411 "Failed to receive on the underlying socket",
412 e,
413 );
414
415 Poll::Pending
416 }
417 }
418 }
419
420 fn local_addr(&self) -> std::io::Result<std::net::SocketAddr> {
421 Ok(std::net::SocketAddr::new(
422 self.address_translator
423 .register_scion_address(self.socket.local_addr().scion_address()),
424 self.socket.local_addr().port(),
425 ))
426 }
427}
428
429fn debounced_warn(last_send_error: &Mutex<Instant>, msg: &str, err: impl core::fmt::Debug) {
435 let now = Instant::now();
436 let last_send_error = &mut *last_send_error.lock().expect("poisoned lock");
437 if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL {
438 *last_send_error = now;
439 tracing::warn!(?err, "{msg}");
440 }
441}