1use std::{
17 collections::HashMap,
18 fmt::{self, Debug},
19 hash::{BuildHasher, Hash as _, Hasher as _},
20 net::{IpAddr, Ipv6Addr},
21 pin::Pin,
22 sync::{Arc, Mutex},
23 task::{Poll, ready},
24};
25
26use bytes::BufMut as _;
27use chrono::Utc;
28use foldhash::fast::FixedState;
29use quinn::{AsyncUdpSocket, udp::RecvMeta};
30use scion_proto::{
31 address::SocketAddr,
32 packet::{ByEndpoint, ScionPacketUdp},
33};
34
35use super::{AsyncUdpUnderlaySocket, udp_polling::UdpPoller};
36use crate::{
37 path::manager::{PathPrefetcher, SyncPathManager},
38 quic::ScionQuinnConn,
39};
40
41pub struct Endpoint {
48 inner: quinn::Endpoint,
49 path_prefetcher: Arc<dyn PathPrefetcher + Send + Sync>,
50 address_translator: Arc<AddressTranslator>,
51}
52
53impl Endpoint {
54 pub fn new_with_abstract_socket(
56 config: quinn::EndpointConfig,
57 server_config: Option<quinn::ServerConfig>,
58 socket: Arc<dyn quinn::AsyncUdpSocket>,
59 runtime: Arc<dyn quinn::Runtime>,
60 pather: Arc<dyn PathPrefetcher + Send + Sync>,
61 address_translator: Arc<AddressTranslator>,
62 ) -> std::io::Result<Self> {
63 Ok(Self {
64 inner: quinn::Endpoint::new_with_abstract_socket(
65 config,
66 server_config,
67 socket,
68 runtime,
69 )?,
70 path_prefetcher: pather,
71 address_translator,
72 })
73 }
74
75 pub fn connect(
77 &self,
78 addr: scion_proto::address::SocketAddr,
79 server_name: &str,
80 ) -> Result<quinn::Connecting, quinn::ConnectError> {
81 let mapped_addr = self
82 .address_translator
83 .register_scion_address(addr.scion_address());
84 let local_addr = self
85 .address_translator
86 .lookup_scion_address(self.inner.local_addr().unwrap().ip())
87 .unwrap();
88 self.path_prefetcher
89 .prefetch_path(local_addr.isd_asn(), addr.isd_asn());
90 self.inner.connect(
91 std::net::SocketAddr::new(mapped_addr, addr.port()),
92 server_name,
93 )
94 }
95
96 pub async fn accept(&self) -> Result<Option<ScionQuinnConn>, quinn::ConnectionError> {
98 let incoming = self.inner.accept().await;
99 if let Some(incoming) = incoming {
100 let remote_socket_addr = incoming.remote_address();
101 let local_scion_addr = incoming
102 .local_ip()
103 .and_then(|ip| self.address_translator.lookup_scion_address(ip));
104 let conn = ScionQuinnConn {
105 inner: incoming.await?,
106 local_addr: local_scion_addr,
110 remote_addr: scion_proto::address::SocketAddr::new(
111 self.address_translator
112 .lookup_scion_address(remote_socket_addr.ip())
113 .or_else(|| {
114 panic!(
115 "no scion address for ip, this should never happen: {}",
116 remote_socket_addr.ip(),
117 );
118 })
119 .unwrap(),
120 remote_socket_addr.port(),
121 ),
122 };
123 Ok(Some(conn))
124 } else {
125 Ok(None)
126 }
127 }
128
129 pub fn set_default_client_config(&mut self, config: quinn::ClientConfig) {
131 self.inner.set_default_client_config(config);
132 }
133
134 pub async fn wait_idle(&self) {
136 self.inner.wait_idle().await;
137 }
138
139 pub fn local_addr(&self) -> std::io::Result<std::net::SocketAddr> {
141 self.inner.local_addr()
142 }
143}
144
145pub struct AddressTranslator {
148 build_hasher: FixedState,
149 addr_map: Mutex<HashMap<std::net::Ipv6Addr, scion_proto::address::ScionAddr>>,
150}
151
152impl Debug for AddressTranslator {
153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 write!(
155 f,
156 "AddressTranslatorImpl {{ {} }}",
157 self.addr_map
158 .lock()
159 .unwrap()
160 .iter()
161 .map(|(ip, addr)| format!("{ip} -> {addr}"))
162 .collect::<Vec<_>>()
163 .join(", ")
164 )
165 }
166}
167
168impl AddressTranslator {
169 pub fn new(build_hasher: FixedState) -> Self {
171 Self {
172 build_hasher,
173 addr_map: Mutex::new(HashMap::new()),
174 }
175 }
176
177 fn hash_scion_address(&self, addr: scion_proto::address::ScionAddr) -> std::net::Ipv6Addr {
178 let mut hasher = self.build_hasher.build_hasher();
179 hasher.write_u64(addr.isd_asn().to_u64());
180 addr.local_address().hash(&mut hasher);
181 Ipv6Addr::from(hasher.finish() as u128)
182 }
183
184 pub fn register_scion_address(
186 &self,
187 addr: scion_proto::address::ScionAddr,
188 ) -> std::net::IpAddr {
189 let ip = self.hash_scion_address(addr);
190 let mut addr_map = self.addr_map.lock().unwrap();
191 addr_map.entry(ip).or_insert(addr);
192 IpAddr::V6(ip)
193 }
194
195 pub fn lookup_scion_address(
197 &self,
198 ip: std::net::IpAddr,
199 ) -> Option<scion_proto::address::ScionAddr> {
200 let ip = match ip {
201 IpAddr::V6(ip) => ip,
202 IpAddr::V4(_) => return None,
203 };
204 self.addr_map.lock().unwrap().get(&ip).cloned()
205 }
206}
207
208impl Default for AddressTranslator {
209 fn default() -> Self {
210 Self {
211 build_hasher: FixedState::with_seed(42),
212 addr_map: Mutex::new(HashMap::new()),
213 }
214 }
215}
216
217pub(crate) struct ScionAsyncUdpSocket {
224 socket: Arc<dyn AsyncUdpUnderlaySocket>,
225 path_manager: Arc<dyn SyncPathManager + Send + Sync>,
226 address_translator: Arc<AddressTranslator>,
227}
228
229impl ScionAsyncUdpSocket {
230 pub fn new(
231 socket: Arc<dyn AsyncUdpUnderlaySocket>,
232 path_manager: Arc<dyn SyncPathManager + Send + Sync>,
233 address_translator: Arc<AddressTranslator>,
234 ) -> Self {
235 Self {
236 socket,
237 path_manager,
238 address_translator,
239 }
240 }
241}
242
243impl std::fmt::Debug for ScionAsyncUdpSocket {
244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 f.write_fmt(format_args!(
246 "ScionAsyncUdpSocket({})",
247 match self.local_addr() {
248 Ok(addr) => addr.to_string(),
249 Err(e) => e.to_string(),
250 }
251 ))
252 }
253}
254
255struct QuinnUdpPollerWrapper(Pin<Box<dyn UdpPoller>>);
258
259impl std::fmt::Debug for QuinnUdpPollerWrapper {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 self.0.fmt(f)
262 }
263}
264
265impl QuinnUdpPollerWrapper {
266 fn new(inner: Pin<Box<dyn UdpPoller>>) -> Self {
267 Self(inner)
268 }
269}
270
271impl quinn::UdpPoller for QuinnUdpPollerWrapper {
272 fn poll_writable(
273 mut self: Pin<&mut Self>,
274 cx: &mut std::task::Context,
275 ) -> Poll<std::io::Result<()>> {
276 self.0.as_mut().poll_writable(cx)
277 }
278}
279
280impl AsyncUdpSocket for ScionAsyncUdpSocket {
281 fn create_io_poller(self: Arc<Self>) -> std::pin::Pin<Box<dyn quinn::UdpPoller>> {
282 let socket = self.socket.clone();
283 let inner_poller = socket.create_io_poller();
284 let wrapper = QuinnUdpPollerWrapper::new(inner_poller);
285 Box::pin(wrapper)
286 }
287
288 fn try_send(&self, transmit: &quinn::udp::Transmit) -> std::io::Result<()> {
289 let buf = bytes::Bytes::copy_from_slice(transmit.contents);
290 let remote_scion_addr = SocketAddr::new(
291 self.address_translator
292 .lookup_scion_address(transmit.destination.ip())
293 .ok_or(std::io::Error::other(format!(
294 "no scion address for ip, this should never happen: {}",
295 transmit.destination.ip(),
296 )))?,
297 transmit.destination.port(),
298 );
299 let path = self.path_manager.try_cached_path(
300 self.socket.local_addr().isd_asn(),
301 remote_scion_addr.isd_asn(),
302 Utc::now(),
303 )?;
304
305 let path = match path {
306 Some(path) => path,
307 None => return Ok(()),
308 };
309
310 let packet = ScionPacketUdp::new(
311 ByEndpoint {
312 source: self.socket.local_addr(),
313 destination: remote_scion_addr,
314 },
315 path.data_plane_path.to_bytes_path(),
316 buf,
317 )
318 .map_err(|_| std::io::Error::other("failed to encode packet"))?;
319 self.socket.try_send(packet.into())
320 }
321
322 fn poll_recv(
323 &self,
324 cx: &mut std::task::Context,
325 bufs: &mut [std::io::IoSliceMut<'_>],
326 meta: &mut [quinn::udp::RecvMeta],
327 ) -> std::task::Poll<std::io::Result<usize>> {
328 match ready!(self.socket.poll_recv_from_with_path(cx)) {
329 Ok((remote, bytes, path)) => {
330 match path.to_reversed() {
331 Ok(path) => {
332 self.path_manager.register_path(
334 remote.isd_asn(),
335 self.socket.local_addr().isd_asn(),
336 Utc::now(),
337 path,
338 );
339 }
340 Err(e) => {
341 tracing::trace!("Failed to reverse path for registration: {}", e)
342 }
343 }
344
345 let remote_ip = self
346 .address_translator
347 .register_scion_address(remote.scion_address());
348
349 meta[0] = RecvMeta {
350 addr: std::net::SocketAddr::new(remote_ip, remote.port()),
351 len: bytes.len(),
352 ecn: None,
353 stride: bytes.len(),
354 dst_ip: self.socket.local_addr().local_address().map(|s| s.ip()),
355 };
356 bufs[0].as_mut().put_slice(&bytes);
357
358 Poll::Ready(Ok(1))
359 }
360 Err(e) => std::task::Poll::Ready(Err(e)),
361 }
362 }
363
364 fn local_addr(&self) -> std::io::Result<std::net::SocketAddr> {
365 Ok(std::net::SocketAddr::new(
366 self.address_translator
367 .register_scion_address(self.socket.local_addr().scion_address()),
368 self.socket.local_addr().port(),
369 ))
370 }
371}