xstack_quic/
lib.rs

1//! A [***libp2p quic transport***](https://docs.libp2p.io/concepts/transports/quic/) implementation.
2
3use std::{
4    io::{self, Result},
5    net::SocketAddr,
6    pin::Pin,
7    sync::{
8        atomic::{AtomicUsize, Ordering},
9        Arc,
10    },
11    task::{Context, Poll},
12    time::Duration,
13};
14
15use async_trait::async_trait;
16use futures::{AsyncRead, AsyncWrite};
17use futures_boring::{
18    ec, pkey,
19    ssl::{SslAlert, SslContextBuilder, SslMethod, SslVerifyError, SslVerifyMode, SslVersion},
20    x509::X509,
21};
22use futures_quic::{
23    quiche::{self, Config},
24    QuicConn, QuicConnect, QuicListener, QuicListenerBind, QuicStream,
25};
26
27use uuid::Uuid;
28use xstack::{
29    identity::PublicKey,
30    multiaddr::{is_quic_transport, Multiaddr, Protocol, ToSockAddr},
31    transport_syscall::{DriverConnection, DriverListener, DriverStream, DriverTransport},
32    KeyStore, P2pConn, ProtocolStream, Switch, TransportListener,
33};
34
35async fn create_quic_config(host_key: &KeyStore, timeout: Duration) -> io::Result<Config> {
36    let (cert, pk) = xstack_x509::generate(host_key).await?;
37
38    let cert = X509::from_der(&cert)?;
39
40    let pk = pkey::PKey::from_ec_key(ec::EcKey::private_key_from_der(&pk)?)?;
41
42    let mut ssl_context_builder = SslContextBuilder::new(SslMethod::tls())?;
43
44    ssl_context_builder.set_max_proto_version(Some(SslVersion::TLS1_3))?;
45    ssl_context_builder.set_min_proto_version(Some(SslVersion::TLS1_3))?;
46
47    ssl_context_builder.set_certificate(&cert)?;
48
49    ssl_context_builder.set_private_key(&pk)?;
50
51    ssl_context_builder.check_private_key()?;
52
53    ssl_context_builder.set_custom_verify_callback(
54        SslVerifyMode::PEER | SslVerifyMode::FAIL_IF_NO_PEER_CERT,
55        |ssl| {
56            let cert = ssl
57                .peer_certificate()
58                .ok_or(SslVerifyError::Invalid(SslAlert::CERTIFICATE_REQUIRED))?;
59
60            let cert = cert
61                .to_der()
62                .map_err(|_| SslVerifyError::Invalid(SslAlert::BAD_CERTIFICATE))?;
63
64            let peer_id = xstack_x509::verify(cert)
65                .map_err(|_| SslVerifyError::Invalid(SslAlert::BAD_CERTIFICATE))?
66                .to_peer_id();
67
68            log::trace!("ssl_server: verified peer={}", peer_id);
69
70            Ok(())
71        },
72    );
73
74    let mut config =
75        Config::with_boring_ssl_ctx_builder(quiche::PROTOCOL_VERSION, ssl_context_builder)
76            .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
77
78    config.set_initial_max_data(10_000_000);
79    config.set_initial_max_stream_data_bidi_local(1024 * 1024);
80    config.set_initial_max_stream_data_bidi_remote(1024 * 1024);
81    config.set_initial_max_streams_bidi(100);
82    config.set_initial_max_streams_uni(100);
83    config.set_max_idle_timeout(timeout.as_millis() as u64);
84
85    config.verify_peer(true);
86
87    config.set_application_protos(&[b"libp2p"]).unwrap();
88
89    // config.enable_early_data();
90
91    config.set_disable_active_migration(false);
92
93    Ok(config)
94}
95
96/// A libp2p transport backed quic protocol.
97pub struct QuicTransport {
98    timeout: Duration,
99    activities: Arc<AtomicUsize>,
100}
101
102impl Default for QuicTransport {
103    fn default() -> Self {
104        Self {
105            timeout: Duration::from_secs(5),
106            activities: Default::default(),
107        }
108    }
109}
110
111#[async_trait]
112impl DriverTransport for QuicTransport {
113    fn name(&self) -> &str {
114        "quic"
115    }
116    fn activities(&self) -> usize {
117        self.activities.load(Ordering::Relaxed)
118    }
119    async fn bind(&self, switch: &Switch, laddr: &Multiaddr) -> Result<TransportListener> {
120        let quic_config = create_quic_config(&switch.keystore, self.timeout).await?;
121
122        let laddrs = laddr.to_sockaddr()?;
123
124        let listener = QuicListener::bind(laddrs, quic_config).await?;
125
126        let laddr = listener.local_addrs().next().unwrap().clone();
127
128        Ok(QuicP2pListener::new(listener, laddr, self.activities.clone()).into())
129    }
130
131    /// Connect to peer with remote peer [`raddr`](Multiaddr).
132    async fn connect(&self, switch: &Switch, raddr: &Multiaddr) -> Result<P2pConn> {
133        let mut quic_config = create_quic_config(&switch.keystore, self.timeout).await?;
134
135        let raddr = raddr.to_sockaddr()?;
136
137        let laddr = if raddr.is_ipv4() {
138            "0.0.0.0:0"
139        } else {
140            "[::]:0"
141        };
142
143        let conn = QuicConn::connect(None, laddr, raddr, &mut quic_config).await?;
144
145        let (laddr, raddr) = conn
146            .path()
147            .await
148            .ok_or(io::Error::new(io::ErrorKind::Other, "quic: no valid path"))?;
149
150        let cert = conn.peer_cert().await.ok_or(io::Error::new(
151            io::ErrorKind::NotFound,
152            "quic: peer cert not found",
153        ))?;
154
155        let public_key = xstack_x509::verify(cert)?;
156
157        let conn = QuicP2pConn::new(laddr, raddr, conn, public_key, self.activities.clone());
158
159        Ok(conn.into())
160    }
161
162    /// Check if this transport support the protocol stack represented by the `addr`.
163    fn multiaddr_hint(&self, addr: &Multiaddr) -> bool {
164        is_quic_transport(addr)
165    }
166}
167
168struct QuicP2pListener {
169    listener: QuicListener,
170    laddr: SocketAddr,
171    conn_counter: Arc<AtomicUsize>,
172}
173
174impl QuicP2pListener {
175    fn new(listener: QuicListener, laddr: SocketAddr, conn_counter: Arc<AtomicUsize>) -> Self {
176        Self {
177            laddr,
178            listener,
179            conn_counter,
180        }
181    }
182}
183
184#[async_trait]
185impl DriverListener for QuicP2pListener {
186    /// Accept next incoming connection between local and peer.
187    async fn accept(&mut self) -> Result<P2pConn> {
188        let conn = self.listener.accept().await?;
189
190        let cert = conn.peer_cert().await.ok_or(io::Error::new(
191            io::ErrorKind::NotFound,
192            "quic: peer cert not found",
193        ))?;
194
195        let public_key = xstack_x509::verify(cert)?;
196
197        let peer_addr = conn.peer_addr(self.laddr).await.ok_or(io::Error::new(
198            io::ErrorKind::NotFound,
199            "quic: peer path not found",
200        ))?;
201
202        Ok(QuicP2pConn::new(
203            self.laddr.clone(),
204            peer_addr,
205            conn,
206            public_key,
207            self.conn_counter.clone(),
208        )
209        .into())
210    }
211
212    /// Returns the local address that this listener is bound to.
213    fn local_addr(&self) -> Result<Multiaddr> {
214        let mut addr = Multiaddr::from(self.laddr.ip());
215        addr.push(Protocol::Udp(self.laddr.port()));
216        addr.push(Protocol::QuicV1);
217
218        Ok(addr)
219    }
220}
221
222#[derive(Clone)]
223struct QuicP2pConn {
224    laddr: Multiaddr,
225    raddr: Multiaddr,
226    conn: Arc<QuicConn>,
227    public_key: PublicKey,
228    id: String,
229    counter: Arc<AtomicUsize>,
230    activities: Arc<AtomicUsize>,
231}
232
233impl Drop for QuicP2pConn {
234    fn drop(&mut self) {
235        self.activities.fetch_sub(1, Ordering::Relaxed);
236    }
237}
238
239impl QuicP2pConn {
240    fn new(
241        laddr: SocketAddr,
242        raddr: SocketAddr,
243        conn: QuicConn,
244        public_key: PublicKey,
245        activities: Arc<AtomicUsize>,
246    ) -> Self {
247        activities.fetch_add(1, Ordering::Relaxed);
248
249        let mut m_laddr = Multiaddr::from(laddr.ip());
250        m_laddr.push(Protocol::Udp(laddr.port()));
251        m_laddr.push(Protocol::QuicV1);
252
253        let mut m_raddr = Multiaddr::from(raddr.ip());
254        m_raddr.push(Protocol::Udp(raddr.port()));
255        m_raddr.push(Protocol::QuicV1);
256
257        Self {
258            id: Uuid::new_v4().to_string(),
259            laddr: m_laddr,
260            raddr: m_raddr,
261            conn: Arc::new(conn),
262            public_key,
263            counter: Default::default(),
264            activities,
265        }
266    }
267}
268
269#[async_trait]
270impl DriverConnection for QuicP2pConn {
271    fn id(&self) -> &str {
272        &self.id
273    }
274
275    /// Returns local bind address.
276    ///
277    /// This can be useful, for example, when binding to port 0 to figure out which port was
278    /// actually bound.
279    fn local_addr(&self) -> &Multiaddr {
280        &self.laddr
281    }
282
283    /// Returns the remote address that this connection is connected to.
284    fn peer_addr(&self) -> &Multiaddr {
285        &self.raddr
286    }
287
288    /// Accept newly incoming stream for reading/writing.
289    ///
290    /// If the connection is dropping or has been dropped, this function will returns `None`.
291    async fn accept(&mut self) -> io::Result<ProtocolStream> {
292        let stream = self.conn.accept().await?;
293
294        Ok(QuicP2pStream::new(
295            self.id.clone(),
296            stream,
297            self.public_key.clone(),
298            self.laddr.clone(),
299            self.raddr.clone(),
300            self.counter.clone(),
301        )
302        .into())
303    }
304
305    async fn connect(&mut self) -> Result<ProtocolStream> {
306        let stream = self.conn.open(true).await?;
307
308        Ok(QuicP2pStream::new(
309            self.id.clone(),
310            stream,
311            self.public_key.clone(),
312            self.laddr.clone(),
313            self.raddr.clone(),
314            self.counter.clone(),
315        )
316        .into())
317    }
318
319    fn close(&mut self) -> io::Result<()> {
320        self.conn.close()?;
321
322        Ok(())
323    }
324
325    /// Returns true if this connection is closed or is closing.
326    fn is_closed(&self) -> bool {
327        self.conn.is_closed()
328    }
329
330    /// Creates a new independently owned handle to the underlying socket.
331    fn clone(&self) -> P2pConn {
332        self.activities.fetch_add(1, Ordering::Relaxed);
333        Clone::clone(self).into()
334    }
335
336    /// Return the remote peer's public key.
337    fn public_key(&self) -> &PublicKey {
338        &self.public_key
339    }
340
341    fn actives(&self) -> usize {
342        self.counter.load(Ordering::Relaxed)
343    }
344}
345
346struct QuicP2pStream {
347    conn_id: String,
348    id: String,
349    stream: QuicStream,
350    public_key: PublicKey,
351    laddr: Multiaddr,
352    raddr: Multiaddr,
353    counter: Arc<AtomicUsize>,
354}
355
356impl Drop for QuicP2pStream {
357    fn drop(&mut self) {
358        self.counter.fetch_sub(1, Ordering::Relaxed);
359    }
360}
361
362impl QuicP2pStream {
363    fn new(
364        conn_id: String,
365        stream: QuicStream,
366        public_key: PublicKey,
367        laddr: Multiaddr,
368        raddr: Multiaddr,
369        counter: Arc<AtomicUsize>,
370    ) -> Self {
371        counter.fetch_add(1, Ordering::Relaxed);
372
373        Self {
374            conn_id,
375            counter,
376            id: format!("quic({:?},{})", stream.scid(), stream.id()),
377            stream,
378            public_key,
379            laddr,
380            raddr,
381        }
382    }
383}
384
385#[async_trait]
386impl DriverStream for QuicP2pStream {
387    fn conn_id(&self) -> &str {
388        &self.conn_id
389    }
390    fn id(&self) -> &str {
391        &self.id
392    }
393    /// Return the remote peer's public key.
394    fn public_key(&self) -> &PublicKey {
395        &self.public_key
396    }
397
398    /// Returns the local address that this stream is bound to.
399    fn local_addr(&self) -> &Multiaddr {
400        &self.laddr
401    }
402
403    /// Returns the remote address that this stream is connected to.
404    fn peer_addr(&self) -> &Multiaddr {
405        &self.raddr
406    }
407    /// Attempt to read data via this stream.
408    fn poll_read(
409        mut self: std::pin::Pin<&mut Self>,
410        cx: &mut Context<'_>,
411        buf: &mut [u8],
412    ) -> Poll<Result<usize>> {
413        Pin::new(&mut self.stream).poll_read(cx, buf)
414    }
415
416    /// Attempt to write data via this stream.
417    fn poll_write(
418        mut self: std::pin::Pin<&mut Self>,
419        cx: &mut Context<'_>,
420        buf: &[u8],
421    ) -> Poll<Result<usize>> {
422        Pin::new(&mut self.stream).poll_write(cx, buf)
423    }
424
425    /// Attempt to flush the write data.
426    fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
427        Pin::new(&mut self.stream).poll_flush(cx)
428    }
429
430    /// Close this connection.
431    fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
432        Pin::new(&mut self.stream).poll_close(cx)
433    }
434}
435
436#[cfg(test)]
437mod tests {
438
439    use async_trait::async_trait;
440    use xstack::{Result, Switch};
441    use xstack_spec::transport::{transport_specs, TransportSpecContext};
442
443    use super::*;
444
445    struct QuicMock;
446
447    #[async_trait]
448    impl TransportSpecContext for QuicMock {
449        async fn create_switch(&self) -> Result<Switch> {
450            let switch = Switch::new("test")
451                .transport(QuicTransport::default())
452                .transport_bind(["/ip4/127.0.0.1/udp/0/quic-v1"])
453                .create()
454                .await?;
455
456            Ok(switch)
457        }
458    }
459
460    #[futures_test::test]
461    async fn test_specs() {
462        transport_specs(QuicMock).await.unwrap();
463    }
464}