1use std::{
4 io::{self, Result},
5 net::SocketAddr,
6 pin::Pin,
7 sync::{
8 atomic::{AtomicUsize, Ordering},
9 Arc,
10 },
11 task::{Context, Poll},
12 time::Duration,
13};
14
15use async_trait::async_trait;
16use futures::{AsyncRead, AsyncWrite};
17use futures_boring::{
18 ec, pkey,
19 ssl::{SslAlert, SslContextBuilder, SslMethod, SslVerifyError, SslVerifyMode, SslVersion},
20 x509::X509,
21};
22use futures_quic::{
23 quiche::{self, Config},
24 QuicConn, QuicConnect, QuicListener, QuicListenerBind, QuicStream,
25};
26
27use uuid::Uuid;
28use xstack::{
29 identity::PublicKey,
30 multiaddr::{is_quic_transport, Multiaddr, Protocol, ToSockAddr},
31 transport_syscall::{DriverConnection, DriverListener, DriverStream, DriverTransport},
32 KeyStore, P2pConn, ProtocolStream, Switch, TransportListener,
33};
34
35async fn create_quic_config(host_key: &KeyStore, timeout: Duration) -> io::Result<Config> {
36 let (cert, pk) = xstack_x509::generate(host_key).await?;
37
38 let cert = X509::from_der(&cert)?;
39
40 let pk = pkey::PKey::from_ec_key(ec::EcKey::private_key_from_der(&pk)?)?;
41
42 let mut ssl_context_builder = SslContextBuilder::new(SslMethod::tls())?;
43
44 ssl_context_builder.set_max_proto_version(Some(SslVersion::TLS1_3))?;
45 ssl_context_builder.set_min_proto_version(Some(SslVersion::TLS1_3))?;
46
47 ssl_context_builder.set_certificate(&cert)?;
48
49 ssl_context_builder.set_private_key(&pk)?;
50
51 ssl_context_builder.check_private_key()?;
52
53 ssl_context_builder.set_custom_verify_callback(
54 SslVerifyMode::PEER | SslVerifyMode::FAIL_IF_NO_PEER_CERT,
55 |ssl| {
56 let cert = ssl
57 .peer_certificate()
58 .ok_or(SslVerifyError::Invalid(SslAlert::CERTIFICATE_REQUIRED))?;
59
60 let cert = cert
61 .to_der()
62 .map_err(|_| SslVerifyError::Invalid(SslAlert::BAD_CERTIFICATE))?;
63
64 let peer_id = xstack_x509::verify(cert)
65 .map_err(|_| SslVerifyError::Invalid(SslAlert::BAD_CERTIFICATE))?
66 .to_peer_id();
67
68 log::trace!("ssl_server: verified peer={}", peer_id);
69
70 Ok(())
71 },
72 );
73
74 let mut config =
75 Config::with_boring_ssl_ctx_builder(quiche::PROTOCOL_VERSION, ssl_context_builder)
76 .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
77
78 config.set_initial_max_data(10_000_000);
79 config.set_initial_max_stream_data_bidi_local(1024 * 1024);
80 config.set_initial_max_stream_data_bidi_remote(1024 * 1024);
81 config.set_initial_max_streams_bidi(100);
82 config.set_initial_max_streams_uni(100);
83 config.set_max_idle_timeout(timeout.as_millis() as u64);
84
85 config.verify_peer(true);
86
87 config.set_application_protos(&[b"libp2p"]).unwrap();
88
89 config.set_disable_active_migration(false);
92
93 Ok(config)
94}
95
96pub struct QuicTransport {
98 timeout: Duration,
99 activities: Arc<AtomicUsize>,
100}
101
102impl Default for QuicTransport {
103 fn default() -> Self {
104 Self {
105 timeout: Duration::from_secs(5),
106 activities: Default::default(),
107 }
108 }
109}
110
111#[async_trait]
112impl DriverTransport for QuicTransport {
113 fn name(&self) -> &str {
114 "quic"
115 }
116 fn activities(&self) -> usize {
117 self.activities.load(Ordering::Relaxed)
118 }
119 async fn bind(&self, switch: &Switch, laddr: &Multiaddr) -> Result<TransportListener> {
120 let quic_config = create_quic_config(&switch.keystore, self.timeout).await?;
121
122 let laddrs = laddr.to_sockaddr()?;
123
124 let listener = QuicListener::bind(laddrs, quic_config).await?;
125
126 let laddr = listener.local_addrs().next().unwrap().clone();
127
128 Ok(QuicP2pListener::new(listener, laddr, self.activities.clone()).into())
129 }
130
131 async fn connect(&self, switch: &Switch, raddr: &Multiaddr) -> Result<P2pConn> {
133 let mut quic_config = create_quic_config(&switch.keystore, self.timeout).await?;
134
135 let raddr = raddr.to_sockaddr()?;
136
137 let laddr = if raddr.is_ipv4() {
138 "0.0.0.0:0"
139 } else {
140 "[::]:0"
141 };
142
143 let conn = QuicConn::connect(None, laddr, raddr, &mut quic_config).await?;
144
145 let (laddr, raddr) = conn
146 .path()
147 .await
148 .ok_or(io::Error::new(io::ErrorKind::Other, "quic: no valid path"))?;
149
150 let cert = conn.peer_cert().await.ok_or(io::Error::new(
151 io::ErrorKind::NotFound,
152 "quic: peer cert not found",
153 ))?;
154
155 let public_key = xstack_x509::verify(cert)?;
156
157 let conn = QuicP2pConn::new(laddr, raddr, conn, public_key, self.activities.clone());
158
159 Ok(conn.into())
160 }
161
162 fn multiaddr_hint(&self, addr: &Multiaddr) -> bool {
164 is_quic_transport(addr)
165 }
166}
167
168struct QuicP2pListener {
169 listener: QuicListener,
170 laddr: SocketAddr,
171 conn_counter: Arc<AtomicUsize>,
172}
173
174impl QuicP2pListener {
175 fn new(listener: QuicListener, laddr: SocketAddr, conn_counter: Arc<AtomicUsize>) -> Self {
176 Self {
177 laddr,
178 listener,
179 conn_counter,
180 }
181 }
182}
183
184#[async_trait]
185impl DriverListener for QuicP2pListener {
186 async fn accept(&mut self) -> Result<P2pConn> {
188 let conn = self.listener.accept().await?;
189
190 let cert = conn.peer_cert().await.ok_or(io::Error::new(
191 io::ErrorKind::NotFound,
192 "quic: peer cert not found",
193 ))?;
194
195 let public_key = xstack_x509::verify(cert)?;
196
197 let peer_addr = conn.peer_addr(self.laddr).await.ok_or(io::Error::new(
198 io::ErrorKind::NotFound,
199 "quic: peer path not found",
200 ))?;
201
202 Ok(QuicP2pConn::new(
203 self.laddr.clone(),
204 peer_addr,
205 conn,
206 public_key,
207 self.conn_counter.clone(),
208 )
209 .into())
210 }
211
212 fn local_addr(&self) -> Result<Multiaddr> {
214 let mut addr = Multiaddr::from(self.laddr.ip());
215 addr.push(Protocol::Udp(self.laddr.port()));
216 addr.push(Protocol::QuicV1);
217
218 Ok(addr)
219 }
220}
221
222#[derive(Clone)]
223struct QuicP2pConn {
224 laddr: Multiaddr,
225 raddr: Multiaddr,
226 conn: Arc<QuicConn>,
227 public_key: PublicKey,
228 id: String,
229 counter: Arc<AtomicUsize>,
230 activities: Arc<AtomicUsize>,
231}
232
233impl Drop for QuicP2pConn {
234 fn drop(&mut self) {
235 self.activities.fetch_sub(1, Ordering::Relaxed);
236 }
237}
238
239impl QuicP2pConn {
240 fn new(
241 laddr: SocketAddr,
242 raddr: SocketAddr,
243 conn: QuicConn,
244 public_key: PublicKey,
245 activities: Arc<AtomicUsize>,
246 ) -> Self {
247 activities.fetch_add(1, Ordering::Relaxed);
248
249 let mut m_laddr = Multiaddr::from(laddr.ip());
250 m_laddr.push(Protocol::Udp(laddr.port()));
251 m_laddr.push(Protocol::QuicV1);
252
253 let mut m_raddr = Multiaddr::from(raddr.ip());
254 m_raddr.push(Protocol::Udp(raddr.port()));
255 m_raddr.push(Protocol::QuicV1);
256
257 Self {
258 id: Uuid::new_v4().to_string(),
259 laddr: m_laddr,
260 raddr: m_raddr,
261 conn: Arc::new(conn),
262 public_key,
263 counter: Default::default(),
264 activities,
265 }
266 }
267}
268
269#[async_trait]
270impl DriverConnection for QuicP2pConn {
271 fn id(&self) -> &str {
272 &self.id
273 }
274
275 fn local_addr(&self) -> &Multiaddr {
280 &self.laddr
281 }
282
283 fn peer_addr(&self) -> &Multiaddr {
285 &self.raddr
286 }
287
288 async fn accept(&mut self) -> io::Result<ProtocolStream> {
292 let stream = self.conn.accept().await?;
293
294 Ok(QuicP2pStream::new(
295 self.id.clone(),
296 stream,
297 self.public_key.clone(),
298 self.laddr.clone(),
299 self.raddr.clone(),
300 self.counter.clone(),
301 )
302 .into())
303 }
304
305 async fn connect(&mut self) -> Result<ProtocolStream> {
306 let stream = self.conn.open(true).await?;
307
308 Ok(QuicP2pStream::new(
309 self.id.clone(),
310 stream,
311 self.public_key.clone(),
312 self.laddr.clone(),
313 self.raddr.clone(),
314 self.counter.clone(),
315 )
316 .into())
317 }
318
319 fn close(&mut self) -> io::Result<()> {
320 self.conn.close()?;
321
322 Ok(())
323 }
324
325 fn is_closed(&self) -> bool {
327 self.conn.is_closed()
328 }
329
330 fn clone(&self) -> P2pConn {
332 self.activities.fetch_add(1, Ordering::Relaxed);
333 Clone::clone(self).into()
334 }
335
336 fn public_key(&self) -> &PublicKey {
338 &self.public_key
339 }
340
341 fn actives(&self) -> usize {
342 self.counter.load(Ordering::Relaxed)
343 }
344}
345
346struct QuicP2pStream {
347 conn_id: String,
348 id: String,
349 stream: QuicStream,
350 public_key: PublicKey,
351 laddr: Multiaddr,
352 raddr: Multiaddr,
353 counter: Arc<AtomicUsize>,
354}
355
356impl Drop for QuicP2pStream {
357 fn drop(&mut self) {
358 self.counter.fetch_sub(1, Ordering::Relaxed);
359 }
360}
361
362impl QuicP2pStream {
363 fn new(
364 conn_id: String,
365 stream: QuicStream,
366 public_key: PublicKey,
367 laddr: Multiaddr,
368 raddr: Multiaddr,
369 counter: Arc<AtomicUsize>,
370 ) -> Self {
371 counter.fetch_add(1, Ordering::Relaxed);
372
373 Self {
374 conn_id,
375 counter,
376 id: format!("quic({:?},{})", stream.scid(), stream.id()),
377 stream,
378 public_key,
379 laddr,
380 raddr,
381 }
382 }
383}
384
385#[async_trait]
386impl DriverStream for QuicP2pStream {
387 fn conn_id(&self) -> &str {
388 &self.conn_id
389 }
390 fn id(&self) -> &str {
391 &self.id
392 }
393 fn public_key(&self) -> &PublicKey {
395 &self.public_key
396 }
397
398 fn local_addr(&self) -> &Multiaddr {
400 &self.laddr
401 }
402
403 fn peer_addr(&self) -> &Multiaddr {
405 &self.raddr
406 }
407 fn poll_read(
409 mut self: std::pin::Pin<&mut Self>,
410 cx: &mut Context<'_>,
411 buf: &mut [u8],
412 ) -> Poll<Result<usize>> {
413 Pin::new(&mut self.stream).poll_read(cx, buf)
414 }
415
416 fn poll_write(
418 mut self: std::pin::Pin<&mut Self>,
419 cx: &mut Context<'_>,
420 buf: &[u8],
421 ) -> Poll<Result<usize>> {
422 Pin::new(&mut self.stream).poll_write(cx, buf)
423 }
424
425 fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
427 Pin::new(&mut self.stream).poll_flush(cx)
428 }
429
430 fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
432 Pin::new(&mut self.stream).poll_close(cx)
433 }
434}
435
436#[cfg(test)]
437mod tests {
438
439 use async_trait::async_trait;
440 use xstack::{Result, Switch};
441 use xstack_spec::transport::{transport_specs, TransportSpecContext};
442
443 use super::*;
444
445 struct QuicMock;
446
447 #[async_trait]
448 impl TransportSpecContext for QuicMock {
449 async fn create_switch(&self) -> Result<Switch> {
450 let switch = Switch::new("test")
451 .transport(QuicTransport::default())
452 .transport_bind(["/ip4/127.0.0.1/udp/0/quic-v1"])
453 .create()
454 .await?;
455
456 Ok(switch)
457 }
458 }
459
460 #[futures_test::test]
461 async fn test_specs() {
462 transport_specs(QuicMock).await.unwrap();
463 }
464}